Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
# ATOMIC_RED_TEAM_REPO=
# Atomic tests directory (e.g. atomics/)
# ATOMIC_TESTS_PATH=
# Platform (e.g. linux , default = windows)
# PLATFORM=linux


# --- Splunk Connection ---
# Splunk host (hostname or IP)
Expand Down Expand Up @@ -61,6 +64,10 @@ ATTACK_TIDS=T1059.001,T1087.001,T1003.001
# VM_PASSWORD=
# Safe directory on VM for Atomic Red Team (e.g. C:\AtomicRedTeam)
# VM_SAFE_DIR=
# VM SSH port (default 22, e.g. 2222 for VirtualBox NAT)
# VM_SSH_PORT=22
# Path to SSH private key (optional; if not set, password auth is used)
# VM_SSH_KEY_PATH=

# --- Atomic Red Team Paths (Windows VM; optional) ---
# Path to Invoke-AtomicRedTeam.psd1 on the VM
Expand All @@ -69,6 +76,8 @@ ATTACK_TIDS=T1059.001,T1087.001,T1003.001
# ATOMIC_ATOMICS_PATH=C:\AtomicRedTeam\atomics

# --- Proxmox (optional; for snapshot-based lab VMs) ---
# Set to false if not using Proxmox (e.g. VirtualBox, VMware)
# USE_PROXMOX=true
# Proxmox host
# PROXMOX_HOST=
# Proxmox user (default root)
Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ data/repos/
__pycache__/
*.pyc
# Logs
*.log
*.log
# Claude Code memory
.claude/
66 changes: 66 additions & 0 deletions add_splunk_macros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Splunk ESCU Filter Macro Installer

ESCU (Enterprise Security Content Updates) detection rules reference filter macros
(e.g. `linux_auditd_add_user_account_type_filter`) that must exist in Splunk's
macros.conf for the searches to run without errors. By default, these macros are
not defined, causing "macro not found" errors during detection verification.

This script:
1. Scans all ESCU detection rules for `*_filter` macro references
2. Checks which ones are already defined in macros.conf
3. Adds missing macros with a passthrough definition (`search *`)

Usage:
python add_splunk_macros.py

