Creating Custom Pipeline Steps
Crystallize pipelines are built from small, deterministic steps. This guide shows how to create your own steps with the @pipeline_step decorator and how they interact with the immutable context (ctx), metrics collection, and caching.
1. Define a Step with @pipeline_step
Section titled “1. Define a Step with @pipeline_step”Use the decorator on a regular function. data is required while ctx is
optional. Additional keyword arguments make your step configurable and are
automatically pulled from the execution context when not supplied explicitly.
from crystallize import pipeline_stepfrom crystallize import FrozenContext
@pipeline_step(cacheable=True)def scale_data(data: list, ctx: FrozenContext, *, factor: float = 1.0) -> list: """Multiply incoming numeric data by ``factor``.
The ``factor`` parameter is automatically populated from the context if it exists, otherwise its default is used. """ return [x * factor for x in data]Calling scale_data() returns a PipelineStep instance. You can create one
directly or pass parameters with functools.partial when adding it to a
pipeline:
from functools import partial
scaler = scale_data(factor=2.0) # explicit instantiation# or later:steps = [partial(scale_data, factor=2.0), normalize]Either form works with a Pipeline.
2. Automatic Parameter Injection
Section titled “2. Automatic Parameter Injection”Any keyword argument in the step signature (besides data and ctx) will be
looked up in the :class:FrozenContext when the step runs. This keeps your
dependencies explicit and avoids repetitive ctx.get() calls.
The scale_data function above demonstrates this pattern—factor is pulled
from the context if you don’t provide it when constructing the step.
3. When to Use ctx
Section titled “3. When to Use ctx”The context object is still important for information that isn’t represented by parameters. Examples include:
- reading
ctx.get('replicate')to know the current replicate number - recording metrics with
ctx.metrics.add() - saving files or other artifacts via
Artifact.write()
Use parameter injection for simple values and fall back to ctx for these
framework features.
@pipeline_step()def normalize(data: list, ctx: FrozenContext) -> list: scale = ctx.get("scale_factor", 1.0) return [x / scale for x in data]4. Recording Metrics
Section titled “4. Recording Metrics”Each context has a metrics object used to accumulate results. Use ctx.metrics.add(name, value) to record values in any step. Metrics are aggregated across replicates and passed to hypotheses for verification. Steps may also return a tuple (data, metrics_dict) to add metrics without mutating the context directly.
@pipeline_step()def compute_sum(data): total = sum(data) return data, {"sum": total}Intermediate steps may also write metrics if useful. All metrics collected across replicates are provided to verifiers.
5. Enabling Caching
Section titled “5. Enabling Caching”By default steps created with @pipeline_step are not cached. Set cacheable=True on the decorator to store step outputs based on a hash of the step’s parameters, the step’s code, and the input data. A cached step reruns only when one of these elements changes.
@pipeline_step(cacheable=True)def heavy_transform(data, ctx: FrozenContext, method: str = "a"): # expensive work return complex_operation(data, method)When the same input data and parameters are seen again, Crystallize loads the result from .cache/<step_hash>/<input_hash>.pkl instead of executing the step. Change the parameters, modify the code, or pass different input, and the step runs anew.
Caching is useful for deterministic or long-running operations. Avoid it for highly stochastic steps where reuse would give misleading results.
If your step generates random numbers, configure a reproducible seed using
SeedPlugin(seed=value) or provide a custom seed_fn
for library-specific RNGs. See
Tutorial: Basic Experiment
for examples of seeding experiments.
6. Resource Management
Section titled “6. Resource Management”For heavyweight resources (LLMs, embedding models, GPU inferencers), prefer class-based steps so initialization happens once per experiment load instead of every replicate:
from crystallize import PipelineStep
class ModelStep(PipelineStep): cacheable = False # set to True if outputs can be reused safely
def __init__(self): self.model = load_heavy_model() # Loaded once when the step is constructed
def __call__(self, data, ctx): return self.model.predict(data) # Reused across replicates in the same process
@property def params(self): return {}Instantiate ModelStep() once and pass the instance into your Pipeline so all replicates share the loaded model within a worker. For distributed/process execution, each worker loads its own model copy; to share across step instances, wrap the loader in functools.lru_cache at module scope.
7. Asynchronous Steps
Section titled “7. Asynchronous Steps”Steps can also be defined as async functions when they perform I/O bound work
like network requests. Decorate the async function with @pipeline_step and
Crystallize will await it automatically during asynchronous experiment runs.
Synchronous code can still call :class:Pipeline.run, which wraps the async
execution internally.
import asyncio
@pipeline_step()async def fetch_remote(data: str, ctx: FrozenContext) -> str: await asyncio.sleep(0.1) return data + "!"Example: Putting It Together
Section titled “Example: Putting It Together”from functools import partialfrom crystallize import ParallelExecution, Experiment, Pipeline, FrozenContext
# Step definitions from aboveexp = Experiment( datasource=lambda ctx: [1, 2, 3], pipeline=Pipeline([ partial(scale_data, factor=1.5), normalize(), compute_sum(), ]), plugins=[ParallelExecution()],)exp.validate() # optionalresult = exp.run(replicates=3)print(result.metrics.baseline.metrics)This small experiment scales and normalizes data, then records the sum metric for each replicate.
Troubleshooting & FAQs
Section titled “Troubleshooting & FAQs”- Why can’t my step modify
ctxdirectly?FrozenContextis immutable to avoid side effects. Usectx.add()only to add new keys (usually inside treatments). - Metrics missing in the result? Verify each step calls
ctx.metrics.addfor values you want to analyze. - Caching not taking effect? Verify
cacheable=True, parameters are hashable, and that the input data is identical between runs.
Next Steps
Section titled “Next Steps”- Learn how to add custom treatments to pass values into
ctx. - See the Reference: PipelineStep for API details.
- Explore Explanation: Reproducibility for more on caching and context design.