Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how-to/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Pick the scenario that matches your job-to-be-done and follow the playbook. Each
## Performance & Aggregation
- [Process Archives at Scale](batch-process-archives.md)
- [Roll Up Logs with Span Windows](span-aggregation-cookbook.md)
- [Analyze Sequential Deltas and Trends](inter-record-analysis.md)

## Advanced Techniques
- [Power-User Techniques](power-user-techniques.md) - Discover overlooked features like pattern normalization, deterministic sampling, JWT parsing, and multi-level fan-out
Expand Down
75 changes: 75 additions & 0 deletions docs/how-to/inter-record-analysis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Analyze Sequential Changes with `prev`, `lag`, `delta`, and `ewma`

Use inter-record helpers when you need to compare each event to earlier events in the same stream.
These helpers are ideal for latency jumps, drift detection, and smoothing noisy metrics.

> These functions are **sequential-only**. In `--parallel`, Kelora raises a runtime error.

## 1) Alert on sudden latency jumps (`delta`)

Detect requests whose latency increased sharply compared to the previous record:

```bash
cat <<'JSON' > latency.jsonl
{"svc":"api","duration_ms":100}
{"svc":"api","duration_ms":120}
{"svc":"api","duration_ms":900}
{"svc":"api","duration_ms":910}
JSON

kelora -f json -F json latency.jsonl \
--exec 'e.delta_ms = delta("duration_ms")' \
--filter 'e.delta_ms != () && e.delta_ms > 500'
```

Expected output includes the jump event with `delta_ms` around `780`.

## 2) Compare against an older baseline (`lag(..., n)` + `delta(..., n)`)

Compare current values to values from three records ago:

```bash
cat <<'JSON' > throughput.jsonl
{"value":10}
{"value":20}
{"value":30}
{"value":50}
JSON

kelora -f json -F json throughput.jsonl \
--exec '
e.baseline_3 = lag("value", 3);
e.delta_3 = delta("value", 3);
' \
--filter 'e.delta_3 != () && e.delta_3 >= 40'
```

This returns the last event with `baseline_3 = 10` and `delta_3 = 40`.

## 3) Smooth noisy telemetry in-stream (`ewma`)

Compute an EWMA for latency:

```bash
cat <<'JSON' > noisy.jsonl
{"latency_ms":100}
{"latency_ms":200}
{"latency_ms":50}
JSON

kelora -f json -F json noisy.jsonl \
--exec 'e.latency_smooth = ewma("latency_ms", e.latency_ms.to_float(), 0.5)'
```

With `alpha = 0.5`, the smoothed values are approximately `100`, `150`, `100`.

## 4) Fail fast with strict variants

Use strict variants when missing history or non-numeric values should stop the run:

```bash
kelora -f json data.jsonl --strict \
--exec 'e.delta = delta_strict("duration_ms")'
```

