Feature/anti dos and metricsImplement Stage 8 (FEC Deduplication) and Stage 9 (Keepalive Feedback) + Security Improvements#11
Conversation
…raceful shutdown in srtla_rec
…irectional keepalive feedback)
…up counter for metrics, feedback debug log - Fix rate_limiter.h: check count BEFORE increment (was allowing limit+1 through) - Fix rate_limiter.h: remove exact count from warn log (info leak to attackers) - Fix stream_id_validator.h: replace O(n) vector scan with O(1) unordered_set - Move DEDUP_CACHE_SIZE to receiver_config.h with other tunable constants - Add dedup_count_ counter to ConnectionGroup for observability - Add dedup_packets_discarded field to metrics JSON output - Add debug log showing keepalive feedback values (weight/errors/status)
|
Warning Review limit reached
More reviews will be available in 42 minutes and 35 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughThis PR implements a comprehensive anti-DoS layer with stream ID validation, IP-based rate limiting, duplicate packet detection, adaptive connection timeouts, and JSON metrics export. It adds signal-driven hot-reload of stream ID allowlists and graceful shutdown signaling. ChangesAnti-DoS and Stream ID Validation with Metrics Reporting
Sequence DiagramsequenceDiagram
participant Client
participant SRTLAHandler
participant RateLimiter
participant StreamIdValidator
participant ConnectionGroup
Client->>SRTLAHandler: REG or data packet
alt is registration
SRTLAHandler->>RateLimiter: allow(ip, now)
RateLimiter-->>SRTLAHandler: pass/fail
SRTLAHandler->>SRTLAHandler: count_pending_groups()
SRTLAHandler-->>Client: error if over limit
else is data (unauthenticated group)
SRTLAHandler->>StreamIdValidator: try_authenticate_group()
StreamIdValidator->>StreamIdValidator: extract_stream_id(buf)
StreamIdValidator->>ConnectionGroup: set_authenticated(true)
end
alt SRT packet with sequence number
SRTLAHandler->>ConnectionGroup: is_duplicate_srt_packet(sn)
ConnectionGroup-->>SRTLAHandler: duplicate?
alt duplicate
SRTLAHandler-->>Client: discard
else not duplicate
SRTLAHandler->>SRTLAHandler: register_packet()
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
src/receiver_main.cpp (1)
288-295: 💤 Low valuePending group count duplicates
SRTLAHandler::count_pending_groups().The pending group counting logic is duplicated here and in
SRTLAHandler::count_pending_groups(). Consider reusing the handler method for consistency.♻️ Proposed refactor to reuse existing method
// Metrics: Write JSON metrics file if (metrics_writer) { - std::size_t pending = 0; - for (const auto &g : registry.groups()) { - if (!g->is_authenticated()) pending++; - } + std::size_t pending = srtla_handler.count_pending_groups(); metrics_writer->write(registry, srtla_handler.rate_limiter(), pending, ts); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/receiver_main.cpp` around lines 288 - 295, The pending group counting loop duplicates SRTLAHandler::count_pending_groups(); replace the manual loop that computes `pending` over `registry.groups()` with a call to `srtla_handler.count_pending_groups()` and pass that value into `metrics_writer->write(registry, srtla_handler.rate_limiter(), pending, ts)` (retain use of `metrics_writer`, `registry`, `srtla_handler.rate_limiter()`, and `ts`) so the handler's single-source-of-truth is reused.src/metrics/metrics_writer.h (1)
131-145: 💤 Low value
escape_jsondoes not escape all JSON control characters.The JSON spec requires escaping all control characters (U+0000 through U+001F). Characters like
\x00–\x08,\x0B,\x0C,\x0E–\x1Fwould produce invalid JSON if present instream_id. Consider escaping them as\uXXXX.♻️ Proposed enhancement for full JSON compliance
static std::string escape_json(const std::string &s) { std::string result; result.reserve(s.size()); for (char c : s) { + unsigned char uc = static_cast<unsigned char>(c); switch (c) { case '"': result += "\\\""; break; case '\\': result += "\\\\"; break; case '\n': result += "\\n"; break; case '\r': result += "\\r"; break; case '\t': result += "\\t"; break; - default: result += c; break; + default: + if (uc < 0x20) { + char buf[8]; + snprintf(buf, sizeof(buf), "\\u%04x", uc); + result += buf; + } else { + result += c; + } + break; } } return result; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/metrics/metrics_writer.h` around lines 131 - 145, The escape_json function currently only handles a subset of escapes; update escape_json to also convert all control characters U+0000 through U+001F into JSON-style unicode escapes (e.g. "\u00XX") while preserving the existing special-case escapes for '"' '\\' '\n' '\r' '\t'; implement this by checking if (unsigned char)c < 0x20 and appending a "\u00" + two-hex-digit representation of the byte (using the function's characters/loop and result string) so characters like NUL, BEL, VT, FF, and others are emitted as \u00XX and produce valid JSON.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/connection/connection_registry.cpp`:
- Around line 154-159: Current logic only applies PENDING_GROUP_TIMEOUT when
connections.empty(), allowing unauthenticated groups to persist; change the
removal condition so pending timeout is enforced regardless of attached
connections: replace the single timeout variable and check with an explicit
branch—if !group->is_authenticated() then remove when (group->created_at() +
PENDING_GROUP_TIMEOUT) < current_time; else (group->is_authenticated()) remove
only when connections.empty() && (group->created_at() + GROUP_TIMEOUT) <
current_time. Update the condition around groups_.erase(group_it) (and the
analogous block at the other location) to implement this logic, using
group->is_authenticated(), group->created_at(), PENDING_GROUP_TIMEOUT,
GROUP_TIMEOUT, connections.empty(), and groups_.erase(group_it).
In `@src/metrics/metrics_writer.h`:
- Around line 51-55: The code opens an ofstream named out(tmp_filepath_) and
calls rename() even if writes failed; update the write/flush/close sequence to
detect stream errors before performing atomic rename by checking out.fail() or
out.bad() (and/or checking out.flush() result) after all writes and before
calling rename(); if the stream indicates failure, log a warning with
tmp_filepath_ and abort the rename. Apply the same check-and-abort logic to the
other occurrence referenced (the block around the second ofstream usage at lines
~124-127) so both write paths verify stream state before moving the temporary
file into place.
In `@src/protocol/srtla_handler.cpp`:
- Around line 179-190: The duplicate check currently happens after
register_packet, causing duplicates to be recorded and potentially trigger SRTLA
ACKs; reorder logic so you call group->is_duplicate_srt_packet(sn) before
register_packet(group, conn, sn) and only call register_packet when is_dup is
false. Locate get_srt_sn(...) to obtain sn, then perform is_dup =
group->is_duplicate_srt_packet(sn) and if not duplicate call register_packet;
keep the existing spdlog trace and early return when duplicates are detected to
avoid populating recv_log or generating ACKs.
In `@src/security/rate_limiter.h`:
- Around line 46-48: The deny-path currently calls spdlog::warn on every blocked
request (when entry.count >= REG1_RATE_LIMIT), causing log flooding; modify the
rate-limiter to throttle these logs by storing and checking a per-IP timestamp
or counter in the entry (e.g., add a field like last_warn_ts or
warn_suppressed_count to the RateLimiterEntry), only call spdlog::warn when the
last_warn_ts is older than a configurable interval (or when a sampled threshold
is reached), and update that field when you log; keep the existing return false
behavior and reference entry.count, REG1_RATE_LIMIT and the spdlog::warn call
when implementing the change.
---
Nitpick comments:
In `@src/metrics/metrics_writer.h`:
- Around line 131-145: The escape_json function currently only handles a subset
of escapes; update escape_json to also convert all control characters U+0000
through U+001F into JSON-style unicode escapes (e.g. "\u00XX") while preserving
the existing special-case escapes for '"' '\\' '\n' '\r' '\t'; implement this by
checking if (unsigned char)c < 0x20 and appending a "\u00" + two-hex-digit
representation of the byte (using the function's characters/loop and result
string) so characters like NUL, BEL, VT, FF, and others are emitted as \u00XX
and produce valid JSON.
In `@src/receiver_main.cpp`:
- Around line 288-295: The pending group counting loop duplicates
SRTLAHandler::count_pending_groups(); replace the manual loop that computes
`pending` over `registry.groups()` with a call to
`srtla_handler.count_pending_groups()` and pass that value into
`metrics_writer->write(registry, srtla_handler.rate_limiter(), pending, ts)`
(retain use of `metrics_writer`, `registry`, `srtla_handler.rate_limiter()`, and
`ts`) so the handler's single-source-of-truth is reused.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3dac740c-c017-4eb2-bf49-d536c7f86268
📒 Files selected for processing (10)
src/connection/connection_group.cppsrc/connection/connection_group.hsrc/connection/connection_registry.cppsrc/metrics/metrics_writer.hsrc/protocol/srtla_handler.cppsrc/protocol/srtla_handler.hsrc/receiver_config.hsrc/receiver_main.cppsrc/security/rate_limiter.hsrc/security/stream_id_validator.h
| // Adaptive group timeout: | ||
| // - Unauthenticated groups (never validated StreamID): fast 5s timeout | ||
| // - Authenticated groups with no connections: standard 30s timeout | ||
| int timeout = group->is_authenticated() ? GROUP_TIMEOUT : PENDING_GROUP_TIMEOUT; | ||
| if (connections.empty() && (group->created_at() + timeout) < current_time) { | ||
| group_it = groups_.erase(group_it); |
There was a problem hiding this comment.
Pending-group timeout is bypassed while a connection remains attached
At Line 158, PENDING_GROUP_TIMEOUT is only enforced when connections.empty(). That allows unauthenticated groups to outlive the 5s pending window, weakening the anti-DoS contract.
Suggested fix
- int timeout = group->is_authenticated() ? GROUP_TIMEOUT : PENDING_GROUP_TIMEOUT;
- if (connections.empty() && (group->created_at() + timeout) < current_time) {
+ const bool pending_expired =
+ !group->is_authenticated() &&
+ (group->created_at() + PENDING_GROUP_TIMEOUT) < current_time;
+
+ const bool authenticated_empty_expired =
+ group->is_authenticated() &&
+ connections.empty() &&
+ (group->created_at() + GROUP_TIMEOUT) < current_time;
+
+ if (pending_expired || authenticated_empty_expired) {
group_it = groups_.erase(group_it);
removed_groups++;
- spdlog::info("[Group: {}] Group removed ({}, timeout={}s)",
+ spdlog::info("[Group: {}] Group removed ({}, timeout={}s)",
static_cast<void *>(group.get()),
group->is_authenticated() ? "no connections" : "unauthenticated",
- timeout);
+ group->is_authenticated() ? GROUP_TIMEOUT : PENDING_GROUP_TIMEOUT);Also applies to: 161-164
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connection/connection_registry.cpp` around lines 154 - 159, Current logic
only applies PENDING_GROUP_TIMEOUT when connections.empty(), allowing
unauthenticated groups to persist; change the removal condition so pending
timeout is enforced regardless of attached connections: replace the single
timeout variable and check with an explicit branch—if !group->is_authenticated()
then remove when (group->created_at() + PENDING_GROUP_TIMEOUT) < current_time;
else (group->is_authenticated()) remove only when connections.empty() &&
(group->created_at() + GROUP_TIMEOUT) < current_time. Update the condition
around groups_.erase(group_it) (and the analogous block at the other location)
to implement this logic, using group->is_authenticated(), group->created_at(),
PENDING_GROUP_TIMEOUT, GROUP_TIMEOUT, connections.empty(), and
groups_.erase(group_it).
| std::ofstream out(tmp_filepath_); | ||
| if (!out.is_open()) { | ||
| spdlog::warn("[metrics] Cannot open metrics file: {}", tmp_filepath_); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Check for write errors before atomic rename.
If any write operation fails (disk full, I/O error), the stream's failbit will be set, but the code proceeds to rename() anyway, potentially replacing a valid metrics file with a truncated/corrupted one. Check the stream state before renaming.
🛡️ Proposed fix: check stream state before rename
out << "\n ]\n";
out << "}\n";
out.close();
+ if (out.fail()) {
+ spdlog::warn("[metrics] Write error to metrics file: {}", tmp_filepath_);
+ std::remove(tmp_filepath_.c_str());
+ return;
+ }
+
// Atomic rename: guarantees readers never see partial JSON
std::rename(tmp_filepath_.c_str(), filepath_.c_str());Also applies to: 124-127
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/metrics/metrics_writer.h` around lines 51 - 55, The code opens an
ofstream named out(tmp_filepath_) and calls rename() even if writes failed;
update the write/flush/close sequence to detect stream errors before performing
atomic rename by checking out.fail() or out.bad() (and/or checking out.flush()
result) after all writes and before calling rename(); if the stream indicates
failure, log a warning with tmp_filepath_ and abort the rename. Apply the same
check-and-abort logic to the other occurrence referenced (the block around the
second ofstream usage at lines ~124-127) so both write paths verify stream state
before moving the temporary file into place.
| int32_t sn = get_srt_sn(const_cast<char *>(buf), n); | ||
| bool is_dup = false; | ||
| if (sn >= 0) { | ||
| is_dup = group->is_duplicate_srt_packet(sn); | ||
| register_packet(group, conn, sn); | ||
| } | ||
|
|
||
| if (is_dup) { | ||
| spdlog::trace("[Group: {}] Duplicate SRT packet (SN: {}) discarded", | ||
| static_cast<void *>(group.get()), sn); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Duplicate packet check happens after register_packet is called.
The call to register_packet(group, conn, sn) on line 183 occurs before the duplicate check on line 186. This means duplicates are still registered in recv_log and potentially trigger SRTLA ACKs, defeating the purpose of early discard. Move the duplicate check before registration.
🐛 Proposed fix: check for duplicate before registering
int32_t sn = get_srt_sn(const_cast<char *>(buf), n);
- bool is_dup = false;
if (sn >= 0) {
- is_dup = group->is_duplicate_srt_packet(sn);
+ if (group->is_duplicate_srt_packet(sn)) {
+ spdlog::trace("[Group: {}] Duplicate SRT packet (SN: {}) discarded",
+ static_cast<void *>(group.get()), sn);
+ return;
+ }
register_packet(group, conn, sn);
}
- if (is_dup) {
- spdlog::trace("[Group: {}] Duplicate SRT packet (SN: {}) discarded",
- static_cast<void *>(group.get()), sn);
- return;
- }
-
if (!srt_handler_.forward_to_srt_server(group, buf, n)) {
return;
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/protocol/srtla_handler.cpp` around lines 179 - 190, The duplicate check
currently happens after register_packet, causing duplicates to be recorded and
potentially trigger SRTLA ACKs; reorder logic so you call
group->is_duplicate_srt_packet(sn) before register_packet(group, conn, sn) and
only call register_packet when is_dup is false. Locate get_srt_sn(...) to obtain
sn, then perform is_dup = group->is_duplicate_srt_packet(sn) and if not
duplicate call register_packet; keep the existing spdlog trace and early return
when duplicates are detected to avoid populating recv_log or generating ACKs.
| if (entry.count >= REG1_RATE_LIMIT) { | ||
| spdlog::warn("[security] Rate limit exceeded for IP {}", ip); | ||
| return false; |
There was a problem hiding this comment.
Throttle deny-path logging to avoid log-flood amplification.
Line 47 logs on every blocked request. Under sustained abuse, this can become a secondary DoS vector (CPU/disk/log pipeline pressure), even though packet handling is dropped.
Suggested fix
struct RateEntry {
int count = 0;
time_t window_start = 0;
+ bool limit_logged = false;
};
@@
if ((now - entry.window_start) >= REG1_RATE_WINDOW) {
entry.count = 0;
entry.window_start = now;
+ entry.limit_logged = false;
}
if (entry.count >= REG1_RATE_LIMIT) {
- spdlog::warn("[security] Rate limit exceeded for IP {}", ip);
+ if (!entry.limit_logged) {
+ spdlog::warn("[security] Rate limit exceeded for IP {}", ip);
+ entry.limit_logged = true;
+ }
return false;
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/security/rate_limiter.h` around lines 46 - 48, The deny-path currently
calls spdlog::warn on every blocked request (when entry.count >=
REG1_RATE_LIMIT), causing log flooding; modify the rate-limiter to throttle
these logs by storing and checking a per-IP timestamp or counter in the entry
(e.g., add a field like last_warn_ts or warn_suppressed_count to the
RateLimiterEntry), only call spdlog::warn when the last_warn_ts is older than a
configurable interval (or when a sampled threshold is reached), and update that
field when you log; keep the existing return false behavior and reference
entry.count, REG1_RATE_LIMIT and the spdlog::warn call when implementing the
change.
….cpp as suggested by CodeRabbitAI
This PR implements Stage 8 (selective packet deduplication/FEC) and Stage 9 (bidirectional keepalive feedback) for
srtla_rec, along with several security and stability improvements:ConnectionGroupto discard duplicate packets without forwarding them to the SRT server, while still acknowledging them to keep the connection alive.std::unordered_setwith dynamic configuration hot-reload viaSIGHUP.Summary by CodeRabbit
Release Notes
New Features
Improvements