Compute Engine
The compute engine performs point-in-time rolling aggregations — for each spine row, aggregate the entity's history within a window that respects causality (no leakage from the row's own moment forward). It is the heart of the skeleton stage's rolling-feature columns.
aggregate_pit_table — one function, two engines
aggregate_pit_table is a plain module-level function, not a class. Two implementations live side-by-side under src/daedalus/catalog/compute/, both exposing the identical signature:
aggregate_pit_table(pit_table, aggregation_specs, arrow_udfs=None) -> pa.Tabledaedalus.catalog.compute.duckdb— DuckDB windowed list-aggregation.daedalus.catalog.compute.polars— Polarsrolling().agg().
Both take an Arrow Table in and return an Arrow Table out, with one rolling-aggregate column per spec, in the same row order as the input.
Selecting an engine — the factory
Pick an engine explicitly via the Engine enum and the get_aggregate_pit_table() factory exported from the package. Do not rely on a magical "default" import.
from daedalus.catalog.compute import Engine, get_aggregate_pit_table
aggregate = get_aggregate_pit_table(Engine.DUCKDB)
result = aggregate(spine, specs, duckdb_config={"threads": "32"})Engine is a str-based enum (Engine.DUCKDB == "duckdb", Engine.POLARS == "polars"), so it round-trips through YAML/JSON without a custom serializer and get_aggregate_pit_table("duckdb") works directly on a YAML-loaded value. For the training pipeline the choice lives in runtime.yaml under compute.engine (with compute.duckdb_config for per-call DuckDB settings); run_skeleton_stage builds the aggregator via run_config.compute.make_aggregator() so call sites stay engine-agnostic.
Why a factory, not a default
A package-level aggregate_pit_table re-export of the DuckDB engine is kept for catalog-internal call sites that need a fixed callable (e.g. AggregateOp), but the training driver always goes through get_aggregate_pit_table so the engine choice is config-driven and visible.
DuckDB engine design
compute/duckdb.py emits one CTE per call. The CTE computes every windowed aggregate; a postprocess SELECT then applies the list-shaping transforms.
Strict-< semantics by construction. Each spec becomes a window expression with RANGE … PRECEDING ... AND CURRENT ROW, and — when the spec does not include the current row — EXCLUDE GROUP:
agg OVER (
PARTITION BY <group_by>
ORDER BY <timestamp>
RANGE BETWEEN INTERVAL <window_days> DAY PRECEDING AND CURRENT ROW
EXCLUDE GROUP -- omits the anchor row AND every timestamp peer
) AS <spec_name>EXCLUDE GROUP (not EXCLUDE CURRENT ROW) is the right match for Polars closed="left": it drops the anchor row and every row sharing its ORDER BY value — the strict-< upper bound. EXCLUDE CURRENT ROW would only drop the literal anchor, letting millisecond-timestamp peers leak in.
Bounded list path — arg_max heap. A capped list spec uses arg_max(value, (ts, value), N) — a bounded heap that returns the top-N by a composite (timestamp, value) score without sorting the full in-window membership. On multi-million-row spines that is roughly 8× faster than a full list(... ORDER BY) capped post-hoc. Scalar aggregates (count, avg, sum, max, min) use the plain aggregate function.
Postprocess pass. A second SELECT applies, per spec: coalesce(x, []) (empty list for empty windows), flatten + order-preserving dedup (matching Polars' unique(maintain_order=True)), list_slice for the max_items cap, and a list_filter for exclude_self. flatten_distinct (used by set-style rollups such as like_artwork_tack_ids_*) collects the most-recent max_items inner lists via the bounded heap, then flattens + dedups + caps.
out = aggregate_pit_table(
pit_table,
specs,
duckdb_config={"threads": "32", "memory_limit": "256GiB"},
)Cap threads and memory explicitly on shared pods
DuckDB's defaults read from the host's /proc/cpuinfo / /proc/meminfo, which leak through the cgroup on shared-node devpods. Pass explicit threads / memory_limit via duckdb_config when running interactively.
Polars engine design
compute/polars.py groups specs by their rolling parameters (group_by, timestamp_column, window_days, include_current) and runs a single rolling().agg() pass per group. Each pass:
- Projects only the columns it needs (
group_by, timestamp, each spec's value column and condition column) before sorting, so wide list columns from prior passes aren't dragged through the sort. - Sorts by
(group_by, ts, scalar_value_cols)so the rollingtail(N)cap is deterministic at timestamp ties — two spines with the same content but different row order produce identical capped lists. - Applies SQL
IN/ comparison conditions as null-masked value columns (when(mask).then(col).otherwise(None)), parsed by_parse_sql_condition. - Uses
closed="left"for strict-<(excludes the current timestamp) andclosed="both"to include it, mirroring the DuckDBEXCLUDE GROUPchoice. - Runs
.agg()through the lazy streaming engine (collect(engine= "streaming")) so window intermediates are chunked rather than held whole, with agc.collect()between passes to release pinned Arrow buffers.
Arrow UDFs (operation="arrow_udf") are applied post-hoc on the collected list columns; original row order is restored via an __orig_idx tag before returning Arrow.
Bit-for-bit equivalence
The two engines are bit-for-bit equivalent on every production spec. The DuckDB path was deliberately built to match Polars at every tie-break: lists are emitted newest-first with a value secondary sort (matching Polars' tail(N).reverse()), EXCLUDE GROUP matches closed="left", and flatten_distinct preserves first-seen (newest-first) order on both sides. The Polars engine is kept around for A/B testing and as a fallback; the DuckDB engine is the production default. Cross-engine parity is pinned by the engine tests under tests/engines/.
avg_pool_embeddings — Arrow-buffer construction
Embedding pooling (src/daedalus/catalog/udfs.py) is the canonical example of the Arrow-native, vectorized-kernel discipline (see Arrow-Native Design). avg_pool_embeddings takes a LIST(LIST(DOUBLE)) (one list-of-embeddings per row) and returns a LIST(DOUBLE) (one average embedding per row).
It builds the output Arrow ListArray directly from numpy buffers and offset arrays — pulling outer_offsets, inner_offsets, and the flat values to numpy once, pre-scanning for the common embedding dimension, then accumulating means into a pre-allocated flat output buffer and assembling the result with pa.ListArray.from_arrays(offsets, values, mask=...). This avoids the per-row .tolist() → Python-list → Arrow round-trip that otherwise dominates cost at high row counts.
import pyarrow as pa
from daedalus.catalog.udfs import avg_pool_embeddings
pooled = avg_pool_embeddings(col) # LIST(LIST(DOUBLE)) → LIST(DOUBLE)
assert pooled.type == pa.list_(pa.float64())Null and empty rows are carried through via the output null_mask, so a row with no embeddings becomes a null entry rather than a spurious zero vector.