Skeleton Stage
The skeleton stage is the SQL source layer of the training DAG. It is catalog-based: it iterates every month from feed_start through feed_end (inclusive), collecting every day with feed data across the range, and for each target day produces a per-day parquet of feature joins and rolling aggregations.
Engine tag: sql. Entry point: run_skeleton_stage() in src/daedalus/pipelines/training.py. Run it on its own with --skeleton-only:
daeda pipeline train dssm_ranking --skeleton-only --target-date 2026-05-15Per-day flow
For each target day, the pipeline runs the following steps in order:
| Step | Function | What happens |
|---|---|---|
| 1 | _build_time_window_entity_spine | 7-day entity spine from raw feed (user_id, artwork_id, event_timestamp, event, optional feed_session) |
| 2 | _apply_embedding_coverage_filter | Drop spine rows whose artwork_id is not in the Lance embedding pool (avoids dead rows downstream of enrich) |
| 3 | _preload_filtered_features | Load each catalog view once, DuckDB-filtered to spine IDs; deduped to the latest row per entity by the source timestamp_field |
| 4 | _enrich_spine_for_rolling | LEFT JOIN artwork-static columns (author_id, model_id, tack_ids) onto the full spine for rolling aggregation |
| 5 | _inject_dislike_history | When dislike_artwork_ids is configured, concat historical dislike_click rows (ts < window_start) for users in the spine so the cumulative dislike aggregation sees the user's full history |
| 6 | aggregate_pit_table | Rolling aggregations on the full 7-day spine (incl. flatten_distinct for set-style rollups like like_artwork_tack_ids_*) |
| 7 | _filter_to_target_day | Keep only rows in [day_start, day_end); strips injected historical dislike rows whose timestamps fall outside the window |
| 8 | _apply_session_filter | Optional — drops view-only sessions and/or single-like sessions when feed has feed_session |
| 9 | _compute_user_recent_published_artworks | Direct windowed join against artwork_properties keyed on author_id = user_id (replaces the older synthetic event='publish' row injection) |
| 10 | Per-chunk loop | _join_preloaded_features → _compute_age_columns → _curate_animated_model_type → _apply_content_filter → write via ParquetSink (dt=YYYY-MM-DD/part-N-0.parquet) |
Load-bearing per-chunk ordering
Inside step 10, _curate_animated_model_type MUST precede _apply_content_filter. Upstream tags every ANIMATED_ARTWORK row as SD_V1_MODEL; curation rewrites those to DEFAULT_I2V_MODEL beforecontent_filter drops SD_V1_MODEL rows. Reverse the order and legitimate animated artworks are silently dropped.
Feature-join design
feature_refs in runtime.yaml selects which views to load. All columns defined in view.features are loaded — not just the referenced column. Joins are plain LEFT JOINs; ASOF is used only by aggregate_pit_table for rolling aggregation windows, never for feature lookup.
Rolling aggregations
The rolling step (aggregate_pit_table, step 6) is driven by aggregation_specs in config/training/aggregations.yaml. The skeleton driver runs resource-isolated DuckDB COPY stages per day and always uses DuckDB (compute.engine is still honored for legacy aggregate_pit_table call sites such as catalog ops and notebooks). The interchangeable DuckDB / Polars implementations are documented in the Compute Engine section.
Notes from the specs:
like_artwork_ids_*setexclude_self: trueso an impression row for artwork X cannot see X in its own like history — this plugs the candidate-ID leak. Author / model / tack rolls intentionally keep candidate matches as legitimate taste signal.dislike_artwork_idsis cumulative across the user's full history (no window cap), computed in a separate post-rolling pipeline that ASOF-joins the cumulative list onto the target-day rows.user_recent_published_artworksis not a regular sequence aggregation — it joinsartwork_propertiesto the spine onauthor_id = user_idwith a windowed predicate (step 9).
Output columns
Output is controlled by output_columns in runtime.yaml. Grouped by source:
| Source | Columns |
|---|---|
| feed spine | user_id, artwork_id, event_timestamp, event, current_like_count |
| feed spine (geo / provenance) | country_code, platform_code (ISO numeric / platform enum; 0 when absent or unknown), retriever_id, member_source |
user_profile | blur_nsfw, show_nsfw, safe_search, followed_user_ids, user_age |
artwork_properties | author_id, hide_prompts, is_sensitive, artwork_type, artwork_age |
artwork_generation | model_id, model_type, width, height, lightning, sampling_steps |
artwork_aesthetic | aes_score |
artwork_tack | tack_ids |
artwork_vector | media_safety_score, text_safety_score |
rolling agg (like_*) | like_artwork_ids_{1d,3d,7d}, like_artwork_author_ids_{1d,3d,7d}, like_artwork_model_ids_{1d,3d,7d}, like_artwork_cnts_{1d,3d,7d}, like_artwork_tack_ids_{1d,3d,7d} |
| rolling agg (view / click) | view_artwork_ids_{1d,3d,7d}, click_artwork_ids_{1d,3d,7d} |
| rolling agg (cumulative / windowed-join) | dislike_artwork_ids (cumulative), user_recent_published_artworks |
| dynamic feature (Stage 2) | image_embedding, like_artwork_avg_embeds (emitted as typed NULL placeholders here; populated by enrich) |
Output column defaults
Columns in output_columns written as {name, default} dicts are filled with the specified literal (e.g. default: 0, default: [0], default: null) when absent from the chunk — keeping a consistent schema even before the enrich stage runs. The two dynamic columns use default_sql: "CAST(NULL AS FLOAT[])" so they stay typed-NULL until enrich replaces them.
Runtime config knobs
Default knobs live in config/training/runtime.yaml:
| Key | Default | Description |
|---|---|---|
feed_start / feed_end | "202602" / "202605" | Inclusive YYYYMM month range to process |
entity_spine_chunk_rows | 500000 | Rows per output chunk (accepted for CLI compat; the staged SQL driver does not chunk in Python) |
day_workers | 1 | Target days processed concurrently; also --day-workers N on the CLI |
output_root | data/training_output | Output directory (relative resolves against CWD) |
compute.engine | duckdb | Rolling-agg engine for legacy call sites; the skeleton always uses DuckDB |
compute.duckdb_config | {threads, memory_limit, temp_directory, …} | Forwarded to every duckdb.connect(config=…) the skeleton opens |
feature_refs | 6 views | Which feature views to preload (user_profile, artwork_properties, artwork_generation, artwork_aesthetic, artwork_tack, artwork_vector) |
embedding_coverage_filter | enabled | Spine-level filter; drops reco_artwork_view/reco_artwork_view2 rows missing from the Lance store to avoid NULL-embedding rows |
output_columns | see above | Columns written to output parquets |
Day parallelism
Keep day_workers at 1 unless the host has enough CPU, memory, and DuckDB spill bandwidth for N independent daily pipelines at the configured per-day duckdb_config limits.
Output
Per-day dt=YYYY-MM-DD/part-N-0.parquet (Zstd, via ParquetSink). If feed carries feed_session, an incremental integer qid is assigned and persisted in output_root/qid_map.parquet (grows across runs).
Next
- Enrich Stage — appends avg-pooled artwork embeddings.
- Overview — the full
compile → skeleton → enrichDAG.