Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/chained pubsub #1798

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,69 @@ TEST(core_cpp_pubsub, TestPublisherIsSubscribedTiming)
// finalize eCAL API
eCAL::Finalize();
}

TEST(core_cpp_pubsub, TestChainedPublisherSubscriberCallback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: all parameters should be named in a function [readability-named-parameter]

Suggested change
TEST(core_cpp_pubsub, TestChainedPublisherSubscriberCallback)
TEST(core_cpp_pubsub /*unused*/, TestChainedPublisherSubscriberCallback /*unused*/)

{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "chained_publisher_subscriber"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

// Set up counters for sent and received messages
const int message_count = 10;
std::atomic<int> publisher1_sent_count(0);
std::atomic<int> subscriber2_received_count(0);

// Publisher1 in thread 1
auto publisher1_thread = [&]() {
eCAL::CPublisher pub1("topic1");
while (!pub1.IsSubscribed())
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
for (int i = 0; i < message_count; ++i)
{
pub1.Send(std::to_string(i));
publisher1_sent_count++;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
};

// Publisher2
eCAL::CPublisher pub2("topic2");

// Subscriber1 with callback that triggers Publisher2
eCAL::CSubscriber sub1("topic1");
auto subscriber1_callback = [&](const char* /*topic_name*/, const eCAL::SReceiveCallbackData* data) {
// On receiving data from Publisher1, Publisher2 sends the same data
std::string received_data(static_cast<const char*>(data->buf), data->size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'received_data' of type 'std::string' (aka 'basic_string') can be declared 'const' [misc-const-correctness]

Suggested change
std::string received_data(static_cast<const char*>(data->buf), data->size);
std::string const received_data(static_cast<const char*>(data->buf), data->size);

pub2.Send(received_data);
};
sub1.AddReceiveCallback(subscriber1_callback);

// Subscriber2 that receives data from Publisher2
eCAL::CSubscriber sub2("topic2");
auto subscriber2_callback = [&](const char* /*topic_name*/, const eCAL::SReceiveCallbackData* data) {
// Count each received message from Publisher2
subscriber2_received_count++;
std::cout << "Subscriber2 Receiving " << std::string(static_cast<const char*>(data->buf), data->size) << std::endl;
};
sub2.AddReceiveCallback(subscriber2_callback);

// Start publisher1 thread
std::thread pub1_thread(publisher1_thread);

// Wait until Publisher1 has sent all messages and Subscriber2 has received them
pub1_thread.join();
while (subscriber2_received_count < message_count)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Validate that Subscriber2 received all messages sent by Publisher1
EXPECT_EQ(publisher1_sent_count, subscriber2_received_count);

// finalize eCAL API
eCAL::Finalize();
}
Loading