Skip to main content
The Messages API lets you interact with your brokers directly — without building a workflow. You can publish individual messages to Kafka topics or RabbitMQ exchanges, page through message history, open a live WebSocket stream for real-time observation, and fire one-off messages using the single-step executor. All HTTP endpoints are relative to http://localhost:8082.

POST /kafka/messages/send

Publish a single message to a Kafka topic. Devset routes the message through the named connector and returns once the broker acknowledges the write.
producerName
string
required
The name of the Kafka connector to use. Must match a connector saved via the Connectors API.
topic
string
required
The Kafka topic to publish the message to.
key
string
The message key used for partition assignment. Omit to let Kafka assign a partition automatically.
message
string
required
The message value as a string. For JSON payloads, serialize your object to a JSON string before sending.
headers
object
Optional key-value pairs to attach as Kafka message headers.
Request example
{
  "producerName": "local-kafka",
  "topic": "orders.events",
  "key": "order-123",
  "message": "{\"id\": \"order-123\", \"amount\": 99}",
  "headers": { "X-Source": "devset" }
}
Response — 200 OK This endpoint returns no response body.

GET /kafka/topics

List all topics available on a Kafka connection. Query parameters
connectionName
string
required
The name of the Kafka connector to query.
Request example
GET /kafka/topics?connectionName=local-kafka
Response — 200 OK
["orders.events", "payments.events", "notifications.events"]
The response is a JSON array of topic name strings.

GET /kafka/messages

Fetch a page of messages from a Kafka topic, ordered newest-first. Results are cursor-paginated using beforeTimestamp. The maximum page size is 500 messages per request. Query parameters
connectionName
string
required
The name of the Kafka connector to read from.
topic
string
required
The topic to read messages from.
limit
integer
Maximum number of messages to return. Defaults to 50. Hard limit is 500.
beforeTimestamp
string
An ISO 8601 timestamp (e.g. 2024-01-15T10:30:00Z). When provided, only messages produced before this instant are returned. Use the timestamp of the oldest message in the current page to fetch the next page.
Request example
GET /kafka/messages?connectionName=local-kafka&topic=orders.events&limit=50&beforeTimestamp=2024-01-15T10:30:00Z
Response — 200 OK
[
  {
    "partition": 0,
    "offset": 42,
    "timestamp": "2024-01-15T10:29:55Z",
    "key": "order-123",
    "value": "{\"id\": \"order-123\", \"amount\": 99}",
    "headers": { "X-Source": "devset" }
  },
  {
    "partition": 0,
    "offset": 41,
    "timestamp": "2024-01-15T10:29:40Z",
    "key": "order-122",
    "value": "{\"id\": \"order-122\", \"amount\": 50}",
    "headers": {}
  }
]
The response is a JSON array of message records, newest first.
partition
integer
The Kafka partition this message was stored in.
offset
integer
The message offset within its partition.
timestamp
string
ISO 8601 timestamp when the message was produced.
key
string | null
The message key, or null if the producer did not set one.
value
string
The raw message value as a string.
headers
object
String key-value pairs attached as Kafka headers.
To paginate, take the timestamp of the last message in the current response and pass it as beforeTimestamp in your next request.

WS /ws/kafka/topic-stream

Open a WebSocket connection to stream live messages from a Kafka topic as they arrive. Devset forwards each incoming record to the client as a JSON text frame. The connection stays open until you close it. Connection URL
ws://localhost:8082/ws/kafka/topic-stream?connectionName=local-kafka&topic=orders.events&offsetMode=latest
Query parameters
connectionName
string
required
The name of the Kafka connector to subscribe on.
topic
string
required
The topic to stream.
offsetMode
string
Controls where consumption starts. Use latest to receive only new messages produced after the connection opens, or earliest to replay the topic from the beginning. Defaults to latest.
Streamed message frame Each WebSocket frame contains a single JSON message record in the same shape as the records returned by GET /kafka/messages:
{
  "partition": 0,
  "offset": 43,
  "timestamp": "2024-01-15T10:30:01Z",
  "key": "order-124",
  "value": "{\"id\": \"order-124\", \"amount\": 120}",
  "headers": {}
}
WebSocket connections are not authenticated. Keep the Devset engine accessible only on localhost or within a trusted network.

POST /rabbit/message/send

Publish a message to a RabbitMQ broker. You can target either a queue directly or an exchange with a routing key. Devset routes the message through the named producer connector.
producerName
string
required
The name of the RabbitMQ connector to use. Must match a connector saved via the Connectors API.
queueName
string
The RabbitMQ queue to publish to directly. Provide either queueName or exchange, not both.
exchange
string
The RabbitMQ exchange to publish to.
routingKey
string
The routing key used to route the message to bound queues. Used with exchange.
message
string
required
The message payload as a string. For JSON payloads, serialize your object to a JSON string before sending.
Request example
{
  "producerName": "local-rabbit",
  "exchange": "orders",
  "routingKey": "order.created",
  "message": "{\"id\": \"order-123\", \"amount\": 99}"
}
Response — 200 OK This endpoint returns no response body.

