Skip to content

Operators & Optimization

The training pipeline is one DAG of engine-tagged operators. Every column a feature service declares is produced by an operator, and every operator carries a declared engine — never one inferred by an optimizer. This page lists the operator vocabulary, walks the pipeline end to end as operators, and explains how optimization happens at the sub-operator level.

The logical model lives in src/daedalus/pipelines/operators.py; the compiler that builds the DAG in compiler.py; the executor that runs it in executor.py. See Operator Pipeline for the compiler / executor architecture.

The operator vocabulary

Two execution layers, nine op kinds. The engine column is the default tag the compiler emits; the author can override it in the generated pipeline YAML within the set each op kind can_run_on().

Operatorop_kindEngineRole
ScanscansqlRead a source relation (feed time-series or a feature view); carries source_ref, projected columns, an optional pushdown predicate.
FilterfiltersqlRow filter — a SQL predicate over the input.
ProjectprojectsqlSelect / rename / compute scalar expressions (curation, age columns, output ordering + default-fill).
JoinjoinsqlPlain cross-table equi-join (the non-temporal path); inputs = (left, right), on, how.
PointInTimeJoinpit_joinsqlTemporal-causal ASOF join, strict <: most-recent right rows with right.ts < left.timestamp_column.
RollingAggregaterolling_aggregatesqlWindowed point-in-time aggregation wrapping aggregate_pit_table; carries the spec list, timestamp_column, and rolling_engine.
ArrowUdfTransformarrow_udf_transformsql_arrow_udfVectorized Arrow UDF over a materialized column (e.g. flatten_distinct, avg-pool).
RayUdfTransformray_udf_transformpythonicDistributed Ray/Lance embedding compute. Pinned to pythonic.
SinksinksqlTerminal write — day-partitioned, ordered, atomic parquet.

RollingAggregate.can_run_on() returns {sql, sql_arrow_udf, pythonic} (its flatten_distinct specs are split onto sql_arrow_udf); ArrowUdfTransform runs on sql_arrow_udf or pythonic; RayUdfTransform pins itself to pythonic.

The pipeline, end to end, as operators

The compiler emits this topologically ordered DAG (compile_service_to_pipeline). Walk it operator by operator.

Source columns — Scan / Filter / Project / Join

The spine starts at a Scan of the event time-series view (feed_events). Each referenced feature view becomes its own Scan plus a cross-table Join (_view_scans_and_joins), LEFT-joining that view's static columns (author_id, model_id, tack_ids, user-profile columns, …) onto the spine on the canonical entity key. Filter applies row predicates (e.g. embedding-coverage and session filters), and Project computes scalar expressions.

These all run on the sql engine: the compiler composes them into one source-optimized DuckDB query per day, handed to the source via source_factory.build_source (DuckLake attaches READ_ONLY).

Point-in-time rollups — RollingAggregate

RollingAggregate produces the windowed sequence features — like_artwork_ids_*, view_artwork_ids_*, click_artwork_ids_*, the count rollups, and the set-style rollups like like_artwork_tack_ids_*. For each spine row it aggregates the user's prior history within the window. It wraps the proven aggregate_pit_table engine; rolling_engine records the DuckDB-vs-Polars choice for the rolling computation (see Compute Engine).

flatten_distinct specs (set-style rollups) are split into a second RollingAggregate tagged sql_arrow_udf, so the list-flatten kernel runs as a vectorized Arrow UDF after the SQL rollup rather than inside the window.

Temporal lookups — PointInTimeJoin

Cumulative / lookback columns that are lookups rather than windowed rollups become a history Scan plus a PointInTimeJoin (_pit_lookup_ops):

  • dislike_artwork_ids — cumulative strict-< dislike history over the whole feed, keyed on user_id.
  • user_recent_published_artworksartwork_properties where author_id == spine user_id, strict-< and window-bounded.

Each lookup is only emitted when the service declares that output column.

Final assembly — Project

A Project (assembly) consumes the spine-after-joins plus every temporal contribution and produces the final column set: model-type curation, age columns derived from *_created_at timestamps, retriever-id remap, output column ordering, and default-fill for absent columns.

Embedding enrichment — RayUdfTransform

For each declared embedding store the compiler emits a RayUdfTransform (_enrichment_ops, always pythonic). This is the irreducible dynamic-compute operator: it appends avg-pooled artwork embeddings (image_embedding, like_artwork_avg_embeds) by running Ray Data streaming actors (EmbeddingEnricher) over a Lance embedding store, vectorized through avg_pool_subset. Contract-driven: a service with no embedding columns gets no RayUdfTransform.

Write — Sink

A Sink writes the day-partitioned, ordered, atomic parquet (dt=YYYY-MM-DD/part-N-0.parquet).

Sub-operator optimization

Optimization is per operator through its declared engine — there is no global query planner and no cost model. Each engine tier self-optimizes:

  • sql — the operator compiles to DuckDB SQL and is pushed down to the source. The source (DuckLake / Snowflake / Postgres, and DuckDB over them) optimizes its own scan / filter / join. Daedalus does not re-plan it.
  • sql_arrow_udf — SQL produces a column, then a vectorized Arrow UDF (e.g. avg_pool, flatten_distinct) runs over Arrow batches post-SQL.
  • pythonicdistributed Ray actors over Arrow batches and the Lance embedding store.

Across all tiers, daedalus owns only the post-source compute layer and only three levers: lazy evaluation, work partitioning, batch streaming — Arrow-native throughout. See the no-query-optimizer design rule and Arrow-Native Design.

Load-bearing operator invariants

Curation must precede the content-filter drop

Upstream data infra mis-tags every ANIMATED_ARTWORK row with model_type = 'SD_V1_MODEL'. The assembly Project rewrites those to DEFAULT_I2V_MODEL before the content-filter drops SD_V1_MODEL rows. In the SQL driver both run against the same curated model_type expression so the order holds; if you hand-edit the assembly, the animated-curation rewrite must precede the SD_V1_MODEL drop or legitimate animated artworks are silently lost.

Strict-< causal window semantics

PointInTimeJoin and RollingAggregate are strictly causal: a spine row only ever sees history with ts < event_timestamp (no leakage of the row's own event). exclude_self further filters the row's own value out of set-style rollups. Keep these semantics intact when overriding an operator's engine — the DuckDB path implements them with RANGE … EXCLUDE GROUP / bounded arg_max(…, N), and the Polars path is bit-for-bit equivalent.

Next