Dagster Orchestration
Dagster is the execution runtime for production training runs. The module-level defs object in src/daedalus/definitions.py is the entrypoint that Dagit, the dg CLI, and dagster dev load:
# src/daedalus/definitions.py
defs = dg.Definitions(
assets=[*feature_view_specs, dssm_ranking_training],
jobs=[training_job],
schedules=[training_schedule],
resources={"cfg": DaedalusConfig()},
)Everything below is defined in src/daedalus/defs/training.py.
The hybrid model: a partitioned graph-backed asset
The top abstraction is a daily-partitioned asset per training service — one partition per day, which is also the schedule cadence. The partition is produced by a graph of ops so the separable sub-phases are visible in Dagit and can be retried or tuned individually:
compile_pipeline ─▶ skeleton_stage ─▶ enrich_stage ─▶ (asset materialization)@dg.graph_asset(
name="dssm_ranking_training",
partitions_def=daily_partitions, # DailyPartitionsDefinition(start_date="2026-02-01")
group_name="training",
)
def dssm_ranking_training() -> dg.MaterializeResult:
return enrich_stage(skeleton_stage(compile_pipeline()))The ops wrap the existing engine (pipelines.executor.run_pipeline over the skeleton/enrich stages) — no feature logic is re-derived. Each op emits rich metadata so a run is self-describing in Dagit:
| Op | Phase | Metadata emitted |
|---|---|---|
compile_pipeline | Compile the service to the engine-tagged operator DAG | operators count, the full dag as JSON |
skeleton_stage | SQL source stage: spine + rolling aggregations (skeleton_only=True) | rows, output path |
enrich_stage | Pythonic stage: Ray + Lance avg-pool embeddings (enrich_only=True) | rows, columns, embedding_columns, output_path, partition |
enrich_stage reads counts and schema from the parquet metadata only — it never materializes the large 1152-d embedding partition into memory just to report stats.
Feature views as external assets (lineage)
Every feature view in the registry is declared as a Dagster external asset, so the asset graph is the lineage. They appear in the feature_views group keyed as feature_view/<name> with the parquet kind:
dg.AssetSpec(
key=dg.AssetKey(["feature_view", name]),
group_name="feature_views",
kinds={"parquet"},
)Job + schedule
training_job = dg.define_asset_job(
name="dssm_ranking_daily",
selection=[dssm_ranking_training],
)
training_schedule = dg.build_schedule_from_partitioned_job(training_job)build_schedule_from_partitioned_job derives a daily schedule directly from the daily partition definition, so production runs follow the partition cadence with no separate cron string to maintain.
The DaedalusConfig resource
Where the engine reads config and writes output is a ConfigurableResource, overridable per run or per environment:
class DaedalusConfig(dg.ConfigurableResource):
runtime_config_path: str = "config/training/runtime.yaml"
aggregation_config_path: str = "config/training/aggregations.yaml"
feature_views_dir: str = "feature_views"
feature_services_dir: str = "feature_services"
output_root: str = "data/training_output"
enriched_output_root: str = "data/training_output_enriched"
# Local Ray envelope (prod overrides to 16c / 64 GiB).
ray_num_cpus: int = 16
ray_memory_gb: int = 44
ray_object_store_gb: int = 12Daily today, hourly later
The cadence is daily end to end. Moving to hourly is a one-line swap to HourlyPartitionsDefinition once the feed supports it.
Running it
# Launch Dagit (the web UI) against the definitions module
uv run dagster dev -m daedalus.definitions
# Validate the definitions load cleanly
uv run dg check defsAgent CLI: ad-hoc materialization & lineage
For agents and quick checks, two top-level daeda commands wrap the same Dagster asset in-process (src/daedalus/commands/agent.py) — no running Dagit instance required.
daeda materialize-day
Materializes one training-day partition (via dg.materialize on dssm_ranking_training) so you can inspect sample output. Only the dssm_ranking service is supported.
uv run daeda materialize-day dssm_ranking 2026-02-01It prints a JSON summary pulled from the materialization metadata:
{
"service": "dssm_ranking",
"date": "2026-02-01",
"rows": 1234567,
"output_path": "data/.../dt=2026-02-01",
"embedding_columns": ["image_embedding", "like_artwork_avg_embeds"]
}daeda lineage
Shows upstream/downstream lineage for one feature view or service, resolved from the platform registry (not Dagster):
uv run daeda lineage dssm_ranking # table (default)
uv run daeda lineage user_likes --format jsonFor a service it lists upstream views and their sources; for a view it lists upstream sources and downstream services.
See also
- Training Pipeline overview — what the ops actually run.
- JSON-RPC API — the
runs.*andassets.lineagemethods that reach the same Dagster job over HTTP. - CLI reference.