-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaggregator.cpp
More file actions
122 lines (101 loc) · 3.95 KB
/
Copy pathaggregator.cpp
File metadata and controls
122 lines (101 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include "aggregator.h"
#include <algorithm>
#include <cstring>
#include "log_parser.h"
void process_chunk(const Chunk& chunk, ThreadStats& s) {
const char* p = chunk.begin;
const char* const end = chunk.end;
while (p < end) {
const char* nl = static_cast<const char*>(
std::memchr(p, '\n', static_cast<std::size_t>(end - p)));
const char* line_end = (nl != nullptr) ? nl : end;
LogRecord rec;
if (parse_line(p, line_end, rec)) {
++s.total_requests;
if (rec.is_error) ++s.total_errors;
++s.endpoint_counts[rec.endpoint];
++s.per_minute[rec.minute];
++s.user_counts[rec.user_id];
s.latencies.push_back(rec.latency);
} else {
++s.skipped;
}
p = (nl != nullptr) ? nl + 1 : end;
}
}
namespace {
// Build the top-N entries (by count, descending) from a label->count map.
template <typename Map, typename ToLabel>
std::vector<LabeledCount> top_n(const Map& m, std::size_t n, ToLabel to_label) {
std::vector<LabeledCount> v;
v.reserve(m.size());
for (const auto& [key, count] : m) v.push_back({to_label(key), count});
const std::size_t keep = std::min(n, v.size());
// partial_sort: only the first `keep` need to be ordered -> O(size * log keep),
// cheaper than a full sort when keep << size.
std::partial_sort(
v.begin(), v.begin() + static_cast<std::ptrdiff_t>(keep), v.end(),
[](const LabeledCount& a, const LabeledCount& b) { return a.count > b.count; });
v.resize(keep);
return v;
}
} // namespace
Report merge(std::vector<ThreadStats>& parts, std::size_t top_n_count) {
Report rep;
std::unordered_map<std::string_view, std::uint64_t> endpoints;
std::unordered_map<std::string_view, std::uint64_t> minutes;
std::unordered_map<std::uint64_t, std::uint64_t> users;
std::vector<std::uint32_t> latencies;
std::size_t total_lat = 0;
for (const auto& part : parts) total_lat += part.latencies.size();
latencies.reserve(total_lat);
for (auto& part : parts) {
rep.total_requests += part.total_requests;
rep.total_errors += part.total_errors;
rep.skipped += part.skipped;
for (const auto& [k, v] : part.endpoint_counts) endpoints[k] += v;
for (const auto& [k, v] : part.per_minute) minutes[k] += v;
for (const auto& [k, v] : part.user_counts) users[k] += v;
latencies.insert(latencies.end(), part.latencies.begin(), part.latencies.end());
}
rep.distinct_endpoints = endpoints.size();
rep.distinct_minutes = minutes.size();
rep.distinct_users = users.size();
if (rep.total_requests > 0) {
rep.error_rate =
static_cast<double>(rep.total_errors) / static_cast<double>(rep.total_requests);
}
if (!latencies.empty()) {
unsigned long long sum = 0;
for (std::uint32_t v : latencies) sum += v;
rep.avg_latency = static_cast<double>(sum) / static_cast<double>(latencies.size());
// P95 needs only ONE order statistic, so nth_element (O(n)) beats a full
// sort (O(n log n)). It partitions so that lat[idx] is the value that would
// be there if sorted.
std::size_t idx = (latencies.size() * 95) / 100;
if (idx >= latencies.size()) idx = latencies.size() - 1;
std::nth_element(latencies.begin(),
latencies.begin() + static_cast<std::ptrdiff_t>(idx),
latencies.end());
rep.p95_latency = latencies[idx];
}
rep.top_endpoints =
top_n(endpoints, top_n_count, [](std::string_view k) { return std::string(k); });
rep.top_users = top_n(users, top_n_count,
[](std::uint64_t id) { return std::to_string(id); });
if (!minutes.empty()) {
rep.avg_requests_per_minute =
static_cast<double>(rep.total_requests) / static_cast<double>(minutes.size());
std::uint64_t peak = 0;
std::string_view peak_label;
for (const auto& [k, v] : minutes) {
if (v > peak) {
peak = v;
peak_label = k;
}
}
rep.peak_minute_requests = peak;
rep.peak_minute = std::string(peak_label);
}
return rep;
}