Enrich Stage
The enrich stage is the Pythonic stage of the training DAG. It reads the per-day skeleton parquets and appends avg-pooled artwork embeddings — image_embedding and like_artwork_avg_embeds — using Ray Data streaming actors backed by a Lance embedding store.
Engine tag: pythonic. Entry point: run_enrich_stage() in src/daedalus/pipelines/dynamic_feature.py. It runs automatically after the skeleton stage in daeda pipeline train, or standalone against an existing skeleton output:
daeda pipeline train dssm_ranking --enrich-only --target-date 2026-05-15What it does
- Reads every
dt=YYYY-MM-DD/partition under the skeleton output root (plus back-compat flatYYYY-MM-DD.parquetfiles). - Appends
image_embedding(scalarartwork_id→ single Lance lookup) andlike_artwork_avg_embeds(avg-pool overlike_artwork_ids_7d, the canonical rolling pool) via Ray Data streaming actors. - Writes sharded
dt=YYYY-MM-DD/*.parquetdirectly from Ray Data withpreserve_orderon — each shard is a contiguous, chronologicalevent_timestamprange, so there is no serial single-file merge. - Emits the Lance-backed embedding columns as
HALF_FLOAT[].
This stage is Arrow-native end-to-end — the avg-pool helper builds the output Arrow ListArray directly from numpy buffers and offsets rather than round-tripping through Python lists. See Arrow-Native Design.
Resumable / day-atomic
Days already present in the output directory are skipped, so a failed run is safe to re-run. Each day is processed independently.
The Ray envelope
The stage spins up a singleton local Ray cluster sized to the 16c / 64 GiB envelope. The defaults come from RayEnvelope in executor.py and the --ray-num-cpus CLI flag:
daeda pipeline train dssm_ranking --enrich-only --ray-num-cpus 16| Setting | Default | Notes |
|---|---|---|
--ray-num-cpus / ray_num_cpus | 16 | Ray logical CPUs |
ray_memory | 64 GiB | Logical memory budget |
ray_object_store_memory | 20 GiB | Ray object store |
num_workers | auto | Actor pool auto-sizes to ~80% of the worker-CPU budget |
preload_day_embeddings | True | Preload the day's FP16 subset once on the driver |
Actor-pool sizing (~80%)
When num_workers is unset, the actor pool is sized to ~80% of the worker-CPU budget (actor_cpu_budget = worker_cpu_budget * 0.8), reserving ~20% for Ray Data read/write tasks. An all-CPU actor pool would starve IO and stall the streaming pipeline. The pool is then capped so workers never exceed the envelope's worker CPUs.
Resource isolation on read-only-cgroup pods
Ray's enable_resource_isolation needs a writable cgroup (off by default here). On read-only-cgroup pods, enforce the envelope with an external memory governor (bench/memcap.py).
Preloading day embeddings
--preload-day-embeddings is on by default. A single day touches only ~470K distinct artworks, so the day's needed FP16 embedding subset is loaded once on the driver and shared with the actors via the Ray object store (stays well under 1 GiB). Each actor's __init__ then loads only the ID index, not the full pool.
The Lance embedding store
Artwork embeddings live in a Lance directory (a dataset is a directory, so the path has no extension), configured under embedding_stores in config/training/aggregations.yaml:
embedding_stores:
- name: artwork_embeddings
type: lance
path: data/store/v6/artwork_embedding
id_column: id
embedding_column: image_embedding
embedding_dim: 1152
embedding_dtype: float16
build_batch_size: 50000
source_parquet: data/mewtant/siglip2_vectorsThe cumulative v6 pool (data/store/v6/artwork_embedding, ~42.5M rows / 185 GB) is the currently pinned store. Embeddings are stored as FixedSizeList<float16> with a BTREE scalar index on artwork_id. The store is built automatically on first run via a single DuckDB hive_partitioning query that streams batches straight into Lance, keeping peak memory bounded to one batch at a time.
Local-disk only
The store must reside on a local ext4 filesystem (e.g. /local/). GPFS / network mounts are not supported due to a Lance atomic-rename limitation.
Because image_embedding is resolved here from Lance, it lives only on the enrich side — it is never read on the skeleton side.
Memory budget
Production profile (31 workers, 128 CPUs):
| Component | Memory |
|---|---|
| Lance ID index per actor (~23 M artworks) | ~180 MB |
| Per-batch embeddings (50 K rows × 1152-dim float16) | ~115 MB |
| All Ray workers combined | ~5 GB |
| Ray object store | ~10 GB |
| Total peak | ~15 GB |
Next
- Skeleton Stage — the SQL source layer that produces the input parquets.
- Overview — the full
compile → skeleton → enrichDAG. - Arrow-Native Design — the zero-copy buffer model this stage relies on.