Skip to content

Commit f6c5358

Browse files
authored
Fix[BMQ]: handling early channel close when authenticating (#897)
Signed-off-by: dorjesinpo <[email protected]>
1 parent 3a6ef47 commit f6c5358

File tree

4 files changed

+70
-24
lines changed

4 files changed

+70
-24
lines changed

src/groups/mqb/mqbnet/mqbnet_initialconnectioncontext.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@ namespace mqbnet {
2626
// -------------------------------------
2727

2828
InitialConnectionContext::InitialConnectionContext(bool isIncoming)
29-
: d_isIncoming(isIncoming)
30-
, d_resultState_p(0)
29+
: d_resultState_p(0)
3130
, d_userData_p(0)
31+
, d_channelSp()
32+
, d_initialConnectionCompleteCb()
33+
, d_negotiationCtxSp()
34+
, d_isIncoming(isIncoming)
35+
, d_isClosed(false)
3236
{
3337
// NOTHING
3438
}
@@ -71,6 +75,11 @@ InitialConnectionContext& InitialConnectionContext::setNegotiationContext(
7175
return *this;
7276
}
7377

78+
void InitialConnectionContext::onClose()
79+
{
80+
d_isClosed = true;
81+
}
82+
7483
bool InitialConnectionContext::isIncoming() const
7584
{
7685
return d_isIncoming;
@@ -108,5 +117,10 @@ InitialConnectionContext::negotiationContext() const
108117
return d_negotiationCtxSp;
109118
}
110119

120+
bool InitialConnectionContext::isClosed() const
121+
{
122+
return d_isClosed;
123+
}
124+
111125
} // close package namespace
112126
} // close enterprise namespace

