22
33#include < gtest/gtest.h>
44#include " fboss/fsdb/oper/ExtendedPathBuilder.h"
5+ #include " fboss/fsdb/server/ServiceHandler.h"
56#include " fboss/fsdb/tests/client/FsdbTestClients.h"
67#include " fboss/fsdb/tests/utils/FsdbTestServer.h"
78#include " fboss/lib/CommonUtils.h"
@@ -18,6 +19,7 @@ namespace {
1819
1920const uint32_t kStateServeIntervalMs = 50 ;
2021const uint32_t kStatsServeIntervalMs = 50 ;
22+ const uint32_t kSubscriptionServeQueueSize = 10 ;
2123auto constexpr kSubscriberId = " fsdb_test_subscriber" ;
2224auto constexpr kPublisherId = " fsdb_test_publisher" ;
2325auto constexpr kUnknownPublisherId = " publisher_unknown" ;
@@ -45,7 +47,11 @@ class FsdbPubSubTest : public ::testing::Test {
4547 folly::LoggerDB::get ().setLevel (" fboss.fsdb" , folly::LogLevel::DBG4);
4648 auto config = getFsdbConfig ();
4749 fsdbTestServer_ = std::make_unique<FsdbTestServer>(
48- std::move (config), 0 , kStateServeIntervalMs , kStatsServeIntervalMs );
50+ std::move (config),
51+ 0 ,
52+ kStateServeIntervalMs ,
53+ kStatsServeIntervalMs ,
54+ kSubscriptionServeQueueSize );
4955 publisherStreamEvbThread_ =
5056 std::make_unique<folly::ScopedEventBaseThread>();
5157 subscriberStreamEvbThread_ =
@@ -442,9 +448,91 @@ TYPED_TEST(FsdbPubSubTest, dupSubscriber) {
442448 EXPECT_NO_THROW ({ auto res2 = this ->subscribe (req); });
443449}
444450
445- TYPED_TEST (FsdbPubSubTest, slowSubscriber) {
446- FLAGS_subscriptionServeQueueSize = 2 ;
451+ TYPED_TEST (FsdbPubSubTest, slowSubscriberDisconnectThreshold) {
452+ // verify threshold for number of pending updates for slow subscriber
453+ // disconnect
454+
455+ uint32_t queueSize = this ->pubSubStats ()
456+ ? FLAGS_statsSubscriptionServeQueueSize
457+ : kSubscriptionServeQueueSize ;
458+ uint32_t updatesPublished = 100 + queueSize;
459+ uint32_t subscriptionServeIntervalMs =
460+ this ->pubSubStats () ? kStatsServeIntervalMs : kStateServeIntervalMs ;
461+ uint32_t publishIntervalMs = subscriptionServeIntervalMs + 20 ;
462+ this ->setupConnection (*this ->publisher_ , false );
463+ this ->checkPublishing ({this ->publisher_ ->clientId ()});
447464
465+ // pause subscriber on initial sync long enough for all updates to be
466+ // published and served so that queue builds up.
467+ folly::Baton<> waitForInitialSync, waitForDisconnect;
468+ folly::Baton<> resumeDataCb, resumeReconnect;
469+ bool initialSyncOnce{false }, disconnectOnce{false };
470+ auto slowSub = this ->createSubscriber (
471+ " fsdb_slow_subscriber" ,
472+ [&waitForInitialSync, &resumeDataCb, &initialSyncOnce]() {
473+ if (!initialSyncOnce) {
474+ initialSyncOnce = true ;
475+ waitForInitialSync.post ();
476+ resumeDataCb.wait ();
477+ }
478+ },
479+ [&waitForDisconnect, &resumeReconnect, &disconnectOnce]() {
480+ if (!disconnectOnce) {
481+ disconnectOnce = true ;
482+ waitForDisconnect.post ();
483+ resumeReconnect.wait ();
484+ }
485+ });
486+ this ->setupConnection (*slowSub);
487+ this ->checkSubscribed ({slowSub->clientId ()});
488+ int updateNum{0 };
489+ for (; updateNum < updatesPublished; updateNum++) {
490+ if (this ->pubSubStats ()) {
491+ this ->publishPortStats (makePortStats (updateNum));
492+ } else {
493+ std::string testStr = folly::to<std::string>(" bar" , updateNum);
494+ this ->publishAgentConfig (makeAgentConfig ({{" foo" , testStr}}));
495+ }
496+ std::this_thread::sleep_for (std::chrono::milliseconds (publishIntervalMs));
497+ }
498+
499+ // validate server does not disconnect subscriber yet
500+ waitForInitialSync.wait ();
501+ SubscriptionInfo info = slowSub->getInfo ();
502+ ASSERT_EQ (
503+ info.state , FsdbStreamClient::ReconnectingThriftClient::State::CONNECTED);
504+
505+ // post another update and validate that server disconnects the slow
506+ // subscriber
507+ if (this ->pubSubStats ()) {
508+ this ->publishPortStats (makePortStats (updateNum));
509+ } else {
510+ std::string testStr = folly::to<std::string>(" bar" , updateNum);
511+ this ->publishAgentConfig (makeAgentConfig ({{" foo" , testStr}}));
512+ }
513+ std::this_thread::sleep_for (std::chrono::milliseconds (publishIntervalMs));
514+
515+ // resume subscriber data callback after all updates are published
516+ resumeDataCb.post ();
517+
518+ waitForDisconnect.wait ();
519+ info = slowSub->getInfo ();
520+ ASSERT_EQ (
521+ info.disconnectReason , FsdbErrorCode::SUBSCRIPTION_SERVE_QUEUE_FULL);
522+
523+ WITH_RETRIES_N (90 , {
524+ fb303::ThreadCachedServiceData::get ()->publishStats ();
525+ auto counterName = folly::sformat (
526+ " {}.subscriber.{}.disconnects.slow_subscriber.count" ,
527+ this ->pubSubStats () ? " stats" : " fsdb" ,
528+ " unspecified" );
529+ EXPECT_EVENTUALLY_GT (fb303::ServiceData::get ()->getCounter (counterName), 0 );
530+ });
531+
532+ resumeReconnect.post ();
533+ }
534+
535+ TYPED_TEST (FsdbPubSubTest, slowSubscriber) {
448536 // publishInterval: wait for subscriptionServeIntervalMs+delta to prevent
449537 // published updates from being coalesced
450538 uint32_t updatesPublished = 120 ;
@@ -486,7 +574,6 @@ TYPED_TEST(FsdbPubSubTest, slowSubscriber) {
486574 std::this_thread::sleep_for (std::chrono::milliseconds (publishIntervalMs));
487575 }
488576
489- // TODO: validate subscription serve queue watermark counter
490577 // resume subscriber data callback after all updates are published
491578 resumeDataCb.post ();
492579
@@ -513,7 +600,6 @@ TYPED_TEST(FsdbPubSubTest, slowSubscriber) {
513600}
514601
515602TYPED_TEST (FsdbPubSubTest, slowSubscriberQueueWatermark) {
516- FLAGS_subscriptionServeQueueSize = 100 ;
517603 FLAGS_forceCloseSlowSubscriber = false ;
518604
519605 // publishInterval: wait for subscriptionServeIntervalMs+delta to prevent
0 commit comments