Skip to content

rsigma-runtime🔗

The streaming runtime. Wraps rsigma-eval in an async pipeline with input adapters, sinks, hot-reload, dynamic source resolution, and pluggable metrics.

When to use🔗

  • Embed the daemon shape into a larger Rust program (alternate front-end, sidecar, custom orchestrator).
  • Reuse the input parsers (JSON, syslog, logfmt, CEF, EVTX) or the dynamic-source resolver outside the daemon.
  • Build a custom event sink that the engine writes to.

For a one-shot evaluation against in-memory events, you do not need this crate; use rsigma-eval directly. For the supported daemon binary, cargo install rsigma --features daemon.

Install🔗

[dependencies]
rsigma-parser = "0.12.0"
rsigma-eval = "0.12.0"
rsigma-runtime = "0.12.0"
tokio = { version = "1", features = ["full"] }
Feature Effect
nats NATS JetStream as an event source, sink, and dynamic-pipeline source.
otlp OTLP/HTTP and OTLP/gRPC log decoding (opentelemetry-proto, prost).
logfmt logfmt input parser.
cef CEF input parser.
evtx .evtx (Windows Event Log) file reader.
daachorse-index Pass-through to rsigma-eval/daachorse-index for the cross-rule AC pre-filter.

Public surface🔗

Type Purpose
RuntimeEngine Wraps an Engine or CorrelationEngine plus the on-disk rule path, pipelines, and the dynamic source resolver. Supports hot-reload via load_rules.
LogProcessor An ArcSwap<Mutex<RuntimeEngine>> with batch processing methods (process_batch_lines, process_batch_with_format) and a reload_rules helper. The daemon glues this to its bounded mpsc plumbing.
EventSource trait, Sink trait The plug-in surfaces for inputs and outputs. Built-in: StdinSource, StdoutSink, FileSink, and NatsSource/NatsSink under the nats feature.
spawn_source(source) -> mpsc::Receiver<RawEvent> Convenience helper that runs an EventSource on its own task.
EventFilter trait Optional jq/JSONPath pre-extraction applied to each input line.
MetricsHook trait + NoopMetrics The plug-in surface that the daemon uses to wire prometheus counters. Reimplement to ship metrics elsewhere (Datadog, OpenTelemetry, custom registry).
input::parse_line(...) Format-aware line parser. Auto-detects JSON, syslog, plain text; honours format hints.
EvtxFileReader (feature evtx) Streaming .evtx reader.
SourceResolver trait + DefaultSourceResolver Dynamic-pipeline source resolution: HTTP, command, file, NATS.
SourceCache TTL-aware cache for resolved source values. Optional SQLite backing for cross-restart persistence.
TemplateExpander, RefreshScheduler, RefreshTrigger Substitutes ${source.X} references in pipeline vars: and runs the refresh policies.
Enricher trait + EnrichmentPipeline Post-evaluation enrichment surface. Drives the four primitives (TemplateEnricher, LookupEnricher, HttpEnricher, CommandEnricher) and any bespoke types registered via register_builtin.
EnricherKind, OnError, Scope, EnrichError, EnrichErrorKind Configuration types: declared kind, error policy, scope filter, and the typed error returned by Enricher::enrich.
HttpResponseCache (re-exported from enrichment::http_cache) (method, url, body_hash)-keyed in-memory response cache with TTL and lazy eviction. Each HttpEnricher instance owns its own.
register_builtin(name, factory) -> Result<(), String> Process-global, append-only registry hook. External crates use it to ship a bespoke Rust-coded enricher type addressable via type: <name> in the daemon's enrichers config. Reserved names (template / lookup / http / command) and duplicate registrations are rejected.
lookup_builtin(name) Read-only registry probe used by the daemon config loader.

The full pipeline architecture, source resolution flow, and dynamic-pipeline contract are in the crate README and the Architecture reference.

Minimum example: in-process batch evaluation🔗

use rsigma_runtime::{LogProcessor, NoopMetrics, RuntimeEngine};
use rsigma_eval::CorrelationConfig;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut engine = RuntimeEngine::new(
        "rules/".into(),         // rules_path
        Vec::new(),              // pipelines
        CorrelationConfig::default(),
        false,                   // include_event
    );
    engine.load_rules().map_err(|e| e.to_string())?;

    let processor = LogProcessor::new(engine, Arc::new(NoopMetrics));

    let lines = vec![
        r#"{"CommandLine":"cmd /c whoami"}"#.to_string(),
        r#"{"CommandLine":"powershell -enc ..."}"#.to_string(),
    ];

    let outcomes = processor.process_batch_lines(&lines, None);
    for r in outcomes {
        for m in r.matches { println!("matched: {}", m.rule_title.unwrap_or_default()); }
    }
    Ok(())
}

