Skip to content

Commit

Permalink
Revert "add tiny delay at the end of Stream open to avoid race condit…
Browse files Browse the repository at this point in the history
…ion + test revalidation"

This reverts commit 864c921.
  • Loading branch information
frs69wq committed Sep 29, 2024
1 parent e825c13 commit e87beba
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void Engine::end_pub_transaction()
void Engine::pub_close()
{
auto self = sg4::Actor::self();
XBT_DEBUG("Publisher '%s' is closing the engine '%s'", self->get_cname(), get_cname());
XBT_DEBUG("Publisher '%s' is closing the engine", self->get_cname());
if (not pub_closing_) {
// I'm the first to close
pub_closing_ = true;
Expand Down Expand Up @@ -221,7 +221,7 @@ void Engine::end_sub_transaction()
void Engine::sub_close()
{
auto self = sg4::Actor::self();
XBT_DEBUG("Subscriber '%s' is closing the engine '%s'", self->get_cname(), get_cname());
XBT_DEBUG("Subscriber '%s' is closing the engine", self->get_cname());
if (not sub_closing_) {
// I'm the first to close
sub_closing_ = true;
Expand Down
9 changes: 3 additions & 6 deletions src/Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
dtl_->lock();

if (not engine_) {
XBT_DEBUG("Create Engine");
if (engine_type_ == Engine::Type::Staging) {
engine_ = std::make_shared<StagingEngine>(name, this);
} else if (engine_type_ == Engine::Type::File) {
Expand All @@ -146,7 +145,7 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
dtl_->unlock();

while (not engine_)
sg4::this_actor::sleep_for(0.01);
sg4::this_actor::sleep_for(0.05);

// Then we register the actors calling Stream::open as publishers or subscribers in the newly created Engine.
if (mode == dtlmod::Stream::Mode::Publish) {
Expand All @@ -155,11 +154,9 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
engine_->add_subscriber(sg4::Actor::self(), rendez_vous_);
}

XBT_DEBUG("Stream '%s' uses engine '%s' of type '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), engine_->get_cname(),
get_engine_type_str(), get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers());
XBT_DEBUG("Stream '%s' uses engine '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), get_engine_type_str(),
get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers());

sg4::this_actor::sleep_for(0.05);

return engine_;
}

Expand Down
2 changes: 1 addition & 1 deletion test/dtl_file_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ TEST_F(DTLFileEngineTest, MultiplePubSingleSubSharedStorage)
XBT_INFO("Start a Transaction");
ASSERT_NO_THROW(engine->begin_transaction());
XBT_INFO("Transition can start as publishers have finished writing");
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.701431842993127);
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.651431842993127);
XBT_INFO("Get the entire Variable 'var' from the DTL");
ASSERT_NO_THROW(engine->get(var_sub));
XBT_INFO("End a Transaction");
Expand Down
6 changes: 3 additions & 3 deletions test/dtl_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ TEST_F(DTLStreamTest, PublishFileMultipleOpen)
XBT_INFO("Open the Stream in Stream::Mode::Publish mode");
ASSERT_NO_THROW(engine = stream->open("zone:fs:/pfs/file", dtlmod::Stream::Mode::Publish));
XBT_INFO("Check current number of publishers and subscribers");
ASSERT_EQ(stream->get_num_publishers(), 2);
ASSERT_EQ(stream->get_num_publishers(), 1);
ASSERT_EQ(stream->get_num_subscribers(), 0);
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
XBT_INFO("Close the engine");
Expand Down Expand Up @@ -194,8 +194,8 @@ TEST_F(DTLStreamTest, OpenWithRendezVous)
ASSERT_NO_THROW(stream->set_rendez_vous());
XBT_INFO("Open the Stream in Stream::Mode::Publish mode");
ASSERT_NO_THROW(engine = stream->open("foo", dtlmod::Stream::Mode::Publish));
XBT_INFO("Open complete. Clock should be at 10.05s");
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.05);
XBT_INFO("Open complete. Clock should be at 10s");
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.0);
XBT_INFO("Let actor %s sleep for 1 second", sg4::this_actor::get_cname());
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
XBT_INFO("Close the engine");
Expand Down

0 comments on commit e87beba

Please sign in to comment.