@@ -135,6 +135,120 @@ ntcCreateInterfaceConfig(const bmqt::SessionOptions& sessionOptions,
135135
136136} // close unnamed namespace
137137
138+ // -------------------------
139+ // class bmqimp::Application::ChannelFactoryPipeline
140+ // -------------------------
141+
142+ Application::ChannelFactoryPipeline::ChannelFactoryPipeline (
143+ const bmqt::SessionOptions& sessionOptions,
144+ bdlbb::BlobBufferFactory* blobBufferFactory,
145+ bdlmt::EventScheduler* scheduler,
146+ const bmqio::StatChannelFactoryConfig::StatContextCreatorFn&
147+ statContextCreator,
148+ const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
149+ BlobSpPool* blobSpPool,
150+ bslma::Allocator* allocator)
151+ : d_allocator_p(bslma::Default::allocator(allocator))
152+ , d_channelFactory(ntcCreateInterfaceConfig(sessionOptions, d_allocator_p),
153+ blobBufferFactory,
154+ d_allocator_p)
155+ , d_resolvingChannelFactory(
156+ bmqio::ResolvingChannelFactoryConfig (
157+ &d_channelFactory,
158+ bmqex::ExecutionPolicyUtil::oneWay ().alwaysBlocking().useExecutor(
159+ bmqex::SystemExecutor ())),
160+ d_allocator_p)
161+ , d_reconnectingChannelFactory(
162+ bmqio::ReconnectingChannelFactoryConfig (&d_resolvingChannelFactory,
163+ scheduler,
164+ d_allocator_p),
165+ d_allocator_p)
166+ , d_statChannelFactory(
167+ bmqio::StatChannelFactoryConfig (&d_reconnectingChannelFactory,
168+ statContextCreator,
169+ d_allocator_p),
170+ d_allocator_p)
171+ , d_negotiatedChannelFactory(
172+ NegotiatedChannelFactoryConfig (&d_statChannelFactory,
173+ negotiationMessage,
174+ sessionOptions.connectTimeout(),
175+ blobSpPool,
176+ d_allocator_p),
177+ d_allocator_p)
178+ {
179+ }
180+
181+ int Application::ChannelFactoryPipeline::configureTls (
182+ const bsl::string& caPath)
183+ {
184+ ntca::EncryptionClientOptions encryptionClientOptions;
185+
186+ // TODO: Configure TLS protocol versions
187+
188+ // Set the minimum version to TLS 1.3
189+ encryptionClientOptions.setMinMethod (ntca::EncryptionMethod::e_TLS_V1_3);
190+ encryptionClientOptions.setMaxMethod (ntca::EncryptionMethod::e_DEFAULT);
191+
192+ // Enable server side authentication
193+ encryptionClientOptions.setAuthentication (
194+ ntca::EncryptionAuthentication::e_VERIFY);
195+
196+ encryptionClientOptions.addAuthorityFile (caPath);
197+
198+ ntsa::Error err = d_channelFactory.configureEncryptionClient (
199+ encryptionClientOptions);
200+
201+ return err.code ();
202+ }
203+
204+ void Application::ChannelFactoryPipeline::listen (
205+ bmqio::Status* status,
206+ bslma::ManagedPtr<OpHandle>* handle,
207+ const bmqio::ListenOptions& options,
208+ const ResultCallback& cb)
209+ {
210+ d_negotiatedChannelFactory.listen (status, handle, options, cb);
211+ }
212+
213+ void Application::ChannelFactoryPipeline::connect (
214+ bmqio::Status* status,
215+ bslma::ManagedPtr<OpHandle>* handle,
216+ const bmqio::ConnectOptions& options,
217+ const ResultCallback& cb)
218+ {
219+ d_negotiatedChannelFactory.connect (status, handle, options, cb);
220+ }
221+
222+ int Application::ChannelFactoryPipeline::start ()
223+ {
224+ // Start the channel factories.
225+ int rc = d_channelFactory.start ();
226+ if (rc != 0 ) {
227+ BALL_LOG_ERROR << " Failed to start channelFactory [rc: " << rc << " ]" ;
228+ return bmqt::GenericResult::e_UNKNOWN; // RETURN
229+ }
230+ bdlb::ScopeExitAny tcpScopeGuard (
231+ bdlf::BindUtil::bind (&bmqio::NtcChannelFactory::stop,
232+ &d_channelFactory));
233+
234+ rc = d_reconnectingChannelFactory.start ();
235+ if (rc != 0 ) {
236+ BALL_LOG_ERROR << " Failed to start reconnectingChannelFactory [rc: "
237+ << rc << " ]" ;
238+ return bmqt::GenericResult::e_UNKNOWN; // RETURN
239+ }
240+
241+ tcpScopeGuard.release ();
242+
243+ return bmqt::GenericResult::e_SUCCESS;
244+ }
245+
246+ void Application::ChannelFactoryPipeline::stop ()
247+ {
248+ d_reconnectingChannelFactory.stop ();
249+ d_channelFactory.stop ();
250+ }
251+
138252// -------------------------
139253// class bmqimp::Application
140254// -------------------------
@@ -303,8 +417,7 @@ void Application::brokerSessionStopped(
303417 // This code assumes that there is no need to stop both factories upon
304418 // e_START_FAILURE.
305419 // If we wanted that, we would need another event.
306- d_reconnectingChannelFactory.stop ();
307- d_channelFactory.stop ();
420+ d_channelFactoryPipeline.stop ();
308421 }
309422
310423 d_scheduler.cancelAllEventsAndWait ();
@@ -320,71 +433,23 @@ void Application::brokerSessionStopped(
320433 BALL_LOG_INFO << " bmqimp::Application stop completed" ;
321434}
322435
323- int Application::loadTlsConfig (bmqio::NtcChannelFactory* channelFactory,
324- const bsl::string& caPath)
325- {
326- bmqio::NtcCertificateLoader certLoader =
327- channelFactory->createCertificateLoader ();
328-
329- int rc = 0 ;
330- if ((rc = d_certificateStore.loadCertificateAuthority (certLoader,
331- caPath))) {
332- return rc;
333- }
334-
335- ntca::EncryptionClientOptions encryptionClientOptions;
336- // Set the minimum version to TLS 1.3
337- encryptionClientOptions.setMinMethod (ntca::EncryptionMethod::e_TLS_V1_3);
338- encryptionClientOptions.setMaxMethod (ntca::EncryptionMethod::e_TLS_V1_3);
339- // Enable server side authentication
340- encryptionClientOptions.setAuthentication (
341- ntca::EncryptionAuthentication::e_VERIFY);
342-
343- ntsa::Error err;
344- {
345- bsl::vector<char > authorityData (&d_allocator);
346- err = d_certificateStore.certificateAuthority ()->encode (
347- &authorityData);
348- if (err) {
349- return err.code ();
350- }
351- encryptionClientOptions.addAuthorityData (authorityData);
352- }
353-
354- err = channelFactory->createEncryptionClient (&d_encryptionClient_sp,
355- encryptionClientOptions);
356-
357- return err.code ();
358- }
359-
360436bmqt::GenericResult::Enum Application::startChannel ()
361437{
362438 // executed by the FSM thread
363439
364440 BSLS_ASSERT_SAFE (d_brokerSession.state () ==
365441 bmqimp::BrokerSession::State::e_STARTING);
366442
367- int rc = 0 ;
368-
369- // Start the channel factories.
370- rc = d_channelFactory.start ();
443+ int rc = d_channelFactoryPipeline.start ();
371444 if (rc != 0 ) {
372- BALL_LOG_ERROR << " Failed to start channelFactory [rc: " << rc << " ]" ;
445+ BALL_LOG_ERROR << " Failed to start channelFactoryPipeline [rc: " << rc
446+ << " ]" ;
373447 return bmqt::GenericResult::e_UNKNOWN; // RETURN
374448 }
375- bdlb::ScopeExitAny tcpScopeGuard (
376- bdlf::BindUtil::bind (&bmqio::NtcChannelFactory::stop,
377- &d_channelFactory));
378449
379- rc = d_reconnectingChannelFactory.start ();
380- if (rc != 0 ) {
381- BALL_LOG_ERROR << " Failed to start reconnectingChannelFactory [rc: "
382- << rc << " ]" ;
383- return bmqt::GenericResult::e_UNKNOWN; // RETURN
384- }
385- bdlb::ScopeExitAny reconnectingScopeGuard (
386- bdlf::BindUtil::bind (&bmqio::ReconnectingChannelFactory::stop,
387- &d_reconnectingChannelFactory));
450+ bdlb::ScopeExitAny pipelineScopeGuard (
451+ bdlf::BindUtil::bind (&ChannelFactoryPipeline::stop,
452+ &d_channelFactoryPipeline));
388453
389454 // Connect to the broker.
390455 bmqio::TCPEndpoint endpoint (d_sessionOptions.brokerUri ());
@@ -408,12 +473,7 @@ bmqt::GenericResult::Enum Application::startChannel()
408473 .setAttemptInterval (attemptInterval)
409474 .setAutoReconnect (true );
410475
411- if (d_sessionOptions.isTlsSession ()) {
412- loadTlsConfig (&d_channelFactory,
413- d_sessionOptions.certificateAuthority ());
414- }
415-
416- d_negotiatedChannelFactory.connect (
476+ d_channelFactoryPipeline.connect (
417477 &status,
418478 &d_connectHandle_mp,
419479 options,
@@ -447,8 +507,7 @@ bmqt::GenericResult::Enum Application::startChannel()
447507 bdlf::MemFnUtil::memFn (&Application::snapshotStats, this ));
448508 }
449509
450- reconnectingScopeGuard.release ();
451- tcpScopeGuard.release ();
510+ pipelineScopeGuard.release ();
452511
453512 return bmqt::GenericResult::e_SUCCESS;
454513}
@@ -603,37 +662,17 @@ Application::Application(
603662 bmqp::BlobPoolUtil::createBlobPool (&d_blobBufferFactory,
604663 d_allocators.get(" BlobSpPool" )))
605664, d_scheduler(bsls::SystemClockType::e_MONOTONIC, &d_allocator)
606- , d_channelFactory(ntcCreateInterfaceConfig(sessionOptions, allocator),
607- &d_blobBufferFactory,
608- allocator)
609- , d_resolvingChannelFactory(
610- bmqio::ResolvingChannelFactoryConfig (
611- &d_channelFactory,
612- bmqex::ExecutionPolicyUtil::oneWay ().alwaysBlocking().useExecutor(
613- bmqex::SystemExecutor ())),
614- allocator)
615- , d_reconnectingChannelFactory(
616- bmqio::ReconnectingChannelFactoryConfig (&d_resolvingChannelFactory,
617- &d_scheduler,
618- allocator),
619- allocator)
620- , d_statChannelFactory(
621- bmqio::StatChannelFactoryConfig (
622- &d_reconnectingChannelFactory,
623- bdlf::BindUtil::bind (&Application::channelStatContextCreator,
624- this ,
625- bdlf::PlaceHolders::_1, // channel
626- bdlf::PlaceHolders::_2), // handle
627- allocator),
628- allocator)
629- , d_negotiatedChannelFactory(
630- NegotiatedChannelFactoryConfig (&d_statChannelFactory,
631- negotiationMessage,
632- sessionOptions.connectTimeout(),
633- d_blobSpPool_sp.get(),
634- d_encryptionClient_sp,
635- allocator),
636- allocator)
665+ , d_channelFactoryPipeline(
666+ sessionOptions,
667+ &d_blobBufferFactory,
668+ &d_scheduler,
669+ bdlf::BindUtil::bind (&Application::channelStatContextCreator,
670+ this ,
671+ bdlf::PlaceHolders::_1, // channel
672+ bdlf::PlaceHolders::_2), // handle
673+ negotiationMessage,
674+ d_blobSpPool_sp.get(),
675+ &d_allocator)
637676, d_connectHandle_mp()
638677, d_brokerSession(&d_scheduler,
639678 &d_blobBufferFactory,
0 commit comments