> ## Documentation Index
> Fetch the complete documentation index at: https://docs.devset.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipeline Stage Fields: Complete Reference Guide

> Full reference for every Devset pipeline stage field: payload assignment, state mutation, repeat loops, conditional emit, and MongoDB queries.

A stage is a single step inside a workflow's `pipeline` array. Each execution of the pipeline runs every stage in order — Devset builds the outbound message payload, optionally dispatches it to the broker, and updates workflow state before moving to the next stage. You can configure a stage to repeat, pause, pull data from MongoDB, or emit conditionally based on live values.

## Full Stage Example

The following stage generates a random order payload, emits it to the broker only when the running count is below one hundred, and updates two state values:

```json theme={null}
{
  "stage": "order-created",
  "event": "order-created",
  "schemaId": "order-created-v2",
  "source": "none",
  "repeat": 3,
  "repeatUntil": { "$fn": "gte(.attempt, 3)" },
  "emit": { "$fn": "lt(state.order.count, 100)" },
  "wait": "500ms",
  "set": {
    "id": { "$fn": "uuid()" },
    "amount": { "$fn": "int(10, 500)" },
    "status": { "$fn": "choice(PENDING,ACTIVE)" },
    "createdAt": { "$fn": "now()" }
  },
  "state": {
    "order.total": { "$fn": "add(state.order.total, .amount)" },
    "order.count": { "$fn": "add(state.order.count, 1)" }
  },
  "headers": {
    "x-source": "devset",
    "x-schema-version": "2"
  },
  "key": { "$fn": "uuid()" }
}
```

## Stage Fields

<ParamField path="stage" type="string" required>
  A unique identifier for this stage within the workflow. Used in logs and error messages to pinpoint which step produced an issue. Stage identifiers do not need to be globally unique across workflows — only within the same `pipeline` array.
</ParamField>

<ParamField path="event" type="string">
  A logical event or schema name associated with this stage. Devset uses this for display purposes and schema lookup when no explicit `schemaId` is provided on the stage.
</ParamField>

<ParamField path="schemaId" type="string">
  Overrides the workflow-level `schemaId` for this stage only. Use this when a pipeline publishes messages to multiple schemas or schema versions within the same workflow run.
</ParamField>

<ParamField path="source" type="&#x22;none&#x22; | &#x22;previous-stage&#x22;" required>
  Controls how the initial payload for this stage is seeded.

  * `"none"` — start with an empty payload object. All fields must be assigned via `set`.
  * `"previous-stage"` — copy the fully resolved payload from the previous stage as the starting point. Useful when you want to evolve a message across multiple stages without repeating common fields.
</ParamField>

