diff --git a/dvaas/packet_injection.cc b/dvaas/packet_injection.cc index 80221736..5c88c40f 100644 --- a/dvaas/packet_injection.cc +++ b/dvaas/packet_injection.cc @@ -134,7 +134,7 @@ absl::StatusOr SendTestPacketsAndCollectOutputs( const Packet& packet = packet_test_vector.input().packet(); // Inject to egress of control switch. - RETURN_IF_ERROR(gpins::InjectEgressPacket( + RETURN_IF_ERROR(pins::InjectEgressPacket( packet.port(), absl::HexStringToBytes(packet.hex()), control_ir_p4info, &control_switch, injection_delay)); } else { diff --git a/lib/pins_control_device.cc b/lib/pins_control_device.cc index c7b136e7..0a3c7c9a 100644 --- a/lib/pins_control_device.cc +++ b/lib/pins_control_device.cc @@ -162,7 +162,7 @@ absl::Status PinsControlDevice::SendPacket( "No P4RuntimeSession exists; Likely failed to establish another " "P4RuntimeSession."); } - return gpins::InjectEgressPacket(interface_name_to_port_id_[interface], + return pins::InjectEgressPacket(interface_name_to_port_id_[interface], std::string(packet), ir_p4_info_, control_session_.get(), packet_delay); } diff --git a/tests/forwarding/BUILD.bazel b/tests/forwarding/BUILD.bazel index b325a934..3047b3b2 100644 --- a/tests/forwarding/BUILD.bazel +++ b/tests/forwarding/BUILD.bazel @@ -190,15 +190,50 @@ cc_library( cc_library( name = "watch_port_test", testonly = True, + srcs = ["watch_port_test.cc"], hdrs = ["watch_port_test.h"], linkstatic = 1, deps = [ ":group_programming_util", ":packet_test_util", + ":util", + "//dvaas:test_vector_cc_proto", + "//gutil:collections", + "//gutil:proto_matchers", + "//gutil:status_matchers", + "//gutil:testing", + "//lib/gnmi:gnmi_helper", + "//p4_pdpi:ir_cc_proto", "//p4_pdpi:p4_runtime_session", + "//p4_pdpi:pd", + "//p4_pdpi/packetlib", + "//p4_pdpi/packetlib:packetlib_cc_proto", + "//p4_pdpi/string_encodings:decimal_string", + "//p4rt_app/tests/lib:p4runtime_grpc_service", + "//sai_p4/instantiations/google:instantiations", "//sai_p4/instantiations/google:sai_p4info_cc", + "//sai_p4/instantiations/google:sai_pd_cc_proto", + "//tests:thinkit_sanity_tests", + "//thinkit:mirror_testbed", "//thinkit:mirror_testbed_fixture", + "//thinkit:switch", + "//thinkit:test_environment", + "@com_github_gnmi//proto/gnmi:gnmi_cc_grpc_proto", + "@com_github_google_glog//:glog", "@com_github_grpc_grpc//:grpc++", + "@com_github_p4lang_p4runtime//:p4info_cc_proto", + "@com_github_p4lang_p4runtime//:p4runtime_cc_grpc", + "@com_github_p4lang_p4runtime//:p4runtime_cc_proto", + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/container:flat_hash_set", + "@com_google_absl//absl/random", + "@com_google_absl//absl/status", + "@com_google_absl//absl/status:statusor", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/synchronization", + "@com_google_absl//absl/time", + "@com_google_absl//absl/types:span", + "@com_google_googletest//:gtest", ], alwayslink = True, ) diff --git a/tests/forwarding/group_programming_util.cc b/tests/forwarding/group_programming_util.cc index 1d68b599..b39d091b 100644 --- a/tests/forwarding/group_programming_util.cc +++ b/tests/forwarding/group_programming_util.cc @@ -33,9 +33,9 @@ namespace pins { absl::Status ProgramNextHops(thinkit::TestEnvironment& test_environment, - pdpi::P4RuntimeSession* const p4_session, + pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, - std::vector& members) { + std::vector& members) { int index = 0; std::vector nexthops; std::vector pi_entries; @@ -103,7 +103,7 @@ absl::Status ProgramNextHops(thinkit::TestEnvironment& test_environment, // Program the switch. RETURN_IF_ERROR( - (pdpi::InstallPiTableEntries(p4_session, ir_p4info, pi_entries))); + (pdpi::InstallPiTableEntries(&p4_session, ir_p4info, pi_entries))); // Write the PI & PD entries to artifacts. for (const auto& pi : pi_entries) { @@ -124,10 +124,10 @@ absl::Status ProgramNextHops(thinkit::TestEnvironment& test_environment, } absl::Status ProgramGroupWithMembers(thinkit::TestEnvironment& test_environment, - pdpi::P4RuntimeSession* const p4_session, + pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, absl::string_view group_id, - absl::Span members, + absl::Span members, const p4::v1::Update_Type& type) { auto group_update = gutil::ParseProtoOrDie(absl::Substitute( R"pb( @@ -168,7 +168,7 @@ absl::Status ProgramGroupWithMembers(thinkit::TestEnvironment& test_environment, update->set_type(type); *update->mutable_entity()->mutable_table_entry() = pi_entry; RETURN_IF_ERROR( - pdpi::SetMetadataAndSendPiWriteRequest(p4_session, write_request)); + pdpi::SetMetadataAndSendPiWriteRequest(&p4_session, write_request)); // Append the PI & PD entries. RETURN_IF_ERROR(test_environment.AppendToTestArtifact( @@ -178,7 +178,7 @@ absl::Status ProgramGroupWithMembers(thinkit::TestEnvironment& test_environment, return absl::OkStatus(); } -absl::Status DeleteGroup(pdpi::P4RuntimeSession* const p4_session, +absl::Status DeleteGroup(pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, absl::string_view group_id) { ASSIGN_OR_RETURN( @@ -196,19 +196,20 @@ absl::Status DeleteGroup(pdpi::P4RuntimeSession* const p4_session, )pb", group_id)))); - return pdpi::SetMetadataAndSendPiWriteRequest(p4_session, write_request); + return pdpi::SetMetadataAndSendPiWriteRequest(&p4_session, write_request); } // Verifies the actual members received from P4 read response matches the // expected members. absl::Status VerifyGroupMembersFromP4Read( - pdpi::P4RuntimeSession* const p4_session, const pdpi::IrP4Info& ir_p4info, - absl::string_view group_id, absl::Span expected_members) { + pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, + absl::string_view group_id, + absl::Span expected_members) { p4::v1::ReadRequest read_request; read_request.add_entities()->mutable_table_entry(); ASSIGN_OR_RETURN( p4::v1::ReadResponse read_response, - pdpi::SetMetadataAndSendPiReadRequest(p4_session, read_request)); + pdpi::SetMetadataAndSendPiReadRequest(&p4_session, read_request)); // Filter out WCMP group entries separately from the whole read response. absl::flat_hash_map @@ -238,7 +239,7 @@ absl::Status VerifyGroupMembersFromP4Read( gutil::ParseProtoOrDie( absl::Substitute(R"pb( action { set_nexthop_id { nexthop_id: "$0" } } - weight: 0 + weight: 1 watch_port: "" )pb", member.nexthop)); @@ -306,8 +307,8 @@ int RescaleWeightForTomahawk3(int weight) { return (weight - 1) / 2; } -void RescaleMemberWeights(std::vector& members) { - for (Member& member : members) { +void RescaleMemberWeights(std::vector& members) { + for (GroupMember& member : members) { int old_weight = member.weight; member.weight = RescaleWeightForTomahawk3(old_weight); LOG(INFO) << "Rescaling member id: " << member.port @@ -316,4 +317,32 @@ void RescaleMemberWeights(std::vector& members) { } } +std::string DescribeDistribution( + int expected_total_test_packets, + absl::Span members, + const absl::flat_hash_map& num_packets_per_port, + bool expect_single_port) { + double total_weight = 0; + for (const auto& member : members) { + total_weight += member.weight; + } + std::string explanation = ""; + for (const auto& member : members) { + double actual_packets = num_packets_per_port.contains(member.port) + ? num_packets_per_port.at(member.port) + : 0; + if (expect_single_port) { + absl::StrAppend(&explanation, "\nport ", member.port, + ": actual_count = ", actual_packets); + } else { + double expected_packets = + expected_total_test_packets * member.weight / total_weight; + absl::StrAppend(&explanation, "\nport ", member.port, " with weight ", + member.weight, ": expected_count = ", expected_packets, + ", actual_count = ", actual_packets); + } + } + return explanation; +} + } // namespace pins diff --git a/tests/forwarding/group_programming_util.h b/tests/forwarding/group_programming_util.h index 850f35a5..b54ae866 100644 --- a/tests/forwarding/group_programming_util.h +++ b/tests/forwarding/group_programming_util.h @@ -28,7 +28,7 @@ namespace pins { // Structure that holds the member details like port, weight and the // nexthop object key (output) that was created. -struct Member { +struct GroupMember { int weight = 1; int port = 0; std::string nexthop; @@ -39,31 +39,31 @@ struct Member { // members.nexthop is an output here with the updated nexthop object that was // created. absl::Status ProgramNextHops(thinkit::TestEnvironment& test_environment, - pdpi::P4RuntimeSession* const p4_session, + pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, - std::vector& members); + std::vector& members); // Programs (insert/modify) a nexthop group on the switch with the given // set of nexthops and weights. It is expected that the dependant nexthops are // already created for an insert/modify operation. absl::Status ProgramGroupWithMembers(thinkit::TestEnvironment& test_environment, - pdpi::P4RuntimeSession* const p4_session, + pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, absl::string_view group_id, - absl::Span members, + absl::Span members, const p4::v1::Update_Type& type); // Deletes the group with the given group_id. It is expected that the caller // takes care of cleaning up the dependant nexthops. -absl::Status DeleteGroup(pdpi::P4RuntimeSession* const p4_session, +absl::Status DeleteGroup(pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, absl::string_view group_id); // Verifies the actual members received from P4 read response matches the // expected members. absl::Status VerifyGroupMembersFromP4Read( - pdpi::P4RuntimeSession* const p4_session, const pdpi::IrP4Info& ir_p4info, - absl::string_view group_id, absl::Span expected_members); + pdpi::P4RuntimeSession& p4_session, const pdpi::IrP4Info& ir_p4info, + absl::string_view group_id, absl::Span expected_members); // Verifies the actual members inferred from receive traffic matches the // expected members. @@ -85,7 +85,17 @@ int RescaleWeightForTomahawk3(int weight); // hardware behaviour, remove when hardware supports > 128 weights. // Halves member weights >= 2 and works only for sum of initial member weights // <= 256. -void RescaleMemberWeights(std::vector& members); +void RescaleMemberWeights(std::vector& members); + +// Returns a human-readable description of the actual vs expected +// distribution of packets on the group member ports. +// expect_single_port specifies whether all packets are expected on a single +// output port(since no hashing applies) or multiple ports(with hashing). +std::string DescribeDistribution( + int expected_total_test_packets, + absl::Span members, + const absl::flat_hash_map& num_packets_per_port, + bool expect_single_port); } // namespace pins diff --git a/tests/forwarding/packet_test_util.cc b/tests/forwarding/packet_test_util.cc index 417f0d54..3affde68 100644 --- a/tests/forwarding/packet_test_util.cc +++ b/tests/forwarding/packet_test_util.cc @@ -110,6 +110,17 @@ uint32_t GetIthL4Port(int i, uint32_t base) { } // namespace +// Clears the received packet output vector and the packet statistics counters. +void TestData::ClearReceivedPackets() { + absl::MutexLock lock(&mutex); + for (auto& [packet, input_output] : input_output_per_packet) { + input_output.output.clear(); + } + total_packets_sent = 0; + total_packets_received = 0; + total_invalid_packets_received = 0; +} + // Is this a valid test configuration? Not all configurations are valid, e.g. // you can't modify the flow label in an IPv4 packet (because there is no flow // label there). diff --git a/tests/forwarding/packet_test_util.h b/tests/forwarding/packet_test_util.h index 3fe5c85f..b0207cca 100644 --- a/tests/forwarding/packet_test_util.h +++ b/tests/forwarding/packet_test_util.h @@ -79,6 +79,9 @@ struct TestData { int total_invalid_packets_received = 0; absl::flat_hash_map input_output_per_packet ABSL_GUARDED_BY(mutex); + // Clears the received packets in the output vector and the send/receive + // counters. + void ClearReceivedPackets() ABSL_LOCKS_EXCLUDED(mutex); }; // Checks wehether this a valid test configuration. Not all configurations are diff --git a/tests/forwarding/util.cc b/tests/forwarding/util.cc index 4fa14fd7..d831c1f6 100644 --- a/tests/forwarding/util.cc +++ b/tests/forwarding/util.cc @@ -23,7 +23,7 @@ #include "p4_pdpi/ir.pb.h" #include "sai_p4/tools/packetio_tools.h" -namespace gpins { +namespace pins { absl::Status TryUpToNTimes(int n, absl::Duration delay, std::function callback) { @@ -91,4 +91,4 @@ absl::Status InjectIngressPacket(const std::string& packet, << request.ShortDebugString(); } -} // namespace gpins +} // namespace pins diff --git a/tests/forwarding/util.h b/tests/forwarding/util.h index ef46884c..e3be1412 100644 --- a/tests/forwarding/util.h +++ b/tests/forwarding/util.h @@ -24,7 +24,7 @@ #include "p4_pdpi/ir.pb.h" #include "p4_pdpi/p4_runtime_session.h" -namespace gpins { +namespace pins { // Calls given callback up to the given number of times with the given delay in // between successive attempts, returning ok status as soon as the callback @@ -68,6 +68,6 @@ absl::StatusOr TryStatusOrUpToNTimes( return result; } -} // namespace gpins +} // namespace pins #endif // PINS_TESTS_FORWARDING_UTIL_H_ diff --git a/tests/forwarding/watch_port_test.cc b/tests/forwarding/watch_port_test.cc new file mode 100644 index 00000000..9236b64d --- /dev/null +++ b/tests/forwarding/watch_port_test.cc @@ -0,0 +1,1022 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tests/forwarding/watch_port_test.h" + +#include +#include +#include // NOLINT +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "absl/random/random.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/escaping.h" +#include "absl/strings/match.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_join.h" +#include "absl/strings/str_split.h" +#include "absl/strings/string_view.h" +#include "absl/strings/substitute.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "absl/types/span.h" +#include "dvaas/test_vector.pb.h" +#include "glog/logging.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "gutil/collections.h" +#include "gutil/proto_matchers.h" +#include "gutil/status_matchers.h" +#include "gutil/testing.h" +#include "lib/gnmi/gnmi_helper.h" +#include "p4/config/v1/p4info.pb.h" +#include "p4/v1/p4runtime.grpc.pb.h" +#include "p4/v1/p4runtime.pb.h" +#include "p4_pdpi/ir.pb.h" +#include "p4_pdpi/packetlib/packetlib.h" +#include "p4_pdpi/packetlib/packetlib.pb.h" +#include "p4_pdpi/pd.h" +#include "p4_pdpi/p4_runtime_session.h" +#include "p4_pdpi/string_encodings/decimal_string.h" +#include "p4rt_app/tests/lib/p4runtime_grpc_service.h" +#include "proto/gnmi/gnmi.grpc.pb.h" +#include "sai_p4/instantiations/google/instantiations.h" +#include "sai_p4/instantiations/google/sai_p4info.h" +#include "sai_p4/instantiations/google/sai_pd.pb.h" +#include "tests/forwarding/group_programming_util.h" +#include "tests/forwarding/packet_test_util.h" +#include "tests/forwarding/util.h" +#include "tests/thinkit_sanity_tests.h" +#include "thinkit/mirror_testbed.h" +#include "thinkit/mirror_testbed_fixture.h" +#include "thinkit/switch.h" +#include "thinkit/test_environment.h" +// Tests for the watchport functionality in Action Profile Group operation. + +namespace pins { + +namespace { +// Admin down/up state used for interfaces. +enum class AdminState { + kDown, + kUp, +}; + +// Group id used in this test. +constexpr absl::string_view kGroupId = "group-1"; + +// Vrf used in the test. +constexpr absl::string_view kVrfId = "vrf-1"; + +// Time to wait after which received packets are processed. +constexpr absl::Duration kDurationToWaitForPackets = absl::Seconds(5); + +// Number of members used in the test. +constexpr int kNumWcmpMembersForTest = 5; + +// Number of packets used in the test. +constexpr int kNumTestPackets = 5000; + +// Default input port index of the group members vector, on which packets +// arrive. +constexpr int kDefaultInputPortIndex = 0; + +// Helper function that sets up the input port for packet recieve. +// Creates the router interface for the input port. Without this input packets +// get dropped (b/190736007). +absl::Status SetUpInputPortForPacketReceive(pdpi::P4RuntimeSession& p4_session, + const pdpi::IrP4Info& ir_p4info, + int input_port) { + ASSIGN_OR_RETURN( + p4::v1::WriteRequest write_request, + pdpi::PdWriteRequestToPi( + ir_p4info, gutil::ParseProtoOrDie(absl::Substitute( + R"pb( + updates { + type: INSERT + table_entry { + router_interface_table_entry { + match { router_interface_id: "$0" } + action { + set_port_and_src_mac { + port: "$1" + src_mac: "00:02:03:04:05:06" + } + } + } + } + } + )pb", + input_port, input_port)))); + + return pdpi::SetMetadataAndSendPiWriteRequest(&p4_session, write_request); +} + +// Helper function that creates/deletes V4, V6 default route entries. +absl::Status ProgramDefaultRoutes(pdpi::P4RuntimeSession& p4_session, + const pdpi::IrP4Info& ir_p4info, + absl::string_view default_vrf, + const p4::v1::Update_Type& type) { + if (!p4::v1::Update_Type_IsValid(type) || + type == p4::v1::Update_Type_UNSPECIFIED) { + return absl::InvalidArgumentError( + absl::StrCat("Type: ", type, " not supported.")); + } + std::string type_str = p4::v1::Update_Type_Name(type); + // Add minimal set of flows to allow forwarding. + auto ipv4_fallback = gutil::ParseProtoOrDie(absl::Substitute( + R"pb( + type: $0 + table_entry { + ipv4_table_entry { + match { vrf_id: "$1" } + action { set_wcmp_group_id { wcmp_group_id: "$2" } } + } + })pb", + type_str, default_vrf, kGroupId)); + auto ipv6_fallback = gutil::ParseProtoOrDie(absl::Substitute( + R"pb( + type: $0 + table_entry { + ipv6_table_entry { + match { vrf_id: "$1" } + action { set_wcmp_group_id { wcmp_group_id: "$2" } } + } + })pb", + type_str, default_vrf, kGroupId)); + + p4::v1::WriteRequest write_request; + for (const auto& pd_entry : {ipv4_fallback, ipv6_fallback}) { + ASSIGN_OR_RETURN( + p4::v1::Update pi_entry, pdpi::PdUpdateToPi(ir_p4info, pd_entry), + _.SetPrepend() << "Failed in PD table conversion to PI, entry: " + << pd_entry.DebugString() << " error: "); + *write_request.add_updates() = pi_entry; + } + return pdpi::SetMetadataAndSendPiWriteRequest(&p4_session, write_request); +} + +// Installs a default vrf for all packets on the SUT. +absl::Status SetUpSut(pdpi::P4RuntimeSession& p4_session, + const pdpi::IrP4Info& ir_p4info, + absl::string_view default_vrf) { + // Set default VRF for all packets. + ASSIGN_OR_RETURN( + p4::v1::TableEntry pi_entry, + pdpi::PartialPdTableEntryToPiTableEntry( + ir_p4info, gutil::ParseProtoOrDie(absl::Substitute( + R"pb( + acl_pre_ingress_table_entry { + match {} # Wildcard match + action { set_vrf { vrf_id: "$0" } } # Default vrf + priority: 1129 + })pb", + default_vrf)))); + + return pdpi::InstallPiTableEntry(&p4_session, pi_entry); +} + +// Punts all packets on the control switch. +absl::Status SetUpControlSwitch(pdpi::P4RuntimeSession& p4_session, + const pdpi::IrP4Info& ir_p4info) { + // Trap all packets on control switch. + ASSIGN_OR_RETURN( + p4::v1::TableEntry punt_all_pi_entry, + pdpi::PartialPdTableEntryToPiTableEntry( + ir_p4info, + gutil::ParseProtoOrDie( + R"pb( + acl_ingress_table_entry { + match {} # Wildcard match. + action { acl_trap { qos_queue: "0x7" } } # Action: punt. + priority: 1 # Highest priority. + } + )pb"))); + return pdpi::InstallPiTableEntry(&p4_session, punt_all_pi_entry); +} + +// Creates members by filling in the controller port ids and random weights for +// each member that add upto 30(random). Skips the default input port on which +// traffic is received, since that is excluded from the traffic forwarding +// members in the group. +absl::StatusOr> CreateGroupMembers( + int group_size, absl::Span controller_port_ids) { + if (group_size + /*input_port=*/1 > controller_port_ids.size()) { + return absl::InvalidArgumentError(absl::StrCat( + "Not enough members: ", controller_port_ids.size(), + " to reserve an input port and create the group with size: ", + group_size)); + } + std::vector members; + for (int i = 0; i < controller_port_ids.size() && members.size() < group_size; + i++) { + // Add port ids except for the default input port id. + if (i != kDefaultInputPortIndex) { + members.push_back( + pins::GroupMember{.weight = 0, .port = controller_port_ids[i]}); + } + } + + ASSIGN_OR_RETURN(std::vector weights, + GenerateNRandomWeights(group_size, + /*total_weight=*/30)); + for (int i = 0; i < members.size(); i++) { + members[i].weight = weights[i]; + } + return members; +} + +// Creates a set of expected port ids from the member ports. +absl::flat_hash_set CreateExpectedMemberPorts( + absl::Span members) { + absl::flat_hash_set expected_member_ports; + for (const auto& member : members) { + expected_member_ports.insert(member.port); + } + return expected_member_ports; +} + +// Returns a map of number of packets received per port. +absl::StatusOr> CountNumPacketsPerPort( + absl::Span output_packets) { + absl::flat_hash_map num_packets_per_port; + for (const auto& output : output_packets) { + ASSIGN_OR_RETURN(int out_port, pdpi::DecimalStringToInt(output.port())); + num_packets_per_port[out_port]++; + } + return num_packets_per_port; +} + +// Sends N packets from the control switch to sut at a rate of 500 packets/sec. +absl::Status SendNPacketsToSut(int num_packets, + const TestConfiguration& test_config, + absl::Span members, + absl::Span port_ids, + const pdpi::IrP4Info& ir_p4info, + pdpi::P4RuntimeSession& p4_session, + thinkit::TestEnvironment& test_environment) { + const absl::Time start_time = absl::Now(); + for (int i = 0; i < num_packets; i++) { + // Rate limit to 500 packets per second. + auto earliest_send_time = start_time + (i * absl::Seconds(1) / 500.0); + absl::SleepFor(earliest_send_time - absl::Now()); + + // Vary the port on which to send the packet if the hash field selected is + // input port. + int port = port_ids[kDefaultInputPortIndex]; + if (test_config.field == PacketField::kInputPort) { + port = port_ids[i % members.size()]; + } + + ASSIGN_OR_RETURN(packetlib::Packet packet, + pins::GenerateIthPacket(test_config, i)); + ASSIGN_OR_RETURN(std::string raw_packet, SerializePacket(packet)); + ASSIGN_OR_RETURN(std::string port_string, pdpi::IntToDecimalString(port)); + RETURN_IF_ERROR( + pins::InjectEgressPacket(port_string, raw_packet, ir_p4info, &p4_session)); + + dvaas::Packet p; + p.set_port(port_string); + *p.mutable_parsed() = packet; + p.set_hex(absl::BytesToHexString(raw_packet)); + // Save log of packets. + RETURN_IF_ERROR(test_environment.AppendToTestArtifact( + absl::StrCat( + "packets-for-config-", + absl::StrJoin(absl::StrSplit(DescribeTestConfig(test_config), " "), + "-"), + ".txt"), + p.DebugString())); + } + + LOG(INFO) << "Sent " << num_packets << " packets in " + << (absl::Now() - start_time) << "."; + return absl::OkStatus(); +} + +void PrettyPrintDistribution( + const TestConfiguration& config, const TestInputOutput& test, + const TestData& test_data, absl::Span members, + const absl::flat_hash_map& num_packets_per_port) { + LOG(INFO) << "Results for " << DescribeTestConfig(config) << ":"; + LOG(INFO) << "- received " << test.output.size() << " packets"; + LOG(INFO) << "- observed distribution was:" + << DescribeDistribution(test_data.total_packets_sent, members, + num_packets_per_port, + /*expect_single_port=*/false); + LOG(INFO) << "Number of sent packets: " + << test_data.total_packets_sent; + LOG(INFO) << "Number of received packets (valid): " + << test_data.total_packets_received; + LOG(INFO) << "Number of received packets (invalid): " + << test_data.total_invalid_packets_received; +} + +// Creates the port_names_per_port_id map from GNMI config. +absl::StatusOr> +GetPortNamePerPortId(gnmi::gNMI::StubInterface& gnmi_stub) { + absl::flat_hash_map port_name_per_port_id; + ASSIGN_OR_RETURN(auto port_id_per_port_name, + pins_test::GetAllInterfaceNameToPortId(gnmi_stub)); + for (const auto& [name, port_id] : port_id_per_port_name) { + port_name_per_port_id[port_id] = name; + } + return port_name_per_port_id; +} + +// Sets the admin state of the interface to UP/DOWN using GNMI config path. +// Queries the state path to verify if the desired state is achieved or not. +absl::Status SetInterfaceAdminState(gnmi::gNMI::StubInterface& gnmi_stub, + absl::string_view if_name, + AdminState admin_state) { + const std::string if_status = + admin_state == AdminState::kDown ? "DOWN" : "UP"; + const std::string config_value = + admin_state == AdminState::kDown ? "false" : "true"; + const std::string if_admin_config_path = + absl::StrCat("interfaces/interface[name=", if_name, "]/config/enabled"); + LOG(INFO) << "Setting interface " << if_name << " to admin " << if_status; + RETURN_IF_ERROR(SetGnmiConfigPath( + &gnmi_stub, if_admin_config_path, pins_test::GnmiSetType::kUpdate, + absl::Substitute("{\"enabled\":$0}", config_value))); + // Wait for the admin state to take effect. + absl::SleepFor(absl::Seconds(1)); + // Verifies /interfaces/interface[name=]/state/admin-status = UP/DOWN. + const std::string if_state_path = + absl::StrCat("interfaces/interface[name=", if_name, "]/state"); + ASSIGN_OR_RETURN(const std::string state_path_response, + pins_test::GetGnmiStatePathInfo( + &gnmi_stub, if_state_path, + /*resp_parse_str=*/"openconfig-interfaces:state")); + if (!absl::StrContains(state_path_response, if_status)) { + return absl::UnknownError(absl::StrCat("Unable to set interface ", if_name, + " to admin ", if_status)); + } + return absl::OkStatus(); +} + +} // namespace + +void WatchPortTestFixture::SetUp() { + GetParam().testbed->SetUp(); + thinkit::MirrorTestbed& testbed = GetParam().testbed->GetMirrorTestbed(); + + // Push gnmi config to the sut and control switch. + const std::string& gnmi_config = GetParam().gnmi_config; + ASSERT_OK( + testbed.Environment().StoreTestArtifact("gnmi_config.txt", gnmi_config)); + ASSERT_OK(pins_test::PushGnmiConfig(testbed.Sut(), gnmi_config)); + ASSERT_OK(pins_test::PushGnmiConfig(testbed.ControlSwitch(), gnmi_config)); + + ASSERT_OK(testbed.Environment().StoreTestArtifact("p4info.pb.txt", + GetP4Info().DebugString())); + + // Setup SUT & control switch. + + ASSERT_OK_AND_ASSIGN(sut_p4_session_, + pdpi::P4RuntimeSession::CreateWithP4InfoAndClearTables( + testbed.Sut(), GetP4Info())); + ASSERT_OK_AND_ASSIGN(control_p4_session_, + pdpi::P4RuntimeSession::CreateWithP4InfoAndClearTables( + testbed.ControlSwitch(), GetP4Info())); + ASSERT_OK(SetUpSut(*sut_p4_session_, GetIrP4Info(), kVrfId)); + ASSERT_OK(SetUpControlSwitch(*control_p4_session_, GetIrP4Info())); + + // Create GNMI stub for admin operations. + ASSERT_OK_AND_ASSIGN(control_gnmi_stub_, + testbed.ControlSwitch().CreateGnmiStub()); + + // Start the receiver thread for control switch to listen for packets from + // SUT, this thread is terminated in the TearDown. + receive_packet_thread_ = std::thread([&]() { + p4::v1::StreamMessageResponse pi_response; + while (control_p4_session_->StreamChannelRead(pi_response)) { + absl::MutexLock lock(&test_data_.mutex); + sai::StreamMessageResponse pd_response; + ASSERT_OK(pdpi::PiStreamMessageResponseToPd(GetIrP4Info(), pi_response, + &pd_response)) + << " PacketIn PI to PD failed: "; + ASSERT_TRUE(pd_response.has_packet()) + << " Received unexpected stream message for packet in: " + << pd_response.DebugString(); + absl::string_view raw_packet = pd_response.packet().payload(); + dvaas::Packet packet; + packet.set_port(pd_response.packet().metadata().ingress_port()); + packet.set_hex(absl::BytesToHexString(raw_packet)); + *packet.mutable_parsed() = packetlib::ParsePacket(raw_packet); + std::string key = packet.parsed().payload(); + if (test_data_.input_output_per_packet.contains(key)) { + test_data_.input_output_per_packet[key].output.push_back(packet); + test_data_.total_packets_received += 1; + } else { + ASSERT_OK(testbed.Environment().AppendToTestArtifact( + "control_unexpected_packet_ins.pb.txt", + absl::StrCat(packet.DebugString(), "\n"))); + test_data_.total_invalid_packets_received += 1; + } + } + }); +} + +void WatchPortTestFixture::TearDown() { + thinkit::MirrorTestbedInterface& testbed = *GetParam().testbed; + + // Clear table entries. + if (sut_p4_session_ != nullptr) { + EXPECT_OK(pdpi::ClearTableEntries(sut_p4_session_.get())); + EXPECT_OK(sut_p4_session_->Finish()); + } + // Stop RPC sessions. + if (control_p4_session_ != nullptr) { + EXPECT_OK( + pdpi::ClearTableEntries(control_p4_session_.get())); + EXPECT_OK(control_p4_session_->Finish()); + } + if (receive_packet_thread_.joinable()) { + receive_packet_thread_.join(); + } + if (control_gnmi_stub_) { + ASSERT_OK_AND_ASSIGN(const auto port_name_per_port_id, + GetPortNamePerPortId(*control_gnmi_stub_)); + // Restore the admin state to UP. + for (const auto& [port_id, name] : port_name_per_port_id) { + EXPECT_OK( + SetInterfaceAdminState(*control_gnmi_stub_, name, AdminState::kUp)); + } + } + testbed.TearDown(); +} + +// TODO: Parameterize over the different instantiations like +// MiddleBlock, FBR400. +const p4::config::v1::P4Info& WatchPortTestFixture::GetP4Info() { + return sai::GetP4Info(sai::Instantiation::kMiddleblock); +} +const pdpi::IrP4Info& WatchPortTestFixture::GetIrP4Info() { + return sai::GetIrP4Info(sai::Instantiation::kMiddleblock); +} + +namespace { + +// Verifies basic WCMP behavior by programming a group with multiple members +// with random weights and ensuring that all members receive some part of +// the sent traffic. +TEST_P(WatchPortTestFixture, VerifyBasicWcmpPacketDistribution) { + thinkit::TestEnvironment& environment = + GetParam().testbed->GetMirrorTestbed().Environment(); + + absl::Span controller_port_ids = GetParam().port_ids; + const int group_size = kNumWcmpMembersForTest; + ASSERT_OK_AND_ASSIGN(std::vector members, + CreateGroupMembers(group_size, controller_port_ids)); + + const int input_port = controller_port_ids[kDefaultInputPortIndex]; + ASSERT_OK(SetUpInputPortForPacketReceive(*sut_p4_session_, GetIrP4Info(), + input_port)); + + // Programs the required router interfaces, nexthops for wcmp group. + ASSERT_OK(pins::ProgramNextHops(environment, *sut_p4_session_, GetIrP4Info(), + members)); + ASSERT_OK(pins::ProgramGroupWithMembers(environment, *sut_p4_session_, + GetIrP4Info(), kGroupId, members, + p4::v1::Update::INSERT)) + << "Failed to program WCMP group: "; + + // Program default routing for all packets on SUT. + ASSERT_OK(ProgramDefaultRoutes(*sut_p4_session_, GetIrP4Info(), kVrfId, + p4::v1::Update::INSERT)); + + // TODO: Revisit for newer chipsets. + // Rescale the member weights (temp workaround) to what would have been + // programmed by the hardware. + RescaleMemberWeights(members); + + // Generate test configuration, pick any field (IP_SRC) used by hashing to + // vary for every packet so that it gets sent to all the members. + TestConfiguration test_config = { + .field = PacketField::kIpSrc, + .ipv4 = true, + .encapped = false, + .inner_ipv4 = false, + .decap = false, + }; + ASSERT_TRUE(IsValidTestConfiguration(test_config)); + + // Create test data entry. + std::string test_config_key = TestConfigurationToPayload(test_config); + { + absl::MutexLock lock(&test_data_.mutex); + test_data_.input_output_per_packet[test_config_key] = TestInputOutput{ + .config = test_config, + }; + } + + // Send 5000 packets and check for packet distribution. + ASSERT_OK(SendNPacketsToSut(kNumTestPackets, test_config, members, + controller_port_ids, GetIrP4Info(), + *control_p4_session_, environment)); + test_data_.total_packets_sent = kNumTestPackets; + + // Wait for packets from the SUT to arrive. + absl::SleepFor(kDurationToWaitForPackets); + + // For the test configuration, check the output distribution. + { + absl::MutexLock lock(&test_data_.mutex); + const TestInputOutput& test = + test_data_.input_output_per_packet[test_config_key]; + EXPECT_EQ(test.output.size(), test_data_.total_packets_sent) + << "Mismatch in expected: " << test_data_.total_packets_sent + << " and actual: " << test.output.size() << "packets received for " + << DescribeTestConfig(test_config); + + ASSERT_OK_AND_ASSIGN(auto num_packets_per_port, + CountNumPacketsPerPort(test.output)); + absl::flat_hash_set expected_member_ports = + CreateExpectedMemberPorts(members); + + ASSERT_OK(VerifyGroupMembersFromP4Read(*sut_p4_session_, GetIrP4Info(), + kGroupId, members)); + ASSERT_OK(VerifyGroupMembersFromReceiveTraffic(num_packets_per_port, + expected_member_ports)); + PrettyPrintDistribution(test_config, test, test_data_, members, + num_packets_per_port); + } +} + +// Bring down/up ActionProfileGroup member and verify traffic is distributed +// only to the up ports. +TEST_P(WatchPortTestFixture, VerifyBasicWatchPortAction) { + thinkit::TestEnvironment& environment = + GetParam().testbed->GetMirrorTestbed().Environment(); + absl::Span controller_port_ids = GetParam().port_ids; + const int group_size = kNumWcmpMembersForTest; + ASSERT_OK_AND_ASSIGN(std::vector members, + CreateGroupMembers(group_size, controller_port_ids)); + + const int input_port = controller_port_ids[kDefaultInputPortIndex]; + ASSERT_OK(SetUpInputPortForPacketReceive(*sut_p4_session_, GetIrP4Info(), + input_port)); + + // Programs the required router interfaces, nexthops for wcmp group. + ASSERT_OK(pins::ProgramNextHops(environment, *sut_p4_session_, GetIrP4Info(), + members)); + ASSERT_OK(pins::ProgramGroupWithMembers(environment, *sut_p4_session_, + GetIrP4Info(), kGroupId, members, + p4::v1::Update::INSERT)); + // Program default routing for all packets on SUT. + ASSERT_OK(ProgramDefaultRoutes(*sut_p4_session_, GetIrP4Info(), kVrfId, + p4::v1::Update::INSERT)); + + // TODO: Revisit for newer chipsets. + // Rescale the member weights to what would have been programmed by the + // hardware. + RescaleMemberWeights(members); + + // Generate test configuration, pick any field used by hashing to vary for + // every packet so that it gets sent to all the members. + TestConfiguration test_config = { + .field = PacketField::kIpDst, + .ipv4 = true, + .encapped = false, + .inner_ipv4 = false, + .decap = false, + }; + ASSERT_TRUE(IsValidTestConfiguration(test_config)); + + // Create test data entry. + std::string test_config_key = TestConfigurationToPayload(test_config); + { + absl::MutexLock lock(&test_data_.mutex); + test_data_.input_output_per_packet[test_config_key] = TestInputOutput{ + .config = test_config, + }; + } + + absl::flat_hash_set expected_member_ports = + CreateExpectedMemberPorts(members); + + // Select one random member of the group to toggle. + absl::BitGen gen; + const int random_member_index = + absl::Uniform(absl::IntervalClosedOpen, gen, 0, members.size()); + const int selected_port_id = members[random_member_index].port; + ASSERT_OK_AND_ASSIGN(const auto port_name_per_port_id, + GetPortNamePerPortId(*control_gnmi_stub_)); + for (auto operation : {AdminState::kDown, AdminState::kUp}) { + ASSERT_OK_AND_ASSIGN(const auto& port_name, + gutil::FindOrStatus(port_name_per_port_id, + absl::StrCat(selected_port_id))); + ASSERT_OK( + SetInterfaceAdminState(*control_gnmi_stub_, port_name, operation)); + + // TODO: Adding watch port up action causes unexpected traffic + // loss. Remove after the bug in OrchAgent is fixed. + absl::SleepFor(absl::Seconds(5)); + + // Clear the counters before the test. + test_data_.ClearReceivedPackets(); + + // Send 5000 packets and check for packet distribution. + ASSERT_OK(SendNPacketsToSut(kNumTestPackets, test_config, members, + controller_port_ids, GetIrP4Info(), + *control_p4_session_, environment)); + test_data_.total_packets_sent = kNumTestPackets; + + // Wait for packets from the SUT to arrive. + absl::SleepFor(kDurationToWaitForPackets); + + // For the test configuration, check the output distribution. + { + absl::MutexLock lock(&test_data_.mutex); + TestInputOutput& test = + test_data_.input_output_per_packet[test_config_key]; + EXPECT_EQ(test.output.size(), test_data_.total_packets_sent) + << "Mismatch in expected: " << test_data_.total_packets_sent + << " and actual: " << test.output.size() << "packets received for " + << DescribeTestConfig(test_config); + + ASSERT_OK_AND_ASSIGN(auto num_packets_per_port, + CountNumPacketsPerPort(test.output)); + + // Add/remove the selected member from expected_member_ports for admin + // up/down case. + if (operation == AdminState::kDown) { + expected_member_ports.erase(selected_port_id); + } else { + expected_member_ports.insert(selected_port_id); + } + + ASSERT_OK(VerifyGroupMembersFromP4Read(*sut_p4_session_, GetIrP4Info(), + kGroupId, members)); + ASSERT_OK(VerifyGroupMembersFromReceiveTraffic(num_packets_per_port, + expected_member_ports)); + PrettyPrintDistribution(test_config, test, test_data_, members, + num_packets_per_port); + } + } +} + +// TODO: Bring down APG member (when in critical state) and verify traffic is +// distributed only to the up ports. +TEST_P(WatchPortTestFixture, VerifyWatchPortActionInCriticalState){}; + +// Bring up/down the only ActionProfileGroup member and verify traffic is +// forwarded/dropped. +TEST_P(WatchPortTestFixture, VerifyWatchPortActionForSingleMember) { + thinkit::TestEnvironment& environment = + GetParam().testbed->GetMirrorTestbed().Environment(); + + absl::Span controller_port_ids = GetParam().port_ids; + const int group_size = 1; + ASSERT_OK_AND_ASSIGN(std::vector members, + CreateGroupMembers(group_size, controller_port_ids)); + + const int input_port = controller_port_ids[kDefaultInputPortIndex]; + ASSERT_OK(SetUpInputPortForPacketReceive(*sut_p4_session_, GetIrP4Info(), + input_port)); + + // Programs the required router interfaces, nexthops for wcmp group. + ASSERT_OK(pins::ProgramNextHops(environment, *sut_p4_session_, GetIrP4Info(), + members)); + ASSERT_OK(pins::ProgramGroupWithMembers(environment, *sut_p4_session_, + GetIrP4Info(), kGroupId, members, + p4::v1::Update::INSERT)); + // Program default routing for all packets on SUT. + ASSERT_OK(ProgramDefaultRoutes(*sut_p4_session_, GetIrP4Info(), kVrfId, + p4::v1::Update::INSERT)); + + // TODO: Revisit for newer chipsets. + // Rescale the member weights to what would have been programmed by the + // hardware. + RescaleMemberWeights(members); + + // Generate test configuration, pick any field used by hashing to vary for + // every packet so that it gets sent to all the members. + TestConfiguration test_config = { + .field = PacketField::kL4SrcPort, + .ipv4 = true, + .encapped = false, + .inner_ipv4 = false, + .decap = false, + }; + ASSERT_TRUE(IsValidTestConfiguration(test_config)); + + // Create test data entry. + std::string test_config_key = TestConfigurationToPayload(test_config); + { + absl::MutexLock lock(&test_data_.mutex); + test_data_.input_output_per_packet[test_config_key] = TestInputOutput{ + .config = test_config, + }; + } + + absl::flat_hash_set expected_member_ports = + CreateExpectedMemberPorts(members); + + // Pickup the only member (index 0) in the group and toggle down/up and verify + // traffic distribution. + ASSERT_THAT(members, testing::SizeIs(1)) + << "Unexpected member size for single member group"; + const int single_member_port_id = members[0].port; + ASSERT_OK_AND_ASSIGN(const auto port_name_per_port_id, + GetPortNamePerPortId(*control_gnmi_stub_)); + for (auto operation : {AdminState::kDown, AdminState::kUp}) { + ASSERT_OK_AND_ASSIGN( + const auto& port_name, + gutil::FindOrStatus(port_name_per_port_id, + absl::StrCat(single_member_port_id))); + ASSERT_OK( + SetInterfaceAdminState(*control_gnmi_stub_, port_name, operation)); + + // Clear the counters before the test. + test_data_.ClearReceivedPackets(); + + // TODO: Adding watch port up action causes unexpected traffic + // loss. Remove after the bug in OrchAgent is fixed. + absl::SleepFor(absl::Seconds(5)); + + // Send 5000 packets and check for packet distribution. + ASSERT_OK(SendNPacketsToSut(kNumTestPackets, test_config, members, + controller_port_ids, GetIrP4Info(), + *control_p4_session_, environment)); + test_data_.total_packets_sent = kNumTestPackets; + + // Wait for packets from the SUT to arrive. + absl::SleepFor(kDurationToWaitForPackets); + + // For the test configuration, check the output distribution. + { + absl::MutexLock lock(&test_data_.mutex); + TestInputOutput& test = + test_data_.input_output_per_packet[test_config_key]; + if (operation == AdminState::kDown) { + // Expect all packets to be lost for single member group watch port down + // action. + EXPECT_EQ(test.output.size(), 0) + << "Expected all packets to be lost for single member group watch " + "port down action, but received " + << test.output.size() << " actual packets"; + } else { + expected_member_ports.insert(single_member_port_id); + EXPECT_EQ(test.output.size(), test_data_.total_packets_sent) + << "Mismatch in expected: " << test_data_.total_packets_sent + << " and actual: " << test.output.size() << " packets received for " + << DescribeTestConfig(test_config); + } + ASSERT_OK_AND_ASSIGN(auto num_packets_per_port, + CountNumPacketsPerPort(test.output)); + + ASSERT_OK(VerifyGroupMembersFromP4Read(*sut_p4_session_, GetIrP4Info(), + kGroupId, members)); + ASSERT_OK(VerifyGroupMembersFromReceiveTraffic(num_packets_per_port, + expected_member_ports)); + PrettyPrintDistribution(test_config, test, test_data_, members, + num_packets_per_port); + } + } +}; + +// Modify ActionProfileGroup member and verify traffic is distributed +// accordingly. +TEST_P(WatchPortTestFixture, VerifyWatchPortActionForMemberModify) { + thinkit::TestEnvironment& environment = + GetParam().testbed->GetMirrorTestbed().Environment(); + + absl::Span controller_port_ids = GetParam().port_ids; + const int group_size = kNumWcmpMembersForTest; + ASSERT_OK_AND_ASSIGN(std::vector members, + CreateGroupMembers(group_size, controller_port_ids)); + + const int input_port = controller_port_ids[kDefaultInputPortIndex]; + ASSERT_OK(SetUpInputPortForPacketReceive(*sut_p4_session_, GetIrP4Info(), + input_port)); + + // Programs the required router interfaces, nexthops for wcmp group. + ASSERT_OK(pins::ProgramNextHops(environment, *sut_p4_session_, GetIrP4Info(), + members)); + ASSERT_OK(pins::ProgramGroupWithMembers(environment, *sut_p4_session_, + GetIrP4Info(), kGroupId, members, + p4::v1::Update::INSERT)); + // Program default routing for all packets on SUT. + ASSERT_OK(ProgramDefaultRoutes(*sut_p4_session_, GetIrP4Info(), kVrfId, + p4::v1::Update::INSERT)); + + // TODO: Revisit for newer chipsets. + // Rescale the member weights to what would have been programmed by the + // hardware. + RescaleMemberWeights(members); + + // Generate test configuration, pick any field used by hashing to vary for + // every packet so that it gets sent to all the members. + TestConfiguration test_config = { + .field = PacketField::kIpDst, + .ipv4 = true, + .encapped = false, + .inner_ipv4 = false, + .decap = false, + }; + ASSERT_TRUE(IsValidTestConfiguration(test_config)); + + // Create test data entry. + std::string test_config_key = TestConfigurationToPayload(test_config); + { + absl::MutexLock lock(&test_data_.mutex); + test_data_.input_output_per_packet[test_config_key] = TestInputOutput{ + .config = test_config, + }; + } + + // Select one random member of the group to be brought down. + absl::BitGen gen; + const int random_member_index = + absl::Uniform(absl::IntervalClosedOpen, gen, 0, members.size()); + const int selected_port_id = members[random_member_index].port; + ASSERT_OK_AND_ASSIGN(const auto port_name_per_port_id, + GetPortNamePerPortId(*control_gnmi_stub_)); + ASSERT_OK_AND_ASSIGN(const auto& selected_port_name, + gutil::FindOrStatus(port_name_per_port_id, + absl::StrCat(selected_port_id))); + ASSERT_OK(SetInterfaceAdminState(*control_gnmi_stub_, selected_port_name, + AdminState::kDown)); + + // Send Modify request to remove the down member from the group. + members.erase(members.begin() + random_member_index); + ASSERT_OK(pins::ProgramGroupWithMembers(environment, *sut_p4_session_, + GetIrP4Info(), kGroupId, members, + p4::v1::Update::MODIFY)); + // Bring the down member watch port up. + ASSERT_OK(SetInterfaceAdminState(*control_gnmi_stub_, selected_port_name, + AdminState::kUp)); + + // TODO: Adding watch port up action causes unexpected traffic + // loss. Remove after the bug in OrchAgent is fixed. + absl::SleepFor(absl::Seconds(5)); + + // Send 5000 packets and check for packet distribution. + ASSERT_OK(SendNPacketsToSut(kNumTestPackets, test_config, members, + controller_port_ids, GetIrP4Info(), + *control_p4_session_, environment)); + test_data_.total_packets_sent = kNumTestPackets; + + // Wait for packets from the SUT to arrive. + absl::SleepFor(kDurationToWaitForPackets); + + // For the test configuration, check the output distribution. + { + absl::MutexLock lock(&test_data_.mutex); + TestInputOutput& test = test_data_.input_output_per_packet[test_config_key]; + EXPECT_EQ(test.output.size(), test_data_.total_packets_sent) + << "Mismatch in expected: " << test_data_.total_packets_sent + << " and actual: " << test.output.size() << "packets received for " + << DescribeTestConfig(test_config); + + ASSERT_OK_AND_ASSIGN(auto num_packets_per_port, + CountNumPacketsPerPort(test.output)); + + absl::flat_hash_set expected_member_ports = + CreateExpectedMemberPorts(members); + + ASSERT_OK(VerifyGroupMembersFromP4Read(*sut_p4_session_, GetIrP4Info(), + kGroupId, members)); + ASSERT_OK(VerifyGroupMembersFromReceiveTraffic(num_packets_per_port, + expected_member_ports)); + PrettyPrintDistribution(test_config, test, test_data_, members, + num_packets_per_port); + } + + // Delete default routes to prepare for delete group. + ASSERT_OK(ProgramDefaultRoutes(*sut_p4_session_, GetIrP4Info(), kVrfId, + p4::v1::Update::DELETE)); + + // Delete group and verify no errors. + ASSERT_OK(DeleteGroup(*sut_p4_session_, GetIrP4Info(), kGroupId)); +}; + +// Add ActionProfileGroup member whose watch port is down (during create) and +// verify traffic distribution when port is down/up. +TEST_P(WatchPortTestFixture, VerifyWatchPortActionForDownPortMemberInsert) { + thinkit::TestEnvironment& environment = + GetParam().testbed->GetMirrorTestbed().Environment(); + environment.SetTestCaseID("e54da480-d2cc-42c6-bced-0354b5ab3329"); + absl::Span controller_port_ids = GetParam().port_ids; + const int input_port = controller_port_ids[kDefaultInputPortIndex]; + ASSERT_OK(SetUpInputPortForPacketReceive(*sut_p4_session_, GetIrP4Info(), + input_port)); + + const int group_size = kNumWcmpMembersForTest; + ASSERT_OK_AND_ASSIGN(std::vector members, + CreateGroupMembers(group_size, controller_port_ids)); + // Select one random port from the member port ids to be brought down/up. + absl::BitGen gen; + const int random_member_index = + absl::Uniform(absl::IntervalClosedOpen, gen, 0, members.size()); + const int selected_port_id = members[random_member_index].port; + ASSERT_OK_AND_ASSIGN(const auto port_name_per_port_id, + GetPortNamePerPortId(*control_gnmi_stub_)); + ASSERT_OK_AND_ASSIGN(const auto& selected_port_name, + gutil::FindOrStatus(port_name_per_port_id, + absl::StrCat(selected_port_id))); + // Bring the port down before the group and members are created. + ASSERT_OK(SetInterfaceAdminState(*control_gnmi_stub_, selected_port_name, + AdminState::kDown)); + + // Programs the required router interfaces, nexthops for wcmp group. + ASSERT_OK(pins::ProgramNextHops(environment, *sut_p4_session_, GetIrP4Info(), + members)); + ASSERT_OK(pins::ProgramGroupWithMembers(environment, *sut_p4_session_, + GetIrP4Info(), kGroupId, members, + p4::v1::Update::INSERT)); + // Program default routing for all packets on SUT. + ASSERT_OK(ProgramDefaultRoutes(*sut_p4_session_, GetIrP4Info(), kVrfId, + p4::v1::Update::INSERT)); + + // Generate test configuration, pick any field used by hashing to vary for + // every packet so that it gets sent to all the members. + TestConfiguration test_config = { + .field = PacketField::kL4DstPort, + .ipv4 = true, + .encapped = false, + .inner_ipv4 = false, + .decap = false, + }; + ASSERT_TRUE(IsValidTestConfiguration(test_config)); + + // Create test data entry. + std::string test_config_key = TestConfigurationToPayload(test_config); + { + absl::MutexLock lock(&test_data_.mutex); + test_data_.input_output_per_packet[test_config_key] = TestInputOutput{ + .config = test_config, + }; + } + + for (auto operation : {AdminState::kDown, AdminState::kUp}) { + // Down operation is a no-op here since the port is already down. + ASSERT_OK(SetInterfaceAdminState(*control_gnmi_stub_, selected_port_name, + operation)); + + // Clear the counters before the test. + test_data_.ClearReceivedPackets(); + + // TODO: Adding watch port up action causes unexpected traffic + // loss. Remove after the bug in OrchAgent is fixed. + absl::SleepFor(absl::Seconds(5)); + + // Send 5000 packets and check for packet distribution. + ASSERT_OK(SendNPacketsToSut(kNumTestPackets, test_config, members, + controller_port_ids, GetIrP4Info(), + *control_p4_session_, environment)); + test_data_.total_packets_sent = kNumTestPackets; + + // Wait for packets from the SUT to arrive. + absl::SleepFor(kDurationToWaitForPackets); + + // For the test configuration, check the output distribution. + { + absl::MutexLock lock(&test_data_.mutex); + TestInputOutput& test = + test_data_.input_output_per_packet[test_config_key]; + EXPECT_EQ(test.output.size(), test_data_.total_packets_sent) + << "Mismatch in expected: " << test_data_.total_packets_sent + << " and actual: " << test.output.size() << "packets received for " + << DescribeTestConfig(test_config); + + absl::flat_hash_set expected_member_ports = + CreateExpectedMemberPorts(members); + // Remove the selected member from expected_member_ports for admin + // down case. + if (operation == AdminState::kDown) { + expected_member_ports.erase(selected_port_id); + } + ASSERT_OK_AND_ASSIGN(auto num_packets_per_port, + CountNumPacketsPerPort(test.output)); + + ASSERT_OK(VerifyGroupMembersFromP4Read(*sut_p4_session_, GetIrP4Info(), + kGroupId, members)); + ASSERT_OK(VerifyGroupMembersFromReceiveTraffic(num_packets_per_port, + expected_member_ports)); + PrettyPrintDistribution(test_config, test, test_data_, members, + num_packets_per_port); + } + } +} + +} // namespace +} // namespace pins diff --git a/tests/forwarding/watch_port_test.h b/tests/forwarding/watch_port_test.h index 496b9fe5..94ed6e8d 100644 --- a/tests/forwarding/watch_port_test.h +++ b/tests/forwarding/watch_port_test.h @@ -15,21 +15,55 @@ #ifndef PINS_TESTS_FORWARDING_WATCH_PORT_TEST_H_ #define PINS_TESTS_FORWARDING_WATCH_PORT_TEST_H_ +#include +#include +#include +#include #include // NOLINT: Need threads (instead of fiber) for upstream code. #include +#include "absl/status/status.h" +#include "gtest/gtest.h" +#include "p4/config/v1/p4info.pb.h" +#include "p4_pdpi/ir.pb.h" #include "p4_pdpi/p4_runtime_session.h" -#include "sai_p4/instantiations/google/sai_p4info.h" +#include "proto/gnmi/gnmi.grpc.pb.h" #include "tests/forwarding/group_programming_util.h" #include "tests/forwarding/packet_test_util.h" #include "thinkit/mirror_testbed_fixture.h" +#include "thinkit/switch.h" namespace pins { -// WatchPortTestFixture that holds member functions needed for testing watch -// port action. -// TODO: To be implemented. -class WatchPortTestFixture : public thinkit::MirrorTestbedFixture {}; +// Holds the common params needed for watch port test. +struct WatchPortTestParams { + thinkit::MirrorTestbedInterface* testbed; + std::string gnmi_config; + // TODO: Remove port ids from here and derive from gNMI config. + std::vector port_ids; +}; + +// WatchPortTestFixture for testing watch port action. +class WatchPortTestFixture + : public testing::TestWithParam { + protected: + void SetUp() override; + + void TearDown() override; + + // Returns the P4Info used by the test, for now just Middleblock. + const p4::config::v1::P4Info& GetP4Info(); + const pdpi::IrP4Info& GetIrP4Info(); + + TestData test_data_; + std::unique_ptr sut_p4_session_; + std::unique_ptr control_p4_session_; + std::unique_ptr control_gnmi_stub_; + // Stores the receive thread that is created in SetUp() and joined in + // TearDown(). Accesses control_p4_session_->StreamChannelRead to read + // packets, which must not be used by other threads. + std::thread receive_packet_thread_; +}; } // namespace pins