Dynamic Pipeline Sources๐
Dynamic pipeline sources let a processing pipeline pull values from external systems (files, local commands, HTTP, NATS) at load time and inject them through the standard Sigma vars + value_placeholders mechanism. This page documents the full source specification, every source type, the four data formats, the three extract languages, the five refresh policies, the three error policies, and every resource limit the runtime enforces.
For an introduction to the feature see Processing Pipelines: dynamic pipelines. For end-to-end testing see pipeline resolve. For runtime metrics see Prometheus metrics: dynamic pipeline sources.
Source declaration๐
External source files (recommended)๐
The recommended way to declare dynamic sources is in standalone YAML files loaded via the --source flag on the daemon (or --source-file on pipeline resolve). This decouples source declarations from pipeline files and avoids the pipeline YAML becoming a kitchen sink for unrelated configuration.
Each file has a top-level sources: block:
# sources.yml
sources:
- id: employee_directory
type: file
path: ./data/employees.json
format: json
- id: kev_catalog
type: http
url: https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json
format: json
extract: ".vulnerabilities"
refresh: 1h
Pass it to the daemon:
A directory path loads all *.yml/*.yaml files in it, sorted alphabetically:
The flag is repeatable, so you can load from multiple files and directories:
rsigma engine daemon -r rules/ -p pipeline.yml \
--source infra-sources.yml \
--source threat-intel-sources.yml
Pipeline-embedded sources (deprecated)๐
Deprecated
Declaring sources inside a pipeline file's sources: block is deprecated. Use external source files with --source instead. Run rsigma rule migrate-sources to extract existing pipeline sources into a standalone file. Pipeline-embedded sources will be removed in v1.0.
A pipeline can still declare zero or more sources in a top-level sources: block. Each entry is a YAML mapping:
name: dynamic_threat_intel
priority: 50
sources:
- id: <source-id> # required, used in ${source.<id>} refs
type: <file|http|command|nats>
# type-specific fieldsโฆ
format: <json|yaml|lines|csv>
extract: <expression> # optional
refresh: <once|<duration>|watch|push|on_demand>
required: <true|false> # default true
timeout: <duration> # default 30s for http/command
on_error: <use_cached|fail|use_default>
default: <value> # required if on_error=use_default
max_body_size: <bytes> # default 10485760 (10 MiB)
max_stdout: <bytes> # command type only
Source schema๐
The schema for each source entry is the same regardless of whether it's declared in an external file or a pipeline. The full Rust type lives at rsigma_eval::pipeline::sources::DynamicSource. The parser is at rsigma_eval::pipeline::parsing.
Collision semantics๐
Source IDs must be unique across all --source files and all pipeline-embedded sources: blocks. If the same ID appears in two different declaration sites, the daemon exits at startup with an error naming both file paths. This ensures operators have one canonical declaration site per source ID.
Migration tool๐
To migrate existing pipeline-embedded sources to standalone files:
This extracts every sources: block from pipeline files in the input directory, consolidates them into a single output file (deduplicating by ID), and rewrites the pipeline files with the sources: block removed. Use --strategy per-pipeline to create one output file per pipeline instead.
Source types๐
file๐
Reads a local file, parses it according to format, applies extract if set, and returns the result.
| Field | Type | Required | Description |
|---|---|---|---|
path | string | yes | Absolute or pipeline-relative path. |
format | enum | yes | json, yaml, lines, or csv. |
extract | string or object | no | Filter applied after parsing. |
refresh: watch is only valid for file sources (uses notify). For other refresh policies, file behaves like the others.
http๐
GET (or other method) request, response body parsed and optionally extracted. Uses reqwest.
- id: ip_blocklist
type: http
url: https://feeds.example.com/blocklist.json
format: json
extract: ".ips"
method: GET # default
headers: # optional
Authorization: "Bearer ${env:FEED_TOKEN}"
timeout: 10s # default 30s
refresh: 300s
on_error: use_cached
| Field | Type | Required | Description |
|---|---|---|---|
url | string | yes | Full HTTP(S) URL. |
method | string | no | GET (default), POST, PUT, etc. |
headers | mapping | no | Request headers. Static values only; env-variable interpolation is not implemented. |
format | enum | yes | json, yaml, lines, or csv. |
extract | string or object | no | Filter applied after parsing. |
timeout | duration | no | Request timeout. Default 30s. |
max_body_size | bytes | no | Per-source override for the 10 MiB default. |
command๐
Runs a local executable, captures stdout, parses it according to format. Useful for shelling out to an inventory tool, a script that queries an internal API with credentials only the host has access to, or a generator that produces transformation YAML on demand.
- id: enrichment_rules
type: command
command: ["/usr/local/bin/generate-transformations", "--format", "json"]
format: json
refresh: once
timeout: 5s
| Field | Type | Required | Description |
|---|---|---|---|
command | array of strings | yes | argv array. First element is the executable. |
format | enum | yes | json, yaml, lines, or csv. |
extract | string or object | no | Filter applied after parsing. |
timeout | duration | no | Execution wall-clock cap. Default 30s. |
max_stdout | bytes | no | Per-source override for the 10 MiB stdout cap. |
The runtime additionally caps stderr at 64 KiB regardless of max_stdout. Stderr is logged on failure but not parsed.
nats๐
Subscribes to a NATS subject and updates the source value with each message. Requires the daemon-nats build feature.
- id: live_iocs
type: nats
url: nats://nats.internal:4222
subject: rsigma.iocs.current
format: json
refresh: push
required: false
| Field | Type | Required | Description |
|---|---|---|---|
url | string | yes | nats://host:port. Auth comes from the daemon-level --nats-* flags. |
subject | string | yes | NATS subject (no wildcards for dynamic-source use). |
format | enum | yes | json, yaml, lines, or csv. |
extract | string or object | no | Filter applied after parsing. |
refresh: push is only valid for NATS sources. Each subject message replaces the source value.
Data formats๐
| Format | Library | Notes |
|---|---|---|
json | serde_json | Standard JSON. |
yaml | yaml_serde 0.10 | Multi-document files concatenate into an array. |
lines | (internal) | One value per non-blank line; the resolved value is a JSON array of strings. |
csv | csv crate | Header row required; each subsequent row becomes an object keyed by the header. |
Extract languages๐
The optional extract: filter slices the parsed data after format parsing. Three languages are supported:
| Language | Library | Best for |
|---|---|---|
jq | jaq | Complex transformations, array iteration, filtering. Familiar to operators. |
jsonpath | serde_json_path (RFC 9535) | Simple path queries. Fastest of the three. |
cel | cel-interpreter | Typed expressions with filtering and aggregation. Slower; use for small datasets. |
Plain-string extract: defaults to jq. Use the object form for explicit selection:
# Shorthand: jq
extract: ".indicators[].ip"
# Explicit jq
extract:
type: jq
expr: ".indicators[].ip"
# JSONPath
extract:
type: jsonpath
expr: "$.indicators[*].ip"
# CEL
extract:
type: cel
expr: "data.indicators.filter(i, i.severity > 7).map(i, i.ip)"
Refresh policies๐
refresh: controls how often the source re-fetches.
| Policy | Behaviour | Valid for |
|---|---|---|
once | Fetch at startup only. | All source types. |
<duration> (30s, 5m, 1h) | Re-fetch on a fixed interval. Minimum 1 s; values below clamp to 1 s with a WARN log. | All source types. |
watch | File-system change notification via notify. | file only. |
push | New value on each NATS message. | nats only. |
on_demand | Fetch at startup, then only when explicitly triggered (SIGHUP, POST /api/v1/sources/resolve, NATS control subject rsigma.control.resolve). | All source types. |
A <duration> refresh below MIN_REFRESH_INTERVAL (1 second) clamps silently with a runtime warning. Operators wishing to refresh more aggressively than that should use NATS push, on-demand triggers, or rethink the architecture.
Error policies๐
on_error: controls what happens when a fetch fails (network down, command exits non-zero, parse error, extract returns empty):
| Policy | Behaviour |
|---|---|
use_cached | Serve the last successfully fetched value. The default when the source has been resolved at least once. |
fail | For required: true (default): the pipeline load fails. For required: false: log and substitute null. |
use_default | Substitute the literal default: value declared inline. Requires default: to be set. |
The required flag interacts with on_error:
required: true+on_error: fail-> startup fails; the daemon exits.required: true+on_error: use_cached-> startup succeeds if a cached value exists from a prior run (with--state-db); fails otherwise.required: false+on_error: fail-> source resolves to null; pipeline continues.
Template substitution๐
The ${source.<id>} syntax expands ONLY in the vars: block. The expander does NOT substitute references inside typed transformation fields (e.g. add_condition.conditions.X). The supported pattern is to put the resolved value into a vars: entry and reference it from rules via the standard Sigma %name% placeholder, expanded by the value_placeholders transformation:
sources:
- id: ip_blocklist
type: http
url: โฆ
extract: ".ips"
vars:
blocklist: "${source.ip_blocklist}"
transformations:
- type: value_placeholders
Dot-path indexing into a nested structure works in vars::
Inline templates work too (${source.X} as part of a larger string), but they substitute the source's stringified representation, which is rarely what you want for array sources. Whole-value substitution (where ${source.X} is the entire vars: entry) is the safe form: it expands an array source to multiple vars entries that value_placeholders can map onto rule values. For scalar sources, inline templates compose cleanly:
Include directives๐
A source resolving to a JSON array of transformation objects can be inlined via include::
Constraints:
- The resolved value must be a JSON array of transformation objects, not a single object.
- Nested includes are rejected (
MAX_INCLUDE_DEPTH = 1). If an included fragment itself containsinclude:directives, expansion fails at startup with a clear error message. - Remote sources (HTTP, NATS) require
--allow-remote-includeon the daemon. The default policy restricts include resolution to local sources (file,command) to limit the blast radius of a compromised CDN or NATS broker.
Triggers and hot-reload๐
| Trigger | Re-resolves |
|---|---|
Filesystem change to a .yml/.yaml rules or pipeline file | Rules + pipelines + all dynamic sources. |
SIGHUP | Same as above. |
POST /api/v1/reload | Same as above. |
POST /api/v1/sources/resolve (no body) | All dynamic sources only; rules are not reloaded. |
POST /api/v1/sources/resolve with {"source_id":"..."} | One source. |
DELETE /api/v1/sources/cache/{source_id} | Invalidates the cache. The next read fetches fresh. Always returns 200 OK, even for nonexistent IDs. |
NATS message on rsigma.control.resolve | All dynamic sources only. |
| Interval timer | The single source whose refresh: interval just elapsed. |
The push policy (NATS) updates the source value continuously on each incoming message, without going through the reload pipeline.
Resource limits๐
Every dynamic source path enforces hard limits to bound resource consumption. Per-source overrides are noted in the table.
| Limit | Constant | Default | Per-source override |
|---|---|---|---|
| HTTP response body size | MAX_SOURCE_RESPONSE_BYTES | 10 MiB | max_body_size |
| Command stdout size | MAX_SOURCE_RESPONSE_BYTES | 10 MiB | max_stdout |
| Command stderr size | (hard-coded) | 64 KiB | not configurable |
| Command execution timeout | DEFAULT_COMMAND_TIMEOUT | 30 s | timeout |
| HTTP request timeout | (hard-coded default) | 30 s | timeout |
| Refresh interval minimum | MIN_REFRESH_INTERVAL | 1 s | not configurable (lower values clamp with a warning) |
| NATS message size cap | MAX_SOURCE_RESPONSE_BYTES | 10 MiB | not configurable |
| Include nesting depth | MAX_INCLUDE_DEPTH | 1 | not configurable |
| Remote include resolution | โ | off | --allow-remote-include daemon flag |
Exceeding any limit produces a SourceErrorKind::ResourceLimit failure with a descriptive message. See Security Hardening for the broader catalogue.
See also๐
- Processing Pipelines: dynamic pipelines for the narrative version.
pipeline resolvefor offline source testing.rule validate --resolve-sourcesfor the strict CI gate.- Prometheus metrics: dynamic pipeline sources for what every successful and failing resolve exposes.
- HTTP API: sources for the daemon control endpoints.
- Security Hardening for every other resource limit the runtime enforces.
rsigma_runtime::sourcessource for the implementation.