Skip to main content
A stage is one step in a workflow’s pipeline array. Stages execute in order, one after another, within each workflow execution. Each stage can build a payload from scratch or inherit the previous stage’s payload, apply field assignments, mutate shared workflow state, and optionally dispatch the resulting event to your broker. Together, a sequence of stages lets you model realistic multi-event flows — from a single produce operation to complex conditional loops.

Complete Stage Example

The following stage builds an order.item.added event, conditionally emits it only when the item price is positive, increments a state counter, and waits one second after dispatching before the next stage begins:
{
  "stage": "add-line-item",
  "event": "order.item.added",
  "schemaId": "order-item-schema",
  "source": "none",
  "repeat": 3,
  "set": {
    "itemId": { "$fn": "uuid()" },
    "productCode": { "$fn": "choice(SKU-001, SKU-002, SKU-003)" },
    "quantity": { "$fn": "int(1, 10)" },
    "unitPrice": { "$fn": "int(100, 9999)" }
  },
  "state": {
    "order.itemCount": { "$fn": "add(state.order.itemCount, 1)" }
  },
  "emit": { "$fn": "gt(.unitPrice, 0)" },
  "wait": "1s",
  "headers": {
    "x-source": "devset",
    "x-schema-version": "2"
  }
}

Field Reference

stage
string
required
A unique identifier for this stage within the workflow. Used in logs and error messages. Choose a descriptive slug such as "build-order" or "add-line-item".
event
string
The logical name of the event this stage produces, for example "order.created" or "payment.authorised". This label appears in the run’s event log and is used to match against schema definitions.
schemaId
string
Overrides the workflow-level schemaId for this stage only. Use this when one stage produces a structurally different event from the workflow default. See Schemas for details.
source
string
required
Controls where the stage’s initial payload comes from before set assignments are applied.
  • "none" — start with an empty payload object.
  • "previous-stage" — copy the payload produced by the immediately preceding stage and modify it.
repeat
integer
Repeat this stage a fixed number of times within a single execution. The stage’s set, state, and emit logic runs on every iteration. Use alongside state mutations to produce sequences of related events.
repeatWhile
object
A DSL expression evaluated before each iteration. The stage keeps repeating as long as the expression resolves to true. The stage does not execute at all if the condition is false on the first check.
{
  "repeatWhile": { "$fn": "lt(state.order.itemCount, 5)" }
}
repeatUntil
object
A DSL expression evaluated after each iteration. The stage repeats until the expression resolves to true, meaning the stage body always executes at least once.
{
  "repeatUntil": { "$fn": "gte(state.order.itemCount, 5)" }
}
emit
boolean | object
Controls whether this stage dispatches its event to the broker.
  • true — always emit.
  • false or omitted — never emit; the stage runs its set and state logic but does not publish.
  • An expression object — emit conditionally based on payload or state values.
{ "emit": { "$fn": "gt(.unitPrice, 0)" } }
wait
string
A duration string that pauses execution after the stage body runs and before the next stage begins. Use this to simulate realistic inter-event timing in your consumer tests.The supported suffixes are:
ValueDuration
"500ms"500 milliseconds
"1s"1 second
"30s"30 seconds
"1m"1 minute
wait is skipped when running a workflow in simulation mode, so your pipeline logic can be validated quickly without real delays.
set
object
Assigns values to fields in the event payload. Keys are dot-notation field paths; values are literals or DSL expressions.
{
  "set": {
    "orderId": { "$fn": "uuid()" },
    "status": "pending",
    "createdAt": { "$fn": "now()" }
  }
}
state
object
Mutates the workflow-scoped state object. Uses the same key-value syntax as set. Changes made here are visible to all subsequent stages in the same execution.
{
  "state": {
    "sequence.index": { "$fn": "add(state.sequence.index, 1)" }
  }
}
headers
object
Free-form string key-value pairs attached as message headers to the dispatched event. Useful for routing metadata, schema version stamps, or correlation IDs.
{
  "headers": {
    "x-correlation-id": "abc123",
    "x-schema-version": "3"
  }
}
key
object
(Kafka only) The Kafka message key. Accepts the same value syntax as set fields, including DSL expressions. Use this to control partition assignment.
{
  "key": { "$ref": "customerId" }
}
query
object
Runs a MongoDB lookup and stores the result in workflow state before the stage’s set logic runs. Useful for seeding payloads from real data.Fields:
FieldDescription
connectionName of the MongoDB connector
databaseDatabase name
collectionCollection name
findMongoDB filter document
selectFields to project
{
  "query": {
    "connection": "local-mongo",
    "database": "shop",
    "collection": "products",
    "find": { "active": true },
    "select": { "productId": 1, "price": 1 }
  }
}
wireFormat
object
Configures Protobuf binary framing when contentType is "application/x-protobuf". See the Schemas page for wire-format details.

Repeat Patterns

Fixed repeat

Use repeat when you know exactly how many iterations you need:
{
  "stage": "add-items",
  "source": "none",
  "repeat": 5,
  "set": {
    "itemId": { "$fn": "uuid()" },
    "quantity": { "$fn": "int(1, 10)" }
  },
  "emit": true
}

Loop with repeatWhile

The condition is checked before each iteration. The stage does not execute if the condition is false from the start:
{
  "stage": "drain-queue",
  "source": "none",
  "repeatWhile": { "$fn": "lt(state.queue.remaining, 100)" },
  "set": {
    "taskId": { "$fn": "uuid()" }
  },
  "state": {
    "queue.remaining": { "$fn": "add(state.queue.remaining, 1)" }
  },
  "emit": true
}

Loop with repeatUntil

The condition is checked after each iteration, so the stage always executes at least once:
{
  "stage": "fill-batch",
  "source": "none",
  "set": {
    "batchId": { "$fn": "uuid()" }
  },
  "state": {
    "batch.size": { "$fn": "add(state.batch.size, 1)" }
  },
  "repeatUntil": { "$fn": "gte(state.batch.size, 10)" },
  "emit": true
}
Avoid using repeatWhile with a condition that is never satisfied — this creates an infinite loop. Always ensure state mutations inside the loop move the condition toward termination.

Conditional Emit

Set emit to a $fn expression to publish events selectively. The engine evaluates the expression after set assignments are applied, so you can gate on freshly computed payload fields:
{
  "stage": "high-value-order",
  "source": "none",
  "set": {
    "total": { "$fn": "int(0, 500)" }
  },
  "emit": { "$fn": "gte(.total, 100)" }
}
Only orders with a total of 100 or more are dispatched to the broker.

Pacing with wait

Use wait to introduce realistic timing between events, for example simulating a user journey where each action is seconds apart:
[
  {
    "stage": "user-login",
    "event": "user.login",
    "source": "none",
    "set": { "userId": { "$fn": "uuid()" } },
    "emit": true
  },
  {
    "stage": "add-to-cart",
    "event": "cart.item.added",
    "source": "previous-stage",
    "wait": "2s",
    "set": { "productId": { "$fn": "uuid()" } },
    "emit": true
  },
  {
    "stage": "checkout",
    "event": "order.placed",
    "source": "previous-stage",
    "wait": "5s",
    "emit": true
  }
]

Next Steps

  • Expressions — the full reference for $fn, $ref, $path, and conditional values