Skip to content

Commit 49feca7

Browse files
feat: 添加高并发流量模拟器,支持二层防御测试
1 parent 9aded25 commit 49feca7

1 file changed

Lines changed: 234 additions & 0 deletions

File tree

scripts/traffic_simulator.py

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
#!/usr/bin/env python3
2+
"""
3+
淘宝级别高并发流量模拟器
4+
实现二层防御架构:服务器默认防御(一层) + 本系统应用层防御(二层)
5+
"""
6+
7+
import asyncio
8+
import aiohttp
9+
import random
10+
import time
11+
import argparse
12+
from datetime import datetime
13+
from typing import Dict, Any
14+
15+
BASE_URL = "http://127.0.0.1:8000"
16+
API_ENDPOINT = "/api/v1/stats/logs"
17+
PROTECTION_STATE_ENDPOINT = "/api/v1/protection/state"
18+
19+
TRAFFIC_DISTRIBUTION = {"normal_user": 0.70, "crawler": 0.12, "bot": 0.08, "attacker": 0.10}
20+
ATTACK_TYPES = ["ddos", "sql_injection", "xss", "brute_force", "probe", "phishing", "botnet", "web_attack"]
21+
TAOBAO_URLS = ["/", "/search", "/item/detail", "/cart", "/order/confirm", "/pay", "/user/login", "/api/search"]
22+
NORMAL_UAS = ["Mozilla/5.0 Chrome/120.0.0.0", "Mozilla/5.0 iPhone Safari", "Mozilla/5.0 Mac Safari"]
23+
BOT_UAS = ["Googlebot/2.1", "python-requests/2.28", "scrapy/2.8", "", "Java/1.8"]
24+
25+
GEO = {"CN": (0.75, ["北京", "上海", "广州"]), "US": (0.10, ["New York", "LA"]), "XX": (0.15, ["Unknown"])}
26+
27+
28+
def gen_ip(geo="CN"):
29+
prefixes = {"CN": ["116.25", "180.101"], "US": ["8.8", "104.16"], "XX": ["185.220", "45.33"]}
30+
p = random.choice(prefixes.get(geo, prefixes["CN"]))
31+
return f"{p}.{random.randint(1,254)}.{random.randint(1,254)}"
32+
33+
34+
def select_geo():
35+
r, c = random.random(), 0
36+
for code, (w, cities) in GEO.items():
37+
c += w
38+
if r <= c: return code, random.choice(cities)
39+
return "CN", "北京"
40+
41+
42+
def generate_traffic(protection_active: bool, protection_level: str) -> Dict[str, Any]:
43+
"""
44+
二层防御逻辑:
45+
- 一层(服务器默认): 对明显恶意流量(critical级别)进行拦截
46+
- 二层(本系统): 仅在protection_active时启用,对中高风险进行二次筛选
47+
"""
48+
r, cumulative, traffic_type = random.random(), 0, "normal_user"
49+
for t, w in TRAFFIC_DISTRIBUTION.items():
50+
cumulative += w
51+
if r <= cumulative:
52+
traffic_type = t
53+
break
54+
55+
geo, city = select_geo()
56+
is_threat = traffic_type == "attacker" or (traffic_type == "bot" and random.random() < 0.6) or \
57+
(traffic_type == "crawler" and random.random() < 0.3)
58+
59+
data = {
60+
"source_ip": gen_ip("XX" if is_threat else geo),
61+
"source_port": random.randint(10000, 65535),
62+
"dest_ip": "10.0.0.1",
63+
"dest_port": random.choice([80, 443]),
64+
"protocol": "HTTPS",
65+
"method": random.choice(["GET", "POST"]),
66+
"url": random.choice(TAOBAO_URLS),
67+
"geo_country": "XX" if is_threat else geo,
68+
"geo_city": "Unknown" if is_threat else city,
69+
"processing_time_ms": random.uniform(5, 50),
70+
}
71+
72+
if is_threat:
73+
attack = random.choice(ATTACK_TYPES)
74+
risk_score = random.uniform(0.5, 1.0)
75+
data.update({
76+
"threat_type": attack,
77+
"risk_score": risk_score,
78+
"user_agent": random.choice(BOT_UAS),
79+
})
80+
81+
# 根据风险分数确定风险等级
82+
if risk_score >= 0.85:
83+
data["risk_level"] = "critical"
84+
elif risk_score >= 0.7:
85+
data["risk_level"] = "high"
86+
elif risk_score >= 0.5:
87+
data["risk_level"] = "medium"
88+
else:
89+
data["risk_level"] = "low"
90+
91+
# === 二层防御逻辑 ===
92+
# 一层(服务器默认防御): 只拦截critical级别的明显攻击
93+
if data["risk_level"] == "critical":
94+
data["action"] = "block"
95+
data["blocked_by"] = "server_default" # 服务器一层拦截
96+
elif protection_active:
97+
# 二层(本系统): 根据保护级别进行二次筛选
98+
if protection_level == "strict":
99+
data["action"] = "block"
100+
data["blocked_by"] = "second_layer"
101+
elif protection_level == "high" and data["risk_level"] in ["high", "medium"]:
102+
data["action"] = "block"
103+
data["blocked_by"] = "second_layer"
104+
elif protection_level == "medium" and data["risk_level"] == "high":
105+
data["action"] = "block"
106+
data["blocked_by"] = "second_layer"
107+
elif protection_level == "low":
108+
data["action"] = "log" # 低级只记录
109+
data["blocked_by"] = "none"
110+
else:
111+
data["action"] = random.choice(["challenge", "alert"])
112+
data["blocked_by"] = "none"
113+
else:
114+
# 保护未开启: 非critical的威胁只记录不拦截
115+
data["action"] = "log"
116+
data["blocked_by"] = "none"
117+
else:
118+
data.update({
119+
"threat_type": "benign",
120+
"risk_score": random.uniform(0.0, 0.15),
121+
"risk_level": "safe",
122+
"user_agent": random.choice(NORMAL_UAS),
123+
"action": "allow",
124+
"blocked_by": "none",
125+
})
126+
127+
return data
128+
129+
130+
class TrafficSimulator:
131+
"""二层防御架构流量模拟器"""
132+
133+
def __init__(self, base_url: str = BASE_URL, concurrency: int = 500):
134+
self.base_url = base_url
135+
self.concurrency = concurrency
136+
self.total_sent = 0
137+
self.success_count = 0
138+
self.start_time = None
139+
self.protection_active = False
140+
self.protection_level = "medium"
141+
# 统计
142+
self.server_blocked = 0 # 一层拦截
143+
self.second_layer_blocked = 0 # 二层拦截
144+
self.threats_logged = 0 # 仅记录的威胁
145+
146+
async def check_protection_state(self, session: aiohttp.ClientSession):
147+
try:
148+
async with session.get(f"{self.base_url}{PROTECTION_STATE_ENDPOINT}", timeout=aiohttp.ClientTimeout(total=5)) as resp:
149+
if resp.status == 200:
150+
data = await resp.json()
151+
self.protection_active = data.get("is_active", False)
152+
self.protection_level = data.get("level", "medium")
153+
except: pass
154+
155+
async def send_log(self, session: aiohttp.ClientSession, log_data: Dict) -> bool:
156+
try:
157+
# 统计拦截来源
158+
if log_data.get("action") == "block":
159+
if log_data.get("blocked_by") == "server_default":
160+
self.server_blocked += 1
161+
elif log_data.get("blocked_by") == "second_layer":
162+
self.second_layer_blocked += 1
163+
elif log_data.get("threat_type") != "benign" and log_data.get("action") == "log":
164+
self.threats_logged += 1
165+
166+
async with session.post(f"{self.base_url}{API_ENDPOINT}", json=log_data, timeout=aiohttp.ClientTimeout(total=10)) as resp:
167+
if resp.status == 200:
168+
self.success_count += 1
169+
return True
170+
return False
171+
except: return False
172+
173+
async def send_batch(self, session: aiohttp.ClientSession, batch_size: int):
174+
tasks = [self.send_log(session, generate_traffic(self.protection_active, self.protection_level)) for _ in range(batch_size)]
175+
await asyncio.gather(*tasks, return_exceptions=True)
176+
self.total_sent += batch_size
177+
178+
def print_progress(self, total_target: int):
179+
elapsed = time.time() - self.start_time
180+
rate = self.total_sent / elapsed if elapsed > 0 else 0
181+
pct = (self.total_sent / total_target) * 100
182+
status = f"二层防御开启({self.protection_level})" if self.protection_active else "仅一层防御"
183+
print(f"\r[{datetime.now().strftime('%H:%M:%S')}] {status} | 进度:{pct:.1f}% | 一层拦截:{self.server_blocked} | 二层拦截:{self.second_layer_blocked} | 仅记录:{self.threats_logged} | 速率:{rate:.0f}/s", end="", flush=True)
184+
185+
async def run(self, total_requests: int, batch_size: int = 100):
186+
print(f"\n{'='*70}")
187+
print(f" 淘宝级流量模拟器 - 二层防御架构")
188+
print(f" 目标: {total_requests:,} 请求 | 并发: {self.concurrency}")
189+
print(f"{'='*70}")
190+
print("\n防御架构说明:")
191+
print(" 一层(服务器默认): 拦截critical级别的明显攻击")
192+
print(" 二层(本系统): 开启后对中高风险进行二次筛选\n")
193+
194+
self.start_time = time.time()
195+
connector = aiohttp.TCPConnector(limit=self.concurrency)
196+
197+
async with aiohttp.ClientSession(connector=connector) as session:
198+
await self.check_protection_state(session)
199+
print(f"当前状态: {'二层防御已开启 - ' + self.protection_level if self.protection_active else '二层防御未开启(仅一层默认防御)'}\n")
200+
201+
batches = total_requests // batch_size
202+
check_interval = max(1, batches // 20)
203+
204+
for i in range(batches):
205+
await self.send_batch(session, batch_size)
206+
self.print_progress(total_requests)
207+
if i % check_interval == 0:
208+
await self.check_protection_state(session)
209+
210+
if total_requests % batch_size > 0:
211+
await self.send_batch(session, total_requests % batch_size)
212+
213+
elapsed = time.time() - self.start_time
214+
print(f"\n\n{'='*70}")
215+
print(f" 完成! 耗时:{elapsed:.1f}s | 速率:{self.total_sent/elapsed:.0f}/s")
216+
print(f" 一层拦截:{self.server_blocked} | 二层拦截:{self.second_layer_blocked} | 仅记录:{self.threats_logged}")
217+
print(f"{'='*70}\n")
218+
219+
220+
async def main():
221+
parser = argparse.ArgumentParser()
222+
parser.add_argument("-n", "--requests", type=int, default=1000000)
223+
parser.add_argument("-c", "--concurrency", type=int, default=500)
224+
parser.add_argument("-b", "--batch", type=int, default=100)
225+
parser.add_argument("--url", type=str, default=BASE_URL)
226+
args = parser.parse_args()
227+
await TrafficSimulator(args.url, args.concurrency).run(args.requests, args.batch)
228+
229+
230+
if __name__ == "__main__":
231+
print("\n启动流量模拟器 - 二层防御架构测试")
232+
print("仪表盘: http://127.0.0.1:8000/dashboard")
233+
print("一键保护: http://127.0.0.1:8000/protection\n")
234+
asyncio.run(main())

0 commit comments

Comments
 (0)