From e87beba1b2672d462aef5179893de3a4a6e462c8 Mon Sep 17 00:00:00 2001 From: Fred Suter Date: Sun, 29 Sep 2024 19:05:26 -0400 Subject: [PATCH] Revert "add tiny delay at the end of Stream open to avoid race condition + test revalidation" This reverts commit 864c921b6a702c5d9b6f3d72aabf466874243dff. --- src/Engine.cpp | 4 ++-- src/Stream.cpp | 9 +++------ test/dtl_file_engine.cpp | 2 +- test/dtl_stream.cpp | 6 +++--- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/Engine.cpp b/src/Engine.cpp index 3a78ec7..32e11ec 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -138,7 +138,7 @@ void Engine::end_pub_transaction() void Engine::pub_close() { auto self = sg4::Actor::self(); - XBT_DEBUG("Publisher '%s' is closing the engine '%s'", self->get_cname(), get_cname()); + XBT_DEBUG("Publisher '%s' is closing the engine", self->get_cname()); if (not pub_closing_) { // I'm the first to close pub_closing_ = true; @@ -221,7 +221,7 @@ void Engine::end_sub_transaction() void Engine::sub_close() { auto self = sg4::Actor::self(); - XBT_DEBUG("Subscriber '%s' is closing the engine '%s'", self->get_cname(), get_cname()); + XBT_DEBUG("Subscriber '%s' is closing the engine", self->get_cname()); if (not sub_closing_) { // I'm the first to close sub_closing_ = true; diff --git a/src/Stream.cpp b/src/Stream.cpp index f7852f2..071b259 100644 --- a/src/Stream.cpp +++ b/src/Stream.cpp @@ -133,7 +133,6 @@ std::shared_ptr Stream::open(const std::string& name, Mode mode) dtl_->lock(); if (not engine_) { - XBT_DEBUG("Create Engine"); if (engine_type_ == Engine::Type::Staging) { engine_ = std::make_shared(name, this); } else if (engine_type_ == Engine::Type::File) { @@ -146,7 +145,7 @@ std::shared_ptr Stream::open(const std::string& name, Mode mode) dtl_->unlock(); while (not engine_) - sg4::this_actor::sleep_for(0.01); + sg4::this_actor::sleep_for(0.05); // Then we register the actors calling Stream::open as publishers or subscribers in the newly created Engine. if (mode == dtlmod::Stream::Mode::Publish) { @@ -155,11 +154,9 @@ std::shared_ptr Stream::open(const std::string& name, Mode mode) engine_->add_subscriber(sg4::Actor::self(), rendez_vous_); } - XBT_DEBUG("Stream '%s' uses engine '%s' of type '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), engine_->get_cname(), - get_engine_type_str(), get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers()); + XBT_DEBUG("Stream '%s' uses engine '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), get_engine_type_str(), + get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers()); - sg4::this_actor::sleep_for(0.05); - return engine_; } diff --git a/test/dtl_file_engine.cpp b/test/dtl_file_engine.cpp index 6409420..419233f 100644 --- a/test/dtl_file_engine.cpp +++ b/test/dtl_file_engine.cpp @@ -234,7 +234,7 @@ TEST_F(DTLFileEngineTest, MultiplePubSingleSubSharedStorage) XBT_INFO("Start a Transaction"); ASSERT_NO_THROW(engine->begin_transaction()); XBT_INFO("Transition can start as publishers have finished writing"); - ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.701431842993127); + ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.651431842993127); XBT_INFO("Get the entire Variable 'var' from the DTL"); ASSERT_NO_THROW(engine->get(var_sub)); XBT_INFO("End a Transaction"); diff --git a/test/dtl_stream.cpp b/test/dtl_stream.cpp index 513a1a0..0e17fd8 100644 --- a/test/dtl_stream.cpp +++ b/test/dtl_stream.cpp @@ -138,7 +138,7 @@ TEST_F(DTLStreamTest, PublishFileMultipleOpen) XBT_INFO("Open the Stream in Stream::Mode::Publish mode"); ASSERT_NO_THROW(engine = stream->open("zone:fs:/pfs/file", dtlmod::Stream::Mode::Publish)); XBT_INFO("Check current number of publishers and subscribers"); - ASSERT_EQ(stream->get_num_publishers(), 2); + ASSERT_EQ(stream->get_num_publishers(), 1); ASSERT_EQ(stream->get_num_subscribers(), 0); ASSERT_NO_THROW(sg4::this_actor::sleep_for(1)); XBT_INFO("Close the engine"); @@ -194,8 +194,8 @@ TEST_F(DTLStreamTest, OpenWithRendezVous) ASSERT_NO_THROW(stream->set_rendez_vous()); XBT_INFO("Open the Stream in Stream::Mode::Publish mode"); ASSERT_NO_THROW(engine = stream->open("foo", dtlmod::Stream::Mode::Publish)); - XBT_INFO("Open complete. Clock should be at 10.05s"); - ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.05); + XBT_INFO("Open complete. Clock should be at 10s"); + ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.0); XBT_INFO("Let actor %s sleep for 1 second", sg4::this_actor::get_cname()); ASSERT_NO_THROW(sg4::this_actor::sleep_for(1)); XBT_INFO("Close the engine");