Skip to content

Commit 8beed8d

Browse files
committed
change according to negotiator
Signed-off-by: Emelia Lei <[email protected]>
1 parent 04c05b5 commit 8beed8d

File tree

5 files changed

+199
-22
lines changed

5 files changed

+199
-22
lines changed

src/groups/mqb/mqba/mqba_initialconnectionhandler.cpp

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,132 @@ void InitialConnectionHandler::readCallback(
6060
bdlbb::Blob* blob,
6161
const InitialConnectionContextSp& context)
6262
{
63-
context->readCallback(status, numNeeded, blob);
63+
enum RcEnum {
64+
// Value for the various RC error categories
65+
rc_SUCCESS = 0,
66+
rc_READ_BLOB_ERROR = -1,
67+
rc_PROCESS_BLOB_ERROR = -2,
68+
};
69+
70+
BALL_LOG_TRACE << "InitialConnectionHandler readCb: [status: " << status
71+
<< ", peer: '" << context->channel()->peerUri() << "']";
72+
73+
bsl::shared_ptr<mqbnet::Session> session;
74+
bmqu::MemOutStream errStream;
75+
bdlbb::Blob outPacket;
76+
77+
bool isFullBlob = true;
78+
int rc = rc_SUCCESS;
79+
bsl::string error;
80+
81+
// The completeCb is not triggered only when there's more to read
82+
// (didn't receive a full blob; or received a full blob and
83+
// successfully scheduled another read)
84+
bdlb::ScopeExitAny guard(
85+
bdlf::BindUtil::bind(&InitialConnectionHandler::complete,
86+
context,
87+
bsl::ref(rc),
88+
bsl::ref(error),
89+
bsl::ref(session)));
90+
91+
rc = readBlob(errStream, &outPacket, &isFullBlob, status, numNeeded, blob);
92+
if (rc != rc_SUCCESS) {
93+
rc = (rc * 10) + rc_READ_BLOB_ERROR;
94+
error = bsl::string(errStream.str().data(), errStream.str().length());
95+
return; // RETURN
96+
}
97+
98+
if (!isFullBlob) {
99+
guard.release();
100+
return; // RETURN
101+
}
102+
103+
rc = processBlob(errStream, &session, outPacket, context);
104+
if (rc != rc_SUCCESS) {
105+
rc = (rc * 10) + rc_PROCESS_BLOB_ERROR;
106+
error = bsl::string(errStream.str().data(), errStream.str().length());
107+
return; // RETURN
108+
}
109+
}
110+
111+
int InitialConnectionHandler::readBlob(bsl::ostream& errorDescription,
112+
bdlbb::Blob* outPacket,
113+
bool* isFullBlob,
114+
const bmqio::Status& status,
115+
int* numNeeded,
116+
bdlbb::Blob* blob)
117+
{
118+
enum RcEnum {
119+
// Value for the various RC error categories
120+
rc_SUCCESS = 0,
121+
rc_READ_ERROR = -1,
122+
rc_UNRECOVERABLE_READ_ERROR = -2
123+
};
124+
125+
if (!status) {
126+
errorDescription << "Read error: " << status;
127+
return (10 * status.category()) + rc_READ_ERROR; // RETURN
128+
}
129+
130+
int rc = bmqio::ChannelUtil::handleRead(outPacket, numNeeded, blob);
131+
if (rc != 0) {
132+
// This indicates a non recoverable error...
133+
errorDescription << "Unrecoverable read error:\n"
134+
<< bmqu::BlobStartHexDumper(blob);
135+
return (rc * 10) + rc_UNRECOVERABLE_READ_ERROR; // RETURN
136+
}
137+
138+
if (outPacket->length() == 0) {
139+
// Don't yet have a full blob
140+
*isFullBlob = false;
141+
return rc_SUCCESS; // RETURN
142+
}
143+
144+
// Have a full blob, indicate no more bytes needed (we have to do this
145+
// because 'handleRead' above set it back to 4 at the end).
146+
*numNeeded = 0;
147+
148+
return rc_SUCCESS;
149+
}
150+
151+
int InitialConnectionHandler::processBlob(
152+
bsl::ostream& errorDescription,
153+
bsl::shared_ptr<mqbnet::Session>* session,
154+
const bdlbb::Blob& blob,
155+
const InitialConnectionContextSp& context)
156+
{
157+
enum RcEnum {
158+
// Value for the various RC error categories
159+
rc_SUCCESS = 0,
160+
rc_INVALID_NEGOTIATION_MESSAGE = -1,
161+
};
162+
163+
bsl::optional<bmqp_ctrlmsg::NegotiationMessage> negotiationMsg;
164+
165+
int rc = decodeInitialConnectionMessage(errorDescription,
166+
blob,
167+
&negotiationMsg);
168+
169+
if (rc != 0) {
170+
return (rc * 10) + rc_INVALID_NEGOTIATION_MESSAGE; // RETURN
171+
}
172+
173+
if (negotiationMsg.has_value()) {
174+
context->negotiationContext()->setNegotiationMessage(
175+
negotiationMsg.value());
176+
177+
rc = d_negotiator_mp->createSessionOnMsgType(errorDescription,
178+
session,
179+
context.get());
180+
}
181+
else {
182+
errorDescription
183+
<< "Decode NegotiationMessage succeeds but nothing is "
184+
"loaded into the NegotiationMessage.";
185+
rc = (rc * 10) + rc_INVALID_NEGOTIATION_MESSAGE;
186+
}
187+
188+
return rc;
64189
}
65190

