Skip to content

Enrich Stage

The enrich stage is the Pythonic stage of the training DAG. It reads the per-day skeleton parquets and appends avg-pooled artwork embeddings — image_embedding and like_artwork_avg_embeds — using Ray Data streaming actors backed by a Lance embedding store.

Engine tag: pythonic. Entry point: run_enrich_stage() in src/daedalus/pipelines/dynamic_feature.py. It runs automatically after the skeleton stage in daeda pipeline train, or standalone against an existing skeleton output:

bash
daeda pipeline train dssm_ranking --enrich-only --target-date 2026-05-15

What it does

  • Reads every dt=YYYY-MM-DD/ partition under the skeleton output root (plus back-compat flat YYYY-MM-DD.parquet files).
  • Appends image_embedding (scalar artwork_id → single Lance lookup) and like_artwork_avg_embeds (avg-pool over like_artwork_ids_7d, the canonical rolling pool) via Ray Data streaming actors.
  • Writes sharded dt=YYYY-MM-DD/*.parquet directly from Ray Data with preserve_order on — each shard is a contiguous, chronological event_timestamp range, so there is no serial single-file merge.
  • Emits the Lance-backed embedding columns as HALF_FLOAT[].

This stage is Arrow-native end-to-end — the avg-pool helper builds the output Arrow ListArray directly from numpy buffers and offsets rather than round-tripping through Python lists. See Arrow-Native Design.

Resumable / day-atomic

Days already present in the output directory are skipped, so a failed run is safe to re-run. Each day is processed independently.

The Ray envelope

The stage spins up a singleton local Ray cluster sized to the 16c / 64 GiB envelope. The defaults come from RayEnvelope in executor.py and the --ray-num-cpus CLI flag:

bash
daeda pipeline train dssm_ranking --enrich-only --ray-num-cpus 16
SettingDefaultNotes
--ray-num-cpus / ray_num_cpus16Ray logical CPUs
ray_memory64 GiBLogical memory budget
ray_object_store_memory20 GiBRay object store
num_workersautoActor pool auto-sizes to ~80% of the worker-CPU budget
preload_day_embeddingsTruePreload the day's FP16 subset once on the driver

Actor-pool sizing (~80%)

When num_workers is unset, the actor pool is sized to ~80% of the worker-CPU budget (actor_cpu_budget = worker_cpu_budget * 0.8), reserving ~20% for Ray Data read/write tasks. An all-CPU actor pool would starve IO and stall the streaming pipeline. The pool is then capped so workers never exceed the envelope's worker CPUs.

Resource isolation on read-only-cgroup pods

Ray's enable_resource_isolation needs a writable cgroup (off by default here). On read-only-cgroup pods, enforce the envelope with an external memory governor (bench/memcap.py).

Preloading day embeddings

--preload-day-embeddings is on by default. A single day touches only ~470K distinct artworks, so the day's needed FP16 embedding subset is loaded once on the driver and shared with the actors via the Ray object store (stays well under 1 GiB). Each actor's __init__ then loads only the ID index, not the full pool.

The Lance embedding store

Artwork embeddings live in a Lance directory (a dataset is a directory, so the path has no extension), configured under embedding_stores in config/training/aggregations.yaml:

yaml
embedding_stores:
    - name: artwork_embeddings
      type: lance
      path: data/store/v6/artwork_embedding
      id_column: id
      embedding_column: image_embedding
      embedding_dim: 1152
      embedding_dtype: float16
      build_batch_size: 50000
      source_parquet: data/mewtant/siglip2_vectors

The cumulative v6 pool (data/store/v6/artwork_embedding, ~42.5M rows / 185 GB) is the currently pinned store. Embeddings are stored as FixedSizeList<float16> with a BTREE scalar index on artwork_id. The store is built automatically on first run via a single DuckDB hive_partitioning query that streams batches straight into Lance, keeping peak memory bounded to one batch at a time.

Local-disk only

The store must reside on a local ext4 filesystem (e.g. /local/). GPFS / network mounts are not supported due to a Lance atomic-rename limitation.

Because image_embedding is resolved here from Lance, it lives only on the enrich side — it is never read on the skeleton side.

Memory budget

Production profile (31 workers, 128 CPUs):

ComponentMemory
Lance ID index per actor (~23 M artworks)~180 MB
Per-batch embeddings (50 K rows × 1152-dim float16)~115 MB
All Ray workers combined~5 GB
Ray object store~10 GB
Total peak~15 GB

Next