Skip to content

Commit 5836d54

Browse files
Priyank Warkhedemeta-codesync[bot]
authored andcommitted
Refactor queue size as parameter
Summary: For ease of review, separating this no-op refactoring into separate diff. Refactor subscription serve queue size to be passed as a parameter while creating Subscription object. Also adding queue sizes for Path and default (non-path) subscriptions in StorageParams. In next diff we will use this to choose different queue size for serving stats subscriptions. Differential Revision: D85965178 fbshipit-source-id: a87c2cfe1efbd8a6498e2677f21264ec3a2bff21
1 parent c8fbd32 commit 5836d54

File tree

5 files changed

+85
-52
lines changed

5 files changed

+85
-52
lines changed

fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ DEFINE_bool(
3030
serveHeartbeats,
3131
false,
3232
"Whether or not to serve hearbeats in subscription streams");
33+
// default queue size for subscription serve updates
34+
// is chosen to be large enough so that in prod we
35+
// should not see any dropped updates. This value will
36+
// be tuned to lower value after monitoring and max-scale
37+
// testing.
38+
// Rationale for choice of this specific large value:
39+
// * minimum enqueue interval is state subscription serve interval (50msec).
40+
// * Thrift streams holds ~100 updates before pipe queue starts building up.
41+
// So with 1024 default queue size, pipe can get full
42+
// only in the exceptional scenario where subscriber does not
43+
// read any update for > 1 min.
44+
DEFINE_int32(
45+
subscriptionServeQueueSize,
46+
1024,
47+
"Max subscription serve updates to queue, default 1024");
3348

3449
namespace facebook::fboss::fsdb {
3550

@@ -437,7 +452,8 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl(
437452
protocol,
438453
getPublisherRoot(path.begin(), path.end()),
439454
heartbeatThread_ ? heartbeatThread_->getEventBase() : nullptr,
440-
heartbeatInterval);
455+
heartbeatInterval,
456+
params_.pathSubscriptionServeQueueSize_);
441457
subMgr().registerSubscription(std::move(subscription));
442458
return std::move(gen);
443459
}
@@ -462,7 +478,8 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_impl(
462478
protocol,
463479
getPublisherRoot(path.begin(), path.end()),
464480
heartbeatThread_ ? heartbeatThread_->getEventBase() : nullptr,
465-
heartbeatInterval);
481+
heartbeatInterval,
482+
params_.defaultSubscriptionServeQueueSize_);
466483
subMgr().registerSubscription(std::move(subscription));
467484
return std::move(gen);
468485
}
@@ -486,7 +503,8 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl(
486503
std::move(publisherRoot),
487504
protocol,
488505
heartbeatThread_ ? heartbeatThread_->getEventBase() : nullptr,
489-
heartbeatInterval);
506+
heartbeatInterval,
507+
params_.pathSubscriptionServeQueueSize_);
490508
subMgr().registerExtendedSubscription(std::move(subscription));
491509
return std::move(gen);
492510
}
@@ -510,7 +528,8 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl(
510528
std::move(publisherRoot),
511529
protocol,
512530
heartbeatThread_ ? heartbeatThread_->getEventBase() : nullptr,
513-
heartbeatInterval);
531+
heartbeatInterval,
532+
params_.defaultSubscriptionServeQueueSize_);
514533
subMgr().registerExtendedSubscription(std::move(subscription));
515534
return std::move(gen);
516535
}
@@ -536,7 +555,8 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_impl(
536555
patchOperProtocol_,
537556
std::move(root),
538557
heartbeatThread_ ? heartbeatThread_->getEventBase() : nullptr,
539-
heartbeatInterval);
558+
heartbeatInterval,
559+
params_.defaultSubscriptionServeQueueSize_);
540560
subMgr().registerExtendedSubscription(std::move(subscription));
541561
return std::move(gen);
542562
}
@@ -562,7 +582,8 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_extended_impl(
562582
patchOperProtocol_,
563583
std::move(root),
564584
heartbeatThread_ ? heartbeatThread_->getEventBase() : nullptr,
565-
heartbeatInterval);
585+
heartbeatInterval,
586+
params_.defaultSubscriptionServeQueueSize_);
566587
subMgr().registerExtendedSubscription(std::move(subscription));
567588
return std::move(gen);
568589
}

fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
DECLARE_int32(storage_thread_heartbeat_ms);
1919
DECLARE_bool(serveHeartbeats);
20+
DECLARE_int32(subscriptionServeQueueSize);
2021

