Skip to content

Commit 053aff6

Browse files
committed
application pass reauthentication to authenticatedChannelFactory
Signed-off-by: Emelia Lei <[email protected]>
1 parent cc26b69 commit 053aff6

File tree

3 files changed

+89
-47
lines changed

3 files changed

+89
-47
lines changed

src/groups/bmq/bmqimp/bmqimp_application.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,24 @@ void Application::readCb(
217217
BALL_LOG_TRACE << channel->peerUri() << ": ReadCallback got a blob\n"
218218
<< bmqu::BlobStartHexDumper(&readBlob);
219219

220-
d_brokerSession.processPacket(event);
220+
if (event.isAuthenticationEvent()) {
221+
// Application received a broker response to an re-authentication
222+
// request. The callback function `channelStateCallback` should
223+
// only be called for failed cases.
224+
d_authenticatedChannelFactory.processAuthenticationEvent(
225+
event,
226+
bdlf::BindUtil::bindS(&d_allocator,
227+
&Application::channelStateCallback,
228+
this,
229+
channel->peerUri(),
230+
bdlf::PlaceHolders::_1, // event
231+
bdlf::PlaceHolders::_2, // status
232+
bdlf::PlaceHolders::_3), // channel
233+
channel);
234+
}
235+
else {
236+
d_brokerSession.processPacket(event);
237+
}
221238
}
222239
}
223240

