> ## Documentation Index
> Fetch the complete documentation index at: https://docs.devset.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipeline Stages: Building Blocks of a Devset Workflow

> A stage is a single step in a workflow pipeline. Stages run sequentially, building and emitting event payloads to your Kafka or RabbitMQ broker.

A stage is one step in a workflow's `pipeline` array. Stages execute in order, one after another, within each workflow execution. Each stage can build a payload from scratch or inherit the previous stage's payload, apply field assignments, mutate shared workflow state, and optionally dispatch the resulting event to your broker. Together, a sequence of stages lets you model realistic multi-event flows — from a single produce operation to complex conditional loops.

## Complete Stage Example

The following stage builds an `order.item.added` event, conditionally emits it only when the item price is positive, increments a state counter, and waits one second after dispatching before the next stage begins:

```json theme={null}
{
  "stage": "add-line-item",
  "event": "order.item.added",
  "schemaId": "order-item-schema",
  "source": "none",
  "repeat": 3,
  "set": {
    "itemId": { "$fn": "uuid()" },
    "productCode": { "$fn": "choice(SKU-001, SKU-002, SKU-003)" },
    "quantity": { "$fn": "int(1, 10)" },
    "unitPrice": { "$fn": "int(100, 9999)" }
  },
  "state": {
    "order.itemCount": { "$fn": "add(state.order.itemCount, 1)" }
  },
  "emit": { "$fn": "gt(.unitPrice, 0)" },
  "wait": "1s",
  "headers": {
    "x-source": "devset",
    "x-schema-version": "2"
  }
}
```

## Field Reference

<ParamField path="stage" type="string" required>
  A unique identifier for this stage within the workflow. Used in logs and error messages. Choose a descriptive slug such as `"build-order"` or `"add-line-item"`.
</ParamField>

<ParamField path="event" type="string">
  The logical name of the event this stage produces, for example `"order.created"` or `"payment.authorised"`. This label appears in the run's event log and is used to match against schema definitions.
</ParamField>

<ParamField path="schemaId" type="string">
  Overrides the workflow-level `schemaId` for this stage only. Use this when one stage produces a structurally different event from the workflow default. See [Schemas](/concepts/schemas) for details.
</ParamField>

<ParamField path="source" type="string" required>
  Controls where the stage's initial payload comes from before `set` assignments are applied.

  * `"none"` — start with an empty payload object.
  * `"previous-stage"` — copy the payload produced by the immediately preceding stage and modify it.
</ParamField>

<ParamField path="repeat" type="integer">
  Repeat this stage a fixed number of times within a single execution. The stage's `set`, `state`, and `emit` logic runs on every iteration. Use alongside `state` mutations to produce sequences of related events.
</ParamField>

<ParamField path="repeatWhile" type="object">
  A DSL expression evaluated **before** each iteration. The stage keeps repeating as long as the expression resolves to `true`. The stage does not execute at all if the condition is false on the first check.

  ```json theme={null}
  {
    "repeatWhile": { "$fn": "lt(state.order.itemCount, 5)" }
  }
  ```
</ParamField>

<ParamField path="repeatUntil" type="object">
  A DSL expression evaluated **after** each iteration. The stage repeats until the expression resolves to `true`, meaning the stage body always executes at least once.

  ```json theme={null}
  {
    "repeatUntil": { "$fn": "gte(state.order.itemCount, 5)" }
  }
  ```
</ParamField>

<ParamField path="emit" type="boolean | object">
  Controls whether this stage dispatches its event to the broker.

  * `true` — always emit.
  * `false` or omitted — never emit; the stage runs its `set` and `state` logic but does not publish.
  * An expression object — emit conditionally based on payload or state values.

  ```json theme={null}
  { "emit": { "$fn": "gt(.unitPrice, 0)" } }
  ```
</ParamField>

<ParamField path="wait" type="string">
  A duration string that pauses execution **after** the stage body runs and **before** the next stage begins. Use this to simulate realistic inter-event timing in your consumer tests.

  The supported suffixes are:

  | Value     | Duration         |
  | --------- | ---------------- |
  | `"500ms"` | 500 milliseconds |
  | `"1s"`    | 1 second         |
  | `"30s"`   | 30 seconds       |
  | `"1m"`    | 1 minute         |

  <Note>
    `wait` is skipped when running a workflow in simulation mode, so your pipeline logic can be validated quickly without real delays.
  </Note>
</ParamField>

<ParamField path="set" type="object">
  Assigns values to fields in the event payload. Keys are dot-notation field paths; values are literals or [DSL expressions](/concepts/expressions).

  ```json theme={null}
  {
    "set": {
      "orderId": { "$fn": "uuid()" },
      "status": "pending",
      "createdAt": { "$fn": "now()" }
    }
  }
  ```
