Skip to content

Dagster Orchestration

Dagster is the execution runtime for production training runs. The module-level defs object in src/daedalus/definitions.py is the entrypoint that Dagit, the dg CLI, and dagster dev load:

python
# src/daedalus/definitions.py
defs = dg.Definitions(
    assets=[*feature_view_specs, dssm_ranking_training],
    jobs=[training_job],
    schedules=[training_schedule],
    resources={"cfg": DaedalusConfig()},
)

Everything below is defined in src/daedalus/defs/training.py.

The hybrid model: a partitioned graph-backed asset

The top abstraction is a daily-partitioned asset per training service — one partition per day, which is also the schedule cadence. The partition is produced by a graph of ops so the separable sub-phases are visible in Dagit and can be retried or tuned individually:

text
compile_pipeline ─▶ skeleton_stage ─▶ enrich_stage ─▶ (asset materialization)
python
@dg.graph_asset(
    name="dssm_ranking_training",
    partitions_def=daily_partitions,        # DailyPartitionsDefinition(start_date="2026-02-01")
    group_name="training",
)
def dssm_ranking_training() -> dg.MaterializeResult:
    return enrich_stage(skeleton_stage(compile_pipeline()))

The ops wrap the existing engine (pipelines.executor.run_pipeline over the skeleton/enrich stages) — no feature logic is re-derived. Each op emits rich metadata so a run is self-describing in Dagit:

OpPhaseMetadata emitted
compile_pipelineCompile the service to the engine-tagged operator DAGoperators count, the full dag as JSON
skeleton_stageSQL source stage: spine + rolling aggregations (skeleton_only=True)rows, output path
enrich_stagePythonic stage: Ray + Lance avg-pool embeddings (enrich_only=True)rows, columns, embedding_columns, output_path, partition

enrich_stage reads counts and schema from the parquet metadata only — it never materializes the large 1152-d embedding partition into memory just to report stats.

Feature views as external assets (lineage)

Every feature view in the registry is declared as a Dagster external asset, so the asset graph is the lineage. They appear in the feature_views group keyed as feature_view/<name> with the parquet kind:

python
dg.AssetSpec(
    key=dg.AssetKey(["feature_view", name]),
    group_name="feature_views",
    kinds={"parquet"},
)

Job + schedule

python
training_job = dg.define_asset_job(
    name="dssm_ranking_daily",
    selection=[dssm_ranking_training],
)

training_schedule = dg.build_schedule_from_partitioned_job(training_job)

build_schedule_from_partitioned_job derives a daily schedule directly from the daily partition definition, so production runs follow the partition cadence with no separate cron string to maintain.

The DaedalusConfig resource

Where the engine reads config and writes output is a ConfigurableResource, overridable per run or per environment:

python
class DaedalusConfig(dg.ConfigurableResource):
    runtime_config_path: str = "config/training/runtime.yaml"
    aggregation_config_path: str = "config/training/aggregations.yaml"
    feature_views_dir: str = "feature_views"
    feature_services_dir: str = "feature_services"
    output_root: str = "data/training_output"
    enriched_output_root: str = "data/training_output_enriched"
    # Local Ray envelope (prod overrides to 16c / 64 GiB).
    ray_num_cpus: int = 16
    ray_memory_gb: int = 44
    ray_object_store_gb: int = 12

Daily today, hourly later

The cadence is daily end to end. Moving to hourly is a one-line swap to HourlyPartitionsDefinition once the feed supports it.

Running it

bash
# Launch Dagit (the web UI) against the definitions module
uv run dagster dev -m daedalus.definitions

# Validate the definitions load cleanly
uv run dg check defs

Agent CLI: ad-hoc materialization & lineage

For agents and quick checks, two top-level daeda commands wrap the same Dagster asset in-process (src/daedalus/commands/agent.py) — no running Dagit instance required.

daeda materialize-day

Materializes one training-day partition (via dg.materialize on dssm_ranking_training) so you can inspect sample output. Only the dssm_ranking service is supported.

bash
uv run daeda materialize-day dssm_ranking 2026-02-01

It prints a JSON summary pulled from the materialization metadata:

json
{
  "service": "dssm_ranking",
  "date": "2026-02-01",
  "rows": 1234567,
  "output_path": "data/.../dt=2026-02-01",
  "embedding_columns": ["image_embedding", "like_artwork_avg_embeds"]
}

daeda lineage

Shows upstream/downstream lineage for one feature view or service, resolved from the platform registry (not Dagster):

bash
uv run daeda lineage dssm_ranking            # table (default)
uv run daeda lineage user_likes --format json

For a service it lists upstream views and their sources; for a view it lists upstream sources and downstream services.

See also