Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion detection_rules/rule_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ def unique_fields(self) -> list[str]: # type: ignore[reportIncompatibleMethodOv
return [field["name"] for field in self.esql_unique_fields]
return []

def get_esql_query_indices(self, query: str) -> tuple[str, list[str]]:
@staticmethod
def get_esql_query_indices(query: str) -> tuple[str, list[str]]:
"""Extract indices from an ES|QL query."""
match = FROM_SOURCES_REGEX.search(query)
if not match:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.27"
version = "1.6.28"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
68 changes: 67 additions & 1 deletion tests/test_all_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from marshmallow import ValidationError
from semver import Version

from detection_rules import atlas, attack
from detection_rules import atlas, attack, ecs
from detection_rules.config import load_current_package_version
from detection_rules.integrations import (
find_latest_compatible_version,
Expand All @@ -34,6 +34,7 @@
TOMLRuleContents,
)
from detection_rules.rule_loader import FILE_PATTERN, RULES_CONFIG
from detection_rules.rule_validators import ESQLValidator
from detection_rules.schemas import definitions, get_min_supported_stack_version, get_stack_schemas
from detection_rules.utils import INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump, make_git
from detection_rules.version_lock import loaded_version_lock
Expand All @@ -43,6 +44,20 @@
PACKAGE_STACK_VERSION = Version.parse(current_stack_version(), optional_minor_and_patch=True)


def _strip_query_literals(query: str) -> str:
"""Strip string literals and line comments from a query body.

Used when scanning ES|QL / KQL / EQL queries for field references via regex,
so matches inside string literals (e.g. KQL filters embedded as
KQL(\"\"\"...\"\"\")) and inside `// comments` don't false-positive.
Triple-quoted, double-quoted, and `//` comments cover the patterns used by
rules in this repo.
"""
query = re.sub(r'"""[\s\S]*?"""', "", query)
query = re.sub(r'"[^"\n]*"', "", query)
return re.sub(r"//[^\n]*", "", query)


class TestValidRules(BaseRuleTest):
"""Test that all detection rules load properly without duplicates."""

Expand Down Expand Up @@ -200,6 +215,57 @@ def test_index_or_data_view_id_present(self):
"""
self.fail(fail_msg + "\n".join(failures))

def test_alerts_only_rules_no_constant_keyword_fields(self):
"""Query rules targeting only .alerts-* indices must not reference constant_keyword ECS fields.

Kibana's .alerts-ecs-mappings system component template excludes
constant_keyword ECS fields by design (a constant_keyword can hold only one
value per index, which is incompatible with alerts indices that aggregate
signals from many data streams). Querying such a field on alerts-only rules
fails at runtime with "Unknown column" / "Unknown field" — use event.dataset
instead of data_stream.dataset, and similar substitutions.

The static AST validators don't catch this: they merge ECS into every
validation target's schema (rule_validators.py:202-210), so any field
defined in ECS validates as "known" regardless of whether it's actually
mapped on .alerts-*. This test fills that gap.

Multi-index rules (e.g. ".alerts-security.*" + "logs-okta.system-*") are
not flagged — the field resolves from the integration mapping side at
runtime. Machine-learning rules have no query and are excluded by the
QueryRuleData filter.
"""
constant_keyword_fields = sorted(
f for f, info in ecs.get_schema().items() if info.get("type") == "constant_keyword"
)
failures: list[str] = []
for rule in self.all_rules:
data = rule.contents.data
if not isinstance(data, QueryRuleData):
continue
query = data.get("query") or ""
if not query:
continue
# ES|QL targets indices via the FROM clause; everything else uses the
# rule's index field.
if data.get("language") == "esql":
_, sources = ESQLValidator.get_esql_query_indices(query)
else:
sources = list(data.index_or_dataview or [])
if not sources or not all(s.startswith(".alerts-") for s in sources):
continue
body = _strip_query_literals(query)
offenders = [f for f in constant_keyword_fields if re.search(rf"\b{re.escape(f)}\b", body)]
if offenders:
failures.append(f"{self.rule_str(rule)} references {offenders} on {sources}")

if failures:
self.fail(
"Rules targeting only .alerts-* indices must not reference constant_keyword "
"ECS fields (e.g. use event.dataset instead of data_stream.dataset):\n"
+ "\n".join(failures)
)


class TestThreatMappings(BaseRuleTest):
"""Test threat mapping data for rules."""
Expand Down
Loading