Skip to content

Arrow-Native Design

The whole stack is Arrow-backed. Arrow is the floor everything resolves through: DuckDB hands back Arrow tables, the rolling-aggregation engines take and return pa.Table, Ray pipeline intermediates flow as Arrow batches, and the post-source kernels operate on Arrow buffers directly. There is no pandas in the data path.

This is the data-format half of the platform's two load-bearing rules — the other being the no-query-optimizer rule. Both are spelled out in src/daedalus/pipelines/AGENTS.md.

The pandas-free contract

pandas is never an explicit dependency and is never imported in pipeline code. It stays in uv.lock only transitively (via ray[data]), which is fine — but import pandas and pandas DataFrames must not appear in src/.

  • Relational / DataFrame work that genuinely needs a frame uses Polars.
  • Everything ultimately resolves through DuckDB / Arrow.

Ray intermediates are Arrow, never pandas

The unit of flow in the enrich stage is a ray.data.Dataset; actors take and return pa.Table / RecordBatch.

  • Every map_batches must pass batch_format="pyarrow". Ray's default can hand you pandas frames, so the explicit Arrow format is mandatory.
  • No .to_pandas() / from_pandas anywhere in the data path.
python
ds = (
    ray.data.read_parquet(input_dir)
    .map_batches(enrich_fn, batch_format="pyarrow")   # actors see pa.Table
)
ds.write_parquet(output_dir)   # plan executes only at the terminal sink

Lazy + streaming is mandatory

Build the plan (read_parquet → map_batches → write_parquet) and let it execute only at the terminal sink. Never .materialize() or collect a full day into memory. Shard compaction streams through a bounded dataset.scanner(batch_size=…).to_batches() writer rather than a serial single-file merge, and Ray Data runs with preserve_order=True so each output shard is a contiguous chronological event_timestamp range.

The Polars rolling engine follows the same discipline on its side: its rolling().agg() runs through the lazy streaming engine (collect(engine="streaming")) so window intermediates are chunked instead of held whole (see Compute Engine).

Vectorized SIMD batch kernels only

Pool / transform over whole Arrow batches with numpy buffer ops — pull offsets/values to numpy, do segment reductions, rebuild the Arrow array from buffers. No per-row Python loops, no .as_py() / .tolist() on column data. Batch size is effectively the SIMD width and is kept tunable.

The canonical example is avg_pool_embeddings (src/daedalus/catalog/udfs.py) and its enrich-stage sibling avg_pool_subset: they build the output Arrow ListArray directly from numpy buffers + offset arrays, avoiding the per-row .tolist() → Python-list → Arrow round-trip that dominates cost at high row counts. See the avg_pool_embeddings walkthrough.

Why Polars, DuckDB, and Arrow — and not pandas

NeedTool
Source-side relational query (scans, joins, windowed agg)DuckDB over the polymorphic source
In-process DataFrame work that needs a framePolars (lazy, streaming)
The data interchange floor everything resolves throughArrow (pa.Table / RecordBatch)
Distributed embedding enrichmentRay Data over Arrow batches + a Lance store

pandas appears nowhere in that table on purpose: it is eager, single-threaded for most ops, and copies on the row-oriented boundary Arrow is built to avoid.

The guard test

These rules are already satisfied repo-wide, and a guard test keeps a future edit from quietly regressing them: tests/pipelines/test_arrow_native_contract.py performs static source checks (no Ray execution):

  • For every *.py in src/daedalus/pipelines/, assert it contains neither import pandas nor from pandas.
  • In dynamic_feature.py, assert no batch_format="pandas" appears, and that every map_batches( call is matched by a batch_format="pyarrow" — so no map_batches can silently fall back to Ray's pandas default.
bash
uv run pytest tests/pipelines/test_arrow_native_contract.py