Skip to content

Creating Custom Pipeline Steps

Crystallize pipelines are built from small, deterministic steps. This guide shows how to create your own steps with the @pipeline_step decorator and how they interact with the immutable context (ctx), metrics collection, and caching.

Use the decorator on a regular function. data is required while ctx is optional. Additional keyword arguments make your step configurable and are automatically pulled from the execution context when not supplied explicitly.

from crystallize import pipeline_step
from crystallize import FrozenContext
@pipeline_step(cacheable=True)
def scale_data(data: list, ctx: FrozenContext, *, factor: float = 1.0) -> list:
"""Multiply incoming numeric data by ``factor``.
The ``factor`` parameter is automatically populated from the context if it
exists, otherwise its default is used.
"""
return [x * factor for x in data]

Calling scale_data() returns a PipelineStep instance. You can create one directly or pass parameters with functools.partial when adding it to a pipeline:

from functools import partial
scaler = scale_data(factor=2.0) # explicit instantiation
# or later:
steps = [partial(scale_data, factor=2.0), normalize]

Either form works with a Pipeline.

Any keyword argument in the step signature (besides data and ctx) will be looked up in the :class:FrozenContext when the step runs. This keeps your dependencies explicit and avoids repetitive ctx.get() calls.

The scale_data function above demonstrates this pattern—factor is pulled from the context if you don’t provide it when constructing the step.

The context object is still important for information that isn’t represented by parameters. Examples include:

  • reading ctx.get('replicate') to know the current replicate number
  • recording metrics with ctx.metrics.add()
  • saving files or other artifacts via Artifact.write()

Use parameter injection for simple values and fall back to ctx for these framework features.

@pipeline_step()
def normalize(data: list, ctx: FrozenContext) -> list:
scale = ctx.get("scale_factor", 1.0)
return [x / scale for x in data]

Each context has a metrics object used to accumulate results. Use ctx.metrics.add(name, value) to record values in any step. Metrics are aggregated across replicates and passed to hypotheses for verification. Steps may also return a tuple (data, metrics_dict) to add metrics without mutating the context directly.

@pipeline_step()
def compute_sum(data):
total = sum(data)
return data, {"sum": total}

Intermediate steps may also write metrics if useful. All metrics collected across replicates are provided to verifiers.

By default steps created with @pipeline_step are not cached. Set cacheable=True on the decorator to store step outputs based on a hash of the step’s parameters, the step’s code, and the input data. A cached step reruns only when one of these elements changes.

@pipeline_step(cacheable=True)
def heavy_transform(data, ctx: FrozenContext, method: str = "a"): # expensive work
return complex_operation(data, method)

When the same input data and parameters are seen again, Crystallize loads the result from .cache/<step_hash>/<input_hash>.pkl instead of executing the step. Change the parameters, modify the code, or pass different input, and the step runs anew.

Caching is useful for deterministic or long-running operations. Avoid it for highly stochastic steps where reuse would give misleading results.

If your step generates random numbers, configure a reproducible seed using SeedPlugin(seed=value) or provide a custom seed_fn for library-specific RNGs. See Tutorial: Basic Experiment for examples of seeding experiments.

For heavyweight resources (LLMs, embedding models, GPU inferencers), prefer class-based steps so initialization happens once per experiment load instead of every replicate:

from crystallize import PipelineStep
class ModelStep(PipelineStep):
cacheable = False # set to True if outputs can be reused safely
def __init__(self):
self.model = load_heavy_model() # Loaded once when the step is constructed
def __call__(self, data, ctx):
return self.model.predict(data) # Reused across replicates in the same process
@property
def params(self):
return {}

Instantiate ModelStep() once and pass the instance into your Pipeline so all replicates share the loaded model within a worker. For distributed/process execution, each worker loads its own model copy; to share across step instances, wrap the loader in functools.lru_cache at module scope.

Steps can also be defined as async functions when they perform I/O bound work like network requests. Decorate the async function with @pipeline_step and Crystallize will await it automatically during asynchronous experiment runs. Synchronous code can still call :class:Pipeline.run, which wraps the async execution internally.

import asyncio
@pipeline_step()
async def fetch_remote(data: str, ctx: FrozenContext) -> str:
await asyncio.sleep(0.1)
return data + "!"
from functools import partial
from crystallize import ParallelExecution, Experiment, Pipeline, FrozenContext
# Step definitions from above
exp = Experiment(
datasource=lambda ctx: [1, 2, 3],
pipeline=Pipeline([
partial(scale_data, factor=1.5),
normalize(),
compute_sum(),
]),
plugins=[ParallelExecution()],
)
exp.validate() # optional
result = exp.run(replicates=3)
print(result.metrics.baseline.metrics)

This small experiment scales and normalizes data, then records the sum metric for each replicate.

  • Why can’t my step modify ctx directly? FrozenContext is immutable to avoid side effects. Use ctx.add() only to add new keys (usually inside treatments).
  • Metrics missing in the result? Verify each step calls ctx.metrics.add for values you want to analyze.
  • Caching not taking effect? Verify cacheable=True, parameters are hashable, and that the input data is identical between runs.