@@ -265,7 +265,7 @@ void TxReceiverHandler::on_transaction_declared(transaction t) {
265
265
266
266
void TxReceiverHandler::on_transaction_aborted (transaction t) {
267
267
confirmed += current_batch;
268
- logger (debug) << " [on_transaction_aborted] messages aborted" ;
268
+ logger (debug) << " [on_transaction_aborted] messages aborted, confirmed: " << confirmed ;
269
269
if (confirmed == count) {
270
270
logger (info) << " [on_transaction_committed] All messages proccessed" ;
271
271
t.connection ().close ();
@@ -278,7 +278,7 @@ void TxReceiverHandler::on_transaction_aborted(transaction t) {
278
278
void TxReceiverHandler::on_transaction_committed (transaction t) {
279
279
confirmed += current_batch;
280
280
current_batch = 0 ;
281
- logger (trace ) << " [on_transaction_committed] Committed: " << confirmed;
281
+ logger (debug ) << " [on_transaction_aborted] messages committed, confirmed: " << confirmed;
282
282
if (confirmed == count) {
283
283
logger (info) << " [on_transaction_committed] All messages proccessed" ;
284
284
t.connection ().close ();
@@ -290,21 +290,24 @@ void TxReceiverHandler::on_transaction_committed(transaction t) {
290
290
291
291
void TxReceiverHandler::on_container_start (container &c)
292
292
{
293
- logger (debug) << " Starting messaging handler" ;
293
+ logger (debug) << " [on_container_start] Starting messaging transaction handler" ;
294
+ logger (debug) << " [on_container_start] User: " << user;
295
+ logger (debug) << " [on_container_start] Password: " << password;
296
+ logger (debug) << " [on_container_start] SASL mechanisms: " << sasl_mechanisms;
297
+ logger (debug) << " [on_container_start] SASL enabled: " << conn_sasl_enabled;
298
+ logger (debug) << " [on_container_start] Maximum frame size: " << max_frame_size;
299
+ logger (debug) << " [on_container_start] Topic: " << is_topic;
300
+ logger (debug) << " [on_container_start] Transaction batch size: " << batch_size;
301
+ logger (debug) << " [on_container_start] Transaction action: " << tx_action;
302
+ logger (debug) << " [on_container_start] Transaction endloop action: " << tx_endloop_action;
303
+ logger (trace) << " [on_container_start] Messages count: " << count;
304
+ logger (debug) << " [on_container_start] Messages confirmed: " << confirmed;
305
+ logger (debug) << " [on_container_start] Peer to Peer: " << recv_listen;
294
306
295
307
if (recv_listen == " true" ) {
296
308
cont = &c;
297
309
}
298
310
299
- logger (debug) << " User: " << user;
300
- logger (debug) << " Password: " << password;
301
- logger (debug) << " SASL mechanisms: " << sasl_mechanisms;
302
- logger (debug) << " SASL enabled: " << conn_sasl_enabled;
303
-
304
- logger (debug) << " Maximum frame size: " << max_frame_size;
305
-
306
- logger (debug) << " Topic: " << is_topic;
307
-
308
311
connection_options conn_opts;
309
312
std::vector< ::proton::symbol > caps;
310
313
@@ -321,7 +324,7 @@ void TxReceiverHandler::on_container_start(container &c)
321
324
}
322
325
}
323
326
324
- logger (debug) << " Source capabilities: " ;
327
+ logger (debug) << " [on_container_start] Source capabilities: " ;
325
328
for (std::vector< ::proton::symbol >::const_iterator i = caps.begin (); i != caps.end (); ++i) {
326
329
logger (debug) << *i;
327
330
}
@@ -340,24 +343,24 @@ void TxReceiverHandler::on_container_start(container &c)
340
343
// conn_opts.max_frame_size(max_frame_size);
341
344
conn_opts.failover_urls (conn_urls);
342
345
343
- logger (debug) << " Setting a reconnect timer: " << conn_reconnect;
344
- logger (debug) << " Custom reconnect: " << conn_reconnect_custom;
346
+ logger (debug) << " [on_container_start] Setting a reconnect timer: " << conn_reconnect;
347
+ logger (debug) << " [on_container_start] Custom reconnect: " << conn_reconnect_custom;
345
348
346
349
configure_reconnect (conn_opts);
347
350
configure_ssl (c);
348
351
349
352
if (conn_heartbeat != 0 ) {
350
- logger (debug) << " Heartbeat: " << conn_heartbeat;
353
+ logger (debug) << " [on_container_start] Heartbeat: " << conn_heartbeat;
351
354
352
355
duration heartbeat_seconds = conn_heartbeat * duration::SECOND;
353
356
354
357
conn_opts.idle_timeout (heartbeat_seconds);
355
358
}
356
359
357
- logger (debug) << " Browsing: " << browse;
360
+ logger (debug) << " [on_container_start] Browsing: " << browse;
358
361
359
362
if (browse) {
360
- logger (debug) << " Creating a receiver and connecting to the server" ;
363
+ logger (debug) << " [on_container_start] Creating a receiver and connecting to the server" ;
361
364
362
365
source_options s_opts = source_options ()
363
366
.distribution_mode (source::COPY)
@@ -398,18 +401,18 @@ void TxReceiverHandler::on_container_start(container &c)
398
401
399
402
work_q = &recv.work_queue ();
400
403
} else {
401
- logger (debug) << " Peer-to-peer: " << recv_listen;
402
- logger (debug) << " Peer-to-peer port: " << recv_listen_port;
404
+ logger (debug) << " [on_container_start] Peer-to-peer: " << recv_listen;
405
+ logger (debug) << " [on_container_start] Peer-to-peer port: " << recv_listen_port;
403
406
404
407
if (recv_listen == " true" ) {
405
- logger (debug) << " Creating a listener" ;
408
+ logger (debug) << " [on_container_start] Creating a listener" ;
406
409
// P2P
407
410
stringstream ss;
408
411
ss << " 0.0.0.0:" ;
409
412
ss << recv_listen_port;
410
413
lsnr = c.listen (ss.str (), conn_opts);
411
414
} else {
412
- logger (debug) << " Creating a receiver and connecting to the server" ;
415
+ logger (debug) << " [on_container_start] Creating a receiver and connecting to the server" ;
413
416
414
417
source_options s_opts = source_options ().filters (this ->fm ).capabilities (caps);
415
418
@@ -448,7 +451,7 @@ void TxReceiverHandler::on_container_start(container &c)
448
451
work_q = &recv.work_queue ();
449
452
}
450
453
}
451
- logger (debug) << " Connected to the broker/p2p and waiting for messages" ;
454
+ logger (debug) << " [on_container_start] Connected to the broker/p2p and waiting for messages" ;
452
455
453
456
if (subscriber_unsubscribe && durable_subscriber_name != " " ) {
454
457
recv.close ();
@@ -471,10 +474,10 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
471
474
// TODO useless now ?? usew confirmed ?
472
475
msg_received_cnt += 1 ;
473
476
474
- logger (debug) << " Processing received message" ;
477
+ logger (debug) << " [on_message] Processing received message" ;
475
478
476
479
if (log_msgs == " dict" ) {
477
- logger (trace) << " Decoding message" ;
480
+ logger (trace) << " [on_message] Decoding message" ;
478
481
ReactorDecoder decoder = ReactorDecoder (m);
479
482
480
483
std::ostringstream stream;
@@ -492,7 +495,7 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
492
495
}
493
496
494
497
if (duration_time > 0 && duration_mode == " after-receive" ) {
495
- logger (debug) << " Waiting..." ;
498
+ logger (debug) << " [on_message] Waiting..." ;
496
499
sleep4next (ts, count, duration_time, msg_received_cnt);
497
500
}
498
501
@@ -508,20 +511,20 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
508
511
// TODO: not implemented yet
509
512
}
510
513
511
- logger (debug) << " Process-reply-to: " << process_reply_to;
514
+ logger (debug) << " [on_message] Process-reply-to: " << process_reply_to;
512
515
513
516
if (process_reply_to) {
514
517
if (m.reply_to () != " " ) {
515
- logger (debug) << " Reply-to address: " << m.reply_to ();
518
+ logger (debug) << " [on_message] Reply-to address: " << m.reply_to ();
516
519
517
520
do_process_reply_to (m);
518
521
} else {
519
- logger (debug) << " Reply-to address is not set" ;
522
+ logger (debug) << " [on_message] Reply-to address is not set" ;
520
523
}
521
524
}
522
525
523
526
if (recv_drain_after_credit_window && msg_received_cnt == recv_credit_window) {
524
- logger (debug) << " Scheduling drain" ;
527
+ logger (debug) << " [on_message] Scheduling drain" ;
525
528
d.receiver ().work_queue ().add (make_work (&TxReceiverHandler::drain, this ));
526
529
}
527
530
@@ -564,22 +567,22 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
564
567
}
565
568
566
569
void TxReceiverHandler::on_receiver_drain_finish (receiver &r) {
567
- logger (debug) << " Receiver drain finished" ;
570
+ logger (debug) << " [on_receiver_drain_finish] Receiver drain finished" ;
568
571
}
569
572
570
573
void TxReceiverHandler::on_tracker_accept (tracker &t)
571
574
{
572
- logger (debug) << " Delivery accepted" ;
575
+ logger (debug) << " [on_tracker_accept] Delivery accepted" ;
573
576
}
574
577
575
578
576
579
void TxReceiverHandler::on_tracker_reject (tracker &t)
577
580
{
578
- logger (debug) << " Delivery rejected" ;
581
+ logger (debug) << " [on_tracker_reject] Delivery rejected" ;
579
582
}
580
583
581
584
void TxReceiverHandler::on_transport_close (transport &t) {
582
- logger (debug) << " Closing the transport" ;
585
+ logger (debug) << " [on_transport_close] Closing the transport" ;
583
586
584
587
if (conn_reconnect == " false" ) {
585
588
exit (1 );
@@ -589,7 +592,8 @@ void TxReceiverHandler::on_transport_close(transport &t) {
589
592
}
590
593
591
594
void TxReceiverHandler::on_transport_error (transport &t) {
592
- logger (error) << " The connection with " << broker_url.getHost () << " :" << broker_url.getPort () << " was interrupted: " << t.error ().what ();
595
+ logger (error) << " [on_transport_error] The connection with " << broker_url.getHost () << " :"
596
+ << broker_url.getPort () << " was interrupted: " << t.error ().what ();
593
597
594
598
if (t.error ().what ().find (" unauthorized" ) != string::npos) {
595
599
exit (1 );
@@ -598,20 +602,16 @@ void TxReceiverHandler::on_transport_error(transport &t) {
598
602
599
603
void TxReceiverHandler::on_connection_close (connection &conn)
600
604
{
601
- logger (debug) << " Disconnecting ..." ;
605
+ logger (debug) << " [on_connection_close] Disconnecting ..." ;
602
606
}
603
607
604
608
void TxReceiverHandler::on_connection_error (connection &c)
605
609
{
606
- logger (error) << " Failed to connect to " << broker_url.getHost () << " :" << broker_url.getPort ();
610
+ logger (error) << " [on_connection_error] Failed to connect to " << broker_url.getHost () << " :" << broker_url.getPort ();
607
611
608
612
if (c.error ().what ().find (" Unable to validate user" ) != string::npos) {
609
613
exit (1 );
610
614
}
611
615
}
612
616
613
- void TxReceiverHandler::do_disconnect ()
614
- {
615
-
616
- }
617
-
617
+ void TxReceiverHandler::do_disconnect () {}
0 commit comments