2122
namespace facebook::fboss::fsdb {
2223

@@ -55,7 +56,11 @@ class NaivePeriodicSubscribableStorageBase {
5556
bool convertToIDPaths = false,
5657
bool requireResponseOnInitialSync = false,
5758
bool exportPerSubscriberMetrics = false,
58-
bool serveGetRequestsWithLastPublishedState = true)
59+
bool serveGetRequestsWithLastPublishedState = true,
60+
int32_t pathSubscriptionServeQueueSize =
61+
FLAGS_subscriptionServeQueueSize,
62+
int32_t defaultSubscriptionServeQueueSize =
63+
FLAGS_subscriptionServeQueueSize)
5964
: subscriptionServeInterval_(subscriptionServeInterval),
6065
subscriptionHeartbeatInterval_(subscriptionHeartbeatInterval),
6166
trackMetadata_(trackMetadata),
@@ -64,7 +69,10 @@ class NaivePeriodicSubscribableStorageBase {
6469
requireResponseOnInitialSync_(requireResponseOnInitialSync),
6570
exportPerSubscriberMetrics_(exportPerSubscriberMetrics),
6671
serveGetRequestsWithLastPublishedState_(
67-
serveGetRequestsWithLastPublishedState) {}
72+
serveGetRequestsWithLastPublishedState),
73+
pathSubscriptionServeQueueSize_(pathSubscriptionServeQueueSize),
74+
defaultSubscriptionServeQueueSize_(
75+
defaultSubscriptionServeQueueSize) {}
6876

