Skip to content

Commit

Permalink
logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Matousek committed Jan 12, 2025
1 parent f47e0bf commit 94636b7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
23 changes: 11 additions & 12 deletions src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,40 +241,39 @@ int TxReceiverHandler::getBatchSize() const

void TxReceiverHandler::on_session_open(session &s) {
sess = s;
std::cout << " [on_session_open] declare_txn started..." << std::endl;
logger(trace) << "[on_session_open] declare_txn started...";
s.declare_transaction(*this);
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
logger(trace) << "[on_session_open] declare_txn ended...";
logger(debug) << "[on_session_open] transaction batch size: " << batch_size;
}

void TxReceiverHandler::on_transaction_declare_failed(transaction) {}

void TxReceiverHandler::on_transaction_commit_failed(transaction t) {
std::cout << "Transaction Commit Failed" << std::endl;
logger(debug) << "[on_transaction_commit_failed] Transaction Commit Failed";
t.connection().close();
exit(-1);
}

void TxReceiverHandler::on_transaction_declared(transaction t) {
std::cout << "[on_transaction_declared] txn called " << (&t)
<< std::endl;
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
<< "\t" << tx.is_empty() << std::endl;
logger(trace) << "[on_transaction_declared] txn called " << (&t);
logger(debug) << "[on_transaction_declared] txn is_empty " << (t.is_empty());
// TODO needed?
// recv.add_credit(batch_size);
tx = t;
}

void TxReceiverHandler::on_transaction_aborted(transaction t) {
confirmed += current_batch;
std::cout << "messages aborted" << std::endl;
logger(debug) << "[on_transaction_aborted] messages aborted";
}

void TxReceiverHandler::on_transaction_committed(transaction t) {
confirmed += current_batch;
current_batch = 0;
std::cout<<" [OnTxnCommitted] Committed:"<< confirmed << std::endl;
logger(trace) << "[on_transaction_committed] Committed:"<< confirmed;
if(confirmed == count) {
std::cout << "All messages committed" << std::endl;
logger(info) << "[on_transaction_committed] All messages committed";
t.connection().close();
}
else {
Expand Down Expand Up @@ -534,11 +533,11 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
// TODO
tx.accept(d);
current_batch += 1;
std::cout<<"# CURRENT VS SIZE: " << current_batch <<": " << batch_size << std::endl;
logger(debug) << "[on_message] current batch: " << current_batch;
if(current_batch == batch_size) {
//tx = transaction(); // null
// TODO: I think we should do a commit here ! rakhi is not doing
std::cout<<"# COMMIT: " << current_batch <<": " << batch_size << std::endl;
logger(debug) << "[on_message] messages commited: " << current_batch;
tx.commit();
//sess.declare_transaction(*this);
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,9 @@ void TxSenderHandler::on_sender_close(sender &s) {

void TxSenderHandler::on_session_open(session &s) {
sess = s;
std::cout << " [on_session_open] declare_txn started..." << std::endl;
logger(trace) << "[on_session_open] declare_txn started...";
s.declare_transaction(*this);
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
logger(trace) << "[on_session_open] declare_txn ended...";
}

void TxSenderHandler::on_container_start(container &c)
Expand Down

0 comments on commit 94636b7

Please sign in to comment.