Skip to content

Putting It All Together – Building a Graph Experiment

Crystallize experiments can depend on one another by publishing artifacts and consuming them downstream. The CLI recognises these relationships and runs the graph in topological order. This tutorial builds a minimal two-stage pipeline: a producer writes an artifact, and a consumer reads it.

  1. Launch crystallize and press n.

  2. Name the experiment producer. Include datasources.py, steps.py, and outputs.py. Enable Add example code for quick scaffolding.

  3. In outputs.py, add helpers that turn a Python dictionary into JSON bytes and back:

    import json
    from pathlib import Path
    def dump_json(payload: dict) -> bytes:
    return json.dumps(payload).encode("utf-8")
    def load_json(path: Path) -> dict:
    return json.loads(path.read_text())
  4. Update config.yaml:

    name: producer
    outputs:
    summary:
    file_name: summary.json
    writer: dump_json
    loader: load_json
    steps:
    - produce_summary
    treatments:
    baseline: {}
  5. Implement produce_summary in steps.py:

    from crystallize import pipeline_step, Artifact
    @pipeline_step()
    def produce_summary(data: dict, *, summary: Artifact):
    summary.write({"total": sum(data["numbers"])})
    return data
  6. Ensure the datasource returns the numbers:

    from crystallize import data_source, FrozenContext
    @data_source
    def numbers(ctx: FrozenContext) -> dict[str, list[int]]:
    return {"numbers": [1, 2, 3]}

Running producer now writes data/producer/v0/.../summary.json.

  1. Press n again and name the experiment consumer. Include datasources.py and steps.py. Tick Use outputs from other experiments and select producer -> summary.

  2. The generated config.yaml includes a datasource reference:

    datasource:
    producer_summary: producer#summary
    steps:
    - inspect_summary
  3. Implement the step to utilise the loaded JSON:

    from crystallize import pipeline_step
    @pipeline_step()
    def inspect_summary(data: dict):
    summary = data["producer_summary"] # already a dict thanks to load_json
    total = summary["total"]
    return data, {"total_from_producer": total}

Because we defined a loader in the producer, data["producer_summary"] is a dictionary instead of raw bytes.

  • On the selection screen, the consumer experiment is marked as a graph (its datasource references another experiment).
  • Highlight consumer and press Enter. The CLI automatically runs producer first, then consumer, respecting the dependency.
  • The summary tab includes separate sections for the producer and consumer, with artifacts and metrics for each.
  • Graph experiments can reference multiple outputs. Each alias becomes an entry in the datasource payload.
  • The CLI persists treatment state per experiment. If you disable a treatment upstream, the consumer run uses the subset of outputs generated by the active treatments.
  • You can load the entire directory programmatically via ExperimentGraph.from_yaml("experiments/") and call .visualize_from_yaml(...) to render the DAG.
  • When branching graphs become large, use cli.priority and cli.group to keep the selection tree navigable.