Skip to content

Commit efcef14

Browse files
author
Saiprashanth Pulisetti
committed
feat(cyber_security): add file_integrity_monitor and url_analyzer; add idna dep
1 parent c668557 commit efcef14

3 files changed

Lines changed: 259 additions & 0 deletions

File tree

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#!/usr/bin/env python3
2+
"""File Integrity Monitor (FIM).
3+
4+
Two modes:
5+
- init: hash files and create baseline JSON
6+
- verify: re-hash and report modifications/additions/deletions
7+
8+
Usage:
9+
python -m cyber_security.file_integrity_monitor init --root /path --output baseline.json --glob "**/*.py"
10+
python -m cyber_security.file_integrity_monitor verify --root /path --baseline baseline.json
11+
"""
12+
from __future__ import annotations
13+
14+
import argparse
15+
import fnmatch
16+
import hashlib
17+
import json
18+
import os
19+
import sys
20+
from dataclasses import dataclass
21+
from typing import Dict, Iterable
22+
23+
24+
@dataclass
25+
class FileHash:
26+
path: str
27+
sha256: str
28+
29+
30+
def iter_files(root: str, pattern: str | None) -> Iterable[str]:
31+
for base, _dirs, files in os.walk(root):
32+
for name in files:
33+
rel = os.path.relpath(os.path.join(base, name), root)
34+
if pattern is None or fnmatch.fnmatch(rel, pattern):
35+
yield rel
36+
37+
38+
def hash_file(root: str, rel_path: str) -> str:
39+
h = hashlib.sha256()
40+
abs_path = os.path.join(root, rel_path)
41+
with open(abs_path, "rb") as f:
42+
for chunk in iter(lambda: f.read(8192), b""):
43+
h.update(chunk)
44+
return h.hexdigest()
45+
46+
47+
def build_baseline(root: str, pattern: str | None) -> Dict[str, str]:
48+
result: Dict[str, str] = {}
49+
for rel in iter_files(root, pattern):
50+
try:
51+
result[rel] = hash_file(root, rel)
52+
except (PermissionError, FileNotFoundError):
53+
continue
54+
return result
55+
56+
57+
def write_json(path: str, data: Dict[str, str]) -> None:
58+
with open(path, "w", encoding="utf-8") as f:
59+
json.dump(data, f, indent=2, sort_keys=True)
60+
61+
62+
def read_json(path: str) -> Dict[str, str]:
63+
with open(path, "r", encoding="utf-8") as f:
64+
return json.load(f)
65+
66+
67+
def cmd_init(args: argparse.Namespace) -> int:
68+
baseline = build_baseline(args.root, args.glob)
69+
write_json(args.output, baseline)
70+
print(f"Baseline written: {args.output} ({len(baseline)} files)")
71+
return 0
72+
73+
74+
def cmd_verify(args: argparse.Namespace) -> int:
75+
prior = read_json(args.baseline)
76+
current = build_baseline(args.root, None)
77+
78+
added = sorted(set(current) - set(prior))
79+
removed = sorted(set(prior) - set(current))
80+
modified = sorted([p for p in set(current) & set(prior) if current[p] != prior[p]])
81+
82+
if added:
83+
print("Added:")
84+
for p in added:
85+
print(f" + {p}")
86+
if removed:
87+
print("Removed:")
88+
for p in removed:
89+
print(f" - {p}")
90+
if modified:
91+
print("Modified:")
92+
for p in modified:
93+
print(f" * {p}")
94+
95+
if not (added or removed or modified):
96+
print("No changes detected.")
97+
return 0
98+
99+
return 1
100+
101+
102+
def parse_args(argv: list[str]) -> argparse.Namespace:
103+
parser = argparse.ArgumentParser(description="File Integrity Monitor (FIM)")
104+
sub = parser.add_subparsers(dest="command", required=True)
105+
106+
p_init = sub.add_parser("init", help="Create baseline JSON of file hashes")
107+
p_init.add_argument("--root", required=True, help="Root directory to scan")
108+
p_init.add_argument("--output", required=True, help="Path to baseline JSON output")
109+
p_init.add_argument("--glob", help="Glob-like pattern relative to root (e.g., **/*.py)")
110+
p_init.set_defaults(func=cmd_init)
111+
112+
p_ver = sub.add_parser("verify", help="Verify current state against baseline JSON")
113+
p_ver.add_argument("--root", required=True, help="Root directory to scan")
114+
p_ver.add_argument("--baseline", required=True, help="Path to baseline JSON")
115+
p_ver.set_defaults(func=cmd_verify)
116+
117+
return parser.parse_args(argv)
118+
119+
120+
def main(argv: list[str] | None = None) -> int:
121+
args = parse_args(sys.argv[1:] if argv is None else argv)
122+
return int(args.func(args))
123+
124+
125+
if __name__ == "__main__": # pragma: no cover
126+
raise SystemExit(main())

