Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/connection/connection_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ ConnectionGroup::ConnectionGroup(const char *client_id, time_t timestamp)
std::copy(random_bytes,
random_bytes + (SRTLA_ID_LEN / 2),
id_.begin() + (SRTLA_ID_LEN / 2));

// Initialize duplicate packet cache for FEC/selective duplication
dedup_cache_.fill(-1);
}

ConnectionGroup::~ConnectionGroup() {
Expand Down
78 changes: 53 additions & 25 deletions src/connection/connection_group.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <array>
#include <array>
#include <string>
#include <memory>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -50,32 +51,59 @@ class ConnectionGroup {
bool load_balancing_enabled() const { return load_balancing_enabled_; }
void set_load_balancing_enabled(bool enabled) { load_balancing_enabled_ = enabled; }


std::unordered_map<uint64_t, NakHashEntry> &nak_cache() { return nak_seen_hash_; }

std::vector<struct sockaddr_storage> get_client_addresses() const;
void write_socket_info_file() const;
void remove_socket_info_file() const;

void set_epoll_fd(int fd) { epoll_fd_ = fd; }

private:
std::array<char, SRTLA_ID_LEN> id_ {};
std::vector<ConnectionPtr> conns_;
time_t created_at_ = 0;
int srt_sock_ = -1;
struct sockaddr_storage last_addr_ {};


std::unordered_map<uint64_t, NakHashEntry> &nak_cache() { return nak_seen_hash_; }

std::vector<struct sockaddr_storage> get_client_addresses() const;
void write_socket_info_file() const;
void remove_socket_info_file() const;

void set_epoll_fd(int fd) { epoll_fd_ = fd; }

// Anti-DoS: StreamID authentication state
bool is_authenticated() const { return authenticated_; }
void set_authenticated(bool auth) { authenticated_ = auth; }
const std::string &stream_id() const { return stream_id_; }
void set_stream_id(const std::string &sid) { stream_id_ = sid; }

// Selective duplication / FEC deduplication
bool is_duplicate_srt_packet(int32_t sn) {
if (sn < 0) return false;
std::size_t idx = static_cast<uint32_t>(sn) % DEDUP_CACHE_SIZE;
if (dedup_cache_[idx] == sn) {
dedup_count_++;
return true;
}
dedup_cache_[idx] = sn;
return false;
}
uint64_t dedup_count() const { return dedup_count_; }

private:
std::array<char, SRTLA_ID_LEN> id_ {};
std::vector<ConnectionPtr> conns_;
time_t created_at_ = 0;
int srt_sock_ = -1;
struct sockaddr_storage last_addr_ {};

uint64_t total_target_bandwidth_ = 0;
time_t last_quality_eval_ = 0;
time_t last_load_balance_eval_ = 0;
bool load_balancing_enabled_ = true;


std::unordered_map<uint64_t, NakHashEntry> nak_seen_hash_;
int epoll_fd_ = -1;
};

using ConnectionGroupPtr = std::shared_ptr<ConnectionGroup>;

} // namespace srtla::connection

std::unordered_map<uint64_t, NakHashEntry> nak_seen_hash_;
int epoll_fd_ = -1;

// Anti-DoS state
bool authenticated_ = false;
std::string stream_id_;

// Selective duplication / FEC deduplication cache
std::array<int32_t, DEDUP_CACHE_SIZE> dedup_cache_;
uint64_t dedup_count_ = 0;
};

using ConnectionGroupPtr = std::shared_ptr<ConnectionGroup>;

} // namespace srtla::connection
17 changes: 14 additions & 3 deletions src/connection/connection_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ bool addresses_equal(const struct sockaddr_storage &a, const struct sockaddr_sto
}

bool conn_timed_out(const ConnectionPtr &conn, time_t ts) {
return (conn->last_received() + CONN_TIMEOUT) < ts;
// Adaptive timeout: connections that have successfully transmitted data
// get a longer grace period (30s) for IRL scenarios (tunnels, dead zones).
// New connections that never transmitted keep the default 15s timeout.
int timeout = (conn->stats().packets_received > 0) ? CONN_TIMEOUT_ACTIVE : CONN_TIMEOUT;
return (conn->last_received() + timeout) < ts;
}

} // namespace
Expand Down Expand Up @@ -147,10 +151,17 @@ void ConnectionRegistry::cleanup_inactive(time_t current_time,
}
}

