Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions src/handlers/flow/test_flows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,128 @@ TEST_CASE("Flow specialized_merge + to_prometheus + to_opentelemetry with all gr
target->to_opentelemetry(scope, start_ts, end_ts, {});
CHECK(otel_gauge_value(scope, "flow_records_flows") == pre_b1 + pre_b2);
}

// The fixtures below were hand-rolled to exercise parser branches the
// pre-existing nf5/nf9/ipfix/ecmp captures don't touch — NetflowData.h's
// v1 and v7 dispatch arms and SflowData.h's IPv6 sample-header path.

TEST_CASE("Parse netflow v1 stream", "[netflow][flow]")
{
FlowInputStream stream{"netflow-v1-test"};
stream.config_set("flow_type", "netflow");
stream.config_set("pcap_file", "tests/fixtures/nf1.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-v1", stream_proxy, &c};

flow_handler.start();
stream.start();
stream.stop();
flow_handler.stop();

auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();
CHECK(event_data.num_events->value() == 1);
CHECK(event_data.num_samples->value() == 1);

nlohmann::json j;
flow_handler.metrics()->bucket(0)->to_json(j);
// Fixture has 2 v1 records (UDP + TCP) sourced from 10.0.0.1.
CHECK(j["devices"]["10.0.0.1"]["records_flows"] == 2);
}

TEST_CASE("Parse netflow v7 stream", "[netflow][flow]")
{
FlowInputStream stream{"netflow-v7-test"};
stream.config_set("flow_type", "netflow");
stream.config_set("pcap_file", "tests/fixtures/nf7.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-v7", stream_proxy, &c};

flow_handler.start();
stream.start();
stream.stop();
flow_handler.stop();

auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();
CHECK(event_data.num_events->value() == 1);
CHECK(event_data.num_samples->value() == 1);

nlohmann::json j;
flow_handler.metrics()->bucket(0)->to_json(j);
// Fixture has 2 v7 records (UDP + TCP) from agent 10.0.0.1.
CHECK(j["devices"]["10.0.0.1"]["records_flows"] == 2);
}

TEST_CASE("Parse sflow IPv6 sample", "[sflow][flow]")
{
FlowInputStream stream{"sflow-ipv6-test"};
stream.config_set("flow_type", "sflow");
stream.config_set("pcap_file", "tests/fixtures/sflow_ipv6.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-sflow-v6", stream_proxy, &c};

flow_handler.start();
stream.start();
stream.stop();
flow_handler.stop();

auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();
// Single sFlow datagram with one flow sample carrying an IPv6 packet.
CHECK(event_data.num_events->value() == 1);
CHECK(event_data.num_samples->value() == 1);

nlohmann::json j;
flow_handler.metrics()->bucket(0)->to_json(j);
auto &dev = j["devices"]["10.0.0.99"];
CHECK(dev["records_flows"] == 1);
// Embedded payload is IPv6/UDP; assert IPv6-specific counters to
// prove decodeIPV6 actually ran (a bare records_flows check would
// still pass if the protocol classification regressed).
CHECK(dev["interfaces"]["1"]["in_ipv6_packets"] == 1);
CHECK(dev["interfaces"]["1"]["in_udp_packets"] == 1);
}

TEST_CASE("Parse sflow IPv4/IPv6 keyed sample elements", "[sflow][flow]")
{
// Two flow_samples in the datagram, each carrying a non-header element:
// SFLFLOW_IPV4 (tag=3) → readFlowSample_IPv4 in SflowData.h
// SFLFLOW_IPV6 (tag=4) → readFlowSample_IPv6 in SflowData.h
// Existing sflow fixtures only emit SFLFLOW_HEADER (tag=1).
FlowInputStream stream{"sflow-keyed-test"};
stream.config_set("flow_type", "sflow");
stream.config_set("pcap_file", "tests/fixtures/sflow_keyed.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-sflow-keyed", stream_proxy, &c};

flow_handler.start();
stream.start();
stream.stop();
flow_handler.stop();

auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();
CHECK(event_data.num_events->value() == 1);
CHECK(event_data.num_samples->value() == 1);

nlohmann::json j;
flow_handler.metrics()->bucket(0)->to_json(j);
auto &dev = j["devices"]["10.0.0.77"];
CHECK(dev["records_flows"] == 2);
// The IPv4 element: src 192.0.2.10 → dst 192.0.2.20, TCP, length 1500.
// The IPv6 element: src 2001:db8::1 → dst 2001:db8::2, UDP, length 1280.
auto &iface = dev["interfaces"]["1"];
CHECK(iface["in_ipv4_packets"] == 1);
CHECK(iface["in_ipv6_packets"] == 1);
CHECK(iface["in_tcp_bytes"] == 1500);
CHECK(iface["in_udp_bytes"] == 1280);
}
Binary file added src/tests/fixtures/nf1.pcap
Binary file not shown.
Binary file added src/tests/fixtures/nf7.pcap
Binary file not shown.
Binary file added src/tests/fixtures/sflow_ipv6.pcap
Binary file not shown.
Binary file added src/tests/fixtures/sflow_keyed.pcap
Binary file not shown.
Loading