cyber_security/url_analyzer.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
#!/usr/bin/env python3
2+
"""URL Analyzer: heuristic checks for phishing and IDN homograph risks.
3+
4+
Usage:
5+
python -m cyber_security.url_analyzer https://example.com
6+
"""
7+
from __future__ import annotations
8+
9+
import argparse
10+
import idna # type: ignore
11+
import re
12+
import sys
13+
from urllib.parse import urlparse
14+
15+
16+
SUSPICIOUS_TLDS = {
17+
"zip",
18+
"mov",
19+
"top",
20+
"xyz",
21+
}
22+
23+
24+
def is_punycode(label: str) -> bool:
25+
return label.lower().startswith("xn--")
26+
27+
28+
def has_mixed_scripts(label: str) -> bool:
29+
# crude mixed script detection: Latin + Cyrillic/Greek
30+
latin = re.search(r"[A-Za-z]", label)
31+
cyrillic = re.search(r"[\u0400-\u04FF]", label)
32+
greek = re.search(r"[\u0370-\u03FF]", label)
33+
scripts = sum(bool(x) for x in (latin, cyrillic, greek))
34+
return scripts >= 2
35+
36+
37+
def looks_like_homograph(label: str) -> bool:
38+
# very rough: replace common confusables and see if it becomes a known brand
39+
confusable_map = {
40+
"0": "o",
41+
"1": "l",
42+
"3": "e",
43+
"5": "s",
44+
"@": "a",
45+
"$": "s",
46+
"!": "i",
47+
"і": "i", # Cyrillic i
48+
"е": "e", # Cyrillic e
49+
"о": "o", # Cyrillic o
50+
}
51+
brands = {"google", "apple", "microsoft", "paypal", "facebook", "amazon"}
52+
norm = label.lower()
53+
for k, v in confusable_map.items():
54+
norm = norm.replace(k, v)
55+
return any(b in norm for b in brands)
56+
57+
58+
def analyze(url: str) -> list[str]:
59+
issues: list[str] = []
60+
try:
61+
parsed = urlparse(url)
62+
except Exception:
63+
return ["Invalid URL format."]
64+
65+
if parsed.scheme not in {"http", "https"}:
66+
issues.append("Non-HTTP(S) scheme.")
67+
68+
hostname = parsed.hostname or ""
69+
if not hostname:
70+
issues.append("Missing hostname.")
71+
return issues
72+
73+
labels = hostname.split(".")
74+
decoded_labels = []
75+
for label in labels:
76+
if is_punycode(label):
77+
issues.append("Punycode present (possible IDN homograph).")
78+
try:
79+
decoded_labels.append(idna.decode(label))
80+
except Exception:
81+
decoded_labels.append(label)
82+
else:
83+
decoded_labels.append(label)
84+
85+
decoded_host = ".".join(decoded_labels)
86+
87+
for label in decoded_labels:
88+
if has_mixed_scripts(label):
89+
issues.append("Mixed Unicode scripts in hostname label.")
90+
if looks_like_homograph(label):
91+
issues.append("Label resembles well-known brand (possible homograph).")
92+
93+
tld = labels[-1].lower() if labels else ""
94+
if tld in SUSPICIOUS_TLDS:
95+
issues.append(f"Suspicious TLD: .{tld}")
96+
97+
# URL path query heuristics
98+
path_query = (parsed.path or "") + ("?" + parsed.query if parsed.query else "")
99+
if re.search(r"(?i)login|verify|update|secure|account", path_query):
100+
issues.append("Phishing related keyword in path/query.")
101+
if re.search(r"@", parsed.netloc):
102+
issues.append("@ in netloc may hide true host (userinfo trick).")
103+
104+
return issues
105+
106+
107+
def parse_args(argv: list[str]) -> argparse.Namespace:
108+
parser = argparse.ArgumentParser(description="Analyze a URL for phishing/IDN risks.")
109+
parser.add_argument("url", help="URL to analyze")
110+
parser.add_argument("--quiet", action="store_true", help="Exit code only: 0 safe-ish, 1 suspicious")
111+
return parser.parse_args(argv)
112+
113+
114+
def main(argv: list[str] | None = None) -> int:
115+
args = parse_args(sys.argv[1:] if argv is None else argv)
116+
issues = analyze(args.url)
117+
118+
if args.quiet:
119+
return 1 if issues else 0
120+
121+
if issues:
122+
print("Suspicious indicators:")
123+
for item in issues:
124+
print(f" - {item}")
125+
return 1
126+
127+
print("No obvious issues detected.")
128+
return 0
129+
130+
131+
if __name__ == "__main__": # pragma: no cover
132+
raise SystemExit(main())

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ sympy
1717
tweepy
1818
typing_extensions
1919
xgboost
20+
idna

0 commit comments

Comments
 (0)