Skip to content

Commit

Permalink
wip: initial version refactoring #7
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Matousek committed Dec 2, 2024
1 parent 7233c5c commit e0651a0
Showing 1 changed file with 59 additions and 60 deletions.
119 changes: 59 additions & 60 deletions src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,46 +380,45 @@ void TxSenderHandler::on_container_start(container &c)
for (std::vector< ::proton::symbol >::const_iterator i = caps.begin(); i != caps.end(); ++i) {
logger(debug) << *i;
}
// connection_options conn_opts;
//
// if (!user.empty()) conn_opts.user(user);
// if (!password.empty()) conn_opts.password(password);
//
// if (conn_sasl_enabled == "false") {
// conn_opts.sasl_enabled(false);
// } else {
// conn_opts.sasl_enabled(true);
// }
//
// conn_opts.sasl_allow_insecure_mechs(true);
// conn_opts.sasl_allowed_mechs(sasl_mechanisms);
// // conn_opts.max_frame_size(max_frame_size);
// conn_opts.failover_urls(conn_urls);
//
// logger(debug) << "Setting a reconnect timer: " << conn_reconnect;
// logger(debug) << "Custom reconnect: " << conn_reconnect_custom;
//
// configure_reconnect(conn_opts);
// configure_ssl(c);
//
// if (conn_heartbeat != 0) {
// logger(debug) << "Heartbeat: " << conn_heartbeat;
//
// duration heartbeat_seconds = conn_heartbeat * duration::SECOND;
//
// conn_opts.idle_timeout(heartbeat_seconds);
// }
//
// logger(debug) << "Creating a sender";
//
// connection conn;
// if (conn_use_config_file) {
// conn = c.connect();
// } else {
// conn = c.connect(broker_url.getUri(), conn_opts);
// }

connection conn = c.connect(broker_url.getUri());
connection_options conn_opts;

if (!user.empty()) conn_opts.user(user);
if (!password.empty()) conn_opts.password(password);

if (conn_sasl_enabled == "false") {
conn_opts.sasl_enabled(false);
} else {
conn_opts.sasl_enabled(true);
}

conn_opts.sasl_allow_insecure_mechs(true);
conn_opts.sasl_allowed_mechs(sasl_mechanisms);
// conn_opts.max_frame_size(max_frame_size);
conn_opts.failover_urls(conn_urls);

logger(debug) << "Setting a reconnect timer: " << conn_reconnect;
logger(debug) << "Custom reconnect: " << conn_reconnect_custom;

configure_reconnect(conn_opts);
configure_ssl(c);

if (conn_heartbeat != 0) {
logger(debug) << "Heartbeat: " << conn_heartbeat;

duration heartbeat_seconds = conn_heartbeat * duration::SECOND;

conn_opts.idle_timeout(heartbeat_seconds);
}

logger(debug) << "Creating a sender";

connection conn;
if (conn_use_config_file) {
conn = c.connect();
} else {
conn = c.connect(broker_url.getUri(), conn_opts);
}

sndr = conn.open_sender(
broker_url.getPath(),
c.sender_options()
Expand All @@ -428,26 +427,26 @@ void TxSenderHandler::on_container_start(container &c)
)
);

// work_q = &sndr.work_queue();
//
// logger(trace) << "Setting up timer";
//
// if (duration_time > 0 && count > 0) {
// interval = duration((duration_time * duration::SECOND) / count);
//
// logger(trace) << "Interval for duration: " << interval.milliseconds() << " ms";
// }
//#if defined(__REACTOR_HAS_TIMER)
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this));
//
// if (duration_time > 0 && duration_mode == "after-send") {
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
// } else if (duration_time > 0 && duration_mode == "before-send") {
// work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
// } else {
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
// }
//#endif
work_q = &sndr.work_queue();

logger(trace) << "Setting up timer";

if (duration_time > 0 && count > 0) {
interval = duration((duration_time * duration::SECOND) / count);

logger(trace) << "Interval for duration: " << interval.milliseconds() << " ms";
}
#if defined(__REACTOR_HAS_TIMER)
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this));

if (duration_time > 0 && duration_mode == "after-send") {
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
} else if (duration_time > 0 && duration_mode == "before-send") {
work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
} else {
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
}
#endif

tx = transaction();
logger(debug) << "[on_container_start] declare_txn started...";
Expand Down

0 comments on commit e0651a0

Please sign in to comment.