GET /rabbit/broker-resources

List the queues and exchanges available on a RabbitMQ connection. This endpoint requires the RabbitMQ Management Plugin to be enabled on your broker. If the plugin is not available, the response returns "available": false instead of raising an error. Query parameters
connectionName
string
required
The name of the RabbitMQ connector to query.
Request example
GET /rabbit/broker-resources?connectionName=local-rabbit
Response — 200 OK (plugin available)
{
  "available": true,
  "queues": ["orders.queue", "payments.queue"],
  "exchanges": ["orders", "payments", ""]
}
Response — 200 OK (plugin not enabled)
{
  "available": false,
  "queues": [],
  "exchanges": []
}
available
boolean
true if the RabbitMQ Management Plugin is reachable; false otherwise.
queues
array
Names of all queues declared on the broker. Empty when available is false.
exchanges
array
Names of all exchanges declared on the broker. Empty when available is false.
If available is false, enable the RabbitMQ Management Plugin by running rabbitmq-plugins enable rabbitmq_management on your broker host and restarting the broker.

POST /single-step/execute

Execute a single ad-hoc message production immediately, without creating or referencing a saved workflow. This is useful for quick one-off tests or exploratory development. Returns a run identifier you can use to look up events via the Engine API.
producerName
string
required
The name of the connector to use for this execution.
messageType
string
required
The broker type: kafka or rabbit.
contentType
string
The payload content type. Defaults to json.
topic
string
The Kafka topic to publish to. Required when messageType is kafka.
exchange
string
The RabbitMQ exchange to publish to. Used when messageType is rabbit.
routingKey
string
The RabbitMQ routing key. Used when messageType is rabbit.
stage
string
A name label for this pipeline stage.
event
string
A name label for the event being produced.
set
object
The payload fields to set. Supports Devset DSL functions such as { "$fn": "uuid()" } for dynamic values.
headers
object
Key-value pairs to attach as message headers.
executions
integer
Number of times to execute. Defaults to 1.
state
object
Initial state map available to $ref and $path expressions in set.
Request example
{
  "producerName": "local-kafka",
  "topic": "orders.events",
  "messageType": "kafka",
  "stage": "order-created",
  "event": "order-created",
  "set": {
    "id": { "$fn": "uuid()" },
    "status": "READY"
  },
  "headers": {},
  "executions": 1
}
Response — 202 Accepted
{
  "historyId": "e1b2c3d4-0000-0000-0000-000000000001",
  "runId": "a3f1c2d4-89ab-4e12-b456-426614174000",
  "status": "PENDING",
  "executions": 1,
  "workflowId": "order-created"
}
historyId
string
Identifier of the persisted history entry for this execution.
runId
string
The underlying engine run ID. Use it with GET /engine/runs/{runId}/events to retrieve the produced events.
status
string
Initial run status. Typically PENDING immediately after submission.
executions
integer
Number of executions that were requested.
workflowId
string
The workflow identifier associated with this execution.

GET /single-step/history

Retrieve the history of all single-step executions performed in the current session. Results are ordered most-recent-first. Response — 200 OK
[
  {
    "id": "e1b2c3d4-0000-0000-0000-000000000001",
    "createdAtEpochMillis": 1705316400000,
    "runId": "a3f1c2d4-89ab-4e12-b456-426614174000",
    "workflowId": "order-created",
    "messageType": "kafka",
    "producerName": "local-kafka",
    "topic": "orders.events",
    "stage": "order-created",
    "event": "order-created",
    "executions": 1,
    "set": {
      "id": { "$fn": "uuid()" },
      "status": "READY"
    },
    "headers": {}
  }
]
id
string
Unique identifier of this history entry.
createdAtEpochMillis
integer
Creation timestamp in Unix epoch milliseconds.
runId
string
The engine run ID associated with this execution.
workflowId
string
The workflow identifier used for this execution.
messageType
string
The broker type used: kafka or rabbit.
producerName
string
The connector used for the execution.
topic
string | null
The Kafka topic that was targeted, or null for RabbitMQ executions.
exchange
string | null
The RabbitMQ exchange that was targeted, or null for Kafka executions.
routingKey
string | null
The RabbitMQ routing key used, or null for Kafka executions.
stage
string
The stage name label used for this execution.
event
string
The event name label used for this execution.
executions
integer
The number of executions that were requested.
set
object
The payload definition that was used, including any DSL expressions.
headers
object
The message headers that were applied.