For a streaming daemon shape, wire an EventSource (StdinSource, NatsSource, …) through spawn_source into your own loop, batch the channel into Vec<String>, and feed it to process_batch_lines. The daemon code in rsigma-cli/src/daemon/server.rs is the reference implementation.

Hot-reload🔗

LogProcessor holds the live RuntimeEngine in an ArcSwap<Mutex<…>>. processor.reload_rules() rebuilds it from disk and atomically swaps it in; in-flight batches finish against the old engine, the next batch sees the new one. Wire a file watcher (notify), a SIGHUP handler, or an HTTP endpoint to call it:

use rsigma_runtime::LogProcessor;
use std::sync::Arc;

let processor = Arc::new(processor);
let p = processor.clone();

tokio::spawn(async move {
    // On reload trigger:
    match p.reload_rules() {
        Ok(stats) => println!("reloaded {} rules", stats.rules_loaded),
        Err(e) => eprintln!("reload failed: {e}"),
    }
});

The daemon's notify + SIGHUP + POST /api/v1/reload wiring lives in rsigma-cli; LogProcessor just exposes the primitive.

Post-evaluation enrichment🔗

EnrichmentPipeline runs after the engine has produced a ProcessResult and before the sink serializes it. Each enricher implements the Enricher trait, declares a kind: detection | correlation at construction, and writes into RuleHeader::enrichments under a configured inject_field. The pipeline filters per-result by declared kind against the body variant, applies the optional Scope filter, wraps the call in a per-enricher timeout, and applies the configured OnError policy on failure.

use rsigma_runtime::{
    EnricherKind, EnrichmentPipeline, OnError, Scope, TemplateEnricher,
};
use std::time::Duration;

let runbook = TemplateEnricher::new(
    "runbook_det".to_string(),
    EnricherKind::Detection,
    "runbook_url".to_string(),
    "https://wiki.internal/runbooks/${detection.rule.id}".to_string(),
    Duration::from_secs(5),
    OnError::Skip,
    Scope::default(),
);

let pipeline = EnrichmentPipeline::new(
    vec![Box::new(runbook)],
    16, // max_concurrent_enrichments
);

// `results` is the engine's `Vec<EvaluationResult>`.
// pipeline.run(&mut results).await;

Wire a MetricsHook via EnrichmentPipeline::with_metrics to surface rsigma_enrichment_total / rsigma_enrichment_duration_seconds / rsigma_enrichment_queue_depth (and the HTTP cache counters) into your own metrics backend. The daemon's Prometheus-backed Metrics struct implements the hook.

For YAML-driven configuration, use the rsigma-cli daemon::enrichment::build_enrichers_full entry point in your own daemon, or copy the loader pattern from rsigma-cli/src/daemon/enrichment/config.rs.

For the operator-facing schema, the four primitives, and the recipe catalog, see Enrichers.

Custom metrics🔗

Implement MetricsHook to ship metrics into your own registry. NoopMetrics is a no-op implementation suitable for tests and embedders that do not care. The daemon's own prometheus-backed implementation lives in rsigma-cli/src/daemon/metrics.rs and is a good template. The hook methods mirror the 27 Prometheus metrics the daemon exposes.

use rsigma_runtime::{LogProcessor, MetricsHook};
use std::sync::Arc;

struct MyMetrics { /* fields */ }

impl MetricsHook for MyMetrics {
    // Implement the methods you care about. See the trait definition on docs.rs
    // for the full surface.
}

let processor = LogProcessor::new(engine, Arc::new(MyMetrics { /* ... */ }));

Dynamic source resolution🔗

The SourceResolver + TemplateExpander pair is exposed so you can drive it standalone, for example to validate a pipeline's sources from a CI step or to refresh source values from a custom event loop:

use rsigma_runtime::{DefaultSourceResolver, SourceResolver};

let resolver = DefaultSourceResolver::new();
let value = resolver.resolve(&pipeline.sources[0]).await?;

The full spec (source types, data formats, extract languages, refresh policies) lives in Dynamic Pipeline Sources. RefreshScheduler runs the periodic refresh policies; TemplateExpander substitutes ${source.X} references into the pipeline.

Error handling🔗

RuntimeError from thiserror wraps EvalError, SigmaParserError, io::Error, and SourceError. Per-event errors (failed input parse, sink write) are surfaced through the result types instead of aborting; the daemon's optional DLQ collects them.

See also🔗