Skip to content

Operator Pipeline

A training run is a DAG of engine-tagged operators. A feature service is compiled into an OperatorPipeline, serialized to an editable YAML, and run through the executor. This is the unified skeleton → enrich path and the sole training entry point in Core v1 (daeda pipeline train <service>).

The logical model lives in src/daedalus/pipelines/operators.py; the compiler in compiler.py; the executor in executor.py. The operator dataclasses are pure description + schema inference + to_dict round-trip — they hold no data and run nothing.

The engine-tagged operator model

Engine — a declared tier, never inferred

Every operator carries an engine tag — one of three tiers — chosen by the author, not inferred by a query optimizer. Engine is a str-valued enum so it round-trips through YAML:

EngineValueWhat runs on it
Engine.SQL"sql"Pure DuckDB SQL over the attached source(s) — the source layer.
Engine.SQL_ARROW_UDF"sql_arrow_udf"DuckDB SQL producing a column, then a vectorized Arrow UDF post-SQL (e.g. avg_pool / flatten_distinct over a list column).
Engine.PYTHONIC"pythonic"Distributed / dynamic Python compute (Ray actors + numpy / Lance) — the irreducible embedding-enrichment case.

The Operator protocol

Every operator satisfies a common Operator Protocol:

python
@runtime_checkable
class Operator(Protocol):
    op_kind: str
    name: str                 # unique node id
    inputs: tuple[str, ...]   # upstream operators' output relations
    output: str               # this op's produced relation (defaults to name)
    engine: Engine

    def can_run_on(self) -> frozenset[Engine]: ...
    def to_dict(self) -> dict: ...

can_run_on() reports the engines an op kind is capable of (the author picks one of them); the default is "exactly the declared engine."

Op kinds

Two execution layers, nine op kinds:

Source layer (Engine.SQL) — compile to SQL composed into one query per source and handed to DuckDB over the polymorphic source. The source optimizes.

Op kindop_kindRole
ScanscanRead a source relation (feed time-series or a feature view); carries source_ref, projected columns, an optional pushdown predicate.
FilterfilterRow filter — a SQL predicate over the input.
ProjectprojectSelect / rename / compute scalar expressions (curation, age columns, output ordering + default-fill).
JoinjoinPlain cross-table equi-join (the non-temporal path); inputs = (left, right), on, how.
PointInTimeJoinpit_joinTemporal-causal ASOF join (strict <): most-recent right rows with right.ts < left.timestamp_column. Used for cumulative dislike history and recent-published lookups.
RollingAggregaterolling_aggregateWindowed point-in-time aggregation wrapping aggregate_pit_table; carries the spec list, timestamp_column, and rolling_engine (the DuckDB-vs-Polars choice).

Post-source compute layer — run lazily, partitioned, and streamed.

Op kindop_kindRole
ArrowUdfTransformarrow_udf_transformVectorized Arrow UDF over a materialized column (e.g. flatten_distinct, avg-pool). Runs on SQL_ARROW_UDF or PYTHONIC.
RayUdfTransformray_udf_transformDistributed dynamic compute / inference (Ray actors + Lance/numpy) — embedding enrichment. Always PYTHONIC.
SinksinkTerminal write — day-partitioned, ordered, atomic parquet.

RollingAggregate.can_run_on() returns {SQL, SQL_ARROW_UDF, PYTHONIC} (its flatten_distinct specs are split onto SQL_ARROW_UDF), and RayUdfTransform pins itself to PYTHONIC in __post_init__.

The whole thing is wrapped by OperatorPipeline — a topologically ordered tuple of operators (each op's inputs precede it) plus the service / version it compiled from. to_yaml_dict() / by_kind() round-trip it.

The compiler — service → operator DAG

compiler.py (compile_service_to_pipeline / build_compile_request) is pure: a FeatureServiceDef in, an OperatorPipeline out. It maps the contract's columns and the training aggregation/enrichment specs onto the operator vocabulary, producing a topologically ordered DAG:

text
events Scan
  → per-referenced-view (Scan + cross-table Join)
  → RollingAggregate            (windowed; flatten_distinct split to SQL_ARROW_UDF)
  → PointInTimeJoin lookups     (dislike history, recent-published)
  → Project assembly            (curation, age cols, output ordering + default-fill)
  → RayUdfTransform enrichment  (one per embedding store; PYTHONIC)
  → Sink                        (day-partitioned parquet)

The compile is contract-driven: a PIT-join is emitted only for declared lookback columns, and an enrichment op only for declared embedding columns — so a service with neither (e.g. the XGB reranker) gets no PIT-join and no Ray UDF stage. It reuses training_adapter.resolve_training_service so feature-ref / output-column derivation stays identical to the training path.

The result serializes to an editable pipeline YAML (config/pipelines/<service>.generated.yaml); a hand-edited config/pipelines/<service>.yaml takes precedence when present. Engine tags in the emitted YAML are defaults the author can override.

bash
# Compile a service to an editable operator DAG (inspect / hand-tune)
daeda pipeline compile dssm_ranking

The executor — running the DAG

executor.py (run_pipeline(ExecutionRequest)) interprets the DAG by calling the engine entry points directly — it wraps the proven feature logic rather than reimplementing it:

  • The SQL source layer + post-source SQL ops (Scan / Filter / Join / PointInTimeJoin / RollingAggregate / Project / flatten) are run by training.run_skeleton_stage — one composed, source-optimized DuckDB query per day, already memory-bounded and resumable.
  • The Pythonic RayUdfTransform (embedding enrichment) is run by dynamic_feature.run_enrich_stage — the Ray + Lance avg-pool stage, lazy and streamed (see Enrich Stage).
python
from daedalus.pipelines.executor import run_pipeline, ExecutionRequest

result = run_pipeline(ExecutionRequest(pipeline=pipeline, run_config=cfg))
result.output_path   # the final stage's output directory

The enrich stage only runs when the pipeline declares a RayUdfTransform (pipeline.by_kind("ray_udf_transform")) and enrichment isn't skipped; it runs under the 16c / 64 GiB RayEnvelope. See the Training Pipeline overview for the full per-day flow.

Design rule — no query optimizer

This is load-bearing (see src/daedalus/pipelines/AGENTS.md):

Sources self-optimize; daedalus owns only post-source compute

There is deliberately no LogicalPlanPhysicalPlan duality, no cost model, and no SQL-fusion optimizer. The data sources (DuckLake / Snowflake, and DuckDB over them) already optimize the source-side query. daedalus owns only the post-source compute layer (Arrow-UDF + Ray-UDF), governed by exactly three levers: lazy evaluation, work partitioning, batch streaming.

So the architecture is two thin layers — a SQL source layer (compose pushdown-friendly SQL, hand it to the source) and a post-source compute layer — and the engine is a declared per-operator field, never something an optimizer chooses. See Arrow-Native Design for the data-format half of the same contract.