22 *| This source code is provided under the Apache 2.0 license --
33 *| and is provided AS IS with no warranty or guarantee of fit for purpose. --
44 *| See the project's LICENSE.md for details. --
5- *| Copyright (C) 2019-2023 Refinitiv. All rights reserved. --
5+ *| Copyright (C) 2019-2024 Refinitiv. All rights reserved. --
66 *|-----------------------------------------------------------------------------
77 */
88
@@ -93,7 +93,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig) :
9393 _pErrorClientHandler( 0 ),
9494 _theTimeOuts(),
9595 _bApiDispatchThreadStarted(false ),
96- _bUninitializeInvoked(false )
96+ _bUninitializeInvoked(false ),
97+ _negotiatedPingTimeout( 0 )
9798{
9899 _adminClosure = 0 ;
99100 _OAuthReactorConfig = NULL ;
@@ -140,7 +141,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmConsumerClient& adminCli
140141 _pErrorClientHandler(0 ),
141142 _theTimeOuts(),
142143 _bApiDispatchThreadStarted(false ),
143- _bUninitializeInvoked(false )
144+ _bUninitializeInvoked(false ),
145+ _negotiatedPingTimeout( 0 )
144146{
145147 _adminClosure = adminClosure;
146148 _OAuthReactorConfig = NULL ;
@@ -187,7 +189,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmConsumerClient& adminCli
187189 _pErrorClientHandler(0 ),
188190 _theTimeOuts(),
189191 _bApiDispatchThreadStarted(false ),
190- _bUninitializeInvoked(false )
192+ _bUninitializeInvoked(false ),
193+ _negotiatedPingTimeout( 0 )
191194{
192195 _adminClosure = adminClosure;
193196 _OAuthReactorConfig = NULL ;
@@ -234,7 +237,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmOAuth2ConsumerClient& oA
234237 _pErrorClientHandler(0 ),
235238 _theTimeOuts(),
236239 _bApiDispatchThreadStarted(false ),
237- _bUninitializeInvoked(false )
240+ _bUninitializeInvoked(false ),
241+ _negotiatedPingTimeout( 0 )
238242{
239243 _adminClosure = adminClosure;
240244 _OAuthReactorConfig = NULL ;
@@ -281,7 +285,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmProviderClient& adminCli
281285 _pErrorClientHandler(0 ),
282286 _theTimeOuts(),
283287 _bApiDispatchThreadStarted(false ),
284- _bUninitializeInvoked(false )
288+ _bUninitializeInvoked(false ),
289+ _negotiatedPingTimeout( 0 )
285290{
286291 _adminClosure = adminClosure;
287292 _OAuthReactorConfig = NULL ;
@@ -329,7 +334,8 @@ OmmBaseImpl::OmmBaseImpl( ActiveConfig& activeConfig, OmmConsumerErrorClient& cl
329334 _pErrorClientHandler( 0 ),
330335 _theTimeOuts(),
331336 _bApiDispatchThreadStarted(false ),
332- _bUninitializeInvoked(false )
337+ _bUninitializeInvoked(false ),
338+ _negotiatedPingTimeout( 0 )
333339{
334340 _OAuthReactorConfig = NULL ;
335341 _LoginReactorConfig = NULL ;
@@ -386,7 +392,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmOAuth2ConsumerClient& oA
386392 _pErrorClientHandler(0 ),
387393 _theTimeOuts(),
388394 _bApiDispatchThreadStarted(false ),
389- _bUninitializeInvoked(false )
395+ _bUninitializeInvoked(false ),
396+ _negotiatedPingTimeout( 0 )
390397{
391398 _adminClosure = adminClosure;
392399 _OAuthReactorConfig = NULL ;
@@ -443,7 +450,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmConsumerClient& adminCli
443450 _pErrorClientHandler(0 ),
444451 _theTimeOuts(),
445452 _bApiDispatchThreadStarted(false ),
446- _bUninitializeInvoked(false )
453+ _bUninitializeInvoked(false ),
454+ _negotiatedPingTimeout( 0 )
447455{
448456 _adminClosure = adminClosure;
449457 _OAuthReactorConfig = NULL ;
@@ -500,7 +508,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmConsumerClient& adminCli
500508 _pErrorClientHandler(0 ),
501509 _theTimeOuts(),
502510 _bApiDispatchThreadStarted(false ),
503- _bUninitializeInvoked(false )
511+ _bUninitializeInvoked(false ),
512+ _negotiatedPingTimeout( 0 )
504513{
505514 _adminClosure = adminClosure;
506515 _OAuthReactorConfig = NULL ;
@@ -557,7 +566,8 @@ OmmBaseImpl::OmmBaseImpl( ActiveConfig& activeConfig, OmmProviderErrorClient& cl
557566 _pErrorClientHandler( 0 ),
558567 _theTimeOuts(),
559568 _bApiDispatchThreadStarted(false ),
560- _bUninitializeInvoked(false )
569+ _bUninitializeInvoked(false ),
570+ _negotiatedPingTimeout( 0 )
561571{
562572 _adminClosure = 0 ;
563573 _OAuthReactorConfig = NULL ;
@@ -613,7 +623,8 @@ OmmBaseImpl::OmmBaseImpl(ActiveConfig& activeConfig, OmmProviderClient& adminCli
613623 _pErrorClientHandler(0 ),
614624 _theTimeOuts(),
615625 _bApiDispatchThreadStarted(false ),
616- _bUninitializeInvoked(false )
626+ _bUninitializeInvoked(false ),
627+ _negotiatedPingTimeout( 0 )
617628{
618629 _adminClosure = adminClosure;
619630 _OAuthReactorConfig = NULL ;
@@ -2772,14 +2783,16 @@ void OmmBaseImpl::addCommonSocket()
27722783#endif
27732784}
27742785
2775- Int64 OmmBaseImpl::rsslReactorDispatchLoop ( Int64 timeOut , UInt32 count, bool & bMsgDispRcvd )
2786+ Int64 OmmBaseImpl::rsslReactorDispatchLoop ( Int64 timeOutValue , UInt32 count, bool & bMsgDispRcvd )
27762787{
27772788 bMsgDispRcvd = false ;
27782789
27792790 Int64 startTime = GetTime::getMicros ();
27802791 Int64 endTime = 0 ;
27812792 Int64 nextTimer = 0 ;
27822793
2794+ Int64 timeOut = timeOutValue;
2795+
27832796 bool userTimeoutExists ( TimeOut::getTimeOutInMicroSeconds ( *this , nextTimer ) );
27842797 if ( userTimeoutExists )
27852798 {
@@ -2796,6 +2809,15 @@ Int64 OmmBaseImpl::rsslReactorDispatchLoop( Int64 timeOut, UInt32 count, bool& b
27962809 RsslRet reactorRetCode = RSSL_RET_SUCCESS;
27972810 UInt64 loopCount = 0 ;
27982811
2812+ Int64 negotiatedTimeOutInMicroSeconds = DEFAULT_CONNECTION_PINGTIMEOUT * 1000 / 2 ;
2813+
2814+ // Get the negotiated ping timeout.
2815+ if (_negotiatedPingTimeout > 0 )
2816+ {
2817+ negotiatedTimeOutInMicroSeconds = _negotiatedPingTimeout * 1000 ;
2818+ negotiatedTimeOutInMicroSeconds /= 2 ;
2819+ }
2820+
27992821 endTime = GetTime::getMicros ();
28002822
28012823 if ( ( timeOut >= 0 ) && ( endTime - startTime >= timeOut ) )
@@ -2823,6 +2845,11 @@ Int64 OmmBaseImpl::rsslReactorDispatchLoop( Int64 timeOut, UInt32 count, bool& b
28232845 return bMsgDispRcvd ? 0 : -1 ;
28242846 }
28252847
2848+ if ( timeOut < 0 || negotiatedTimeOutInMicroSeconds < timeOut )
2849+ {
2850+ timeOut = negotiatedTimeOutInMicroSeconds;
2851+ }
2852+
28262853#if defined( USING_SELECT )
28272854
28282855 fd_set useReadFds = _readFds;
@@ -2894,6 +2921,8 @@ Int64 OmmBaseImpl::rsslReactorDispatchLoop( Int64 timeOut, UInt32 count, bool& b
28942921#error "No Implementation for Operating System That Does Not Implement ppoll"
28952922#endif
28962923
2924+ timeOut = timeOutValue;
2925+
28972926 if ( selectRetCode > 0 )
28982927 {
28992928 loopCount = 0 ;
@@ -2905,54 +2934,13 @@ Int64 OmmBaseImpl::rsslReactorDispatchLoop( Int64 timeOut, UInt32 count, bool& b
29052934 ++loopCount;
29062935 }
29072936 while ( reactorRetCode > RSSL_RET_SUCCESS && !bMsgDispRcvd && loopCount < 10 );
2908-
2909- if ( reactorRetCode < RSSL_RET_SUCCESS )
2910- {
2911- if ( OmmLoggerClient::ErrorEnum >= _activeConfig.loggerConfig .minLoggerSeverity )
2912- {
2913- EmaString temp ( " Call to rsslReactorDispatch() failed. Internal sysError='" );
2914- temp.append ( _reactorDispatchErrorInfo.rsslError .sysError )
2915- .append ( " ' Error Id " ).append ( _reactorDispatchErrorInfo.rsslError .rsslErrorId ).append ( " ' " )
2916- .append ( " ' Error Location='" ).append ( _reactorDispatchErrorInfo.errorLocation ).append ( " ' " )
2917- .append ( " ' Error text='" ).append ( _reactorDispatchErrorInfo.rsslError .text ).append ( " '. " );
2918-
2919- _userLock.lock ();
2920- if ( _pLoggerClient ) _pLoggerClient->log ( _activeConfig.instanceName , OmmLoggerClient::ErrorEnum, temp );
2921- _userLock.unlock ();
2922- }
2923-
2924- return -2 ;
2925- }
2926-
2927- if ( bMsgDispRcvd ) return 0 ;
2928-
2929- TimeOut::execute ( *this );
2930-
2931- if ( bMsgDispRcvd ) return 0 ;
2932-
2933- endTime = GetTime::getMicros ();
2934-
2935- if ( timeOut >= 0 )
2936- {
2937- if ( endTime > startTime + timeOut ) return -1 ;
2938-
2939- timeOut -= ( endTime - startTime );
2940- }
29412937 }
29422938 else if ( selectRetCode == 0 )
29432939 {
2944- TimeOut::execute ( *this );
2945-
2946- if ( bMsgDispRcvd ) return 0 ;
2947-
2948- endTime = GetTime::getMicros ();
2949-
2950- if ( timeOut >= 0 )
2951- {
2952- if ( endTime > startTime + timeOut ) return -1 ;
2953-
2954- timeOut -= ( endTime - startTime );
2955- }
2940+ // When select/ppoll breaks by timeout, it calls rsslReactorDispatch to allow check the channel Ping timeout
2941+ _userLock.lock ();
2942+ reactorRetCode = _pRsslReactor ? rsslReactorDispatch ( _pRsslReactor, &dispatchOpts, &_reactorDispatchErrorInfo ) : RSSL_RET_SUCCESS;
2943+ _userLock.unlock ();
29562944 }
29572945 else if ( selectRetCode < 0 )
29582946 {
@@ -2986,6 +2974,40 @@ Int64 OmmBaseImpl::rsslReactorDispatchLoop( Int64 timeOut, UInt32 count, bool& b
29862974#endif
29872975 return -2 ;
29882976 }
2977+
2978+ // Check the return code of rsslReactorDispatch()
2979+ if ( reactorRetCode < RSSL_RET_SUCCESS )
2980+ {
2981+ if ( OmmLoggerClient::ErrorEnum >= _activeConfig.loggerConfig .minLoggerSeverity )
2982+ {
2983+ EmaString temp ( " Call to rsslReactorDispatch() failed. Internal sysError='" );
2984+ temp.append ( _reactorDispatchErrorInfo.rsslError .sysError )
2985+ .append ( " ' Error Id " ).append ( _reactorDispatchErrorInfo.rsslError .rsslErrorId ).append ( " ' " )
2986+ .append ( " ' Error Location='" ).append ( _reactorDispatchErrorInfo.errorLocation ).append ( " ' " )
2987+ .append ( " ' Error text='" ).append ( _reactorDispatchErrorInfo.rsslError .text ).append ( " '. " );
2988+
2989+ _userLock.lock ();
2990+ if ( _pLoggerClient ) _pLoggerClient->log ( _activeConfig.instanceName , OmmLoggerClient::ErrorEnum, temp );
2991+ _userLock.unlock ();
2992+ }
2993+
2994+ return -2 ;
2995+ }
2996+
2997+ if ( bMsgDispRcvd ) return 0 ;
2998+
2999+ TimeOut::execute ( *this );
3000+
3001+ if ( bMsgDispRcvd ) return 0 ;
3002+
3003+ endTime = GetTime::getMicros ();
3004+
3005+ if ( timeOut >= 0 )
3006+ {
3007+ if ( endTime > startTime + timeOut ) return -1 ;
3008+
3009+ timeOut -= ( endTime - startTime );
3010+ }
29893011 } while ( true );
29903012}
29913013
@@ -3583,3 +3605,12 @@ void OmmBaseImpl::modifyReactorIOCtl(Int32 code, Int32 value)
35833605 _userLock.unlock ();
35843606 return ;
35853607}
3608+
3609+ void OmmBaseImpl::saveNegotiatedPingTimeout (UInt32 timeoutMs)
3610+ {
3611+ if ( timeoutMs > 0 )
3612+ {
3613+ if ( _negotiatedPingTimeout == 0 || timeoutMs < _negotiatedPingTimeout )
3614+ _negotiatedPingTimeout = timeoutMs;
3615+ }
3616+ }
0 commit comments