> ## Documentation Index
> Fetch the complete documentation index at: https://docs.devset.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Messages API: Send and Browse Broker Messages

> Publish messages to Kafka and RabbitMQ, browse topic history, stream live events over WebSocket, and execute ad-hoc single-step productions.

The Messages API lets you interact with your brokers directly — without building a workflow. You can publish individual messages to Kafka topics or RabbitMQ exchanges, page through message history, open a live WebSocket stream for real-time observation, and fire one-off messages using the single-step executor. All HTTP endpoints are relative to `http://localhost:8082`.

***

## POST /kafka/messages/send

Publish a single message to a Kafka topic. Devset routes the message through the named connector and returns once the broker acknowledges the write.

<ParamField body="producerName" type="string" required>
  The name of the Kafka connector to use. Must match a connector saved via the [Connectors API](/reference/api/connectors).
</ParamField>

<ParamField body="topic" type="string" required>
  The Kafka topic to publish the message to.
</ParamField>

<ParamField body="key" type="string">
  The message key used for partition assignment. Omit to let Kafka assign a partition automatically.
</ParamField>

<ParamField body="message" type="string" required>
  The message value as a string. For JSON payloads, serialize your object to a JSON string before sending.
</ParamField>

<ParamField body="headers" type="object">
  Optional key-value pairs to attach as Kafka message headers.
</ParamField>

**Request example**

```json theme={null}
{
  "producerName": "local-kafka",
  "topic": "orders.events",
  "key": "order-123",
  "message": "{\"id\": \"order-123\", \"amount\": 99}",
  "headers": { "X-Source": "devset" }
}
```

**Response — 200 OK**

This endpoint returns no response body.

***

## GET /kafka/topics

List all topics available on a Kafka connection.

**Query parameters**

<ParamField query="connectionName" type="string" required>
  The name of the Kafka connector to query.
</ParamField>

**Request example**

```
GET /kafka/topics?connectionName=local-kafka
```

**Response — 200 OK**

```json theme={null}
["orders.events", "payments.events", "notifications.events"]
```

The response is a JSON array of topic name strings.

***

## GET /kafka/messages

Fetch a page of messages from a Kafka topic, ordered newest-first. Results are cursor-paginated using `beforeTimestamp`. The maximum page size is 500 messages per request.

**Query parameters**

<ParamField query="connectionName" type="string" required>
  The name of the Kafka connector to read from.
</ParamField>

<ParamField query="topic" type="string" required>
  The topic to read messages from.
</ParamField>

<ParamField query="limit" type="integer">
  Maximum number of messages to return. Defaults to `50`. Hard limit is `500`.
</ParamField>

<ParamField query="beforeTimestamp" type="string">
  An ISO 8601 timestamp (e.g. `2024-01-15T10:30:00Z`). When provided, only messages produced before this instant are returned. Use the `timestamp` of the oldest message in the current page to fetch the next page.
</ParamField>

**Request example**

```
GET /kafka/messages?connectionName=local-kafka&topic=orders.events&limit=50&beforeTimestamp=2024-01-15T10:30:00Z
```

**Response — 200 OK**

```json theme={null}
[
  {
    "partition": 0,
    "offset": 42,
    "timestamp": "2024-01-15T10:29:55Z",
    "key": "order-123",
    "value": "{\"id\": \"order-123\", \"amount\": 99}",
    "headers": { "X-Source": "devset" }
  },
  {
    "partition": 0,
    "offset": 41,
    "timestamp": "2024-01-15T10:29:40Z",
    "key": "order-122",
    "value": "{\"id\": \"order-122\", \"amount\": 50}",
    "headers": {}
  }
]
```

The response is a JSON array of message records, newest first.

<ResponseField name="partition" type="integer">
  The Kafka partition this message was stored in.
</ResponseField>

<ResponseField name="offset" type="integer">
  The message offset within its partition.
</ResponseField>

<ResponseField name="timestamp" type="string">
  ISO 8601 timestamp when the message was produced.
</ResponseField>

<ResponseField name="key" type="string | null">
  The message key, or `null` if the producer did not set one.
</ResponseField>

<ResponseField name="value" type="string">
  The raw message value as a string.
</ResponseField>

<ResponseField name="headers" type="object">
  String key-value pairs attached as Kafka headers.
</ResponseField>

<Tip>
  To paginate, take the `timestamp` of the last message in the current response and pass it as `beforeTimestamp` in your next request.
</Tip>

***

## WS /ws/kafka/topic-stream

