- I have structured data in JSONL files (in.jsonl, out.jsonl)
- I match lines by a key
- I transform them into (text, embedding) pairs
- I optionally filter/map them
- I batch them (into chunks of 50)
- I push each batch into an external system (e.g. vector DB, Chroma)
That’s it. Sounds trivial. But it turns into ugly imperative Python code very quickly: nested for-loops, global indices, +=, manual batching, line-by-line handling, low-level JSON parsing.Here’s what it usually looks like in Python:
```
with open("in.json", "r") as fin:
with open("out.json", "r") as fout:
for in_line, out_line in zip(fin, fout):
in_data = json.loads(in_line)
out_data = json.loads(out_line)
if in_data["custom_id"] != out_data["custom_id"]:
raise Exception...
texts = in_data["body"]["input"]
embeddings = [d["embedding"] for d in out_data["response"]["body"]["data"]]
for i in range(len(texts)):
doc = texts[i]
emb = embeddings[i]
metadata = {
"source": f"chunk-{global_ids}",
```We’re in 2025, and this is how we’re wiring data into APIs.
---
Why do we tolerate this?
This is a declarative, streaming, data processing problem. Why aren’t we using something more elegant? Something more composable, like functional pipelines?
I'm asking myself: Why don’t we have a composable, streaming, functional DSL for this kind of task?
---
Why not build it like Unix pipes?
What I want is something that feels like:
cat input.jsonl \
| match output.jsonl on custom_id \
| extract (text, embedding) \
| filter not-empty \
| batch 50 \
| send-to-chroma
---In Lisp / Clojure:
(->> (zip input output)
(filter (= :custom_id))
(mapcat (fn [[in out]] (zip (:input in) (:embedding out))))
(partition-all 50)
(map send-to-chroma))
---In Elixir + Broadway:
Broadway
|> read_stream("in.jsonl", "out.jsonl")
|> match_on(:custom_id)
|> map(&{&1.text, &1.embedding})
|> batch_every(50)
|> send_to_chroma()
---And now, back to Python..
We’re stuck writing imperative soup or building hacky DSLs with things like:
load_json_pairs() \
| where(is_valid) \
| select(to_embedding_record) \
| batch(50) \
| foreach(send_to_chroma)
...or, more realistically, writing thousands of lines of with open(...) as f.And even though libraries like tf.data.Dataset, dask.bag, pandas, or pipe exist, none of them really solve this use case in a cohesive and expressive way. They all focus on either tabular data, or big data, or ML input pipelines – not this "structured data -> transform -> push to API" pattern.
---
This is especially absurd now that everyone’s doing RAG
With Retrieval-Augmented Generation (RAG) becoming the norm, we’re all parsing files, extracting embeddings, enriching metadata, batching, and inserting into vector stores.
Why are we all writing the same low-level, ad-hoc code to do this?
Shouldn’t this entire category of work be addressed by proper DSL/framework?
Wouldn’t it make sense to build... - a functional DSL for JSON-to-embedding-to-API pipelines? - or a Python library with proper map, filter, batch, pipe, sink semantics? - or even a streaming runtime like Elixir Broadway or a minimal functional Rx-style graph?
Even R with dplyr has more elegant ways to express transformation than what we do in Python for these jobs.
---
Am I missing something?
Is there a tool, a language, or a framework out there that actually solves this well?
Or is this just one of those gaps in the tooling ecosystem that no one has filled yet?
Would love to hear what others are doing – and if anyone’s already working on a solution like this.
Thanks.
One example I find useful for a lot of usecases is dagster. You can define resources to encapsulate complexity https://docs.dagster.io/guides/build/external-resources in fact with components build on top of custom DSLs https://docs.dagster.io/guides/labs/components
At Magenta/Telekom we are bulding on https://georgheiler.com/event/magenta-data-architecture-25/ - you can follow along with this template here https://github.com/l-mds/local-data-stack/ you may find these examples useful to understand how to use dagster/graph-based data pipeliens @scale
I guess you can make the case that a general framework is useful, but TBH I think either it takes too long to include all use cases (and each company probably only needs one), or becomes too complicated to be a framework.
Not to discourage you though, it's fun to build your own project and I'd love you to try it out. Worst case, no one uses it but you bag a ton of experience.
My experience with Dagster has made me appreciative of plain, simple Python code and highly skeptical of frameworks.
The longer you stick with Python the more elegant you'll find your code. It takes time and trial and error to go from being annoyed with your code to being more comfortable with it and not looking for a tool or framework to be the silver bullet.
- read from an input (source)
- perform some sort of processing
- write the data to some output (sink)
This may either be batch or continuous (stream). The inputs may change, the outputs may change.
I personally think that sql and duckdb are well positioned to do this. SQL is declarative, standardized and has decades worth of mature implementations.
The “source” can be modeled as a table.
The “sink” can also be modeled as a table.
What does a custom dsl provide over sql?
I have a side project called Sqlflow which is attempting to do something similar/
https://github.com/turbolytics/sql-flow
It’s not a DSL but the pipeline is standardized using the source, process, sink stages. Right now the process is pure sql but the source and sink are declarative. SQL has so much prior art, linters and a huge ecosystem with many practitioners.
You provided a Clojure alternative, but without using the precise feature designed to alleviate this problem: transducers[0]. Transducers are the combination of functional composition and an indirection designed to decouple the data transformation pipeline from any specific source (e.g. async vs in-memory doesn't matter).
Sometimes I wrote small frameworks on my own, sometimes I used 3rd party tools. I also tried to create a universal framework for that, trying to solve exactly what you described. Never finished it, because I am more of the no-finishing guy.
However, at some point I came to the conclusion, that it's simply not possible. which is nonsense and more of an excuse. as you said, it's just data in and data out, processed through some simple rules.
so, I'd know the concept, but there are a million other projects in my head.
To get to a higher level of abstraction than the general purpose languages you have to get more specific, maybe someone just hasn’t done it yet for your use case.
(pd.read_json(…)
.join(pd.read_json(…))
.assign(
texts=…,
embeddings=…,
)
.pipe(extract_something)
.query(some_filter)
.pipe(send_somewhere, batch=50))
(this is just thrown together, untested, on mobile, and I’m not sure what your data looks like exactly, or how you’re transforming it, but that’s the general form)Is it the syntax you don’t like, or your data is too large to fit in memory, or what aspect? This can certainly be made more concise if desired, if we want to wrap things in classes, decorators, context managers, overload the actual pipe operator, or other syntactic sugar mechanisms.
Most tools you find will be aimed at handling scale, and in the name of efficiency will therefore be in a columnar form, which is why you see a bunch of tools that do this already but only on tabular data. JSON is slow to work with beyond the initial load, so often you’d see it loaded into pandas or polars, maybe with minor transformation, then dumped to parquet files for some downstream tool to pick up and process.
These systems are also geared toward being robust. In a real world system there are all kinds of unexpected errors to handle. What happens if the API you’re pushing to is down, or starts rate limiting you, or someone added a new field or renamed a field or your two data sets don’t have matching keys? Real pipelines have to account for this, and depending on your scenario you might want to retry, or roll back changes, and this can get ridiculously complicated because in API land you often don’t have any analogous concept of transactions and rollback like exist in SQL, so without a distributed transaction coordinator (it’s own complexity), streaming data will lead to distributed systems drifting out of sync. And now you need a batch job on top of your streaming data to ensure it hasn’t fallen out of sync (so in reality, unless you have heavy complexity needs, just use SQL so you have transactions, or just use batch jobs).
But that’s the reason you don’t see this record-based fire-and-forget approach, because it’s too slow and lacks error handling, and why instead everything is columnar and heavyweight (with DAGs and workflow orchestrators etc).
Maybe I’m missing your point, but your Python code could definitely be as concise as any of the other examples you gave.