Skip to content

Commit 04bfe31

Browse files
authored
Merge branch 'main' into it-legacy-to-fsm-switch-via-quorum1
2 parents 8afc20d + a818cc6 commit 04bfe31

34 files changed

+499
-243
lines changed

src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,13 @@
2020
namespace BloombergLP {
2121
namespace bmqc {
2222

23+
// -------------------------------------------
24+
// struct OrderedHashMapWithHistory_ImpDetails
25+
// -------------------------------------------
26+
27+
const int
28+
OrderedHashMapWithHistory_ImpDetails::k_INSERT_GC_MESSAGES_BATCH_SIZE =
29+
1000;
30+
2331
} // close package namespace
2432
} // close enterprise namespace

src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@
5555
namespace BloombergLP {
5656
namespace bmqc {
5757

58+
// ===========================================
59+
// struct OrderedHashMapWithHistory_ImpDetails
60+
// ===========================================
61+
62+
/// PRIVATE CLASS.
63+
// For use only by `bmqc::OrderedHashMapWithHistory` implementation.
64+
struct OrderedHashMapWithHistory_ImpDetails {
65+
// PRIVATE CLASS DATA
66+
/// How many messages to GC when GC required in
67+
/// `bmqc::OrderedHashMapWithHistory::insert`
68+
static const int k_INSERT_GC_MESSAGES_BATCH_SIZE;
69+
};
70+
5871
// ========================================
5972
// class OrderedHashMapWithHistory_Iterator
6073
// ========================================
@@ -219,7 +232,27 @@ class OrderedHashMapWithHistory {
219232

220233
size_t d_historySize; // how many historical (!d_isLive) items
221234

222-
gc_iterator d_gcIt; // where to start 'gc'
235+
/// Whether this container has more elements to `gc`. This flag might be
236+
/// set or unset during every `gc` call according to this container's
237+
/// needs.
238+
bool d_requireGC;
239+
240+
/// The `now` time of the last GC. We assume that the current actual time
241+
/// is no less than this timestamp.
242+
TimeType d_lastGCTime;
243+
244+
/// The iterator pointing to the element where garbage collection should
245+
/// continue once `gc` is called. According to contract, this iterator
246+
/// only goes forward. All the elements passed by this iterator are either
247+
/// removed or marked for removal, depending on what happened first:
248+
/// - If the element was not erased by the user before, but its timeout
249+
/// happened in this container, it is marked for deletion in `gc` and
250+
/// iterator goes forward. Next, it is the user's responsibility to call
251+
/// `erase` on this element to fully remove it.
252+
/// - If the user removes the element before its timeout happened, the
253+
/// element becomes `not alive`, but still lives in the history.
254+
/// Eventually `gc` reaches this element and fully removes it.
255+
gc_iterator d_gcIt;
223256

224257
// PRIVATE CLASS METHODS
225258
static const KEY& get_key(const bsl::pair<const KEY, VALUE>& value)
@@ -492,6 +525,8 @@ inline OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::
492525
, d_first(d_impl.end())
493526
, d_last(d_impl.end())
494527
, d_historySize(0)
528+
, d_requireGC(false)
529+
, d_lastGCTime(0)
495530
, d_gcIt(endGc())
496531
{
497532
// NOTHING
@@ -521,6 +556,8 @@ inline void OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::clear()
521556

522557
d_first = d_last = end();
523558
d_gcIt = endGc();
559+
d_requireGC = false;
560+
d_lastGCTime = 0;
524561
d_historySize = 0;
525562
}
526563

@@ -623,8 +660,15 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::insert(
623660
const SOURCE_TYPE& value,
624661
TimeType timePoint)
625662
{
626-
TimeType time = d_timeout ? timePoint + d_timeout : 0;
663+
TimeType time = d_timeout ? timePoint + d_timeout : 0;
664+
if (d_requireGC) {
665+
gc(bsl::max(timePoint, d_lastGCTime),
666+
OrderedHashMapWithHistory_ImpDetails::
667+
k_INSERT_GC_MESSAGES_BATCH_SIZE);
668+
}
669+
627670
bsl::pair<gc_iterator, bool> result = d_impl.insert(Value(value, time));
671+
628672
// No need to keep track of element's timePoint if the map is not
629673
// maintaining any history (i.e., if d_timeout == 0).
630674

@@ -668,7 +712,9 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::gc(TimeType now,
668712
// Try to advance to either a young item, or to the end of the batch, or to
669713
// the end of collection.
670714

671-
if (d_gcIt == endGc()) {
715+
d_lastGCTime = now;
716+
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_gcIt == endGc())) {
717+
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
672718
d_gcIt = beginGc();
673719
}
674720
const int initialHistorySize = d_historySize;
@@ -681,26 +727,30 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::gc(TimeType now,
681727
}
682728
gc_iterator it = d_gcIt++;
683729
if (it->d_isLive) {
684-
// This is an old item. It should be removed by 'erase'.
685-
// No need to return to this item. No need to check time again.
686-
// Indicate that it needs to be removed by setting its time to 0.
730+
// This item was not erased by the user yet, but its timeout in
731+
// this container happened. Mark it for deletion by setting
732+
// `d_time` to 0, so the next time user calls `erase` on it, it
733+
// will be fully removed.
687734
it->d_time = 0;
688735
}
689736
else {
737+
// This item was erased by the user before, and we can fully remove
738+
// it right here.
690739
d_impl.erase(it);
691740
--d_historySize;
692741
}
693742
// Meaning, there is no need for 'd_gcIt' to step back. Only forward.
694743

695744
if (--batchSize == 0) {
696745
// Remember where we have stopped and resume from there next time
697-
746+
d_requireGC = true;
698747
const int historyChange = initialHistorySize - d_historySize;
699748
// Note that we return either a negative value or 0 here:
700749
return -historyChange; // RETURN
701750
}
702751
}
703752

753+
d_requireGC = false;
704754
const int historyChange = initialHistorySize - d_historySize;
705755
// Note that we return either a positive value or 0 here:
706756
return historyChange;

src/groups/bmq/bmqt/bmqt_uri.t.cpp

Lines changed: 168 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,23 @@
2222
// BDE
2323
#include <ball_log.h>
2424
#include <bdlf_bind.h>
25+
#include <bdlma_localsequentialallocator.h>
2526
#include <bsl_string.h>
2627
#include <bsl_utility.h>
2728
#include <bsl_vector.h>
2829
#include <bslh_defaulthashalgorithm.h>
2930
#include <bslh_hash.h>
3031
#include <bslmt_barrier.h>
32+
#include <bslmt_latch.h>
3133
#include <bslmt_threadgroup.h>
3234

3335
// TEST DRIVER
3436
#include <bmqtst_testhelper.h>
35-
#include <bsl_functional.h>
37+
38+
// BENCHMARKING LIBRARY
39+
#ifdef BMQTST_BENCHMARK_ENABLED
40+
#include <benchmark/benchmark.h>
41+
#endif
3642

3743
// CONVENIENCE
3844
using namespace BloombergLP;
@@ -647,6 +653,123 @@ static void test7_testLongUri()
647653
bmqt::UriParser::shutdown();
648654
}
649655

656+
#ifdef BMQTST_BENCHMARK_ENABLED
657+
658+
struct UriParserBenchmark {
659+
static void bench(bslmt::Latch* initLatch_p,
660+
bslmt::Barrier* startBarrier_p,
661+
bslmt::Latch* finishLatch_p)
662+
{
663+
// PRECONDITIONS
664+
BSLS_ASSERT_OPT(initLatch_p);
665+
BSLS_ASSERT_OPT(startBarrier_p);
666+
BSLS_ASSERT_OPT(finishLatch_p);
667+
668+
const size_t k_NUM_ITERATIONS = 100000;
669+
const bsl::string k_SAMPLE_URI(
670+
"bmq://my.sample.domain.~dev/my-queue-name?id=consumer123",
671+
bmqtst::TestHelperUtil::allocator());
672+
673+
bmqt::Uri uri(bmqtst::TestHelperUtil::allocator());
674+
bsl::string error(bmqtst::TestHelperUtil::allocator());
675+
676+
initLatch_p->arrive();
677+
startBarrier_p->wait();
678+
679+
for (size_t i = 0; i < k_NUM_ITERATIONS; ++i) {
680+
bmqt::UriParser::parse(&uri, &error, k_SAMPLE_URI);
681+
}
682+
683+
finishLatch_p->arrive();
684+
}
685+
};
686+
687+
struct UriConstructorBenchmark {
688+
static void bench(bslmt::Latch* initLatch_p,
689+
bslmt::Barrier* startBarrier_p,
690+
bslmt::Latch* finishLatch_p)
691+
{
692+
// PRECONDITIONS
693+
BSLS_ASSERT_OPT(initLatch_p);
694+
BSLS_ASSERT_OPT(startBarrier_p);
695+
BSLS_ASSERT_OPT(finishLatch_p);
696+
697+
const size_t k_NUM_ITERATIONS = 100000;
698+
const bsl::string k_SAMPLE_URI(
699+
"bmq://my.sample.domain.~dev/my-queue-name?id=consumer123",
700+
bmqtst::TestHelperUtil::allocator());
701+
702+
// Test allocator is slow and might skew the benchmarks
703+
bdlma::LocalSequentialAllocator<256> lsa(
704+
bmqtst::TestHelperUtil::allocator());
705+
706+
initLatch_p->arrive();
707+
startBarrier_p->wait();
708+
709+
for (size_t i = 0; i < k_NUM_ITERATIONS; ++i) {
710+
bmqt::Uri uri(k_SAMPLE_URI, &lsa);
711+
(void)uri;
712+
}
713+
714+
finishLatch_p->arrive();
715+
}
716+
};
717+
718+
template <size_t NUM_THREADS, typename BENCHMARK>
719+
static void testN1_benchmark(benchmark::State& state)
720+
// ------------------------------------------------------------------------
721+
// URI PERFORMANCE TEST
722+
//
723+
// Plan: spawn NUM_THREADS and measure the time taken for BENCHMARK::bench
724+
//
725+
// Testing:
726+
// Performance
727+
// ------------------------------------------------------------------------
728+
{
729+
bmqtst::TestHelper::printTestName("URI PERFORMANCE TEST");
730+
731+
bmqt::UriParser::initialize(bmqtst::TestHelperUtil::allocator());
732+
733+
bslmt::Latch initThreadLatch(NUM_THREADS);
734+
bslmt::Barrier startBenchmarkBarrier(NUM_THREADS + 1);
735+
bslmt::Latch finishBenchmarkLatch(NUM_THREADS);
736+
737+
bslmt::ThreadGroup threadGroup(bmqtst::TestHelperUtil::allocator());
738+
for (size_t i = 0; i < NUM_THREADS; ++i) {
739+
const int rc = threadGroup.addThread(
740+
bdlf::BindUtil::bindS(bmqtst::TestHelperUtil::allocator(),
741+
&(BENCHMARK::bench),
742+
&initThreadLatch,
743+
&startBenchmarkBarrier,
744+
&finishBenchmarkLatch));
745+
BMQTST_ASSERT_EQ_D(i, rc, 0);
746+
}
747+
748+
initThreadLatch.wait();
749+
750+
size_t iter = 0;
751+
for (auto _ : state) {
752+
// Benchmark time start
753+
754+
// We don't support running multi-iteration benchmarks because we
755+
// prepare and start complex tasks in separate threads.
756+
// Once these tasks are finished, we cannot simply re-run them without
757+
// reinitialization, and it goes against benchmark library design.
758+
// Make sure we run this only once.
759+
BSLS_ASSERT_OPT(0 == iter++ && "Must be run only once");
760+
761+
startBenchmarkBarrier.wait();
762+
finishBenchmarkLatch.wait();
763+
764+
// Benchmark time end
765+
}
766+
767+
threadGroup.joinAll();
768+
bmqt::UriParser::shutdown();
769+
}
770+
771+
#endif // BMQTST_BENCHMARK_ENABLED
772+
650773
// ============================================================================
651774
// MAIN PROGRAM
652775
// ----------------------------------------------------------------------------
@@ -664,6 +787,50 @@ int main(int argc, char* argv[])
664787
case 3: test3_URIBuilderMultiThreaded(); break;
665788
case 2: test2_URIBuilder(); break;
666789
case 1: test1_breathingTest(); break;
790+
case -1: {
791+
#ifdef BMQTST_BENCHMARK_ENABLED
792+
BENCHMARK(testN1_benchmark<1, UriParserBenchmark>)
793+
->Name("bmqt::UriParser::parse threads=1")
794+
->Iterations(1)
795+
->Unit(benchmark::kMillisecond);
796+
BENCHMARK(testN1_benchmark<2, UriParserBenchmark>)
797+
->Name("bmqt::UriParser::parse threads=2")
798+
->Iterations(1)
799+
->Unit(benchmark::kMillisecond);
800+
BENCHMARK(testN1_benchmark<4, UriParserBenchmark>)
801+
->Name("bmqt::UriParser::parse threads=4")
802+
->Iterations(1)
803+
->Unit(benchmark::kMillisecond);
804+
BENCHMARK(testN1_benchmark<8, UriParserBenchmark>)
805+
->Name("bmqt::UriParser::parse threads=8")
806+
->Iterations(1)
807+
->Unit(benchmark::kMillisecond);
808+
809+
BENCHMARK(testN1_benchmark<1, UriConstructorBenchmark>)
810+
->Name("bmqt::Uri::Uri threads=1")
811+
->Iterations(1)
812+
->Unit(benchmark::kMillisecond);
813+
BENCHMARK(testN1_benchmark<2, UriConstructorBenchmark>)
814+
->Name("bmqt::Uri::Uri threads=2")
815+
->Iterations(1)
816+
->Unit(benchmark::kMillisecond);
817+
BENCHMARK(testN1_benchmark<4, UriConstructorBenchmark>)
818+
->Name("bmqt::Uri::Uri threads=4")
819+
->Iterations(1)
820+
->Unit(benchmark::kMillisecond);
821+
BENCHMARK(testN1_benchmark<8, UriConstructorBenchmark>)
822+
->Name("bmqt::Uri::Uri threads=8")
823+
->Iterations(1)
824+
->Unit(benchmark::kMillisecond);
825+
826+
benchmark::Initialize(&argc, argv);
827+
benchmark::RunSpecifiedBenchmarks();
828+
#else
829+
cerr << "WARNING: BENCHMARK '" << _testCase
830+
<< "' IS NOT SUPPORTED ON THIS PLATFORM." << endl;
831+
bmqtst::TestHelperUtil::testStatus() = -1;
832+
#endif // BMQTST_BENCHMARK_ENABLED
833+
} break;
667834
default: {
668835
cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl;
669836
bmqtst::TestHelperUtil::testStatus() = -1;

src/groups/mqb/mqbblp/mqbblp_cluster.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback)
636636
d_clusterData.membership().setSelfNodeStatus(
637637
bmqp_ctrlmsg::NodeStatus::E_STOPPING);
638638

639-
d_clusterOrchestrator.queueHelper().requestToStopPushing();
639+
d_clusterOrchestrator.queueHelper().requestToStopQueues();
640640

641641
bsls::TimeInterval whenToStop(
642642
bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC));

src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
170170
// Mark self as stopping.
171171
d_isStopping = true;
172172

173-
d_queueHelper.requestToStopPushing();
173+
d_queueHelper.requestToStopQueues();
174174
// 'checkUnconfirmedV2' serves as synchronization.
175175
// It makes sure stopPushing() gets executed before the return.
176176
bsls::TimeInterval whenToStop(

src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5296,9 +5296,7 @@ void ClusterQueueHelper::processShutdownEvent()
52965296
}
52975297
}
52985298