src/groups/mqb/mqbnet/mqbnet_initialconnectioncontext.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,6 @@ class InitialConnectionContext {
7070
private:
7171
// DATA
7272

73-
/// True if the session being negotiated originates
74-
/// from a remote peer (i.e., a 'listen'); false if
75-
/// it originates from us (i.e., a 'connect).
76-
bool d_isIncoming;
77-
7873
/// Raw pointer, held not owned, to some user data
7974
/// the session factory will pass back to the
8075
/// 'resultCb' method (used to inform of the
@@ -119,11 +114,19 @@ class InitialConnectionContext {
119114
/// The NegotiationContext updated upon receiving a negotiation message.
120115
bsl::shared_ptr<NegotiationContext> d_negotiationCtxSp;
121116

117+
/// True if the session being negotiated originates
118+
/// from a remote peer (i.e., a 'listen'); false if
119+
/// it originates from us (i.e., a 'connect).
120+
bool d_isIncoming;
121+
122+
/// True if the associated channel is closed (with `onClose`).
123+
bool d_isClosed;
124+
122125
public:
123126
// CREATORS
124127

125128
/// Create a new object having the specified `isIncoming` value.
126-
InitialConnectionContext(bool isIncoming);
129+
explicit InitialConnectionContext(bool isIncoming);
127130

128131
~InitialConnectionContext();
129132

@@ -140,6 +143,9 @@ class InitialConnectionContext {
140143
InitialConnectionContext&
141144
setNegotiationContext(const bsl::shared_ptr<NegotiationContext>& value);
142145

146+
/// Called by the IO upon `onCLose` signal
147+
void onClose();
148+
143149
// ACCESSORS
144150

145151
/// Return the value of the corresponding field.
@@ -148,6 +154,7 @@ class InitialConnectionContext {
148154
void* resultState() const;
149155
const bsl::shared_ptr<bmqio::Channel>& channel() const;
150156
const bsl::shared_ptr<NegotiationContext>& negotiationContext() const;
157+
bool isClosed() const;
151158

152159
void complete(int rc,
153160
const bsl::string& error,

src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,14 @@ void TCPSessionFactory::handleInitialConnection(
366366
bdlf::PlaceHolders::_5, // initialConnectionContext
367367
context));
368368

369+
// Register as observer of the channel to get the 'onClose'
370+
channel->onClose(
371+
bdlf::BindUtil::bindS(d_allocator_p,
372+
&TCPSessionFactory::onClose,
373+
this,
374+
initialConnectionContext,
375+
bdlf::PlaceHolders::_1 /* bmqio::Status */));
376+
369377
// NOTE: we must ensure the 'initialConnectionCompleteCb' can be invoked
370378
// from the
371379
// 'handleInitialConnection()' call as specified on the
@@ -558,6 +566,24 @@ void TCPSessionFactory::negotiationComplete(
558566

559567
++d_nbSessions;
560568

569+
if (isClientOrProxy(session.get())) {
570+
++d_nbOpenClients;
571+
}
572+
573+
// check if the channel is not closed (we can be in authentication
574+
// thread)
575+
576+
if (initialConnectionContext_p->isClosed()) {
577+
BALL_LOG_WARN
578+
<< "#TCP_UNEXPECTED_STATE TCPSessionFactory '"
579+
<< d_config.name()
580+
<< "' got an already closed channel after negotiation.";
581+
582+
// Since the 'session' has missed 'onClose', call 'tearDown' here.
583+
session->tearDown(session, false);
584+
return; // RETURN
585+
}
586+
561587
info.createInplace(d_allocator_p,
562588
channel,
563589
monitoredSession,
@@ -573,10 +599,6 @@ void TCPSessionFactory::negotiationComplete(
573599
inserted = d_channels.insert(toInsert);
574600
info = inserted.first->second;
575601

576-
if (isClientOrProxy(info->d_session_sp.get())) {
577-
++d_nbOpenClients;
578-
}
579-
580602
if (info->d_monitor.isHearbeatEnabled() &&
581603
d_heartbeatSchedulerActive) {
582604
// Enable/Disable heartbeating under the lock
@@ -711,14 +733,6 @@ void TCPSessionFactory::channelStateCallback(
711733
// Keep track of active channels, for logging purposes
712734
++d_nbActiveChannels;
713735

714-
// Register as observer of the channel to get the 'onClose'
715-
channel->onClose(bdlf::BindUtil::bindS(
716-
d_allocator_p,
717-
&TCPSessionFactory::onClose,
718-
this,
719-
channel,
720-
bdlf::PlaceHolders::_1 /* bmqio::Status */));
721-
722736
handleInitialConnection(channel, context);
723737
}
724738
} break;
@@ -743,11 +757,17 @@ void TCPSessionFactory::channelStateCallback(
743757
}
744758
}
745759

746-
void TCPSessionFactory::onClose(const bsl::shared_ptr<bmqio::Channel>& channel,
747-
const bmqio::Status& status)
760+
void TCPSessionFactory::onClose(
761+
const bsl::shared_ptr<InitialConnectionContext>& initialConnectionContext,
762+
const bmqio::Status& status)
748763
{
764+
// Executed by one of the IO threads.
765+
749766
--d_nbActiveChannels;
750767

768+
const bsl::shared_ptr<bmqio::Channel>& channel =
769+
initialConnectionContext->channel();
770+
751771
int port;
752772
channel->properties().load(
753773
&port,
@@ -758,6 +778,10 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<bmqio::Channel>& channel,
758778
// Lookup the session and remove it from internal map
759779
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
760780

781+
// set the 'isClosed' flag under lock to be checked under lock in
782+
// 'negotiationComplete'.
783+
initialConnectionContext->onClose();
784+
761785
ChannelMap::const_iterator it = d_channels.find(channel.get());
762786
if (it != d_channels.end()) {
763787
channelInfo = it->second;

src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,9 @@ class TCPSessionFactory {
457457
/// the channel's status, `userData` corresponding to the one provided
458458
/// when calling `addObserver` to register this object as observer of
459459
/// the channel.
460-
virtual void onClose(const bsl::shared_ptr<bmqio::Channel>& channel,
461-
const bmqio::Status& status);
460+
virtual void onClose(const bsl::shared_ptr<InitialConnectionContext>&
461+
initialConnectionContext,
462+
const bmqio::Status& status);
462463

463464
/// Reccuring scheduler event to check for all `heartbeat-enabled`
464465
/// channels : this will send a heartbeat if no data has been received

0 commit comments

Comments
 (0)