Skip to main content
A stage is a single step inside a workflow’s pipeline array. Each execution of the pipeline runs every stage in order — Devset builds the outbound message payload, optionally dispatches it to the broker, and updates workflow state before moving to the next stage. You can configure a stage to repeat, pause, pull data from MongoDB, or emit conditionally based on live values.

Full Stage Example

The following stage generates a random order payload, emits it to the broker only when the running count is below one hundred, and updates two state values:
{
  "stage": "order-created",
  "event": "order-created",
  "schemaId": "order-created-v2",
  "source": "none",
  "repeat": 3,
  "repeatUntil": { "$fn": "gte(.attempt, 3)" },
  "emit": { "$fn": "lt(state.order.count, 100)" },
  "wait": "500ms",
  "set": {
    "id": { "$fn": "uuid()" },
    "amount": { "$fn": "int(10, 500)" },
    "status": { "$fn": "choice(PENDING,ACTIVE)" },
    "createdAt": { "$fn": "now()" }
  },
  "state": {
    "order.total": { "$fn": "add(state.order.total, .amount)" },
    "order.count": { "$fn": "add(state.order.count, 1)" }
  },
  "headers": {
    "x-source": "devset",
    "x-schema-version": "2"
  },
  "key": { "$fn": "uuid()" }
}

Stage Fields

stage
string
required
A unique identifier for this stage within the workflow. Used in logs and error messages to pinpoint which step produced an issue. Stage identifiers do not need to be globally unique across workflows — only within the same pipeline array.
event
string
A logical event or schema name associated with this stage. Devset uses this for display purposes and schema lookup when no explicit schemaId is provided on the stage.
schemaId
string
Overrides the workflow-level schemaId for this stage only. Use this when a pipeline publishes messages to multiple schemas or schema versions within the same workflow run.
source
"none" | "previous-stage"
required
Controls how the initial payload for this stage is seeded.
  • "none" — start with an empty payload object. All fields must be assigned via set.
  • "previous-stage" — copy the fully resolved payload from the previous stage as the starting point. Useful when you want to evolve a message across multiple stages without repeating common fields.
emit
boolean | DslValue
Controls whether the resolved payload is dispatched to the broker.
  • true — always emit.
  • false — build the payload and update state but do not send. This is also the behaviour when emit is omitted entirely.
  • A { "$fn": "..." } expression — evaluate the function and emit only if the result is truthy.
See Conditional Emit below for a worked example.
wait
string
A duration string to pause before the message is dispatched. Supported units: ms (milliseconds), s (seconds), m (minutes). For example "1s" waits one second, "500ms" waits 500 milliseconds, and "2m" waits two minutes. Useful for simulating realistic timing between events in a scenario.
The wait pause is skipped entirely when running in the Devset Playground or any simulation mode. It only takes effect during a live execution against a real broker.
set
map<string, DslValue>
A map of payload field names to their assigned values. Each value can be a literal string or number, or a { "$fn": "..." } expression. Fields are resolved in declaration order, so you can reference an earlier field’s value in a later assignment within the same set block.
"set": {
  "id":        { "$fn": "uuid()" },
  "amount":    { "$fn": "int(10, 500)" },
  "status":    "PENDING",
  "createdAt": { "$fn": "now()" }
}
state
map<string, DslValue>
A map of workflow state paths to new values. Mutations are applied after the payload is fully resolved. Paths use dot notation to target nested state keys — for example "order.total" updates state.order.total. See the Workflow DSL page for details on how state persists across executions.
"state": {
  "order.total": { "$fn": "add(state.order.total, .amount)" },
  "order.count": { "$fn": "add(state.order.count, 1)" }
}
headers
map<string, string>
Key-value pairs to include as message headers. For Kafka, these become record headers. For RabbitMQ, they are added to the message’s headers property. Values must be plain strings.
"headers": {
  "x-source": "devset",
  "x-correlation-id": "abc123"
}
key
DslValue
The Kafka message key. Accepts a literal string or a { "$fn": "..." } expression. Kafka uses this key for partition assignment, so consistent keys route related messages to the same partition.
"key": { "$fn": "uuid()" }
repeat
integer
Repeat this stage the specified number of times before moving to the next stage. The stage’s set and state blocks are re-evaluated on every iteration, so each repeat produces a fresh payload. Combine with repeatWhile or repeatUntil for conditional loops.
repeatWhile
DslValue
A { "$fn": "..." } expression evaluated before each repeat iteration. If the expression returns false, the loop stops and the pipeline advances to the next stage. The condition is checked before the first iteration, so the stage may execute zero times if the condition is immediately false.
repeatUntil
DslValue
A { "$fn": "..." } expression evaluated after each repeat iteration. If the expression returns true, the loop stops. Because the check happens after execution, the stage always runs at least once regardless of the initial condition.
query
QueryBlock
A MongoDB lookup block that runs before the stage’s set block is evaluated. The results are loaded into workflow state so you can use them in subsequent field assignments. See Query Block below.
wireFormat
WireFormatBlock
Configures the binary framing applied to the serialized Protobuf payload. Only relevant when the workflow’s contentType is "application/x-protobuf". See the Wire Format reference for full details.