Open a WebSocket connection to stream live messages from a Kafka topic as they arrive. Devset forwards each incoming record to the client as a JSON text frame. The connection stays open until you close it.

**Connection URL**

```
ws://localhost:8082/ws/kafka/topic-stream?connectionName=local-kafka&topic=orders.events&offsetMode=latest
```

**Query parameters**

<ParamField query="connectionName" type="string" required>
  The name of the Kafka connector to subscribe on.
</ParamField>

<ParamField query="topic" type="string" required>
  The topic to stream.
</ParamField>

<ParamField query="offsetMode" type="string">
  Controls where consumption starts. Use `latest` to receive only new messages produced after the connection opens, or `earliest` to replay the topic from the beginning. Defaults to `latest`.
</ParamField>

**Streamed message frame**

Each WebSocket frame contains a single JSON message record in the same shape as the records returned by `GET /kafka/messages`:

```json theme={null}
{
  "partition": 0,
  "offset": 43,
  "timestamp": "2024-01-15T10:30:01Z",
  "key": "order-124",
  "value": "{\"id\": \"order-124\", \"amount\": 120}",
  "headers": {}
}
```

<Note>
  WebSocket connections are not authenticated. Keep the Devset engine accessible only on localhost or within a trusted network.
</Note>

***

## POST /rabbit/message/send

Publish a message to a RabbitMQ broker. You can target either a queue directly or an exchange with a routing key. Devset routes the message through the named producer connector.

<ParamField body="producerName" type="string" required>
  The name of the RabbitMQ connector to use. Must match a connector saved via the [Connectors API](/reference/api/connectors).
</ParamField>

<ParamField body="queueName" type="string">
  The RabbitMQ queue to publish to directly. Provide either `queueName` or `exchange`, not both.
</ParamField>

<ParamField body="exchange" type="string">
  The RabbitMQ exchange to publish to.
</ParamField>

<ParamField body="routingKey" type="string">
  The routing key used to route the message to bound queues. Used with `exchange`.
</ParamField>

<ParamField body="message" type="string" required>
  The message payload as a string. For JSON payloads, serialize your object to a JSON string before sending.
</ParamField>

**Request example**

```json theme={null}
{
  "producerName": "local-rabbit",
  "exchange": "orders",
  "routingKey": "order.created",
  "message": "{\"id\": \"order-123\", \"amount\": 99}"
}
```

**Response — 200 OK**

This endpoint returns no response body.

***

## GET /rabbit/broker-resources

List the queues and exchanges available on a RabbitMQ connection. This endpoint requires the **RabbitMQ Management Plugin** to be enabled on your broker. If the plugin is not available, the response returns `"available": false` instead of raising an error.

**Query parameters**

<ParamField query="connectionName" type="string" required>
  The name of the RabbitMQ connector to query.
</ParamField>

**Request example**

```
GET /rabbit/broker-resources?connectionName=local-rabbit
```

**Response — 200 OK (plugin available)**

```json theme={null}
{
  "available": true,
  "queues": ["orders.queue", "payments.queue"],
  "exchanges": ["orders", "payments", ""]
}
```

**Response — 200 OK (plugin not enabled)**

```json theme={null}
{
  "available": false,
  "queues": [],
  "exchanges": []
}
```

<ResponseField name="available" type="boolean">
  `true` if the RabbitMQ Management Plugin is reachable; `false` otherwise.
</ResponseField>

<ResponseField name="queues" type="array">
  Names of all queues declared on the broker. Empty when `available` is `false`.
</ResponseField>

<ResponseField name="exchanges" type="array">
  Names of all exchanges declared on the broker. Empty when `available` is `false`.
</ResponseField>

<Warning>
  If `available` is `false`, enable the RabbitMQ Management Plugin by running `rabbitmq-plugins enable rabbitmq_management` on your broker host and restarting the broker.
</Warning>

***

## POST /single-step/execute

Execute a single ad-hoc message production immediately, without creating or referencing a saved workflow. This is useful for quick one-off tests or exploratory development. Returns a run identifier you can use to look up events via the [Engine API](/reference/api/engine).

<ParamField body="producerName" type="string" required>
  The name of the connector to use for this execution.
</ParamField>

<ParamField body="messageType" type="string" required>
  The broker type: `kafka` or `rabbit`.
</ParamField>

<ParamField body="contentType" type="string">
  The payload content type. Defaults to `json`.
</ParamField>

<ParamField body="topic" type="string">
  The Kafka topic to publish to. Required when `messageType` is `kafka`.
