Skip to content

Commit 57a2d8f

Browse files
committed
create function handleNegotiationMessage
Signed-off-by: Emelia Lei <[email protected]>
1 parent 28a37f6 commit 57a2d8f

File tree

3 files changed

+114
-92
lines changed

3 files changed

+114
-92
lines changed

src/groups/mqb/mqba/mqba_authenticator.cpp

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,15 @@ void Authenticator::authenticate(
182182
// PRECONDITIONS
183183
BSLS_ASSERT(context);
184184

185+
enum RcEnum {
186+
// Value for the various RC error categories
187+
rc_SUCCESS = 0,
188+
rc_AUTHENTICATION_STATE_INCORRECT = -1,
189+
rc_AUTHENTICATION_FAILED = -2,
190+
rc_SEND_AUTHENTICATION_RESPONSE_FAILED = -3,
191+
rc_CONTINUE_READ_FAILED = -4
192+
};
193+
185194
const AuthenticationContextSp& authenticationContext =
186195
context->authenticationContext();
187196

@@ -233,7 +242,7 @@ void Authenticator::authenticate(
233242
<< "Failed to set authentication state for '"
234243
<< channel->peerUri()
235244
<< "' to 'e_AUTHENTICATED' from 'e_AUTHENTICATING'";
236-
context->complete(rc,
245+
context->complete((rc * 10) + rc_AUTHENTICATION_STATE_INCORRECT,
237246
authenticationErrorStream.str(),
238247
bsl::shared_ptr<mqbnet::Session>());
239248
return; // RETURN
@@ -246,7 +255,7 @@ void Authenticator::authenticate(
246255
if (response.status().category() !=
247256
bmqp_ctrlmsg::StatusCategory::E_SUCCESS) {
248257
// If the authentication failed, we do not create a session.
249-
context->complete(rc,
258+
context->complete(rc_AUTHENTICATION_FAILED,
250259
authenticationErrorStream.str(),
251260
bsl::shared_ptr<mqbnet::Session>());
252261
return; // RETURN
@@ -255,14 +264,9 @@ void Authenticator::authenticate(
255264
bsl::shared_ptr<mqbnet::Session> session;
256265
bmqu::MemOutStream errStream;
257266
bsl::string error;
258-
rc = context->negotiationCb()(errStream, &session, context.get());
259267

260-
if (rc != 0) {
261-
error = bsl::string(errStream.str().data(),
262-
errStream.str().length());
263-
}
264-
265-
context->complete(rc, error, session);
268+
rc = context->negotiationCb()(errStream, &session, context.get());
269+
context->complete(rc, errStream.str(), session);
266270

267271
return; // RETURN
268272
}
@@ -275,12 +279,12 @@ void Authenticator::authenticate(
275279
context->authenticationEncodingType());
276280
if (response.status().category() !=
277281
bmqp_ctrlmsg::StatusCategory::E_SUCCESS) {
278-
context->complete(rc,
282+
context->complete(rc_AUTHENTICATION_FAILED,
279283
authenticationErrorStream.str(),
280284
bsl::shared_ptr<mqbnet::Session>());
281285
}
282286
else if (rc != 0) {
283-
context->complete(rc,
287+
context->complete((rc * 10) + rc_SEND_AUTHENTICATION_RESPONSE_FAILED,
284288
sendResponseErrorStream.str(),
285289
bsl::shared_ptr<mqbnet::Session>());
286290
}
@@ -289,9 +293,8 @@ void Authenticator::authenticate(
289293
bmqu::MemOutStream readErrorStream;
290294
rc = context->scheduleReadCb()(readErrorStream, context);
291295
if (rc != 0) {
292-
context->complete(rc,
293-
bsl::string(readErrorStream.str().data(),
294-
readErrorStream.str().length()),
296+
context->complete((rc * 10) + rc_CONTINUE_READ_FAILED,
297+
readErrorStream.str(),
295298
bsl::shared_ptr<mqbnet::Session>());
296299
}
297300

src/groups/mqb/mqba/mqba_initialconnectionhandler.cpp

Lines changed: 91 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ int InitialConnectionHandler::processBlob(
163163
// Value for the various RC error categories
164164
rc_SUCCESS = 0,
165165
rc_INVALID_INITIALCONNECTION_MESSAGE = -1,
166-
rc_DEFAULT_CREDENTIAL_DISALLOWED = -2,
167166
};
168167

169168
bsl::optional<bsl::variant<bmqp_ctrlmsg::AuthenticationMessage,
@@ -193,83 +192,95 @@ int InitialConnectionHandler::processBlob(
193192
context,
194193
bsl::get<bmqp_ctrlmsg::AuthenticationMessage>(message.value()));
195194
}
196-
else if (bsl::holds_alternative<bmqp_ctrlmsg::NegotiationMessage>(
197-
message.value())) {
198-
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage =
199-
bsl::get<bmqp_ctrlmsg::NegotiationMessage>(message.value());
200-
201-
bool isClientyIdentity = negotiationMessage.isClientIdentityValue();
202-
203-
if (isClientyIdentity) {
204-
// Create NegotiationContext only when we receive a ClientIdentity.
205-
// This is been created already if we sent the ClientIdentity and
206-
// are expecting a BrokerResponse.
207-
bsl::shared_ptr<mqbnet::NegotiationContext> negotiationContext =
208-
bsl::allocate_shared<mqbnet::NegotiationContext>(
209-
d_allocator_p,
210-
context.get(), // initialConnectionContext
211-
negotiationMessage, // negotiationMessage
212-
bsl::string(), // clusterName
213-
mqbnet::ConnectionType::e_UNKNOWN, // connectionType
214-
0, // maxMissedHeartbeat
215-
bsl::nullptr_t(), // eventProcessor
216-
bsl::nullptr_t() // cluster
217-
);
218-
219-
context->setNegotiationContext(negotiationContext);
220-
221-
// Received a ClientIdentity before an AuthenticationRequest,
222-
// use the default authentication credential to authenticate.
223-
// In order not to block the IO thread, for default credential, we
224-
// do negotiation in authentication threads
225-
if (!context->authenticationContext()) {
226-
if (!d_authenticator_p->anonymousCredential()) {
227-
errorDescription
228-
<< "Anonymous credential is disallowed, "
229-
<< "cannot negotiate without authentication.";
230-
return (rc * 10) +
231-
rc_DEFAULT_CREDENTIAL_DISALLOWED; // RETURN
232-
}
233-
234-
context->setNegotiationCb(bdlf::BindUtil::bind(
235-
&mqbnet::Negotiator::createSessionOnMsgType,
236-
d_negotiator_p,
237-
bdlf::PlaceHolders::_1, // errorDescription
238-
bdlf::PlaceHolders::_2, // session
239-
bdlf::PlaceHolders::_3 // context
240-
));
241-
242-
bmqp_ctrlmsg::AuthenticationMessage authenticationMessage;
243-
bmqp_ctrlmsg::AuthenticateRequest& authenticateRequest =
244-
authenticationMessage.makeAuthenticateRequest();
245-
246-
const mqbcfg::Credential& anonymousCredential =
247-
d_authenticator_p->anonymousCredential().value();
248-
authenticateRequest.mechanism() =
249-
anonymousCredential.mechanism();
250-
authenticateRequest.data() = bsl::vector<char>(
251-
anonymousCredential.identity().begin(),
252-
anonymousCredential.identity().end());
253-
254-
rc = d_authenticator_p->handleAuthentication(
255-
errorDescription,
256-
context,
257-
authenticationMessage);
258-
259-
return rc; // RETURN
195+
else {
196+
rc = handleNegotiationMessage(
197+
errorDescription,
198+
session,
199+
bsl::get<bmqp_ctrlmsg::NegotiationMessage>(message.value()),
200+
context);
201+
}
202+
203+
return rc;
204+
}
205+
206+
int InitialConnectionHandler::handleNegotiationMessage(
207+
bsl::ostream& errorDescription,
208+
bsl::shared_ptr<mqbnet::Session>* session,
209+
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
210+
const InitialConnectionContextSp& context)
211+
{
212+
enum RcEnum {
213+
// Value for the various RC error categories
214+
rc_SUCCESS = 0,
215+
rc_DEFAULT_CREDENTIAL_DISALLOWED = -1,
216+
};
217+
218+
int rc = 0;
219+
220+
if (negotiationMessage.isClientIdentityValue()) {
221+
// Create NegotiationContext only when we receive a ClientIdentity,
222+
// since it's been created already if we sent the ClientIdentity and
223+
// are expecting a BrokerResponse.
224+
bsl::shared_ptr<mqbnet::NegotiationContext> negotiationContext =
225+
bsl::allocate_shared<mqbnet::NegotiationContext>(
226+
d_allocator_p,
227+
context.get(), // initialConnectionContext
228+
negotiationMessage, // negotiationMessage
229+
bsl::string(), // clusterName
230+
mqbnet::ConnectionType::e_UNKNOWN, // connectionType
231+
0, // maxMissedHeartbeat
232+
bsl::nullptr_t(), // eventProcessor
233+
bsl::nullptr_t() // cluster
234+
);
235+
236+
context->setNegotiationContext(negotiationContext);
237+
238+
// Received a ClientIdentity before an AuthenticationRequest,
239+
// use the default authentication credential to authenticate.
240+
if (!context->authenticationContext()) {
241+
if (!d_authenticator_p->anonymousCredential()) {
242+
errorDescription << "Anonymous credential is disallowed, "
243+
<< "cannot negotiate without authentication.";
244+
return rc_DEFAULT_CREDENTIAL_DISALLOWED; // RETURN
260245
}
261-
}
262-
else {
263-
// Received a BrokerResponse, which is the last message of the
264-
// negotiation protocol.
265-
context->negotiationContext()->setNegotiationMessage(
266-
bsl::get<bmqp_ctrlmsg::NegotiationMessage>(message.value()));
267-
}
268246

269-
rc = d_negotiator_p->createSessionOnMsgType(errorDescription,
270-
session,
271-
context.get());
247+
context->setNegotiationCb(bdlf::BindUtil::bind(
248+
&mqbnet::Negotiator::createSessionOnMsgType,
249+
d_negotiator_p,
250+
bdlf::PlaceHolders::_1, // errorDescription
251+
bdlf::PlaceHolders::_2, // session
252+
bdlf::PlaceHolders::_3 // context
253+
));
254+
255+
bmqp_ctrlmsg::AuthenticationMessage authenticationMessage;
256+
bmqp_ctrlmsg::AuthenticateRequest& authenticateRequest =
257+
authenticationMessage.makeAuthenticateRequest();
258+
259+
const mqbcfg::Credential& anonymousCredential =
260+
d_authenticator_p->anonymousCredential().value();
261+
authenticateRequest.mechanism() = anonymousCredential.mechanism();
262+
authenticateRequest.data() = bsl::vector<char>(
263+
anonymousCredential.identity().begin(),
264+
anonymousCredential.identity().end());
265+
266+
rc = d_authenticator_p->handleAuthentication(
267+
errorDescription,
268+
context,
269+
authenticationMessage);
270+
271+
return rc; // RETURN
272+
}
272273
}
274+
else {
275+
// Received a BrokerResponse, which is the last message of the
276+
// negotiation protocol.
277+
context->negotiationContext()->setNegotiationMessage(
278+
negotiationMessage);
279+
}
280+
281+
rc = d_negotiator_p->createSessionOnMsgType(errorDescription,
282+
session,
283+
context.get());
273284

274285
return rc;
275286
}
@@ -306,9 +317,9 @@ int InitialConnectionHandler::decodeInitialConnectionMessage(
306317
bmqp_ctrlmsg::AuthenticationMessage authenticationMessage;
307318
bmqp_ctrlmsg::NegotiationMessage negotiationMessage;
308319

309-
BALL_LOG_INFO << "Received blob: " << bmqu::BlobStartHexDumper(&blob);
310-
311320
if (event.isAuthenticationEvent()) {
321+
BALL_LOG_DEBUG << "Received AuthenticationEvent: "
322+
<< bmqu::BlobStartHexDumper(&blob);
312323
const int rc = event.loadAuthenticationEvent(&authenticationMessage);
313324
if (rc != 0) {
314325
errorDescription
@@ -323,6 +334,8 @@ int InitialConnectionHandler::decodeInitialConnectionMessage(
323334
*message = authenticationMessage;
324335
}
325336
else if (event.isControlEvent()) {
337+
BALL_LOG_DEBUG << "Received ControlEvent: "
338+
<< bmqu::BlobStartHexDumper(&blob);
326339
const int rc = event.loadControlEvent(&negotiationMessage);
327340
if (rc != 0) {
328341
errorDescription << "Invalid message received [reason: 'control "
@@ -454,7 +467,7 @@ void InitialConnectionHandler::handleInitialConnection(
454467
}
455468

456469
if (rc != 0) {
457-
error = bsl::string(errStream.str().data(), errStream.str().length());
470+
error = errStream.str();
458471
return;
459472
}
460473

src/groups/mqb/mqba/mqba_initialconnectionhandler.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ class InitialConnectionHandler : public mqbnet::InitialConnectionHandler {
114114
const bdlbb::Blob& blob,
115115
const InitialConnectionContextSp& context);
116116

117+
int handleNegotiationMessage(
118+
bsl::ostream& errorDescription,
119+
bsl::shared_ptr<mqbnet::Session>* session,
120+
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
121+
const InitialConnectionContextSp& context);
122+
117123
/// Decode the initial connection messages received in the specified
118124
/// `blob` and store it, on success, in the specified optional
119125
/// `negotiationMsg`, returning 0. Return a non-zero code on error and

0 commit comments

Comments
 (0)