Message Bus (Aggregator)
LyftData has an internal message bus (the aggregator) used for coordination and observability across the server, workers, and jobs. It is separate from channel-based data flow (events moving over workflow channels).
Use messages when you need a wake-up signal, a runtime notification, or a control-plane side channel. Use workflow channels/transports for bulk data movement.
What A Message Contains
Messages carry:
- Kind:
systemvsuser. - Source: which process generated it (
server,worker,job). - Type: a stable message type (for example
job-run-started,job-run-ended,user-generated,user-alert). - Optional tag: a user-defined string used for selective wake-ups and routing. Tags apply to user-generated messages.
- Optional payload: a “publicly exposed” message payload. Some internal message types intentionally do not expose full payloads.
In the UI, many of these show up in Observe → Messages and Observe → Logs / Issues.
In jobs, you consume them via the internal-messages input.
Emit Messages From A Job
There are two different DSL primitives that emit messages.
message Action (Human-Facing Notifications)
Use the message action when you want a runtime alert/info notification (optionally also written into job logs).
actions: - message: condition: "true" notification-type: alert message-content: "Open listening port detected" log-event: trueThis is the right choice for operator-facing warnings and breadcrumbs.
message Output (Machine-Facing Coordination)
Use the message output when you want to emit a structured payload that other jobs can subscribe to.
output: message: tag: stage1-doneBy default, the output emits a user-generated message whose payload is the job’s output event JSON.
If you set set-variable-name, the output emits a variable update instead of a user-generated message:
output: message: set-variable-name: MY_DYNAMIC_VALUEConsume Messages In A Job (internal-messages)
The internal-messages input is the “subscribe to the bus” primitive.
It is event-driven: as matching messages arrive, they become input events for the job to process.
Example: wake up only on a tagged user-generated message from a specific upstream job.
input: internal-messages: filter-kind: user filter-job: stage1_job filter-type: [user-generated] filter-tag: stage1-doneInput Event Schema
Each consumed message becomes one JSON event with a stable top-level shape:
id: message IDversion: message-attributes schema version (currently0.1)nanoseconds_since_epoch: message timestampkind:systemorusersource:server,worker, orjobmessage_type: a kebab-case type (for exampleuser-generated,job-run-started)job_name(optional): originating job nameworker_id(optional): originating worker IDmessage_data(optional): a public payload, when the message type exposes one
For user-generated messages, message_data includes the tag and the original payload:
{ "id": "…", "version": "0.1", "nanoseconds_since_epoch": 1739370000000000000, "kind": "user", "source": "job", "message_type": "user-generated", "job_name": "stage1_job", "worker_id": "worker-1", "message_data": { "type": "UserGeneratedMessage", "tag": "stage1-done", "job_event": { "ok": true, "count": 42 } }}Message Payloads vs internal-messages Events
Jobs see message information in two different ways, and they use different expansion paths:
- Message triggers (
input.<x>.trigger.message) expose the triggering message’s public payload via${msg|...}. - The
internal-messagesinput delivers a full message wrapper as the event (includingmessage_type,kind,source, andmessage_data). In this mode,${msg|...}is not set; read from the event fields instead.
Example: reading a Trigger invocation ID.
# Message trigger (use ${msg|...})- add: output-fields: invocation_id: "${msg|job_event.invocation_id}"# internal-messages input (use normal event fields)- add: output-fields: invocation_id: "${message_data.job_event.invocation_id}" message_type: "${message_type}"Message Types Worth Knowing
The bus includes many message types, but teams usually build around a small, practical set.
Type (message_type) | Typical source | What it’s good for | Notes |
|---|---|---|---|
user-generated | job | Job-to-job coordination | Taggable + filterable (filter-tag). Payload is the emitting job’s event (message_data.job_event). |
user-alert / user-notification | job | Operator-facing warnings and breadcrumbs | Usually produced by the message action. Useful for humans; not usually for automation. |
job-run-started / job-run-ended | job | Run boundaries | Useful for watchers or to correlate other telemetry. |
job-runtime-error / job-errors | job | Error evidence | Useful for alerting and auto-triage. |
update-variable | job | Variable updates | Produced by output.message.set-variable-name. |
trigger-response | job | Completing a Trigger invocation | See Triggers for the response envelope and how it shapes invocation results. |
trigger-proxy-response | job | Capturing a proxied HTTP response | Typically auto-emitted by http-post when a trigger_proxy context exists; recorded into the Trigger invocation result. |
deployment-phase | server | Deployment progress | Useful for “what’s happening right now?” operational dashboards. |
Example: subscribe to job runtime errors for one job:
input: internal-messages: filter-kind: system filter-job: important_job filter-type: [job-runtime-error, job-errors]Patterns That Build On Messages
“Tagged wake-ups” (workflow message edges)
Message edges in workflows are control-flow wiring (“wake up when signalled”), not data transport. Conceptually:
- Upstream step emits a tagged user-generated message (
output.message.tag). - Downstream step subscribes with
input.internal-messages.filter-tag.
This is one way to build “run B after A has committed its durable output” without polling.
Fan-in joins (OR vs AND)
If you need “run when all of these upstream steps finished”, model it explicitly:
- Subscribe to the relevant message types/tags.
- Track prerequisites in Worker KV.
- Emit a single tagged message when the join is satisfied.
This keeps coordination state explicit and avoids accidental re-triggers.
Triggers (UI + MCP)
The Triggers registry (UI + MCP dynamic tools) dispatches user-generated messages to a target job. For the operator-facing surface and MCP behavior, see Triggers.
Where To Go Next
- Messages (UI) for the live feed and filtering ergonomics.
- Notifications & Alerts for
messageaction patterns. - Deployments Core Concepts for message edges vs channel edges.
- Triggers for curated invocation surfaces that dispatch messages.