66191
int InitialConnectionHandler::decodeInitialConnectionMessage(

src/groups/mqb/mqba/mqba_initialconnectionhandler.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ class InitialConnectionHandler : public mqbnet::InitialConnectionHandler {
8282
bdlbb::Blob* blob,
8383
const InitialConnectionContextSp& context);
8484

85+
int readBlob(bsl::ostream& errorDescription,
86+
bdlbb::Blob* outPacket,
87+
bool* isFullBlob,
88+
const bmqio::Status& status,
89+
int* numNeeded,
90+
bdlbb::Blob* blob);
91+
92+
int processBlob(bsl::ostream& errorDescription,
93+
bsl::shared_ptr<mqbnet::Session>* session,
94+
const bdlbb::Blob& blob,
95+
const InitialConnectionContextSp& context);
96+
8597
/// Decode the initial connection messages received in the specified
8698
/// `blob` and store it, on success, in the specified optional
8799
/// `negotiationMsg`, returning 0. Return a non-zero code on error and
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2025 Bloomberg Finance L.P.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
// mqbnet_initialconnectionhandler.cpp -*-C++-*-
17+
#include <mqbnet_initialconnectionhandler.h>
18+
19+
#include <mqbscm_version.h>
20+
21+
namespace BloombergLP {
22+
namespace mqbnet {
23+
24+
// ------------------------------
25+
// class InitialConnectionHandler
26+
// ------------------------------
27+
28+
InitialConnectionHandler::~InitialConnectionHandler()
29+
{
30+
// NOTHING: Pure interface
31+
}
32+
33+
} // close package namespace
34+
} // close enterprise namespace

src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -348,23 +348,27 @@ void TCPSessionFactory::handleInitialConnection(
348348

349349
// Create a unique InitialConnectionContext for the channel, from
350350
// the OperationContext. This shared_ptr is bound to the
351-
// 'negotiationComplete' callback below, which is what scopes its lifetime.
352-
bsl::shared_ptr<InitialConnectionContext> initialConnectionContext;
353-
initialConnectionContext.createInplace(d_allocator_p,
354-
context->d_isIncoming);
355-
(*initialConnectionContext)
356-
.setUserData(context->d_negotiationUserData_sp.get())
357-
.setResultState(context->d_resultState_p)
358-
.setChannel(channel)
359-
.setCompleteCb(bdlf::BindUtil::bind(
360-
&TCPSessionFactory::negotiationComplete,
361-
this,
362-
bdlf::PlaceHolders::_1, // status
363-
bdlf::PlaceHolders::_2, // errorDescription
364-
bdlf::PlaceHolders::_3, // session
365-
bdlf::PlaceHolders::_4, // channel
366-
bdlf::PlaceHolders::_5, // initialConnectionContext
367-
context));
351+
// 'initialConnectionComplete' callback below, which is what scopes its
352+
// lifetime.
353+
bsl::shared_ptr<InitialConnectionContext> initialConnectionContext =
354+
bsl::allocate_shared<InitialConnectionContext>(
355+
d_allocator_p,
356+
context->d_isIncoming,
357+
bsl::nullptr_t(),
358+
bsl::nullptr_t(),
359+
context->d_negotiationUserData_sp.get(),
360+
context->d_resultState_p,
361+
channel,
362+
bdlf::BindUtil::bindS(
363+
d_allocator_p,
364+
&TCPSessionFactory::negotiationComplete,
365+
this,
366+
bdlf::PlaceHolders::_1, // status
367+
bdlf::PlaceHolders::_2, // errorDescription
368+
bdlf::PlaceHolders::_3, // session
369+
bdlf::PlaceHolders::_4, // channel
370+
bdlf::PlaceHolders::_5, // initialConnectionContext
371+
context));
368372

369373
// Register as observer of the channel to get the 'onClose'
370374
channel->onClose(

src/groups/mqb/mqbnet/package/mqbnet.mem

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1+
mqbnet_authenticationcontext
2+
mqbnet_authenticator
13
mqbnet_channel
24
mqbnet_cluster
35
mqbnet_clusteractivenodemanager
46
mqbnet_clusterimp
57
mqbnet_connectiontype
68
mqbnet_dummysession
9+
mqbnet_elector
10+
mqbnet_initialconnectioncontext
11+
mqbnet_initialconnectionhandler
712
mqbnet_mockcluster
813
mqbnet_multirequestmanager
14+
mqbnet_negotiationcontext
915
mqbnet_negotiator
1016
mqbnet_session
1117
mqbnet_tcpsessionfactory
1218
mqbnet_transportmanager
13-
mqbnet_elector
14-
mqbnet_initialconnectionhandler
15-
mqbnet_initialconnectioncontext
16-
mqbnet_negotiationcontext

0 commit comments

Comments
 (0)