153153#include < mqbcfg_brokerconfig.h>
154154#include < mqbi_cluster.h>
155155#include < mqbi_queue.h>
156+ #include < mqbnet_authenticationcontext.h>
156157#include < mqbnet_tcpsessionfactory.h>
157158#include < mqbstat_brokerstats.h>
158159#include < mqbu_messageguidutil.h>
@@ -2631,14 +2632,16 @@ ClientSession::ClientSession(
26312632 const bsl::shared_ptr<bmqio::Channel>& channel,
26322633 const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
26332634 const bsl::string& sessionDescription,
2634- mqbi::Dispatcher* dispatcher,
2635- mqbblp::ClusterCatalog* clusterCatalog,
2636- mqbi::DomainFactory* domainFactory,
2637- bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
2638- ClientSessionState::BlobSpPool* blobSpPool,
2639- bdlbb::BlobBufferFactory* bufferFactory,
2640- bdlmt::EventScheduler* scheduler,
2641- bslma::Allocator* allocator)
2635+ const bsl::shared_ptr<mqbnet::AuthenticationContext>&
2636+ authenticationContext,
2637+ mqbi::Dispatcher* dispatcher,
2638+ mqbblp::ClusterCatalog* clusterCatalog,
2639+ mqbi::DomainFactory* domainFactory,
2640+ bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
2641+ ClientSessionState::BlobSpPool* blobSpPool,
2642+ bdlbb::BlobBufferFactory* bufferFactory,
2643+ bdlmt::EventScheduler* scheduler,
2644+ bslma::Allocator* allocator)
26422645: d_self(this ) // use default allocator
26432646, d_operationState(e_RUNNING)
26442647, d_isDisconnecting(false )
@@ -2650,6 +2653,7 @@ ClientSession::ClientSession(
26502653 bmqp::MessagePropertiesFeatures::k_MESSAGE_PROPERTIES_EX,
26512654 d_clientIdentity_p->features ()))
26522655, d_description(sessionDescription, allocator)
2656+ , d_authenticationContext(authenticationContext)
26532657, d_channel_sp(channel)
26542658, d_state(clientStatContext,
26552659 blobSpPool,
@@ -2716,7 +2720,54 @@ void ClientSession::processEvent(const bmqp::Event& event,
27162720{
27172721 // executed by the *IO* thread
27182722
2719- if (event.isControlEvent ()) {
2723+ if (!event.isAuthenticationEvent () && !d_authenticationContext) {
2724+ BALL_LOG_ERROR << " The authentication lifetime has expired. Need to "
2725+ " re-authenticate." ;
2726+ return ; // RETURN
2727+ }
2728+
2729+ if (event.isAuthenticationEvent ()) {
2730+ if (d_authenticationContext->state ().testAndSwap (
2731+ AuthnState::e_AUTHENTICATED,
2732+ AuthnState::e_AUTHENTICATING) != AuthnState::e_AUTHENTICATED) {
2733+ BALL_LOG_ERROR << " #CLIENT_IMPROPER_BEHAVIOR " << description ()
2734+ << " : received Authentication event while "
2735+ " authentication is in progress" ;
2736+ return ; // RETURN
2737+ }
2738+
2739+ bmqp_ctrlmsg::AuthenticationMessage authenticationMessage;
2740+ int rc = event.loadAuthenticationEvent (&authenticationMessage);
2741+ if (rc != 0 ) {
2742+ BALL_LOG_ERROR << " #CORRUPTED_EVENT " << description ()
2743+ << " : Received invalid authentication message "
2744+ " from client [reason: 'failed to decode', rc: "
2745+ << rc << " ]:\n "
2746+ << bmqu::BlobStartHexDumper (event.blob ());
2747+ return ; // RETURN
2748+ }
2749+
2750+ BALL_LOG_INFO << description () << " : Received authentication message: "
2751+ << authenticationMessage;
2752+
2753+ d_authenticationContext->setAuthenticationMessage (
2754+ authenticationMessage);
2755+ d_authenticationContext->setAuthenticationEncodingType (
2756+ event.authenticationEventEncodingType ());
2757+
2758+ bmqu::MemOutStream errorStream;
2759+ rc = d_authenticationContext->reAuthenticateCb ()(
2760+ errorStream,
2761+ d_authenticationContext,
2762+ d_channel_sp);
2763+ if (rc != 0 ) {
2764+ BALL_LOG_ERROR << " #AUTHENTICATION_FAILED " << description ()
2765+ << " : Authentication failed [reason: '"
2766+ << errorStream.str () << " ', rc: " << rc << " ]" ;
2767+ return ; // RETURN
2768+ }
2769+ }
2770+ else if (event.isControlEvent ()) {
27202771 bdlma::LocalSequentialAllocator<2048 > localAllocator (
27212772 d_state.d_allocator_p );
27222773 bmqp_ctrlmsg::ControlMessage controlMessage (&localAllocator);
0 commit comments