Can you clarify what angle of the existing solution you don’t like? The example you give of the Unix pipeline is pretty similar to what we do with pandas pipeline chains.
(pd.read_json(…)
.join(pd.read_json(…))
.assign(
texts=…,
embeddings=…,
)
.pipe(extract_something)
.query(some_filter)
.pipe(send_somewhere, batch=50))
(this is just thrown together, untested, on mobile, and I’m not sure what your data looks like exactly, or how you’re transforming it, but that’s the general form)Is it the syntax you don’t like, or your data is too large to fit in memory, or what aspect? This can certainly be made more concise if desired, if we want to wrap things in classes, decorators, context managers, overload the actual pipe operator, or other syntactic sugar mechanisms.
Most tools you find will be aimed at handling scale, and in the name of efficiency will therefore be in a columnar form, which is why you see a bunch of tools that do this already but only on tabular data. JSON is slow to work with beyond the initial load, so often you’d see it loaded into pandas or polars, maybe with minor transformation, then dumped to parquet files for some downstream tool to pick up and process.
These systems are also geared toward being robust. In a real world system there are all kinds of unexpected errors to handle. What happens if the API you’re pushing to is down, or starts rate limiting you, or someone added a new field or renamed a field or your two data sets don’t have matching keys? Real pipelines have to account for this, and depending on your scenario you might want to retry, or roll back changes, and this can get ridiculously complicated because in API land you often don’t have any analogous concept of transactions and rollback like exist in SQL, so without a distributed transaction coordinator (it’s own complexity), streaming data will lead to distributed systems drifting out of sync. And now you need a batch job on top of your streaming data to ensure it hasn’t fallen out of sync (so in reality, unless you have heavy complexity needs, just use SQL so you have transactions, or just use batch jobs).
But that’s the reason you don’t see this record-based fire-and-forget approach, because it’s too slow and lacks error handling, and why instead everything is columnar and heavyweight (with DAGs and workflow orchestrators etc).
Maybe I’m missing your point, but your Python code could definitely be as concise as any of the other examples you gave.
Thanks for laying that out. You raise great points: Pandas (or Polars) chaining is indeed closer to what we want than raw imperative loops, and it handles a lot of the transformations. But it still assumes an in-memory, columnar approach, which doesn’t always gel with row-by-row side effects like pushing data to an external API. Also, real-world systems do require robust error handling and partial retries, which is exactly the scenario we’d love a functional pipeline to address by design, rather than patching in big try/except blocks. Tools like DAG orchestrators solve that at a workflow level, but not so much at the record/stream level.
So that’s the core frustration: yes, we can hack it in Python with a pipeline style, yes we can load data into DataFrames and transform it, but as soon as we do side-effectful calls (like collection.add(...) per record or batch), we have to drop out of the nice .pipe() flow. For me, it’s less about syntax and more about a missing standard approach to composable streams + side effects. Elixir Broadway, or even a DSL in Lisp/Clojure, addresses that more natively.
“Just use batch jobs / just use SQL transactions” works only if your external system supports transactions or if you keep your data in a single database. But many people use specialized systems (e.g. vector DBs, text embeddings) that don’t have an ACID-like model. Tools like Elixir Broadway do exactly that for streaming: concurrency, batching, backpressure, built-in retry, etc. That’s part of the reason we’re exploring new paradigms, rather than simply reading into Pandas or writing a standard “for line in file” script.
In short, I’m not saying Pandas can’t be coerced into something close to a pipeline. It’s more that we lack a standard, built-in, composable “stream of records -> transform -> side-effect-> handle errors” solution. Once you leave the purely tabular, in-memory paradigm, or need partial success handling, you quickly outgrow the typical DataFrame approach. That’s why I find frameworks like Elixir Broadway, or even an RxJS-like pipeline, so appealing — they make chunk-based side effects a first-class citizen.
So the question is: do we keep building one-off solutions in Python, or do we push for a more standardized, composable approach that can handle both transformations and side effects gracefully?