|
17 | 17 | from marshmallow import ValidationError |
18 | 18 | from semver import Version |
19 | 19 |
|
20 | | -from detection_rules import atlas, attack |
| 20 | +from detection_rules import atlas, attack, ecs |
21 | 21 | from detection_rules.config import load_current_package_version |
22 | 22 | from detection_rules.integrations import ( |
23 | 23 | find_latest_compatible_version, |
|
34 | 34 | TOMLRuleContents, |
35 | 35 | ) |
36 | 36 | from detection_rules.rule_loader import FILE_PATTERN, RULES_CONFIG |
| 37 | +from detection_rules.rule_validators import ESQLValidator |
37 | 38 | from detection_rules.schemas import definitions, get_min_supported_stack_version, get_stack_schemas |
38 | 39 | from detection_rules.utils import INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump, make_git |
39 | 40 | from detection_rules.version_lock import loaded_version_lock |
|
43 | 44 | PACKAGE_STACK_VERSION = Version.parse(current_stack_version(), optional_minor_and_patch=True) |
44 | 45 |
|
45 | 46 |
|
| 47 | +def _strip_query_literals(query: str) -> str: |
| 48 | + """Strip string literals and line comments from a query body. |
| 49 | +
|
| 50 | + Used when scanning ES|QL / KQL / EQL queries for field references via regex, |
| 51 | + so matches inside string literals (e.g. KQL filters embedded as |
| 52 | + KQL(\"\"\"...\"\"\")) and inside `// comments` don't false-positive. |
| 53 | + Triple-quoted, double-quoted, and `//` comments cover the patterns used by |
| 54 | + rules in this repo. |
| 55 | + """ |
| 56 | + query = re.sub(r'"""[\s\S]*?"""', "", query) |
| 57 | + query = re.sub(r'"[^"\n]*"', "", query) |
| 58 | + query = re.sub(r"//[^\n]*", "", query) |
| 59 | + return query |
| 60 | + |
| 61 | + |
46 | 62 | class TestValidRules(BaseRuleTest): |
47 | 63 | """Test that all detection rules load properly without duplicates.""" |
48 | 64 |
|
@@ -200,6 +216,57 @@ def test_index_or_data_view_id_present(self): |
200 | 216 | """ |
201 | 217 | self.fail(fail_msg + "\n".join(failures)) |
202 | 218 |
|
| 219 | + def test_alerts_only_rules_no_constant_keyword_fields(self): |
| 220 | + """Query rules targeting only .alerts-* indices must not reference constant_keyword ECS fields. |
| 221 | +
|
| 222 | + Kibana's .alerts-ecs-mappings system component template excludes |
| 223 | + constant_keyword ECS fields by design (a constant_keyword can hold only one |
| 224 | + value per index, which is incompatible with alerts indices that aggregate |
| 225 | + signals from many data streams). Querying such a field on alerts-only rules |
| 226 | + fails at runtime with "Unknown column" / "Unknown field" — use event.dataset |
| 227 | + instead of data_stream.dataset, and similar substitutions. |
| 228 | +
|
| 229 | + The static AST validators don't catch this: they merge ECS into every |
| 230 | + validation target's schema (rule_validators.py:202-210), so any field |
| 231 | + defined in ECS validates as "known" regardless of whether it's actually |
| 232 | + mapped on .alerts-*. This test fills that gap. |
| 233 | +
|
| 234 | + Multi-index rules (e.g. ".alerts-security.*" + "logs-okta.system-*") are |
| 235 | + not flagged — the field resolves from the integration mapping side at |
| 236 | + runtime. Machine-learning rules have no query and are excluded by the |
| 237 | + QueryRuleData filter. |
| 238 | + """ |
| 239 | + constant_keyword_fields = sorted( |
| 240 | + f for f, info in ecs.get_schema().items() if info.get("type") == "constant_keyword" |
| 241 | + ) |
| 242 | + failures: list[str] = [] |
| 243 | + for rule in self.all_rules: |
| 244 | + data = rule.contents.data |
| 245 | + if not isinstance(data, QueryRuleData): |
| 246 | + continue |
| 247 | + query = data.get("query") or "" |
| 248 | + if not query: |
| 249 | + continue |
| 250 | + # ES|QL targets indices via the FROM clause; everything else uses the |
| 251 | + # rule's index field. |
| 252 | + if data.get("language") == "esql": |
| 253 | + _, sources = ESQLValidator.get_esql_query_indices(query) |
| 254 | + else: |
| 255 | + sources = list(data.index_or_dataview or []) |
| 256 | + if not sources or not all(s.startswith(".alerts-") for s in sources): |
| 257 | + continue |
| 258 | + body = _strip_query_literals(query) |
| 259 | + offenders = [f for f in constant_keyword_fields if re.search(rf"\b{re.escape(f)}\b", body)] |
| 260 | + if offenders: |
| 261 | + failures.append(f"{self.rule_str(rule)} references {offenders} on {sources}") |
| 262 | + |
| 263 | + if failures: |
| 264 | + self.fail( |
| 265 | + "Rules targeting only .alerts-* indices must not reference constant_keyword " |
| 266 | + "ECS fields (e.g. use event.dataset instead of data_stream.dataset):\n" |
| 267 | + + "\n".join(failures) |
| 268 | + ) |
| 269 | + |
203 | 270 |
|
204 | 271 | class TestThreatMappings(BaseRuleTest): |
205 | 272 | """Test threat mapping data for rules.""" |
|
0 commit comments