Skip to content

Message Bus (Aggregator)

LyftData has an internal message bus (the aggregator) used for coordination and observability across the server, workers, and jobs. It is separate from channel-based data flow (events moving over workflow channels).

Use messages when you need a wake-up signal, a runtime notification, or a control-plane side channel. Use workflow channels/transports for bulk data movement.

What A Message Contains

Messages carry:

  • Kind: system vs user.
  • Source: which process generated it (server, worker, job).
  • Type: a stable message type (for example job-run-started, job-run-ended, user-generated, user-alert).
  • Optional tag: a user-defined string used for selective wake-ups and routing. Tags apply to user-generated messages.
  • Optional payload: a “publicly exposed” message payload. Some internal message types intentionally do not expose full payloads.

In the UI, many of these show up in Observe → Messages and Observe → Logs / Issues. In jobs, you consume them via the internal-messages input.

Emit Messages From A Job

There are two different DSL primitives that emit messages.

message Action (Human-Facing Notifications)

Use the message action when you want a runtime alert/info notification (optionally also written into job logs).

actions:
- message:
condition: "true"
notification-type: alert
message-content: "Open listening port detected"
log-event: true

This is the right choice for operator-facing warnings and breadcrumbs.

message Output (Machine-Facing Coordination)

Use the message output when you want to emit a structured payload that other jobs can subscribe to.

output:
message:
tag: stage1-done

By default, the output emits a user-generated message whose payload is the job’s output event JSON.

If you set set-variable-name, the output emits a variable update instead of a user-generated message:

output:
message:
set-variable-name: MY_DYNAMIC_VALUE

Consume Messages In A Job (internal-messages)

The internal-messages input is the “subscribe to the bus” primitive. It is event-driven: as matching messages arrive, they become input events for the job to process.

Example: wake up only on a tagged user-generated message from a specific upstream job.

input:
internal-messages:
filter-kind: user
filter-job: stage1_job
filter-type: [user-generated]
filter-tag: stage1-done

Input Event Schema

Each consumed message becomes one JSON event with a stable top-level shape:

  • id: message ID
  • version: message-attributes schema version (currently 0.1)
  • nanoseconds_since_epoch: message timestamp
  • kind: system or user
  • source: server, worker, or job
  • message_type: a kebab-case type (for example user-generated, job-run-started)
  • job_name (optional): originating job name
  • worker_id (optional): originating worker ID
  • message_data (optional): a public payload, when the message type exposes one

For user-generated messages, message_data includes the tag and the original payload:

{
"id": "",
"version": "0.1",
"nanoseconds_since_epoch": 1739370000000000000,
"kind": "user",
"source": "job",
"message_type": "user-generated",
"job_name": "stage1_job",
"worker_id": "worker-1",
"message_data": {
"type": "UserGeneratedMessage",
"tag": "stage1-done",
"job_event": { "ok": true, "count": 42 }
}
}

Message Payloads vs internal-messages Events

Jobs see message information in two different ways, and they use different expansion paths:

  • Message triggers (input.<x>.trigger.message) expose the triggering message’s public payload via ${msg|...}.
  • The internal-messages input delivers a full message wrapper as the event (including message_type, kind, source, and message_data). In this mode, ${msg|...} is not set; read from the event fields instead.

Example: reading a Trigger invocation ID.

# Message trigger (use ${msg|...})
- add:
output-fields:
invocation_id: "${msg|job_event.invocation_id}"
# internal-messages input (use normal event fields)
- add:
output-fields:
invocation_id: "${message_data.job_event.invocation_id}"
message_type: "${message_type}"

Message Types Worth Knowing

The bus includes many message types, but teams usually build around a small, practical set.

Type (message_type)Typical sourceWhat it’s good forNotes
user-generatedjobJob-to-job coordinationTaggable + filterable (filter-tag). Payload is the emitting job’s event (message_data.job_event).
user-alert / user-notificationjobOperator-facing warnings and breadcrumbsUsually produced by the message action. Useful for humans; not usually for automation.
job-run-started / job-run-endedjobRun boundariesUseful for watchers or to correlate other telemetry.
job-runtime-error / job-errorsjobError evidenceUseful for alerting and auto-triage.
update-variablejobVariable updatesProduced by output.message.set-variable-name.
trigger-responsejobCompleting a Trigger invocationSee Triggers for the response envelope and how it shapes invocation results.
trigger-proxy-responsejobCapturing a proxied HTTP responseTypically auto-emitted by http-post when a trigger_proxy context exists; recorded into the Trigger invocation result.
deployment-phaseserverDeployment progressUseful for “what’s happening right now?” operational dashboards.

Example: subscribe to job runtime errors for one job:

input:
internal-messages:
filter-kind: system
filter-job: important_job
filter-type: [job-runtime-error, job-errors]

Patterns That Build On Messages

“Tagged wake-ups” (workflow message edges)

Message edges in workflows are control-flow wiring (“wake up when signalled”), not data transport. Conceptually:

  • Upstream step emits a tagged user-generated message (output.message.tag).
  • Downstream step subscribes with input.internal-messages.filter-tag.

This is one way to build “run B after A has committed its durable output” without polling.

Fan-in joins (OR vs AND)

If you need “run when all of these upstream steps finished”, model it explicitly:

  • Subscribe to the relevant message types/tags.
  • Track prerequisites in Worker KV.
  • Emit a single tagged message when the join is satisfied.

This keeps coordination state explicit and avoids accidental re-triggers.

Triggers (UI + MCP)

The Triggers registry (UI + MCP dynamic tools) dispatches user-generated messages to a target job. For the operator-facing surface and MCP behavior, see Triggers.

Where To Go Next