diff --git a/src/handlers/flow/test_flows.cpp b/src/handlers/flow/test_flows.cpp index 675b55570..e2d6d3b47 100644 --- a/src/handlers/flow/test_flows.cpp +++ b/src/handlers/flow/test_flows.cpp @@ -661,3 +661,128 @@ TEST_CASE("Flow specialized_merge + to_prometheus + to_opentelemetry with all gr target->to_opentelemetry(scope, start_ts, end_ts, {}); CHECK(otel_gauge_value(scope, "flow_records_flows") == pre_b1 + pre_b2); } + +// The fixtures below were hand-rolled to exercise parser branches the +// pre-existing nf5/nf9/ipfix/ecmp captures don't touch — NetflowData.h's +// v1 and v7 dispatch arms and SflowData.h's IPv6 sample-header path. + +TEST_CASE("Parse netflow v1 stream", "[netflow][flow]") +{ + FlowInputStream stream{"netflow-v1-test"}; + stream.config_set("flow_type", "netflow"); + stream.config_set("pcap_file", "tests/fixtures/nf1.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + FlowStreamHandler flow_handler{"flow-v1", stream_proxy, &c}; + + flow_handler.start(); + stream.start(); + stream.stop(); + flow_handler.stop(); + + auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked(); + CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_samples->value() == 1); + + nlohmann::json j; + flow_handler.metrics()->bucket(0)->to_json(j); + // Fixture has 2 v1 records (UDP + TCP) sourced from 10.0.0.1. + CHECK(j["devices"]["10.0.0.1"]["records_flows"] == 2); +} + +TEST_CASE("Parse netflow v7 stream", "[netflow][flow]") +{ + FlowInputStream stream{"netflow-v7-test"}; + stream.config_set("flow_type", "netflow"); + stream.config_set("pcap_file", "tests/fixtures/nf7.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + FlowStreamHandler flow_handler{"flow-v7", stream_proxy, &c}; + + flow_handler.start(); + stream.start(); + stream.stop(); + flow_handler.stop(); + + auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked(); + CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_samples->value() == 1); + + nlohmann::json j; + flow_handler.metrics()->bucket(0)->to_json(j); + // Fixture has 2 v7 records (UDP + TCP) from agent 10.0.0.1. + CHECK(j["devices"]["10.0.0.1"]["records_flows"] == 2); +} + +TEST_CASE("Parse sflow IPv6 sample", "[sflow][flow]") +{ + FlowInputStream stream{"sflow-ipv6-test"}; + stream.config_set("flow_type", "sflow"); + stream.config_set("pcap_file", "tests/fixtures/sflow_ipv6.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + FlowStreamHandler flow_handler{"flow-sflow-v6", stream_proxy, &c}; + + flow_handler.start(); + stream.start(); + stream.stop(); + flow_handler.stop(); + + auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked(); + // Single sFlow datagram with one flow sample carrying an IPv6 packet. + CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_samples->value() == 1); + + nlohmann::json j; + flow_handler.metrics()->bucket(0)->to_json(j); + auto &dev = j["devices"]["10.0.0.99"]; + CHECK(dev["records_flows"] == 1); + // Embedded payload is IPv6/UDP; assert IPv6-specific counters to + // prove decodeIPV6 actually ran (a bare records_flows check would + // still pass if the protocol classification regressed). + CHECK(dev["interfaces"]["1"]["in_ipv6_packets"] == 1); + CHECK(dev["interfaces"]["1"]["in_udp_packets"] == 1); +} + +TEST_CASE("Parse sflow IPv4/IPv6 keyed sample elements", "[sflow][flow]") +{ + // Two flow_samples in the datagram, each carrying a non-header element: + // SFLFLOW_IPV4 (tag=3) → readFlowSample_IPv4 in SflowData.h + // SFLFLOW_IPV6 (tag=4) → readFlowSample_IPv6 in SflowData.h + // Existing sflow fixtures only emit SFLFLOW_HEADER (tag=1). + FlowInputStream stream{"sflow-keyed-test"}; + stream.config_set("flow_type", "sflow"); + stream.config_set("pcap_file", "tests/fixtures/sflow_keyed.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + FlowStreamHandler flow_handler{"flow-sflow-keyed", stream_proxy, &c}; + + flow_handler.start(); + stream.start(); + stream.stop(); + flow_handler.stop(); + + auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked(); + CHECK(event_data.num_events->value() == 1); + CHECK(event_data.num_samples->value() == 1); + + nlohmann::json j; + flow_handler.metrics()->bucket(0)->to_json(j); + auto &dev = j["devices"]["10.0.0.77"]; + CHECK(dev["records_flows"] == 2); + // The IPv4 element: src 192.0.2.10 → dst 192.0.2.20, TCP, length 1500. + // The IPv6 element: src 2001:db8::1 → dst 2001:db8::2, UDP, length 1280. + auto &iface = dev["interfaces"]["1"]; + CHECK(iface["in_ipv4_packets"] == 1); + CHECK(iface["in_ipv6_packets"] == 1); + CHECK(iface["in_tcp_bytes"] == 1500); + CHECK(iface["in_udp_bytes"] == 1280); +} diff --git a/src/tests/fixtures/nf1.pcap b/src/tests/fixtures/nf1.pcap new file mode 100644 index 000000000..352e287ed Binary files /dev/null and b/src/tests/fixtures/nf1.pcap differ diff --git a/src/tests/fixtures/nf7.pcap b/src/tests/fixtures/nf7.pcap new file mode 100644 index 000000000..0ac9de72b Binary files /dev/null and b/src/tests/fixtures/nf7.pcap differ diff --git a/src/tests/fixtures/sflow_ipv6.pcap b/src/tests/fixtures/sflow_ipv6.pcap new file mode 100644 index 000000000..12e9f8640 Binary files /dev/null and b/src/tests/fixtures/sflow_ipv6.pcap differ diff --git a/src/tests/fixtures/sflow_keyed.pcap b/src/tests/fixtures/sflow_keyed.pcap new file mode 100644 index 000000000..ce7b02561 Binary files /dev/null and b/src/tests/fixtures/sflow_keyed.pcap differ