Skip to content

Commit c668557

Browse files
author
Saiprashanth Pulisetti
committed
feat(cyber_security): add multithreaded port_scanner with banner grabbing
1 parent 81233e3 commit c668557

1 file changed

Lines changed: 113 additions & 0 deletions

File tree

cyber_security/port_scanner.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env python3
2+
"""Multithreaded TCP port scanner with optional banner grabbing.
3+
4+
Usage:
5+
python -m cyber_security.port_scanner 192.168.1.10 --ports 1-1024 --threads 200 --timeout 0.5 --banner
6+
"""
7+
from __future__ import annotations
8+
9+
import argparse
10+
import concurrent.futures
11+
import socket
12+
import sys
13+
from dataclasses import dataclass
14+
from typing import Iterable, Tuple
15+
16+
17+
DEFAULT_TIMEOUT = 0.7
18+
DEFAULT_THREADS = 200
19+
20+
21+
@dataclass
22+
class ScanResult:
23+
host: str
24+
port: int
25+
is_open: bool
26+
banner: str | None
27+
28+
29+
def parse_ports(spec: str) -> Iterable[int]:
30+
parts = (p.strip() for p in spec.split(","))
31+
for part in parts:
32+
if not part:
33+
continue
34+
if "-" in part:
35+
start_s, end_s = part.split("-", 1)
36+
start = int(start_s)
37+
end = int(end_s)
38+
for p in range(max(1, start), min(65535, end) + 1):
39+
yield p
40+
else:
41+
yield int(part)
42+
43+
44+
def try_connect(host: str, port: int, timeout: float, banner: bool) -> ScanResult:
45+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46+
sock.settimeout(timeout)
47+
try:
48+
result = sock.connect_ex((host, port))
49+
if result != 0:
50+
return ScanResult(host, port, False, None)
51+
recv_banner: str | None = None
52+
if banner:
53+
try:
54+
sock.sendall(b"\r\n")
55+
data = sock.recv(1024)
56+
if data:
57+
recv_banner = data.decode("utf-8", errors="replace").strip()
58+
except Exception:
59+
recv_banner = None
60+
return ScanResult(host, port, True, recv_banner)
61+
except Exception:
62+
return ScanResult(host, port, False, None)
63+
finally:
64+
try:
65+
sock.close()
66+
except Exception:
67+
pass
68+
69+
70+
def parse_args(argv: list[str]) -> argparse.Namespace:
71+
parser = argparse.ArgumentParser(description="TCP port scanner with multi-threading and banner grab.")
72+
parser.add_argument("host", help="Target IPv4/hostname")
73+
parser.add_argument("--ports", default="1-1024", help="Port spec, e.g., 1-1024,22,80,443")
74+
parser.add_argument("--threads", type=int, default=DEFAULT_THREADS, help="Max worker threads")
75+
parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help="Socket timeout in seconds")
76+
parser.add_argument("--banner", action="store_true", help="Attempt to grab service banner")
77+
return parser.parse_args(argv)
78+
79+
80+
def main(argv: list[str] | None = None) -> int:
81+
args = parse_args(sys.argv[1:] if argv is None else argv)
82+
83+
# Resolve host once
84+
try:
85+
host = socket.gethostbyname(args.host)
86+
except socket.gaierror:
87+
print(f"Unable to resolve host: {args.host}", file=sys.stderr)
88+
return 2
89+
90+
ports = list(parse_ports(args.ports))
91+
if not ports:
92+
print("No ports to scan.", file=sys.stderr)
93+
return 2
94+
95+
tasks: list[Tuple[str, int]] = [(host, p) for p in ports]
96+
97+
with concurrent.futures.ThreadPoolExecutor(max_workers=max(1, args.threads)) as executor:
98+
future_to_port = {
99+
executor.submit(try_connect, host, port, args.timeout, args.banner): port for _, port in tasks
100+
}
101+
for future in concurrent.futures.as_completed(future_to_port):
102+
res = future.result()
103+
if res.is_open:
104+
line = f"{res.host}:{res.port} open"
105+
if args.banner and res.banner:
106+
line += f" | {res.banner}"
107+
print(line)
108+
109+
return 0
110+
111+
112+
if __name__ == "__main__": # pragma: no cover
113+
raise SystemExit(main())

0 commit comments

Comments
 (0)