5299-
/// Stop sending PUSHes but continue receiving CONFIRMs, receiving and
5300-
/// sending PUTs and ACKs.
5301-
void ClusterQueueHelper::requestToStopPushing()
5299+
void ClusterQueueHelper::requestToStopQueues()
53025300
{
53035301
// executed by the cluster *DISPATCHER* thread
53045302

@@ -5309,7 +5307,7 @@ void ClusterQueueHelper::requestToStopPushing()
53095307
// Assume Shutdown V2
53105308
d_isShutdownLogicOn = true;
53115309

5312-
// Prevent future queue operations from sending PUSHes.
5310+
// Prevent future queue operations from sending PUSHes and GC.
53135311
for (QueueContextMapIter it = d_queues.begin(); it != d_queues.end();
53145312
++it) {
53155313
QueueContextSp& queueContextSp = it->second;
@@ -5321,7 +5319,7 @@ void ClusterQueueHelper::requestToStopPushing()
53215319
}
53225320

53235321
queue->dispatcher()->execute(
5324-
bdlf::BindUtil::bind(&mqbi::Queue::stopPushing, queue),
5322+
bdlf::BindUtil::bind(&mqbi::Queue::setStopping, queue),
53255323
queue);
53265324
}
53275325
}

0 commit comments

Comments
 (0)