Note: Requires Splunk to be installed locally. Update SPLUNK_MACROS_CONF path
if your Splunk installation is in a different location. Restart Splunk after running.
"""

import re
import os
import glob

DETECTIONS_PATH = os.path.join(os.path.dirname(__file__), "data", "repos",
"security_content", "detections", "endpoint")

SPLUNK_MACROS_CONF = r"C:\Program Files\Splunk\etc\system\local\macros.conf"

def find_filter_macros():
macros = set()
pattern = re.compile(r'`(\w+_filter)`')
for yml_file in glob.glob(os.path.join(DETECTIONS_PATH, "*.yml")):
with open(yml_file, "r", encoding="utf-8") as f:
for match in pattern.finditer(f.read()):
macros.add(match.group(1))
return sorted(macros)

def read_existing_macros():
existing = set()
if os.path.exists(SPLUNK_MACROS_CONF):
with open(SPLUNK_MACROS_CONF, "r", encoding="utf-8") as f:
for line in f:
m = re.match(r'\[(\w+)\]', line.strip())
if m:
existing.add(m.group(1))
return existing

def main():
macros = find_filter_macros()
existing = read_existing_macros()
new_macros = [m for m in macros if m not in existing]
print(f"Found {len(macros)} filter macros in ESCU rules")
print(f"Already defined: {len(existing)}")
print(f"New to add: {len(new_macros)}")
if not new_macros:
print("Nothing to add!")
return
with open(SPLUNK_MACROS_CONF, "a", encoding="utf-8") as f:
for macro_name in new_macros:
f.write(f"\n[{macro_name}]\ndefinition = search *\n")
print(f"Added {len(new_macros)} macros to {SPLUNK_MACROS_CONF}")
print("Restart Splunk for changes to take effect.")

if __name__ == "__main__":
main()
25 changes: 20 additions & 5 deletions automation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
dotenv_path = os.path.join(PROJECT_ROOT, '.env')
load_dotenv(dotenv_path=dotenv_path)

PLATFORM = os.getenv("PLATFORM", "windows").lower()



# Repo base: default data/repos (managed by RepoManager); override via REPOS_BASE_PATH in .env
DEPENDENCIES_PATH = os.path.join(PROJECT_ROOT, 'dependencies')
REPOS_BASE_PATH = os.getenv("REPOS_BASE_PATH", os.path.join(PROJECT_ROOT, "data", "repos"))
Expand Down Expand Up @@ -50,11 +54,18 @@ def _as_bool(val: str | None, default: bool = False) -> bool:
VM_USERNAME = os.getenv("VM_USERNAME")
VM_PASSWORD = os.getenv("VM_PASSWORD")
VM_SAFE_DIR = os.getenv("VM_SAFE_DIR")
VM_SSH_PORT = int(os.getenv("VM_SSH_PORT", "22"))
VM_SSH_KEY_PATH = os.getenv("VM_SSH_KEY_PATH")

ATOMIC_MODULE_PATH = os.getenv("ATOMIC_MODULE_PATH", r"C:\AtomicRedTeam\invoke-atomicredteam\Invoke-AtomicRedTeam.psd1")
ATOMIC_ATOMICS_PATH = os.getenv("ATOMIC_ATOMICS_PATH", r"C:\AtomicRedTeam\atomics")
if PLATFORM =="windows":
ATOMIC_MODULE_PATH = os.getenv("ATOMIC_MODULE_PATH", r"C:\AtomicRedTeam\invoke-atomicredteam\Invoke-AtomicRedTeam.psd1")
ATOMIC_ATOMICS_PATH = os.getenv("ATOMIC_ATOMICS_PATH", r"C:\AtomicRedTeam\atomics")
else:
ATOMIC_MODULE_PATH = None
ATOMIC_ATOMICS_PATH = os.getenv("ATOMIC_ATOMICS_PATH", os.path.join(PROJECT_ROOT, "data", "repos", "atomic-red-team", "atomics"))

# --- Proxmox settings (from .env) ---
USE_PROXMOX = _as_bool(os.getenv("USE_PROXMOX"), True)
PROXMOX_HOST = os.getenv("PROXMOX_HOST")
PROXMOX_USER = os.getenv("PROXMOX_USER", "root")
PROXMOX_PASSWORD = os.getenv("PROXMOX_PASSWORD")
Expand All @@ -69,7 +80,7 @@ def _as_bool(val: str | None, default: bool = False) -> bool:
# Time padding around execution window when querying Splunk (seconds)
SPLUNK_TIME_PAD_SECONDS = int(os.getenv("SPLUNK_TIME_PAD_SECONDS", "300"))
# Post-test wait (seconds) before powering off VM to allow UF to forward events
POST_EXEC_FORWARD_WAIT_SECONDS = int(os.getenv("POST_EXEC_FORWARD_WAIT_SECONDS", "30"))
POST_EXEC_FORWARD_WAIT_SECONDS = int(os.getenv("POST_EXEC_FORWARD_WAIT_SECONDS", "100"))

# --- Per-test verification settings ---
PER_TEST_VERIFICATION = _as_bool(os.getenv("PER_TEST_VERIFICATION"), False)
Expand All @@ -86,11 +97,15 @@ def _as_bool(val: str | None, default: bool = False) -> bool:
# --- VM command execution timeout (seconds) ---
VM_COMMAND_TIMEOUT_SECONDS = int(os.getenv("VM_COMMAND_TIMEOUT_SECONDS", "600"))

ATTACK_TIDS_DEFAULT = "T1059.001,T1087.001,T1003.001"
if PLATFORM =="windows":
ATTACK_TIDS_DEFAULT = "T1059.001,T1087.001,T1003.001"
else:
ATTACK_TIDS_DEFAULT = "T1059.004,T1087.001,T1222.002"

ATTACK_LIST = [t.strip().upper() for t in os.getenv("ATTACK_TIDS", ATTACK_TIDS_DEFAULT).split(",") if t.strip()]

# --- Output paths ---
# Main report: dist/ for AJAX loading by index.html
REPORT_JSON_PATH = os.path.join(PROJECT_ROOT, "dist", "attack_rule_map.json")
REPORT_JSON_PATH = os.path.join(PROJECT_ROOT, "dist", f"attack_rule_map_{PLATFORM}.json")
# dist/ for MITRE layer and HTML (keeps root clean)
DIST_PATH = os.path.join(PROJECT_ROOT, "dist")
20 changes: 17 additions & 3 deletions automation/dynamic_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
"ScriptBlockText=": "Message=",
"ScriptBlockText IN": "Message IN",
"field=ScriptBlockText": "field=Message",
# Linux (auditd)
"exe=": "process_exec=",
}

# GitHub raw URLs for rule links (master branch)
Expand All @@ -56,12 +58,15 @@ def _apply_cim_mapping(spl: str) -> str:
"""Apply CIM-compliant field name replacements for Sigma->Splunk compatibility."""
if not spl or not isinstance(spl, str):
return spl
if config.PLATFORM == "linux":
return spl
result = spl
for old, new in sorted(CIM_MAPPING.items(), key=lambda x: -len(x[0])):
result = result.replace(old, new)
return result



def _normalize_sigma_spl_for_splunk(query: str) -> str:
"""
pySigma çıktısını Splunk için normalize eder.
Expand Down Expand Up @@ -141,6 +146,12 @@ def collect_for_technique(self, technique_id: str) -> tuple[list, list]:
doc = utils.load_yaml_file(fp)
if not isinstance(doc, dict) or "detection" not in doc or "title" not in doc:
continue

if config.PLATFORM == "linux":
product = doc.get("logsource", {}).get("product", "")
if product != "linux":
continue

tags = doc.get("tags") or []
if not isinstance(tags, list):
continue
Expand Down Expand Up @@ -209,7 +220,10 @@ def run_attack(technique_id: str, test_number: int = 1) -> tuple[bool, float, fl
logging.warning("VM not ready for %s", technique_id)
return False, 0.0, 0.0
start_time = time.time()
ok = execution_handler.run_invoke_atomic_test(technique_id, test_number)
if config.PLATFORM == "windows":
ok = execution_handler.run_invoke_atomic_test(technique_id, test_number)
else:
ok = execution_handler.run_bash_atomic_test(technique_id, test_number)
if not ok:
vm_handler.stop_vm()
return False, start_time, time.time()
Expand Down Expand Up @@ -328,9 +342,9 @@ def run(self) -> list:

for technique_id in self.technique_ids:
tid = technique_id.upper()
tests = atomic_parser.get_tests_for_technique(tid, platform_filter="windows")
tests = atomic_parser.get_tests_for_technique(tid, platform_filter=config.PLATFORM)
if not tests:
logging.info("========== Technique %s (no Windows tests) ==========", tid)
logging.info(f"========== Technique {tid} (no {config.PLATFORM} tests) ==========")
continue

sigma_spl_list, escu_spl_list = self.rule_mapper.collect_for_technique(tid)
Expand Down
Loading