</ParamField>

<ParamField body="exchange" type="string">
  The RabbitMQ exchange to publish to. Used when `messageType` is `rabbit`.
</ParamField>

<ParamField body="routingKey" type="string">
  The RabbitMQ routing key. Used when `messageType` is `rabbit`.
</ParamField>

<ParamField body="stage" type="string">
  A name label for this pipeline stage.
</ParamField>

<ParamField body="event" type="string">
  A name label for the event being produced.
</ParamField>

<ParamField body="set" type="object">
  The payload fields to set. Supports Devset DSL functions such as `{ "$fn": "uuid()" }` for dynamic values.
</ParamField>

<ParamField body="headers" type="object">
  Key-value pairs to attach as message headers.
</ParamField>

<ParamField body="executions" type="integer">
  Number of times to execute. Defaults to `1`.
</ParamField>

<ParamField body="state" type="object">
  Initial state map available to `$ref` and `$path` expressions in `set`.
</ParamField>

**Request example**

```json theme={null}
{
  "producerName": "local-kafka",
  "topic": "orders.events",
  "messageType": "kafka",
  "stage": "order-created",
  "event": "order-created",
  "set": {
    "id": { "$fn": "uuid()" },
    "status": "READY"
  },
  "headers": {},
  "executions": 1
}
```

**Response — 202 Accepted**

```json theme={null}
{
  "historyId": "e1b2c3d4-0000-0000-0000-000000000001",
  "runId": "a3f1c2d4-89ab-4e12-b456-426614174000",
  "status": "PENDING",
  "executions": 1,
  "workflowId": "order-created"
}
```

<ResponseField name="historyId" type="string">
  Identifier of the persisted history entry for this execution.
</ResponseField>

<ResponseField name="runId" type="string">
  The underlying engine run ID. Use it with `GET /engine/runs/{runId}/events` to retrieve the produced events.
</ResponseField>

<ResponseField name="status" type="string">
  Initial run status. Typically `PENDING` immediately after submission.
</ResponseField>

<ResponseField name="executions" type="integer">
  Number of executions that were requested.
</ResponseField>

<ResponseField name="workflowId" type="string">
  The workflow identifier associated with this execution.
</ResponseField>

***

## GET /single-step/history

Retrieve the history of all single-step executions performed in the current session. Results are ordered most-recent-first.

**Response — 200 OK**

```json theme={null}
[
  {
    "id": "e1b2c3d4-0000-0000-0000-000000000001",
    "createdAtEpochMillis": 1705316400000,
    "runId": "a3f1c2d4-89ab-4e12-b456-426614174000",
    "workflowId": "order-created",
    "messageType": "kafka",
    "producerName": "local-kafka",
    "topic": "orders.events",
    "stage": "order-created",
    "event": "order-created",
    "executions": 1,
    "set": {
      "id": { "$fn": "uuid()" },
      "status": "READY"
    },
    "headers": {}
  }
]
```

<ResponseField name="id" type="string">
  Unique identifier of this history entry.
</ResponseField>

<ResponseField name="createdAtEpochMillis" type="integer">
  Creation timestamp in Unix epoch milliseconds.
</ResponseField>

<ResponseField name="runId" type="string">
  The engine run ID associated with this execution.
</ResponseField>

<ResponseField name="workflowId" type="string">
  The workflow identifier used for this execution.
</ResponseField>

<ResponseField name="messageType" type="string">
  The broker type used: `kafka` or `rabbit`.
</ResponseField>

<ResponseField name="producerName" type="string">
  The connector used for the execution.
</ResponseField>

<ResponseField name="topic" type="string | null">
  The Kafka topic that was targeted, or `null` for RabbitMQ executions.
</ResponseField>

<ResponseField name="exchange" type="string | null">
  The RabbitMQ exchange that was targeted, or `null` for Kafka executions.
</ResponseField>

<ResponseField name="routingKey" type="string | null">
  The RabbitMQ routing key used, or `null` for Kafka executions.
</ResponseField>

<ResponseField name="stage" type="string">
  The stage name label used for this execution.
</ResponseField>

<ResponseField name="event" type="string">
  The event name label used for this execution.
</ResponseField>

<ResponseField name="executions" type="integer">
  The number of executions that were requested.
</ResponseField>

<ResponseField name="set" type="object">
  The payload definition that was used, including any DSL expressions.
</ResponseField>

<ResponseField name="headers" type="object">
  The message headers that were applied.
</ResponseField>
