Operators & Optimization
The training pipeline is one DAG of engine-tagged operators. Every column a feature service declares is produced by an operator, and every operator carries a declared engine — never one inferred by an optimizer. This page lists the operator vocabulary, walks the pipeline end to end as operators, and explains how optimization happens at the sub-operator level.
The logical model lives in src/daedalus/pipelines/operators.py; the compiler that builds the DAG in compiler.py; the executor that runs it in executor.py. See Operator Pipeline for the compiler / executor architecture.
The operator vocabulary
Two execution layers, nine op kinds. The engine column is the default tag the compiler emits; the author can override it in the generated pipeline YAML within the set each op kind can_run_on().
| Operator | op_kind | Engine | Role |
|---|---|---|---|
Scan | scan | sql | Read a source relation (feed time-series or a feature view); carries source_ref, projected columns, an optional pushdown predicate. |
Filter | filter | sql | Row filter — a SQL predicate over the input. |
Project | project | sql | Select / rename / compute scalar expressions (curation, age columns, output ordering + default-fill). |
Join | join | sql | Plain cross-table equi-join (the non-temporal path); inputs = (left, right), on, how. |
PointInTimeJoin | pit_join | sql | Temporal-causal ASOF join, strict <: most-recent right rows with right.ts < left.timestamp_column. |
RollingAggregate | rolling_aggregate | sql | Windowed point-in-time aggregation wrapping aggregate_pit_table; carries the spec list, timestamp_column, and rolling_engine. |
ArrowUdfTransform | arrow_udf_transform | sql_arrow_udf | Vectorized Arrow UDF over a materialized column (e.g. flatten_distinct, avg-pool). |
RayUdfTransform | ray_udf_transform | pythonic | Distributed Ray/Lance embedding compute. Pinned to pythonic. |
Sink | sink | sql | Terminal write — day-partitioned, ordered, atomic parquet. |
RollingAggregate.can_run_on() returns {sql, sql_arrow_udf, pythonic} (its flatten_distinct specs are split onto sql_arrow_udf); ArrowUdfTransform runs on sql_arrow_udf or pythonic; RayUdfTransform pins itself to pythonic.
The pipeline, end to end, as operators
The compiler emits this topologically ordered DAG (compile_service_to_pipeline). Walk it operator by operator.
Source columns — Scan / Filter / Project / Join
The spine starts at a Scan of the event time-series view (feed_events). Each referenced feature view becomes its own Scan plus a cross-table Join (_view_scans_and_joins), LEFT-joining that view's static columns (author_id, model_id, tack_ids, user-profile columns, …) onto the spine on the canonical entity key. Filter applies row predicates (e.g. embedding-coverage and session filters), and Project computes scalar expressions.
These all run on the sql engine: the compiler composes them into one source-optimized DuckDB query per day, handed to the source via source_factory.build_source (DuckLake attaches READ_ONLY).
Point-in-time rollups — RollingAggregate
RollingAggregate produces the windowed sequence features — like_artwork_ids_*, view_artwork_ids_*, click_artwork_ids_*, the count rollups, and the set-style rollups like like_artwork_tack_ids_*. For each spine row it aggregates the user's prior history within the window. It wraps the proven aggregate_pit_table engine; rolling_engine records the DuckDB-vs-Polars choice for the rolling computation (see Compute Engine).
flatten_distinct specs (set-style rollups) are split into a second RollingAggregate tagged sql_arrow_udf, so the list-flatten kernel runs as a vectorized Arrow UDF after the SQL rollup rather than inside the window.
Temporal lookups — PointInTimeJoin
Cumulative / lookback columns that are lookups rather than windowed rollups become a history Scan plus a PointInTimeJoin (_pit_lookup_ops):
dislike_artwork_ids— cumulative strict-<dislike history over the whole feed, keyed onuser_id.user_recent_published_artworks—artwork_propertieswhereauthor_id == spine user_id, strict-<and window-bounded.
Each lookup is only emitted when the service declares that output column.
Final assembly — Project
A Project (assembly) consumes the spine-after-joins plus every temporal contribution and produces the final column set: model-type curation, age columns derived from *_created_at timestamps, retriever-id remap, output column ordering, and default-fill for absent columns.
Embedding enrichment — RayUdfTransform
For each declared embedding store the compiler emits a RayUdfTransform (_enrichment_ops, always pythonic). This is the irreducible dynamic-compute operator: it appends avg-pooled artwork embeddings (image_embedding, like_artwork_avg_embeds) by running Ray Data streaming actors (EmbeddingEnricher) over a Lance embedding store, vectorized through avg_pool_subset. Contract-driven: a service with no embedding columns gets no RayUdfTransform.
Write — Sink
A Sink writes the day-partitioned, ordered, atomic parquet (dt=YYYY-MM-DD/part-N-0.parquet).
Sub-operator optimization
Optimization is per operator through its declared engine — there is no global query planner and no cost model. Each engine tier self-optimizes:
sql— the operator compiles to DuckDB SQL and is pushed down to the source. The source (DuckLake / Snowflake / Postgres, and DuckDB over them) optimizes its own scan / filter / join. Daedalus does not re-plan it.sql_arrow_udf— SQL produces a column, then a vectorized Arrow UDF (e.g.avg_pool,flatten_distinct) runs over Arrow batches post-SQL.pythonic— distributed Ray actors over Arrow batches and the Lance embedding store.
Across all tiers, daedalus owns only the post-source compute layer and only three levers: lazy evaluation, work partitioning, batch streaming — Arrow-native throughout. See the no-query-optimizer design rule and Arrow-Native Design.
Load-bearing operator invariants
Curation must precede the content-filter drop
Upstream data infra mis-tags every ANIMATED_ARTWORK row with model_type = 'SD_V1_MODEL'. The assembly Project rewrites those to DEFAULT_I2V_MODEL before the content-filter drops SD_V1_MODEL rows. In the SQL driver both run against the same curated model_type expression so the order holds; if you hand-edit the assembly, the animated-curation rewrite must precede the SD_V1_MODEL drop or legitimate animated artworks are silently lost.
Strict-< causal window semantics
PointInTimeJoin and RollingAggregate are strictly causal: a spine row only ever sees history with ts < event_timestamp (no leakage of the row's own event). exclude_self further filters the row's own value out of set-style rollups. Keep these semantics intact when overriding an operator's engine — the DuckDB path implements them with RANGE … EXCLUDE GROUP / bounded arg_max(…, N), and the Polars path is bit-for-bit equivalent.
Next
- Execution & Tuning — running the DAG, day parallelism, the Ray envelope, config files.
- Overview — the one-pipeline model and output layout.
- Operator Pipeline — compiler + executor.
- Compute Engine — DuckDB & Polars rolling aggregation behind
RollingAggregate.