Arrow-Native Design
The whole stack is Arrow-backed. Arrow is the floor everything resolves through: DuckDB hands back Arrow tables, the rolling-aggregation engines take and return pa.Table, Ray pipeline intermediates flow as Arrow batches, and the post-source kernels operate on Arrow buffers directly. There is no pandas in the data path.
This is the data-format half of the platform's two load-bearing rules — the other being the no-query-optimizer rule. Both are spelled out in src/daedalus/pipelines/AGENTS.md.
The pandas-free contract
pandas is never an explicit dependency and is never imported in pipeline code. It stays in uv.lock only transitively (via ray[data]), which is fine — but import pandas and pandas DataFrames must not appear in src/.
- Relational / DataFrame work that genuinely needs a frame uses Polars.
- Everything ultimately resolves through DuckDB / Arrow.
Ray intermediates are Arrow, never pandas
The unit of flow in the enrich stage is a ray.data.Dataset; actors take and return pa.Table / RecordBatch.
- Every
map_batchesmust passbatch_format="pyarrow". Ray's default can hand you pandas frames, so the explicit Arrow format is mandatory. - No
.to_pandas()/from_pandasanywhere in the data path.
ds = (
ray.data.read_parquet(input_dir)
.map_batches(enrich_fn, batch_format="pyarrow") # actors see pa.Table
)
ds.write_parquet(output_dir) # plan executes only at the terminal sinkLazy + streaming is mandatory
Build the plan (read_parquet → map_batches → write_parquet) and let it execute only at the terminal sink. Never .materialize() or collect a full day into memory. Shard compaction streams through a bounded dataset.scanner(batch_size=…).to_batches() writer rather than a serial single-file merge, and Ray Data runs with preserve_order=True so each output shard is a contiguous chronological event_timestamp range.
The Polars rolling engine follows the same discipline on its side: its rolling().agg() runs through the lazy streaming engine (collect(engine="streaming")) so window intermediates are chunked instead of held whole (see Compute Engine).
Vectorized SIMD batch kernels only
Pool / transform over whole Arrow batches with numpy buffer ops — pull offsets/values to numpy, do segment reductions, rebuild the Arrow array from buffers. No per-row Python loops, no .as_py() / .tolist() on column data. Batch size is effectively the SIMD width and is kept tunable.
The canonical example is avg_pool_embeddings (src/daedalus/catalog/udfs.py) and its enrich-stage sibling avg_pool_subset: they build the output Arrow ListArray directly from numpy buffers + offset arrays, avoiding the per-row .tolist() → Python-list → Arrow round-trip that dominates cost at high row counts. See the avg_pool_embeddings walkthrough.
Why Polars, DuckDB, and Arrow — and not pandas
| Need | Tool |
|---|---|
| Source-side relational query (scans, joins, windowed agg) | DuckDB over the polymorphic source |
| In-process DataFrame work that needs a frame | Polars (lazy, streaming) |
| The data interchange floor everything resolves through | Arrow (pa.Table / RecordBatch) |
| Distributed embedding enrichment | Ray Data over Arrow batches + a Lance store |
pandas appears nowhere in that table on purpose: it is eager, single-threaded for most ops, and copies on the row-oriented boundary Arrow is built to avoid.
The guard test
These rules are already satisfied repo-wide, and a guard test keeps a future edit from quietly regressing them: tests/pipelines/test_arrow_native_contract.py performs static source checks (no Ray execution):
- For every
*.pyinsrc/daedalus/pipelines/, assert it contains neitherimport pandasnorfrom pandas. - In
dynamic_feature.py, assert nobatch_format="pandas"appears, and that everymap_batches(call is matched by abatch_format="pyarrow"— so nomap_batchescan silently fall back to Ray's pandas default.
uv run pytest tests/pipelines/test_arrow_native_contract.py