Skip to content

Execution & Tuning

daeda pipeline train <service> compiles a feature service and runs its operator DAG through the executor. This page covers the run flags, day partitioning and parallelism, the Ray envelope that bounds the pythonic enrichment operators, and the config files that drive a run.

bash
daeda pipeline train dssm_ranking

For the one-pipeline mental model see Overview; for the operator-by-operator breakdown see Operators & Optimization.

daeda pipeline train flags

FlagDefaultDescription
SERVICE(required)Feature service name (e.g. dssm_ranking).
--feature-views-dirfeature_viewsDirectory of feature view YAML.
--feature-services-dirfeature_servicesDirectory of feature service YAML.
--runtime-config-pathconfig/training/runtime.yamlTraining runtime config.
--aggregation-config-pathconfig/training/aggregations.yamlAggregation + embedding-store config.
--output-root(from config)Override the output root.
--target-date(all days)Narrow a run to a single YYYY-MM-DD day.
--day-workers(from config, 1)Concurrent target days.
--skeleton-onlyoffRun only the source-side operators.
--enrich-onlyoffRun only the enrichment operators.
--ray-num-cpus16Ray logical CPUs for the pythonic enrichment operators.

--skeleton-only / --enrich-only run a sub-range of the DAG

These flags do not select "stages" — they select a contiguous sub-range of the one operator DAG:

  • --skeleton-only runs only the source-side operators — every sql / sql_arrow_udf operator (Scan / Filter / Project / Join / PointInTimeJoin / RollingAggregate / Sink). It stops before the RayUdfTransform enrichment operators.
  • --enrich-only runs only the enrichment operators — the pythonicRayUdfTransform(s) — over an already-written source-side output.

A run with no flag executes the full DAG: source-side operators first, then the enrichment operators (only if the pipeline declares a RayUdfTransform). Both sub-ranges are resumable, so a failed run is safe to re-run.

bash
# Full DAG, single day
daeda pipeline train dssm_ranking --target-date 2026-05-15

# Source-side operators only, then enrichment operators only
daeda pipeline train dssm_ranking --skeleton-only --target-date 2026-05-15
daeda pipeline train dssm_ranking --enrich-only   --target-date 2026-05-15

# Override config + output locations, size the Ray envelope
daeda pipeline train dssm_ranking \
    --runtime-config-path config/training/runtime.yaml \
    --aggregation-config-path config/training/aggregations.yaml \
    --output-root /path/to/output \
    --ray-num-cpus 16

Day partitioning & parallelism

The pipeline iterates every day from feed_start through feed_end (inclusive), processing each as an independent unit and writing one dt=YYYY-MM-DD/part-N-0.parquet partition per day. --target-date narrows a run to a single day; the month range and chunk sizes are config-driven in config/training/runtime.yaml.

--day-workers N (or day_workers in runtime.yaml) processes N target days concurrently. The default is 1; raise it only when the host has enough CPU, memory, and DuckDB spill bandwidth for N independent daily pipelines. The source-side operators are memory-bounded per day — each major operator is materialized as its own DuckDB COPY stage so memory is released between the spine, rolling, lookup, and assembly work.

The Ray envelope for pythonic operators

The RayUdfTransform enrichment operators run under the 16c / 64 GiBRayEnvelope. A singleton local Ray cluster is sized to that budget; the actor pool auto-sizes to ~80% of the worker-CPU budget so Ray Data read/write tasks aren't starved.

ResourceDefaultNotes
Ray logical CPUs (--ray-num-cpus)16Total CPUs exposed to the local Ray node.
Ray memory budget64 GiBTotal, including object store + system reservation.
Object store20 GiBRay object store size.
Ray system reservation1 CPU / 4 GiBReserved for Ray system processes.
Worker CPU budgetray-num-cpus − 1Actor pool auto-sizes to ~80% of this.

Key behaviors:

  • --preload-day-embeddings (default on) — the day's needed FP16 embedding subset is loaded once on the driver and shared with actors via the object store. A single day touches only ~470K distinct artworks, so this stays under 1 GiB.
  • Chronological sharded output — output is sharded parquet under dt=YYYY-MM-DD/ with Ray Data preserve_order on, so each shard is a contiguous chronological event_timestamp range (no serial single-file merge).
  • Day-atomic & resumable — days are processed sequentially for memory isolation; an already-enriched day is skipped, so a failed run resumes cleanly.

Read-only cgroup pods

Ray's enable_resource_isolation needs a writable cgroup v2. On read-only-cgroup devpods it is off by default — enforce the envelope with the external memory governor (bench/memcap.py) instead.

The Lance embedding pool is cumulative (current pinned: data/store/v6/artwork_embedding, ~42.5M rows / 185 GB), and image_embedding lives only on the enrichment side — it is never read by the source-side operators.

Config files

A run is driven by three layered config files (Pydantic Settings with YAML sources):

FileDrives
config/base.yamlFeature dimensions, feature lists, S3 paths.
config/training/runtime.yamlChunk sizes, feed_start / feed_end, feature_refs, output_columns, day_workers, compute.engine (the rolling_engine for RollingAggregate).
config/training/aggregations.yamlaggregation_specs, enrichment_specs, embedding_stores.

feature_refs selects which views the source-side Scan/Join operators load (all columns in each view's features are loaded, not just the referenced one). output_columns entries given as {name, default} dicts are filled with the specified literal (e.g. NULL) when absent — so the schema is consistent even for a --skeleton-only run that hasn't appended embeddings yet.

Next