Skip to content

Stream

Stream (stream)

Track per-key changes and emit deltas, elapsed times, and optional aggregates.

Transform json

Minimal example

actions:
- stream:
watch: ""
JSON
{
"actions": [
{
"stream": {
"watch": ""
}
}
]
}

Contents

Fields

FieldTypeRequiredDescription
watch Inputfield (string)Field whose value should be tracked.
Examples: data_field
description GeneralstringDescribe this step.
condition Generallua-expression (string)Only run this action when the Lua condition evaluates to true.
Examples: 2 * count()
delta Behaviourboolean (bool)Emit the difference between the current and last value.
marker OutputstringOptional marker added to generated events (when not enriching).
only-changes Behaviourboolean (bool)Only emit records when the watched field changes.
group-by Inputfield (string)Group tracking state by this field’s value.
Examples: data_field
input-time Inputfield (string)Source field providing event time for elapsed calculations.
Examples: data_field
output-field Outputfield (string)Field name used for the emitted delta.
Examples: data_field
elapsed-field Outputfield (string)Field name capturing elapsed milliseconds.
Examples: data_field
fill-strategy Missing DataFill StrategyStrategy applied when the watched value is missing.
recency RecencyRecencyEmit a boolean indicating whether the key changed within a threshold.
aggregations AggregationsAggregations[]Optional incremental aggregates maintained per key.
reset-on-document-end Behaviourboolean (bool)Reset state when a document end marker is observed.
window-size Windowingnumber (integer)Maximum number of recent events to keep per key when computing aggregates.
Examples: 42, 1.2e-10
max-keys Resourcesnumber (integer)Maximum number of concurrent keys tracked.
Examples: 42, 1.2e-10
eviction ResourcesEvictionEviction policy applied when max-keys is exceeded.

General

Show fields
FieldTypeRequiredDescription
descriptionstringDescribe this step.
conditionlua-expression (string)Only run this action when the Lua condition evaluates to true.
Examples: 2 * count()

Behaviour

Show fields
FieldTypeRequiredDescription
deltaboolean (bool)Emit the difference between the current and last value.
only-changesboolean (bool)Only emit records when the watched field changes.
reset-on-document-endboolean (bool)Reset state when a document end marker is observed.

Input

Show fields
FieldTypeRequiredDescription
watchfield (string)Field whose value should be tracked.
Examples: data_field
group-byfield (string)Group tracking state by this field’s value.
Examples: data_field
input-timefield (string)Source field providing event time for elapsed calculations.
Examples: data_field

Output

Show fields
FieldTypeRequiredDescription
markerstringOptional marker added to generated events (when not enriching).
output-fieldfield (string)Field name used for the emitted delta.
Examples: data_field
elapsed-fieldfield (string)Field name capturing elapsed milliseconds.
Examples: data_field

Missing Data

Show fields
FieldTypeRequiredDescription
fill-strategyFill StrategyStrategy applied when the watched value is missing.

Recency

Show fields
FieldTypeRequiredDescription
recencyRecencyEmit a boolean indicating whether the key changed within a threshold.

Aggregations

Show fields
FieldTypeRequiredDescription
aggregationsAggregations[]Optional incremental aggregates maintained per key.

Windowing

Show fields
FieldTypeRequiredDescription
window-sizenumber (integer)Maximum number of recent events to keep per key when computing aggregates.
Examples: 42, 1.2e-10

Resources

Show fields
FieldTypeRequiredDescription
max-keysnumber (integer)Maximum number of concurrent keys tracked.
Examples: 42, 1.2e-10
evictionEvictionEviction policy applied when max-keys is exceeded.

Schema

Fill Strategy Options

OptionNameTypeDescription
forward-fillForward FillmapCarry forward the previous value when the field is missing.
default-valueDefault ValueobjectSubstitute a default JSON value when the field is missing.

Eviction Options

OptionNameTypeDescription
drop-newDrop NewmapDrop new keys when the limit is reached.
lruLrumapEvict the least-recently updated key.
ttlTtlobjectEvict keys that have been idle longer than the configured duration.

Fill Strategy - Default Value Fields

FieldTypeRequiredDescription
valuemap (object)

Recency Fields

FieldTypeRequiredDescription
threshold-msduration (integer)Threshold in milliseconds for considering a key “recent”.
output-fieldfield (string)Field where the recency flag is written.
Examples: data_field

Aggregations Fields

FieldTypeRequiredDescription
fieldfield (string)Source field used for the aggregation.
Examples: data_field
opOpAggregation operation.
Allowed values: avg, mean, min, max, first, last, sum, stddev, …
r-asfield (string)Optional alias for the output field.
Examples: data_field
window-sizenumber (integer)Override window size for this aggregation (falls back to stream-level window).
Examples: 42, 1.2e-10
percentilenumber (integer)Percentile to compute when op = percentile (0-100).
Examples: 42, 1.2e-10

Eviction - Ttl Fields

FieldTypeRequiredDescription
max-idlestring

Aggregations - Op Options

ValueDescription
avgAvg
meanMean
minMin
maxMax
firstFirst
lastLast
sumSum
stddevStddev
rangeRange
earliestEarliest
latestLatest
countCount
distinct-countDistinct Count
medianMedian
percentilePercentile
varianceVariance
variance-populationVariance Population
stddev-populationStddev Population
sum-squaresSum Squares
modeMode
listList
valuesValues