src/groups/bmq/bmqimp/bmqimp_authenticatedchannelfactory.cpp

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ namespace bmqimp {
5050

5151
namespace {
5252

53-
BALL_LOG_SET_NAMESPACE_CATEGORY("BMQIMP.NEGOTIATEDCHANNELFACTORY");
53+
BALL_LOG_SET_NAMESPACE_CATEGORY("BMQIMP.AUTHENTICATEDCHANNELFACTORY");
5454

5555
enum RcEnum {
5656
rc_SUCCESS = 0,
@@ -65,6 +65,12 @@ enum RcEnum {
6565
rc_NEGOTIATION_FAILURE = -10
6666
};
6767

68+
/// Minimum buffer to subtract from lifetimeMs to avoid cutting too close
69+
static const int k_REAUTHN_EARLY_BUFFER = 5000;
70+
71+
/// Proportion of lifetimeMs after which to initiate reauthentication.
72+
const double k_REAUTHN_EARLY_RATIO = 0.9;
73+
6874
} // close unnamed namespace
6975

7076
// ---------------------------------------
@@ -113,7 +119,7 @@ void AuthenticatedChannelFactory::baseResultCallback(
113119
const ResultCallback& cb,
114120
bmqio::ChannelFactoryEvent::Enum event,
115121
const bmqio::Status& status,
116-
const bsl::shared_ptr<bmqio::Channel>& channel)
122+
const bsl::shared_ptr<bmqio::Channel>& channel) const
117123
{
118124
if (event != bmqio::ChannelFactoryEvent::e_CHANNEL_UP) {
119125
cb(event, status, channel);
@@ -215,7 +221,7 @@ void AuthenticatedChannelFactory::readResponse(
215221

216222
void AuthenticatedChannelFactory::authenticate(
217223
const bsl::shared_ptr<bmqio::Channel>& channel,
218-
const ResultCallback& cb)
224+
const ResultCallback& cb) const
219225
{
220226
sendRequest(channel, cb);
221227
readResponse(channel, cb);
@@ -226,7 +232,7 @@ void AuthenticatedChannelFactory::readPacketsCb(
226232
const ResultCallback& cb,
227233
const bmqio::Status& status,
228234
int* numNeeded,
229-
bdlbb::Blob* blob)
235+
bdlbb::Blob* blob) const
230236
{
231237
if (!status) {
232238
// Read failure.
@@ -266,7 +272,7 @@ void AuthenticatedChannelFactory::readPacketsCb(
266272
void AuthenticatedChannelFactory::onBrokerAuthenticationResponse(
267273
const bdlbb::Blob& packet,
268274
const ResultCallback& cb,
269-
const bsl::shared_ptr<bmqio::Channel>& channel)
275+
const bsl::shared_ptr<bmqio::Channel>& channel) const
270276
{
271277
BALL_LOG_TRACE << "Received a packet:\n"
272278
<< bmqu::BlobStartHexDumper(&packet);
@@ -292,12 +298,33 @@ void AuthenticatedChannelFactory::onBrokerAuthenticationResponse(
292298
return; // RETURN
293299
}
294300

301+
processAuthenticationEvent(event, cb, channel);
302+
303+
cb(bmqio::ChannelFactoryEvent::e_CHANNEL_UP, bmqio::Status(), channel);
304+
}
305+
306+
int AuthenticatedChannelFactory::timeoutInterval(int lifetimeMs) const
307+
{
308+
BSLS_ASSERT_SAFE(lifetimeMs >= 0);
309+
const int intervalMsWithRatio = lifetimeMs * k_REAUTHN_EARLY_RATIO;
310+
const int intervalMsWithBuffer = bsl::max(0,
311+
lifetimeMs -
312+
k_REAUTHN_EARLY_BUFFER);
313+
return bsl::min(intervalMsWithRatio, intervalMsWithBuffer);
314+
}
315+
316+
void AuthenticatedChannelFactory::processAuthenticationEvent(
317+
const bmqp::Event& event,
318+
const ResultCallback& cb,
319+
const bsl::shared_ptr<bmqio::Channel>& channel) const
320+
{
295321
bmqp_ctrlmsg::AuthenticationMessage response;
296322
const int rc = event.loadAuthenticationEvent(&response);
297323
if (rc != 0) {
298-
BALL_LOG_ERROR << "Invalid response from broker [reason: 'control "
299-
<< "event is not an AuthenticationMessage', rc: " << rc
300-
<< "]: " << event;
324+
BALL_LOG_ERROR
325+
<< "Invalid response from broker [reason: 'authentication "
326+
<< "event is not an AuthenticationMessage', rc: " << rc
327+
<< "]: " << event;
301328
bmqio::Status status(bmqio::StatusCategory::e_GENERIC_ERROR,
302329
"authenticationError",
303330
rc_INVALID_BROKER_RESPONSE);
@@ -306,9 +333,9 @@ void AuthenticatedChannelFactory::onBrokerAuthenticationResponse(
306333
}
307334

308335
if (!response.isAuthenticateResponseValue()) {
309-
BALL_LOG_ERROR << "Invalid response from broker [reason: 'control "
310-
<< "event is not an authenticateResponse']: "
311-
<< response;
336+
BALL_LOG_ERROR
337+
<< "Invalid response from broker [reason: 'authentication "
338+
<< "event is not an authenticateResponse']: " << response;
312339
bmqio::Status status(bmqio::StatusCategory::e_GENERIC_ERROR,
313340
"authenticationError",
314341
rc_INVALID_BROKER_RESPONSE);
@@ -334,31 +361,24 @@ void AuthenticatedChannelFactory::onBrokerAuthenticationResponse(
334361
// Authentication SUCCEEDED
335362
BALL_LOG_INFO << "Authentication with broker was successful: " << response;
336363

337-
// Schedule recurring reauthentication events if lifetime is specified in
338-
// the response.
364+
// Schedule recurring events to send re-authentication request if lifetime
365+
// is specified in the response.
339366
if (authenticateResponse.lifetimeMs().has_value()) {
340-
BSLS_ASSERT_SAFE(authenticateResponse.lifetimeMs() >= 0);
341-
int lifetimeMs = authenticateResponse.lifetimeMs().value();
367+
int intervalMs = timeoutInterval(
368+
authenticateResponse.lifetimeMs().value());
342369

343-
const int intervalMsWithRatio = lifetimeMs * k_REAUTHN_EARLY_RATIO;
344-
const int intervalMsWithBuffer = bsl::max(0,
345-
lifetimeMs -
346-
k_REAUTHN_EARLY_BUFFER);
347-
const int intervalMs = bsl::min(intervalMsWithRatio,
348-
intervalMsWithBuffer);
370+
BALL_LOG_INFO << "Scheduling reauthentication in " << intervalMs
371+
<< " milliseconds.";
349372

350373
// Pening events will be cancelled when Application stops.
351-
d_config.d_scheduler_p->scheduleRecurringEvent(
352-
bsls::TimeInterval(intervalMs),
353-
bdlf::BindUtil::bind(
354-
bmqu::WeakMemFnUtil::weakMemFn(
355-
&AuthenticatedChannelFactory::authenticate,
356-
d_self.acquireWeak()),
357-
channel,
358-
cb));
374+
d_config.d_scheduler_p->scheduleEvent(
375+
bsls::TimeInterval(bmqsys::Time::nowMonotonicClock())
376+
.addMilliseconds(intervalMs),
377+
bdlf::BindUtil::bind(&AuthenticatedChannelFactory::sendRequest,
378+
this,
379+
channel,
380+
cb));
359381
}
360-
361-
cb(bmqio::ChannelFactoryEvent::e_CHANNEL_UP, bmqio::Status(), channel);
362382
}
363383

364384
// CREATORS

src/groups/bmq/bmqimp/bmqimp_authenticatedchannelfactory.h

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <bmqio_status.h>
3636
#include <bmqp_blobpoolutil.h>
3737
#include <bmqp_ctrlmsg_messages.h>
38+
#include <bmqp_event.h>
3839
#include <bmqt_sessionoptions.h>
3940
#include <bmqu_sharedresource.h>
4041

@@ -112,14 +113,6 @@ class AuthenticatedChannelFactory : public bmqio::ChannelFactory {
112113

113114
typedef bdlmt::EventScheduler::RecurringEventHandle EventHandle;
114115

115-
// CONSTANTS
116-
117-
/// Minimum buffer to subtract from lifetimeMs to avoid cutting too close
118-
const int k_REAUTHN_EARLY_BUFFER = 1000; // 1 second
119-
120-
/// Proportion of lifetimeMs after which to initiate reauthentication.
121-
const double k_REAUTHN_EARLY_RATIO = 0.9;
122-
123116
private:
124117
// PRIVATE DATA
125118
Config d_config;
@@ -142,10 +135,11 @@ class AuthenticatedChannelFactory : public bmqio::ChannelFactory {
142135
// PRIVATE ACCESSORS
143136

144137
/// Handle an event from our base ChannelFactory.
145-
void baseResultCallback(const ResultCallback& cb,
146-
bmqio::ChannelFactoryEvent::Enum event,
147-
const bmqio::Status& status,
148-
const bsl::shared_ptr<bmqio::Channel>& channel);
138+
void
139+
baseResultCallback(const ResultCallback& cb,
140+
bmqio::ChannelFactoryEvent::Enum event,
141+
const bmqio::Status& status,
142+
const bsl::shared_ptr<bmqio::Channel>& channel) const;
149143

150144
void sendRequest(const bsl::shared_ptr<bmqio::Channel>& channel,
151145
const ResultCallback& cb) const;
@@ -154,18 +148,24 @@ class AuthenticatedChannelFactory : public bmqio::ChannelFactory {
154148
const ResultCallback& cb) const;
155149

156150
void authenticate(const bsl::shared_ptr<bmqio::Channel>& channel,
157-
const ResultCallback& cb);
151+
const ResultCallback& cb) const;
158152

159153
void readPacketsCb(const bsl::shared_ptr<bmqio::Channel>& channel,
160154
const ResultCallback& cb,
161155
const bmqio::Status& status,
162156
int* numNeeded,
163-
bdlbb::Blob* blob);
157+
bdlbb::Blob* blob) const;
164158

165159
void onBrokerAuthenticationResponse(
166160
const bdlbb::Blob& packet,
167161
const ResultCallback& cb,
168-
const bsl::shared_ptr<bmqio::Channel>& channel);
162+
const bsl::shared_ptr<bmqio::Channel>& channel) const;
163+
164+
/// Given the specified `lifetimeMs`, return the interval in milliseconds
165+
/// after which reauthentication should be performed. This interval is
166+
/// calculated with a buffer to avoid cutting too close to the actual
167+
/// expiration time.
168+
int timeoutInterval(int lifetimeMs) const;
169169

170170
public:
171171
// CREATORS
@@ -185,6 +185,11 @@ class AuthenticatedChannelFactory : public bmqio::ChannelFactory {
185185
bslma::ManagedPtr<OpHandle>* handle,
186186
const bmqio::ConnectOptions& options,
187187
const ResultCallback& cb) BSLS_KEYWORD_OVERRIDE;
188+
189+
void processAuthenticationEvent(
190+
const bmqp::Event& event,
191+
const ResultCallback& cb,
192+
const bsl::shared_ptr<bmqio::Channel>& channel) const;
188193
};
189194

190195
} // close package namespace

0 commit comments

Comments
 (0)