Operator Pipeline
A training run is a DAG of engine-tagged operators. A feature service is compiled into an OperatorPipeline, serialized to an editable YAML, and run through the executor. This is the unified skeleton → enrich path and the sole training entry point in Core v1 (daeda pipeline train <service>).
The logical model lives in src/daedalus/pipelines/operators.py; the compiler in compiler.py; the executor in executor.py. The operator dataclasses are pure description + schema inference + to_dict round-trip — they hold no data and run nothing.
The engine-tagged operator model
Engine — a declared tier, never inferred
Every operator carries an engine tag — one of three tiers — chosen by the author, not inferred by a query optimizer. Engine is a str-valued enum so it round-trips through YAML:
Engine | Value | What runs on it |
|---|---|---|
Engine.SQL | "sql" | Pure DuckDB SQL over the attached source(s) — the source layer. |
Engine.SQL_ARROW_UDF | "sql_arrow_udf" | DuckDB SQL producing a column, then a vectorized Arrow UDF post-SQL (e.g. avg_pool / flatten_distinct over a list column). |
Engine.PYTHONIC | "pythonic" | Distributed / dynamic Python compute (Ray actors + numpy / Lance) — the irreducible embedding-enrichment case. |
The Operator protocol
Every operator satisfies a common Operator Protocol:
@runtime_checkable
class Operator(Protocol):
op_kind: str
name: str # unique node id
inputs: tuple[str, ...] # upstream operators' output relations
output: str # this op's produced relation (defaults to name)
engine: Engine
def can_run_on(self) -> frozenset[Engine]: ...
def to_dict(self) -> dict: ...can_run_on() reports the engines an op kind is capable of (the author picks one of them); the default is "exactly the declared engine."
Op kinds
Two execution layers, nine op kinds:
Source layer (Engine.SQL) — compile to SQL composed into one query per source and handed to DuckDB over the polymorphic source. The source optimizes.
| Op kind | op_kind | Role |
|---|---|---|
Scan | scan | Read a source relation (feed time-series or a feature view); carries source_ref, projected columns, an optional pushdown predicate. |
Filter | filter | Row filter — a SQL predicate over the input. |
Project | project | Select / rename / compute scalar expressions (curation, age columns, output ordering + default-fill). |
Join | join | Plain cross-table equi-join (the non-temporal path); inputs = (left, right), on, how. |
PointInTimeJoin | pit_join | Temporal-causal ASOF join (strict <): most-recent right rows with right.ts < left.timestamp_column. Used for cumulative dislike history and recent-published lookups. |
RollingAggregate | rolling_aggregate | Windowed point-in-time aggregation wrapping aggregate_pit_table; carries the spec list, timestamp_column, and rolling_engine (the DuckDB-vs-Polars choice). |
Post-source compute layer — run lazily, partitioned, and streamed.
| Op kind | op_kind | Role |
|---|---|---|
ArrowUdfTransform | arrow_udf_transform | Vectorized Arrow UDF over a materialized column (e.g. flatten_distinct, avg-pool). Runs on SQL_ARROW_UDF or PYTHONIC. |
RayUdfTransform | ray_udf_transform | Distributed dynamic compute / inference (Ray actors + Lance/numpy) — embedding enrichment. Always PYTHONIC. |
Sink | sink | Terminal write — day-partitioned, ordered, atomic parquet. |
RollingAggregate.can_run_on() returns {SQL, SQL_ARROW_UDF, PYTHONIC} (its flatten_distinct specs are split onto SQL_ARROW_UDF), and RayUdfTransform pins itself to PYTHONIC in __post_init__.
The whole thing is wrapped by OperatorPipeline — a topologically ordered tuple of operators (each op's inputs precede it) plus the service / version it compiled from. to_yaml_dict() / by_kind() round-trip it.
The compiler — service → operator DAG
compiler.py (compile_service_to_pipeline / build_compile_request) is pure: a FeatureServiceDef in, an OperatorPipeline out. It maps the contract's columns and the training aggregation/enrichment specs onto the operator vocabulary, producing a topologically ordered DAG:
events Scan
→ per-referenced-view (Scan + cross-table Join)
→ RollingAggregate (windowed; flatten_distinct split to SQL_ARROW_UDF)
→ PointInTimeJoin lookups (dislike history, recent-published)
→ Project assembly (curation, age cols, output ordering + default-fill)
→ RayUdfTransform enrichment (one per embedding store; PYTHONIC)
→ Sink (day-partitioned parquet)The compile is contract-driven: a PIT-join is emitted only for declared lookback columns, and an enrichment op only for declared embedding columns — so a service with neither (e.g. the XGB reranker) gets no PIT-join and no Ray UDF stage. It reuses training_adapter.resolve_training_service so feature-ref / output-column derivation stays identical to the training path.
The result serializes to an editable pipeline YAML (config/pipelines/<service>.generated.yaml); a hand-edited config/pipelines/<service>.yaml takes precedence when present. Engine tags in the emitted YAML are defaults the author can override.
# Compile a service to an editable operator DAG (inspect / hand-tune)
daeda pipeline compile dssm_rankingThe executor — running the DAG
executor.py (run_pipeline(ExecutionRequest)) interprets the DAG by calling the engine entry points directly — it wraps the proven feature logic rather than reimplementing it:
- The SQL source layer + post-source SQL ops (
Scan/Filter/Join/PointInTimeJoin/RollingAggregate/Project/ flatten) are run bytraining.run_skeleton_stage— one composed, source-optimized DuckDB query per day, already memory-bounded and resumable. - The Pythonic
RayUdfTransform(embedding enrichment) is run bydynamic_feature.run_enrich_stage— the Ray + Lance avg-pool stage, lazy and streamed (see Enrich Stage).
from daedalus.pipelines.executor import run_pipeline, ExecutionRequest
result = run_pipeline(ExecutionRequest(pipeline=pipeline, run_config=cfg))
result.output_path # the final stage's output directoryThe enrich stage only runs when the pipeline declares a RayUdfTransform (pipeline.by_kind("ray_udf_transform")) and enrichment isn't skipped; it runs under the 16c / 64 GiB RayEnvelope. See the Training Pipeline overview for the full per-day flow.
Design rule — no query optimizer
This is load-bearing (see src/daedalus/pipelines/AGENTS.md):
Sources self-optimize; daedalus owns only post-source compute
There is deliberately no LogicalPlan → PhysicalPlan duality, no cost model, and no SQL-fusion optimizer. The data sources (DuckLake / Snowflake, and DuckDB over them) already optimize the source-side query. daedalus owns only the post-source compute layer (Arrow-UDF + Ray-UDF), governed by exactly three levers: lazy evaluation, work partitioning, batch streaming.
So the architecture is two thin layers — a SQL source layer (compose pushdown-friendly SQL, hand it to the source) and a post-source compute layer — and the engine is a declared per-operator field, never something an optimizer chooses. See Arrow-Native Design for the data-format half of the same contract.