</ParamField>

<ParamField path="state" type="object">
  Mutates the workflow-scoped state object. Uses the same key-value syntax as `set`. Changes made here are visible to all subsequent stages in the same execution.

  ```json theme={null}
  {
    "state": {
      "sequence.index": { "$fn": "add(state.sequence.index, 1)" }
    }
  }
  ```
</ParamField>

<ParamField path="headers" type="object">
  Free-form string key-value pairs attached as message headers to the dispatched event. Useful for routing metadata, schema version stamps, or correlation IDs.

  ```json theme={null}
  {
    "headers": {
      "x-correlation-id": "abc123",
      "x-schema-version": "3"
    }
  }
  ```
</ParamField>

<ParamField path="key" type="object">
  *(Kafka only)* The Kafka message key. Accepts the same value syntax as `set` fields, including DSL expressions. Use this to control partition assignment.

  ```json theme={null}
  {
    "key": { "$ref": "customerId" }
  }
  ```
</ParamField>

<ParamField path="query" type="object">
  Runs a MongoDB lookup and stores the result in workflow state before the stage's `set` logic runs. Useful for seeding payloads from real data.

  Fields:

  | Field        | Description                   |
  | ------------ | ----------------------------- |
  | `connection` | Name of the MongoDB connector |
  | `database`   | Database name                 |
  | `collection` | Collection name               |
  | `find`       | MongoDB filter document       |
  | `select`     | Fields to project             |

  ```json theme={null}
  {
    "query": {
      "connection": "local-mongo",
      "database": "shop",
      "collection": "products",
      "find": { "active": true },
      "select": { "productId": 1, "price": 1 }
    }
  }
  ```
</ParamField>

<ParamField path="wireFormat" type="object">
  Configures Protobuf binary framing when `contentType` is `"application/x-protobuf"`. See the [Schemas](/concepts/schemas) page for wire-format details.
</ParamField>

## Repeat Patterns

### Fixed repeat

Use `repeat` when you know exactly how many iterations you need:

```json theme={null}
{
  "stage": "add-items",
  "source": "none",
  "repeat": 5,
  "set": {
    "itemId": { "$fn": "uuid()" },
    "quantity": { "$fn": "int(1, 10)" }
  },
  "emit": true
}
```

### Loop with `repeatWhile`

The condition is checked before each iteration. The stage does not execute if the condition is false from the start:

```json theme={null}
{
  "stage": "drain-queue",
  "source": "none",
  "repeatWhile": { "$fn": "lt(state.queue.remaining, 100)" },
  "set": {
    "taskId": { "$fn": "uuid()" }
  },
  "state": {
    "queue.remaining": { "$fn": "add(state.queue.remaining, 1)" }
  },
  "emit": true
}
```

### Loop with `repeatUntil`

The condition is checked after each iteration, so the stage always executes at least once:

```json theme={null}
{
  "stage": "fill-batch",
  "source": "none",
  "set": {
    "batchId": { "$fn": "uuid()" }
  },
  "state": {
    "batch.size": { "$fn": "add(state.batch.size, 1)" }
  },
  "repeatUntil": { "$fn": "gte(state.batch.size, 10)" },
  "emit": true
}
```

<Warning>
  Avoid using `repeatWhile` with a condition that is never satisfied — this creates an infinite loop. Always ensure state mutations inside the loop move the condition toward termination.
</Warning>

## Conditional Emit

Set `emit` to a `$fn` expression to publish events selectively. The engine evaluates the expression after `set` assignments are applied, so you can gate on freshly computed payload fields:

```json theme={null}
{
  "stage": "high-value-order",
  "source": "none",
  "set": {
    "total": { "$fn": "int(0, 500)" }
  },
  "emit": { "$fn": "gte(.total, 100)" }
}
```

Only orders with a `total` of 100 or more are dispatched to the broker.

## Pacing with `wait`

Use `wait` to introduce realistic timing between events, for example simulating a user journey where each action is seconds apart:

```json theme={null}
[
  {
    "stage": "user-login",
    "event": "user.login",
    "source": "none",
    "set": { "userId": { "$fn": "uuid()" } },
    "emit": true
  },
  {
    "stage": "add-to-cart",
    "event": "cart.item.added",
    "source": "previous-stage",
    "wait": "2s",
    "set": { "productId": { "$fn": "uuid()" } },
    "emit": true
  },
  {
    "stage": "checkout",
    "event": "order.placed",
    "source": "previous-stage",
    "wait": "5s",
    "emit": true
  }
]
```

## Next Steps

* [Expressions](/concepts/expressions) — the full reference for `$fn`, `$ref`, `$path`, and conditional values