if (connections.empty() && (group->created_at() + GROUP_TIMEOUT) < current_time) {
// Adaptive group timeout:
// - Unauthenticated groups (never validated StreamID): fast 5s timeout
// - Authenticated groups with no connections: standard 30s timeout
int timeout = group->is_authenticated() ? GROUP_TIMEOUT : PENDING_GROUP_TIMEOUT;
if (connections.empty() && (group->created_at() + timeout) < current_time) {
group_it = groups_.erase(group_it);
Comment on lines +154 to 159

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Pending-group timeout is bypassed while a connection remains attached

At Line 158, PENDING_GROUP_TIMEOUT is only enforced when connections.empty(). That allows unauthenticated groups to outlive the 5s pending window, weakening the anti-DoS contract.

Suggested fix
-        int timeout = group->is_authenticated() ? GROUP_TIMEOUT : PENDING_GROUP_TIMEOUT;
-        if (connections.empty() && (group->created_at() + timeout) < current_time) {
+        const bool pending_expired =
+            !group->is_authenticated() &&
+            (group->created_at() + PENDING_GROUP_TIMEOUT) < current_time;
+
+        const bool authenticated_empty_expired =
+            group->is_authenticated() &&
+            connections.empty() &&
+            (group->created_at() + GROUP_TIMEOUT) < current_time;
+
+        if (pending_expired || authenticated_empty_expired) {
             group_it = groups_.erase(group_it);
             removed_groups++;
-            spdlog::info("[Group: {}] Group removed ({}, timeout={}s)",
+            spdlog::info("[Group: {}] Group removed ({}, timeout={}s)",
                          static_cast<void *>(group.get()),
                          group->is_authenticated() ? "no connections" : "unauthenticated",
-                         timeout);
+                         group->is_authenticated() ? GROUP_TIMEOUT : PENDING_GROUP_TIMEOUT);

Also applies to: 161-164

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/connection/connection_registry.cpp` around lines 154 - 159, Current logic
only applies PENDING_GROUP_TIMEOUT when connections.empty(), allowing
unauthenticated groups to persist; change the removal condition so pending
timeout is enforced regardless of attached connections: replace the single
timeout variable and check with an explicit branch—if !group->is_authenticated()
then remove when (group->created_at() + PENDING_GROUP_TIMEOUT) < current_time;
else (group->is_authenticated()) remove only when connections.empty() &&
(group->created_at() + GROUP_TIMEOUT) < current_time. Update the condition
around groups_.erase(group_it) (and the analogous block at the other location)
to implement this logic, using group->is_authenticated(), group->created_at(),
PENDING_GROUP_TIMEOUT, GROUP_TIMEOUT, connections.empty(), and
groups_.erase(group_it).

removed_groups++;
spdlog::info("[Group: {}] Group removed (no connections)", static_cast<void *>(group.get()));
spdlog::info("[Group: {}] Group removed ({}, timeout={}s)",
static_cast<void *>(group.get()),
group->is_authenticated() ? "no connections" : "unauthenticated",
timeout);
} else {
if (before_conns != connections.size()) {
group->write_socket_info_file();
Expand Down
154 changes: 154 additions & 0 deletions src/metrics/metrics_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#pragma once

/*
srtla_rec - JSON Metrics Writer

Periodically writes per-group and per-connection metrics to a JSON file.
The Go Manager (StreamStudio) or any monitoring tool can read this file
to display real-time connection stats in the web UI.

Copyright (C) 2025 IRLServer.com

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
*/

#include <cstdio>
#include <ctime>
#include <fstream>
#include <string>

#include <spdlog/spdlog.h>

#include "../connection/connection_registry.h"
#include "../security/rate_limiter.h"

extern "C" {
#include "../common.h"
}

namespace srtla::metrics {

class MetricsWriter {
public:
explicit MetricsWriter(const std::string &filepath)
: filepath_(filepath), tmp_filepath_(filepath + ".tmp") {}

/// Write current metrics to the JSON file.
/// Uses write-to-temp + rename for atomic updates (no partial reads).
void write(const connection::ConnectionRegistry &registry,
const security::RateLimiter &rate_limiter,
std::size_t pending_groups,
time_t now) {
// Throttle writes to every METRICS_WRITE_PERIOD seconds
if ((now - last_write_) < METRICS_WRITE_PERIOD) {
return;
}
last_write_ = now;

std::ofstream out(tmp_filepath_);
if (!out.is_open()) {
spdlog::warn("[metrics] Cannot open metrics file: {}", tmp_filepath_);
return;
}
Comment on lines +51 to +55

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Check for write errors before atomic rename.

If any write operation fails (disk full, I/O error), the stream's failbit will be set, but the code proceeds to rename() anyway, potentially replacing a valid metrics file with a truncated/corrupted one. Check the stream state before renaming.

🛡️ Proposed fix: check stream state before rename
         out << "\n  ]\n";
         out << "}\n";
         out.close();

+        if (out.fail()) {
+            spdlog::warn("[metrics] Write error to metrics file: {}", tmp_filepath_);
+            std::remove(tmp_filepath_.c_str());
+            return;
+        }
+
         // Atomic rename: guarantees readers never see partial JSON
         std::rename(tmp_filepath_.c_str(), filepath_.c_str());

Also applies to: 124-127

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/metrics/metrics_writer.h` around lines 51 - 55, The code opens an
ofstream named out(tmp_filepath_) and calls rename() even if writes failed;
update the write/flush/close sequence to detect stream errors before performing
atomic rename by checking out.fail() or out.bad() (and/or checking out.flush()
result) after all writes and before calling rename(); if the stream indicates
failure, log a warning with tmp_filepath_ and abort the rename. Apply the same
check-and-abort logic to the other occurrence referenced (the block around the
second ofstream usage at lines ~124-127) so both write paths verify stream state
before moving the temporary file into place.


out << "{\n";
out << " \"timestamp\": " << now << ",\n";
out << " \"total_groups\": " << registry.groups().size() << ",\n";
out << " \"pending_groups\": " << pending_groups << ",\n";
out << " \"rate_limiter_entries\": " << rate_limiter.size() << ",\n";
out << " \"groups\": [\n";

bool first_group = true;
for (const auto &group : registry.groups()) {
if (!first_group) out << ",\n";
first_group = false;

// Group ID as hex string (first 16 bytes for readability)
char id_hex[33];
for (int i = 0; i < 16; i++) {
snprintf(id_hex + i * 2, 3, "%02x",
static_cast<unsigned char>(group->id()[i]));
}
id_hex[32] = '\0';

out << " {\n";
out << " \"id\": \"" << id_hex << "\",\n";
out << " \"authenticated\": " << (group->is_authenticated() ? "true" : "false") << ",\n";
out << " \"stream_id\": \"" << escape_json(group->stream_id()) << "\",\n";
out << " \"created_at\": " << group->created_at() << ",\n";
out << " \"age_seconds\": " << (now - group->created_at()) << ",\n";
out << " \"srt_socket\": " << group->srt_socket() << ",\n";
out << " \"dedup_packets_discarded\": " << group->dedup_count() << ",\n";
out << " \"connections\": [\n";

bool first_conn = true;
for (const auto &conn : group->connections()) {
if (!first_conn) out << ",\n";
first_conn = false;

auto *addr_ptr = const_cast<struct sockaddr *>(
reinterpret_cast<const struct sockaddr *>(&conn->address()));
const char *ip = print_addr(addr_ptr);
int port = port_no(addr_ptr);

const auto &stats = conn->stats();
time_t last_ago = now - conn->last_received();

out << " {\n";
out << " \"ip\": \"" << (ip ? ip : "unknown") << "\",\n";
out << " \"port\": " << port << ",\n";
out << " \"packets_received\": " << stats.packets_received << ",\n";
out << " \"bytes_received\": " << stats.bytes_received << ",\n";
out << " \"packets_lost\": " << stats.packets_lost << ",\n";
out << " \"rtt_ms\": " << stats.rtt_ms << ",\n";
out << " \"nack_count\": " << stats.nack_count << ",\n";
out << " \"weight_percent\": " << static_cast<int>(stats.weight_percent) << ",\n";
out << " \"error_points\": " << stats.error_points << ",\n";
out << " \"sender_bitrate_bps\": " << stats.sender_bitrate_bps << ",\n";
out << " \"window\": " << stats.window << ",\n";
out << " \"in_flight\": " << stats.in_flight << ",\n";
out << " \"last_received_ago_s\": " << last_ago << ",\n";
out << " \"recovery_active\": " << (conn->recovery_start() > 0 ? "true" : "false") << "\n";
out << " }";
}

out << "\n ]\n";
out << " }";
}

out << "\n ]\n";
out << "}\n";
out.close();

// Atomic rename: guarantees readers never see partial JSON
std::rename(tmp_filepath_.c_str(), filepath_.c_str());
}

private:
static std::string escape_json(const std::string &s) {
std::string result;
result.reserve(s.size());
for (char c : s) {
switch (c) {
case '"': result += "\\\""; break;
case '\\': result += "\\\\"; break;
case '\n': result += "\\n"; break;
case '\r': result += "\\r"; break;
case '\t': result += "\\t"; break;
default: result += c; break;
}
}
return result;
}

std::string filepath_;
std::string tmp_filepath_;
time_t last_write_ = 0;

static constexpr int METRICS_WRITE_PERIOD = 2; // Write every 2 seconds
};

} // namespace srtla::metrics
Loading