Repeat Loops

The repeat, repeatWhile, and repeatUntil fields give you full control over how many times a single stage executes before the pipeline advances.

Fixed repeat

Use repeat alone to run a stage an exact number of times:
{
  "stage": "heartbeat",
  "repeat": 5,
  "emit": true,
  "set": {
    "ts": { "$fn": "nowms()" }
  }
}
This stage emits five heartbeat messages in sequence, each with a fresh timestamp.

Pre-condition loop with repeatWhile

repeatWhile is checked before each iteration. The stage stops as soon as the condition becomes false:
{
  "stage": "poll",
  "repeatWhile": { "$fn": "lt(state.attempts, 10)" },
  "emit": true,
  "set": {
    "attempt": { "$fn": "add(state.attempts, 1)" }
  },
  "state": {
    "attempts": { "$fn": "add(state.attempts, 1)" }
  }
}

Post-condition loop with repeatUntil

repeatUntil is checked after each iteration. The stage always executes at least once:
{
  "stage": "retry",
  "repeatUntil": { "$fn": "gte(state.retries, 3)" },
  "emit": true,
  "set": {
    "retryNum": { "$fn": "add(state.retries, 1)" }
  },
  "state": {
    "retries": { "$fn": "add(state.retries, 1)" }
  }
}
You can combine repeat with repeatWhile or repeatUntil. The loop ends as soon as either the fixed count is reached or the condition triggers — whichever comes first.

Conditional Emit

The emit field accepts a { "$fn": "..." } expression that is evaluated at runtime. This lets you send messages only when certain conditions hold, without splitting logic across multiple stages.
{
  "stage": "order-created",
  "emit": { "$fn": "lt(state.order.count, 100)" },
  "set": {
    "id":     { "$fn": "uuid()" },
    "amount": { "$fn": "int(10, 500)" }
  },
  "state": {
    "order.count": { "$fn": "add(state.order.count, 1)" }
  }
}
In this example, the stage builds and state-mutates on every execution, but only dispatches to the broker for the first 100 executions. See the Functions reference for the full list of comparison functions you can use in emit expressions.

Query Block

The query block performs a MongoDB lookup before the stage’s payload is assembled. Use it to seed workflow state with real data from your database, then reference those values in set assignments.
query.connection
string
required
The name of a configured database connector. This must match a connector registered in your Devset environment.
query.database
string
required
The MongoDB database name to query.
query.collection
string
required
The MongoDB collection name to query.
query.find
object
required
A MongoDB filter document using standard db.collection.find({...}) syntax. For example, { "status": "ACTIVE" } returns only active documents.
query.select
map<string, string>
Maps document fields to workflow state paths. The key is the target state path (dot notation) and the value is the source field name from the matched document.
"select": {
  "lookup.userId":   "userId",
  "lookup.planTier": "subscription.tier"
}
After the query runs, state.lookup.userId holds the document’s userId value and state.lookup.planTier holds subscription.tier.

Full Query Example

{
  "stage": "enrich-order",
  "query": {
    "connection": "mongo-primary",
    "database": "commerce",
    "collection": "users",
    "find": { "status": "ACTIVE" },
    "select": {
      "lookup.userId":   "userId",
      "lookup.tier":     "subscription.tier"
    }
  },
  "emit": true,
  "set": {
    "userId": { "$path": "state.lookup.userId" },
    "tier":   { "$path": "state.lookup.tier" },
    "amount": { "$fn": "int(10, 500)" }
  }
}
The query block runs once per stage execution. If the filter matches multiple documents, Devset uses the first result. Design your find filter to be as specific as possible to avoid non-deterministic lookups.