diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp index 83a943a64d..2072a45466 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp @@ -232,3 +232,69 @@ TEST(core_cpp_pubsub, TestPublisherIsSubscribedTiming) // finalize eCAL API eCAL::Finalize(); } + +TEST(core_cpp_pubsub, TestChainedPublisherSubscriberCallback) +{ + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "chained_publisher_subscriber")); + + // enable data loopback + eCAL::Util::EnableLoopback(true); + + // Set up counters for sent and received messages + const int message_count = 10; + std::atomic publisher1_sent_count(0); + std::atomic subscriber2_received_count(0); + + // Publisher1 in thread 1 + auto publisher1_thread = [&]() { + eCAL::CPublisher pub1("topic1"); + while (!pub1.IsSubscribed()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + for (int i = 0; i < message_count; ++i) + { + pub1.Send(std::to_string(i)); + publisher1_sent_count++; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + }; + + // Publisher2 + eCAL::CPublisher pub2("topic2"); + + // Subscriber1 with callback that triggers Publisher2 + eCAL::CSubscriber sub1("topic1"); + auto subscriber1_callback = [&](const char* /*topic_name*/, const eCAL::SReceiveCallbackData* data) { + // On receiving data from Publisher1, Publisher2 sends the same data + std::string received_data(static_cast(data->buf), data->size); + pub2.Send(received_data); + }; + sub1.AddReceiveCallback(subscriber1_callback); + + // Subscriber2 that receives data from Publisher2 + eCAL::CSubscriber sub2("topic2"); + auto subscriber2_callback = [&](const char* /*topic_name*/, const eCAL::SReceiveCallbackData* data) { + // Count each received message from Publisher2 + subscriber2_received_count++; + std::cout << "Subscriber2 Receiving " << std::string(static_cast(data->buf), data->size) << std::endl; + }; + sub2.AddReceiveCallback(subscriber2_callback); + + // Start publisher1 thread + std::thread pub1_thread(publisher1_thread); + + // Wait until Publisher1 has sent all messages and Subscriber2 has received them + pub1_thread.join(); + while (subscriber2_received_count < message_count) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // Validate that Subscriber2 received all messages sent by Publisher1 + EXPECT_EQ(publisher1_sent_count, subscriber2_received_count); + + // finalize eCAL API + eCAL::Finalize(); +}