If the field is missing or non-numeric, Kelora exits with a runtime error that points to the offending function/value.
40 changes: 40 additions & 0 deletions docs/reference/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,46 @@ let recent_vals = window.pluck_as_nums("value")
e.spike = e.value > (recent_vals.reduce(|s, x| s + x, 0) / recent_vals.len()) * 2
```

#### `prev(field)` / `lag(field, n)` / strict variants
Read values from prior records in stream order (**sequential mode only**). `prev(field)` is shorthand for `lag(field, 1)`.

- Resilient forms (`prev`, `lag`) return `()` when history is missing.
- Strict forms (`prev_strict`, `lag_strict`) raise runtime errors when history/field values are missing.
- `n` must be `>= 1` and `<= 10_000`.

```rhai
e.prev_ms = prev("duration_ms")
e.prev2_ms = lag("duration_ms", 2)
e.prev_status = lag("status", 1)
```

#### `delta(field [,n])` / `delta_strict(field [,n])`
Compute `current - lagged` using the current event value and a prior record value.

- Only native numbers are accepted (`int`/`float`).
- Numeric strings (for example `"123"`) are treated as non-numeric.
- Resilient `delta` returns `()` for missing/non-numeric values.
- `delta_strict` raises runtime errors with the offending value/type.

```rhai
e.delta_ms = delta("duration_ms") // current - previous
e.delta_5 = delta("duration_ms", 5) // current - value 5 records back
```

#### `ewma(key, value, alpha)` / `ewma_strict(...)`
Compute an exponential weighted moving average:

`S_t = alpha * x_t + (1 - alpha) * S_{t-1}`

- First observation initializes `S_0 = x_0`.
- State is tracked per `key`.
- `alpha` must be in `(0, 1]`.
- Sequential mode only.

```rhai
e.latency_smooth = ewma("latency_ms", e.latency_ms, 0.2)
```

---

## State Management Functions
Expand Down
10 changes: 10 additions & 0 deletions docs/reference/script-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ Kelora exposes several built-in variables to Rhai scripts. Their availability de
- Type: `Map`
- Event map during per-event stages. Mutating `e` inside `--exec` updates the emitted event; setting a field to `()` removes it; assigning `e = ()` clears the entire event. Writable.

### Inter-record helper lifecycle (`prev` / `lag` / `delta` / `ewma`)

Kelora commits inter-record history once per processed event, after per-event scripting finishes for that event.

- During `--filter` / `--exec`, helper calls read only previously committed history.
- After scripting completes, the current event snapshot becomes available to the next event.
- Filtered-out events still advance history if they reached per-event scripting.
- History continues across input files in stream order.
- In `--parallel` mode, these helpers raise runtime errors (sequential-only).

### `meta`
- Type: `Map`
- Metadata derived from the pipeline and span system. Attempting to mutate `meta` fields has no effect; use event fields on `e` for custom annotations instead. Read-only.
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ nav:
- Flatten Nested JSON for Analysis: how-to/fan-out-nested-structures.md
- Design Streaming Alerts: how-to/build-streaming-alerts.md
- Process Archives at Scale: how-to/batch-process-archives.md
- Analyze Sequential Deltas and Trends: how-to/inter-record-analysis.md
- Merge Timestamp-Sorted Files: how-to/merge-timestamp-sorted-files.md
- Roll Up Logs with Span Windows: how-to/span-aggregation-cookbook.md
- Integrate Kelora with External Tools: how-to/integrate-external-tools.md
Expand Down
1 change: 1 addition & 0 deletions src/parallel/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) fn worker_thread(
ctrl_rx: Receiver<Ctrl>,
) -> Result<()> {
crate::rhai_functions::strings::set_parallel_mode(true);
crate::rhai_functions::inter_record::reset_state();

stats_start_timer();

Expand Down
15 changes: 14 additions & 1 deletion src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,15 @@ impl Pipeline {

for stage in &mut self.script_stages {
result = match result {
ScriptResult::Emit(event) => stage.apply(event, ctx),
ScriptResult::Emit(event) => {
crate::rhai_functions::inter_record::set_current_event(&event);
stage.apply(event, ctx)
}
ScriptResult::EmitMultiple(events) => {
// Process each event through remaining stages
let mut multi_results = Vec::new();
for event in events {
crate::rhai_functions::inter_record::set_current_event(&event);
let original_line = event.original_line.clone(); // Capture before consuming
match stage.apply(event, ctx) {
ScriptResult::Emit(e) => multi_results.push(e),
Expand All @@ -487,9 +491,11 @@ impl Pipeline {

// New resiliency model: use strict flag
if ctx.config.strict {
crate::rhai_functions::inter_record::clear_current_event();
return Err(anyhow::anyhow!(msg));
} else {
// Skip errors in resilient mode and continue processing
crate::rhai_functions::inter_record::clear_current_event();
return Ok(results);
}
}
Expand All @@ -506,6 +512,13 @@ impl Pipeline {
}
}

match &result {
ScriptResult::Error(_) => {
crate::rhai_functions::inter_record::clear_current_event()
}
_ => crate::rhai_functions::inter_record::commit_current_event(),
}

// Handle final result
let remaining_ops = file_ops::take_pending_ops();
if !remaining_ops.is_empty() {
Expand Down
10 changes: 10 additions & 0 deletions src/rhai_functions/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,16 @@ type_of(value) Get type name as string (builtin)
window.pluck(field) Extract field values from window array (requires --window)
window.pluck_as_nums(field) Extract numeric field values from window array (requires --window)

INTER-RECORD HELPERS (sequential mode only; errors in --parallel mode):
prev(field) Return prior event's field value, else ()
lag(field, n) Return field value n events back (n >= 1, max 10_000), else ()
delta(field [,n]) Return numeric current-lag difference, else ()
ewma(key, value, alpha) Exponential weighted moving average (alpha in (0, 1])
prev_strict(field) Strict prev; errors when history/value missing
lag_strict(field, n) Strict lag; errors when history/value missing
delta_strict(field [,n]) Strict delta; errors on missing/non-numeric values
ewma_strict(key, value, alpha) Strict EWMA; validates alpha and mode

DRAIN TEMPLATE MINING (sequential mode only; errors in --parallel mode):
drain_template(text [,options]) Add line to Drain model; returns {template, template_id, count,
is_new, sample, first_line, last_line}
Expand Down
Loading
Loading