<ParamField path="emit" type="boolean | DslValue">
  Controls whether the resolved payload is dispatched to the broker.

  * `true` — always emit.
  * `false` — build the payload and update state but do not send. This is also the behaviour when `emit` is omitted entirely.
  * A `{ "$fn": "..." }` expression — evaluate the function and emit only if the result is truthy.

  See [Conditional Emit](#conditional-emit) below for a worked example.
</ParamField>

<ParamField path="wait" type="string">
  A duration string to pause before the message is dispatched. Supported units: `ms` (milliseconds), `s` (seconds), `m` (minutes). For example `"1s"` waits one second, `"500ms"` waits 500 milliseconds, and `"2m"` waits two minutes. Useful for simulating realistic timing between events in a scenario.

  <Note>
    The `wait` pause is skipped entirely when running in the Devset Playground or any simulation mode. It only takes effect during a live execution against a real broker.
  </Note>
</ParamField>

<ParamField path="set" type="map<string, DslValue>">
  A map of payload field names to their assigned values. Each value can be a literal string or number, or a `{ "$fn": "..." }` expression. Fields are resolved in declaration order, so you can reference an earlier field's value in a later assignment within the same `set` block.

  ```json theme={null}
  "set": {
    "id":        { "$fn": "uuid()" },
    "amount":    { "$fn": "int(10, 500)" },
    "status":    "PENDING",
    "createdAt": { "$fn": "now()" }
  }
  ```
</ParamField>

<ParamField path="state" type="map<string, DslValue>">
  A map of workflow state paths to new values. Mutations are applied after the payload is fully resolved. Paths use dot notation to target nested state keys — for example `"order.total"` updates `state.order.total`. See the [Workflow DSL](/reference/workflow-dsl#execution-model) page for details on how state persists across executions.

  ```json theme={null}
  "state": {
    "order.total": { "$fn": "add(state.order.total, .amount)" },
    "order.count": { "$fn": "add(state.order.count, 1)" }
  }
  ```
</ParamField>

<ParamField path="headers" type="map<string, string>">
  Key-value pairs to include as message headers. For Kafka, these become record headers. For RabbitMQ, they are added to the message's `headers` property. Values must be plain strings.

  ```json theme={null}
  "headers": {
    "x-source": "devset",
    "x-correlation-id": "abc123"
  }
  ```
</ParamField>

<ParamField path="key" type="DslValue">
  The Kafka message key. Accepts a literal string or a `{ "$fn": "..." }` expression. Kafka uses this key for partition assignment, so consistent keys route related messages to the same partition.

  ```json theme={null}
  "key": { "$fn": "uuid()" }
  ```
</ParamField>

<ParamField path="repeat" type="integer">
  Repeat this stage the specified number of times before moving to the next stage. The stage's `set` and `state` blocks are re-evaluated on every iteration, so each repeat produces a fresh payload. Combine with `repeatWhile` or `repeatUntil` for conditional loops.
</ParamField>

<ParamField path="repeatWhile" type="DslValue">
  A `{ "$fn": "..." }` expression evaluated **before** each repeat iteration. If the expression returns `false`, the loop stops and the pipeline advances to the next stage. The condition is checked before the first iteration, so the stage may execute zero times if the condition is immediately false.
</ParamField>

<ParamField path="repeatUntil" type="DslValue">
  A `{ "$fn": "..." }` expression evaluated **after** each repeat iteration. If the expression returns `true`, the loop stops. Because the check happens after execution, the stage always runs at least once regardless of the initial condition.
</ParamField>

<ParamField path="query" type="QueryBlock">
  A MongoDB lookup block that runs before the stage's `set` block is evaluated. The results are loaded into workflow state so you can use them in subsequent field assignments. See [Query Block](#query-block) below.
</ParamField>

<ParamField path="wireFormat" type="WireFormatBlock">
  Configures the binary framing applied to the serialized Protobuf payload. Only relevant when the workflow's `contentType` is `"application/x-protobuf"`. See the [Wire Format](/reference/wire-format) reference for full details.
</ParamField>

***

## Repeat Loops

The `repeat`, `repeatWhile`, and `repeatUntil` fields give you full control over how many times a single stage executes before the pipeline advances.

### Fixed repeat

Use `repeat` alone to run a stage an exact number of times:

```json theme={null}
{
  "stage": "heartbeat",
  "repeat": 5,
  "emit": true,
  "set": {
    "ts": { "$fn": "nowms()" }
  }
}
```

This stage emits five heartbeat messages in sequence, each with a fresh timestamp.

### Pre-condition loop with `repeatWhile`

`repeatWhile` is checked **before** each iteration. The stage stops as soon as the condition becomes false:

```json theme={null}
{
  "stage": "poll",
  "repeatWhile": { "$fn": "lt(state.attempts, 10)" },
  "emit": true,
  "set": {
    "attempt": { "$fn": "add(state.attempts, 1)" }
  },
  "state": {
    "attempts": { "$fn": "add(state.attempts, 1)" }
  }
}
```

### Post-condition loop with `repeatUntil`

`repeatUntil` is checked **after** each iteration. The stage always executes at least once:

```json theme={null}
{
  "stage": "retry",
  "repeatUntil": { "$fn": "gte(state.retries, 3)" },
  "emit": true,
  "set": {
    "retryNum": { "$fn": "add(state.retries, 1)" }
  },
  "state": {
    "retries": { "$fn": "add(state.retries, 1)" }
  }
}
```

<Tip>
  You can combine `repeat` with `repeatWhile` or `repeatUntil`. The loop ends as soon as either the fixed count is reached or the condition triggers — whichever comes first.
</Tip>

***

## Conditional Emit

The `emit` field accepts a `{ "$fn": "..." }` expression that is evaluated at runtime. This lets you send messages only when certain conditions hold, without splitting logic across multiple stages.

```json theme={null}
{
  "stage": "order-created",
  "emit": { "$fn": "lt(state.order.count, 100)" },
  "set": {
    "id":     { "$fn": "uuid()" },
    "amount": { "$fn": "int(10, 500)" }
  },
  "state": {
    "order.count": { "$fn": "add(state.order.count, 1)" }
  }
}
```

In this example, the stage builds and state-mutates on every execution, but only dispatches to the broker for the first 100 executions. See the [Functions](/reference/functions) reference for the full list of comparison functions you can use in `emit` expressions.

***

## Query Block

The `query` block performs a MongoDB lookup before the stage's payload is assembled. Use it to seed workflow state with real data from your database, then reference those values in `set` assignments.

<ParamField path="query.connection" type="string" required>
  The name of a configured database connector. This must match a connector registered in your Devset environment.
</ParamField>

<ParamField path="query.database" type="string" required>
  The MongoDB database name to query.
</ParamField>

<ParamField path="query.collection" type="string" required>
  The MongoDB collection name to query.
</ParamField>

<ParamField path="query.find" type="object" required>
  A MongoDB filter document using standard `db.collection.find({...})` syntax. For example, `{ "status": "ACTIVE" }` returns only active documents.
</ParamField>

<ParamField path="query.select" type="map<string, string>">
  Maps document fields to workflow state paths. The key is the target state path (dot notation) and the value is the source field name from the matched document.

  ```json theme={null}
  "select": {
    "lookup.userId":   "userId",
    "lookup.planTier": "subscription.tier"
  }
  ```

  After the query runs, `state.lookup.userId` holds the document's `userId` value and `state.lookup.planTier` holds `subscription.tier`.
</ParamField>

### Full Query Example

```json theme={null}
{
  "stage": "enrich-order",
  "query": {
    "connection": "mongo-primary",
    "database": "commerce",
    "collection": "users",
    "find": { "status": "ACTIVE" },
    "select": {
      "lookup.userId":   "userId",
      "lookup.tier":     "subscription.tier"
    }
  },
  "emit": true,
  "set": {
    "userId": { "$path": "state.lookup.userId" },
    "tier":   { "$path": "state.lookup.tier" },
    "amount": { "$fn": "int(10, 500)" }
  }
}
```

<Note>
  The `query` block runs once per stage execution. If the filter matches multiple documents, Devset uses the first result. Design your `find` filter to be as specific as possible to avoid non-deterministic lookups.
</Note>