6977
StorageParams& setServeGetRequestsWithLastPublishedState(bool val) {
7078
serveGetRequestsWithLastPublishedState_ = val;
@@ -79,6 +87,8 @@ class NaivePeriodicSubscribableStorageBase {
7987
const bool requireResponseOnInitialSync_;
8088
const bool exportPerSubscriberMetrics_;
8189
bool serveGetRequestsWithLastPublishedState_;
90+
const int32_t pathSubscriptionServeQueueSize_;
91+
const int32_t defaultSubscriptionServeQueueSize_;
8292
};
8393

8494
explicit NaivePeriodicSubscribableStorageBase(

fboss/fsdb/oper/Subscription.cpp

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,6 @@
77
#include <folly/coro/Sleep.h>
88
#include <optional>
99

10-
// default queue size for subscription serve updates
11-
// is chosen to be large enough so that in prod we
12-
// should not see any dropped updates. This value will
13-
// be tuned to lower value after monitoring and max-scale
14-
// testing.
15-
// Rationale for choice of this specific large value:
16-
// * minimum enqueue interval is state subscription serve interval (50msec).
17-
// * Thrift streams holds ~100 updates before pipe queue starts building up.
18-
// So with 1024 default queue size, pipe can get full
19-
// only in the exceptional scenario where subscriber does not
20-
// read any update for > 1 min.
21-
DEFINE_int32(
22-
subscriptionServeQueueSize,
23-
1024,
24-
"Max subscription serve updates to queue, default 1024");
25-
2610
DEFINE_bool(
2711
forceCloseSlowSubscriber,
2812
true,
@@ -129,9 +113,10 @@ DeltaSubscription::create(
129113
OperProtocol protocol,
130114
std::optional<std::string> publisherRoot,
131115
folly::EventBase* heartbeatEvb,
132-
std::chrono::milliseconds heartbeatInterval) {
133-
auto [generator, pipe] = folly::coro::BoundedAsyncPipe<OperDelta>::create(
134-
FLAGS_subscriptionServeQueueSize);
116+
std::chrono::milliseconds heartbeatInterval,
117+
int32_t pipeCapacity) {
118+
auto [generator, pipe] =
119+
folly::coro::BoundedAsyncPipe<OperDelta>::create(pipeCapacity);
135120
std::vector<std::string> path(begin, end);
136121
auto subscription = std::make_unique<DeltaSubscription>(
137122
std::move(subscriber),
@@ -302,9 +287,10 @@ ExtendedPathSubscription::create(
302287
std::optional<std::string> publisherRoot,
303288
OperProtocol protocol,
304289
folly::EventBase* heartbeatEvb,
305-
std::chrono::milliseconds heartbeatInterval) {
306-
auto [generator, pipe] = folly::coro::BoundedAsyncPipe<gen_type>::create(
307-
FLAGS_subscriptionServeQueueSize);
290+
std::chrono::milliseconds heartbeatInterval,
291+
int32_t pipeCapacity) {
292+
auto [generator, pipe] =
293+
folly::coro::BoundedAsyncPipe<gen_type>::create(pipeCapacity);
308294
auto subscription = std::make_shared<ExtendedPathSubscription>(
309295
std::move(subscriber),
310296
makeSimplePathMap(paths),
@@ -365,9 +351,10 @@ ExtendedDeltaSubscription::create(
365351
std::optional<std::string> publisherRoot,
366352
OperProtocol protocol,
367353
folly::EventBase* heartbeatEvb,
368-
std::chrono::milliseconds heartbeatInterval) {
369-
auto [generator, pipe] = folly::coro::BoundedAsyncPipe<gen_type>::create(
370-
FLAGS_subscriptionServeQueueSize);
354+
std::chrono::milliseconds heartbeatInterval,
355+
int32_t pipeCapacity) {
356+
auto [generator, pipe] =
357+
folly::coro::BoundedAsyncPipe<gen_type>::create(pipeCapacity);
371358
auto subscription = std::make_shared<ExtendedDeltaSubscription>(
372359
std::move(subscriber),
373360
makeSimplePathMap(paths),
@@ -480,7 +467,8 @@ ExtendedPatchSubscription::create(
480467
OperProtocol protocol,
481468
std::optional<std::string> publisherRoot,
482469
folly::EventBase* heartbeatEvb,
483-
std::chrono::milliseconds heartbeatInterval) {
470+
std::chrono::milliseconds heartbeatInterval,
471+
int32_t pipeCapacity) {
484472
RawOperPath p;
485473
p.path() = std::move(path);
486474
return create(
@@ -489,7 +477,8 @@ ExtendedPatchSubscription::create(
489477
std::move(protocol),
490478
std::move(publisherRoot),
491479
std::move(heartbeatEvb),
492-
std::move(heartbeatInterval));
480+
std::move(heartbeatInterval),
481+
pipeCapacity);
493482
}
494483

495484
std::pair<
@@ -501,7 +490,8 @@ ExtendedPatchSubscription::create(
501490
OperProtocol protocol,
502491
std::optional<std::string> publisherRoot,
503492
folly::EventBase* heartbeatEvb,
504-
std::chrono::milliseconds heartbeatInterval) {
493+
std::chrono::milliseconds heartbeatInterval,
494+
int32_t pipeCapacity) {
505495
std::map<SubscriptionKey, ExtendedOperPath> extendedPaths;
506496
for (auto& [key, path] : paths) {
507497
std::vector<OperPathElem> extendedPath;
@@ -517,7 +507,8 @@ ExtendedPatchSubscription::create(
517507
std::move(protocol),
518508
std::move(publisherRoot),
519509
std::move(heartbeatEvb),
520-
std::move(heartbeatInterval));
510+
std::move(heartbeatInterval),
511+
pipeCapacity);
521512
}
522513

523514
std::pair<
@@ -529,9 +520,10 @@ ExtendedPatchSubscription::create(
529520
OperProtocol protocol,
530521
std::optional<std::string> publisherRoot,
531522
folly::EventBase* heartbeatEvb,
532-
std::chrono::milliseconds heartbeatInterval) {
533-
auto [generator, pipe] = folly::coro::BoundedAsyncPipe<gen_type>::create(
534-
FLAGS_subscriptionServeQueueSize);
523+
std::chrono::milliseconds heartbeatInterval,
524+
int32_t pipeCapacity) {
525+
auto [generator, pipe] =
526+
folly::coro::BoundedAsyncPipe<gen_type>::create(pipeCapacity);
535527
auto subscription = std::make_unique<ExtendedPatchSubscription>(
536528
std::move(subscriber),
537529
std::move(paths),

fboss/fsdb/oper/Subscription.h

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <folly/io/async/EventBase.h>
1818
#include <folly/json/dynamic.h>
1919

20-
DECLARE_int32(subscriptionServeQueueSize);
2120
DECLARE_bool(forceCloseSlowSubscriber);
2221

2322
namespace facebook::fboss::fsdb {
@@ -307,9 +306,10 @@ class PathSubscription : public BasePathSubscription,
307306
OperProtocol protocol,
308307
std::optional<std::string> publisherRoot,
309308
folly::EventBase* heartbeatEvb,
310-
std::chrono::milliseconds heartbeatInterval) {
311-
auto [generator, pipe] = folly::coro::BoundedAsyncPipe<value_type>::create(
312-
FLAGS_subscriptionServeQueueSize);
309+
std::chrono::milliseconds heartbeatInterval,
310+
int32_t pipeCapacity) {
311+
auto [generator, pipe] =
312+
folly::coro::BoundedAsyncPipe<value_type>::create(pipeCapacity);
313313
std::vector<std::string> path(begin, end);
314314
auto subscription = std::make_unique<PathSubscription>(
315315
std::move(subscriber),
@@ -435,7 +435,8 @@ class DeltaSubscription : public BaseDeltaSubscription,
435435
OperProtocol protocol,
436436
std::optional<std::string> publisherRoot,
437437
folly::EventBase* heartbeatEvb,
438-
std::chrono::milliseconds heartbeatInterval);
438+
std::chrono::milliseconds heartbeatInterval,
439+
int32_t pipeCapacity);
439440

440441
DeltaSubscription(
441442
SubscriptionIdentifier&& subscriber,
@@ -528,7 +529,8 @@ class ExtendedPathSubscription : public ExtendedSubscription,
528529
std::optional<std::string> publisherRoot,
529530
OperProtocol protocol,
530531
folly::EventBase* heartbeatEvb,
531-
std::chrono::milliseconds heartbeatInterval);
532+
std::chrono::milliseconds heartbeatInterval,
533+
int32_t pipeCapacity);
532534

533535
bool shouldConvertToDynamic() const override {
534536
return false;
@@ -620,7 +622,8 @@ class ExtendedDeltaSubscription : public ExtendedSubscription,
620622
std::optional<std::string> publisherRoot,
621623
OperProtocol protocol,
622624
folly::EventBase* heartbeatEvb,
623-
std::chrono::milliseconds heartbeatInterval);
625+
std::chrono::milliseconds heartbeatInterval,
626+
int32_t pipeCapacity);
624627

625628
void buffer(TaggedOperDelta&& newVal);
626629

@@ -718,7 +721,8 @@ class ExtendedPatchSubscription : public ExtendedSubscription,
718721
OperProtocol protocol,
719722
std::optional<std::string> publisherRoot,
720723
folly::EventBase* heartbeatEvb,
721-
std::chrono::milliseconds heartbeatInterval);
724+
std::chrono::milliseconds heartbeatInterval,
725+
int32_t pipeCapacity);
722726

723727
// Multipath
724728
static std::pair<
@@ -730,7 +734,8 @@ class ExtendedPatchSubscription : public ExtendedSubscription,
730734
OperProtocol protocol,
731735
std::optional<std::string> publisherRoot,
732736
folly::EventBase* heartbeatEvb,
733-
std::chrono::milliseconds heartbeatInterval);
737+
std::chrono::milliseconds heartbeatInterval,
738+
int32_t pipeCapacity);
734739

735740
// Extended paths
736741
static std::pair<
@@ -742,7 +747,8 @@ class ExtendedPatchSubscription : public ExtendedSubscription,
742747
OperProtocol protocol,
743748
std::optional<std::string> publisherRoot,
744749
folly::EventBase* heartbeatEvb,
745-
std::chrono::milliseconds heartbeatInterval);
750+
std::chrono::milliseconds heartbeatInterval,
751+
int32_t pipeCapacity);
746752

747753
ExtendedPatchSubscription(
748754
SubscriptionIdentifier&& subscriber,

fboss/fsdb/oper/tests/SubscriptionTests.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
namespace facebook::fboss::fsdb::test {
1111

12+
constexpr int32_t kSubscriptionServeQueueSize = 100;
13+
1214
namespace {
1315
template <typename Gen>
1416
folly::coro::Task<typename Gen::value_type> consumeOne(Gen& generator) {
@@ -35,7 +37,8 @@ class SubscriptionTests : public ::testing::Test {
3537
OperProtocol::BINARY,
3638
std::nullopt,
3739
heartbeatThread_->getEventBase(),
38-
std::chrono::milliseconds(100));
40+
std::chrono::milliseconds(100),
41+
kSubscriptionServeQueueSize);
3942
} else {
4043
return SubscriptionT::create(
4144
SubscriptionIdentifier("test-sub"),
@@ -44,7 +47,8 @@ class SubscriptionTests : public ::testing::Test {
4447
OperProtocol::BINARY,
4548
std::nullopt,
4649
heartbeatThread_->getEventBase(),
47-
std::chrono::milliseconds(100));
50+
std::chrono::milliseconds(100),
51+
kSubscriptionServeQueueSize);
4852
}
4953
}
5054

0 commit comments

Comments
 (0)