Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework timeout logic to try to address the problems described in #5 #6

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ host:port to connect to via the --net option. You can use this to chain
beast-splitter instances together: specify --listen on the beast-splitter closer
to the Beast, and --net on the other beast-splitter.

## Input side - Timeouts

If the --beast-input-timeout option is set, then a Beast-Classic connection
is expected to produce at least one valid message per timeout interval. If it
does not, the connection is considered bad and disconnected/reconnected. This
is useful for non-local network connections to avoid beast-splitter waiting
indefinitely for data on a connection that has silently disconnected on the
remote side.

## Configuring Beast settings

beast-splitter will, by default, autodetect the capabilities of the Beast and
Expand Down
55 changes: 42 additions & 13 deletions beast_input.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ using namespace beast;

enum class BeastInput::ParserState { RESYNC, READ_1A, READ_TYPE, READ_DATA, READ_ESCAPED_1A };

BeastInput::BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_) : receiver_type(ReceiverType::UNKNOWN), fixed_settings(fixed_settings_), filter(filter_), receiving_gps_timestamps(false), autodetect_timer(service_), reconnect_timer(service_), liveness_timer(service_), good_sync(false), good_messages_count(0), bad_bytes_count(0), first_message(true), state(ParserState::RESYNC) {}
BeastInput::BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_)
: receiver_type(ReceiverType::UNKNOWN), fixed_settings(fixed_settings_), filter(filter_), receiving_gps_timestamps(false), autodetect_timer(service_), reconnect_timer(service_), rc_liveness_timer(service_), beast_liveness_interval(beast_liveness_interval_), beast_liveness_timer(service_), good_sync(false), good_messages_count(0), bad_bytes_count(0), first_message(true), state(ParserState::RESYNC) {}

void BeastInput::start() { try_to_connect(); }

Expand All @@ -56,16 +57,19 @@ void BeastInput::connection_established() {
current_settings = Settings();

autodetect_timer.cancel();
if (fixed_settings.radarcape.on())
if (fixed_settings.radarcape.on()) {
receiver_type = ReceiverType::RADARCAPE;
else if (fixed_settings.radarcape.off())
reset_rc_liveness();
} else if (fixed_settings.radarcape.off()) {
receiver_type = ReceiverType::BEAST;
else {
reset_beast_liveness();
} else {
receiver_type = ReceiverType::UNKNOWN;
autodetect_timer.expires_from_now(radarcape_detect_interval);
autodetect_timer.async_wait([this, self](const boost::system::error_code &ec) {
if (!ec) {
receiver_type = ReceiverType::BEAST;
reset_beast_liveness();
send_settings_message();
}
});
Expand All @@ -77,6 +81,8 @@ void BeastInput::connection_established() {
void BeastInput::connection_failed() {
good_sync = false;
autodetect_timer.cancel();
rc_liveness_timer.cancel();
beast_liveness_timer.cancel();

// schedule reconnect.
auto self(shared_from_this());
Expand Down Expand Up @@ -262,6 +268,33 @@ void BeastInput::lost_sync() {
state = ParserState::RESYNC;
}

void BeastInput::reset_rc_liveness() {
auto self(shared_from_this());
rc_liveness_timer.expires_from_now(radarcape_liveness_interval);
rc_liveness_timer.async_wait([this, self](const boost::system::error_code &ec) {
if (!ec) {
std::cerr << what() << ": no recent status messages received, reconnecting" << std::endl;
disconnect();
connection_failed();
}
});
}

void BeastInput::reset_beast_liveness() {
if (beast_liveness_interval.count() == 0)
return;

auto self(shared_from_this());
beast_liveness_timer.expires_from_now(beast_liveness_interval);
beast_liveness_timer.async_wait([this, self](const boost::system::error_code &ec) {
if (!ec) {
std::cerr << what() << ": no recent data received, reconnecting" << std::endl;
disconnect();
connection_failed();
}
});
}

void BeastInput::dispatch_message() {
// monitor status messages for GPS timestamp bit
// and for radarcape autodetection
Expand All @@ -270,18 +303,11 @@ void BeastInput::dispatch_message() {
if (receiver_type != ReceiverType::RADARCAPE) {
receiver_type = ReceiverType::RADARCAPE;
autodetect_timer.cancel();
beast_liveness_timer.cancel();
send_settings_message(); // for the g/G setting
}

auto self(shared_from_this());
liveness_timer.expires_from_now(radarcape_liveness_interval);
liveness_timer.async_wait([this, self](const boost::system::error_code &ec) {
if (!ec) {
std::cerr << what() << ": no recent status messages received" << std::endl;
disconnect();
connection_failed();
}
});
reset_rc_liveness();
}

if (!can_dispatch())
Expand All @@ -292,6 +318,9 @@ void BeastInput::dispatch_message() {
std::cerr << what() << ": connected to a " << (receiver_type == ReceiverType::RADARCAPE ? "Radarcape" : "Beast") << "-style receiver" << std::endl;
}

if (receiver_type == ReceiverType::BEAST)
reset_beast_liveness();

if (!message_notifier)
return;

Expand Down
11 changes: 9 additions & 2 deletions beast_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ namespace beast {

protected:
// construct a new input instance
BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_);
BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_);

virtual ~BeastInput() {}

Expand All @@ -106,6 +106,9 @@ namespace beast {
unsigned good_messages() const { return good_messages_count; }
unsigned bad_bytes() const { return bad_bytes_count; }

void reset_rc_liveness();
void reset_beast_liveness();

virtual void saw_good_message(void);
virtual bool can_dispatch(void) const;

Expand Down Expand Up @@ -145,7 +148,11 @@ namespace beast {
boost::asio::steady_timer reconnect_timer;

// timer that expires after radarcape_liveness_interval
boost::asio::steady_timer liveness_timer;
boost::asio::steady_timer rc_liveness_timer;

// timer that expires after beast_liveness_interval
std::chrono::milliseconds beast_liveness_interval;
boost::asio::steady_timer beast_liveness_timer;

// are we currently in sync?
bool good_sync;
Expand Down
2 changes: 1 addition & 1 deletion beast_input_net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
using namespace beast;
using boost::asio::ip::tcp;

NetInput::NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_) : BeastInput(service_, fixed_settings_, filter_), host(host_), port_or_service(port_or_service_), resolver(service_), socket(service_), reconnect_timer(service_), readbuf(std::make_shared<helpers::bytebuf>(read_buffer_size)), warned_about_framing(false) {}
NetInput::NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_) : BeastInput(service_, fixed_settings_, filter_, beast_liveness_interval_), host(host_), port_or_service(port_or_service_), resolver(service_), socket(service_), readbuf(std::make_shared<helpers::bytebuf>(read_buffer_size)), warned_about_framing(false) {}

std::string NetInput::what() const { return std::string("net(") + host + std::string(":") + port_or_service + std::string(")"); }

Expand Down
5 changes: 2 additions & 3 deletions beast_input_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace beast {
const size_t read_buffer_size = 4096;

// factory method
static pointer create(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, const Settings &fixed_settings = Settings(), const modes::Filter &filter = modes::Filter()) { return pointer(new NetInput(service, host, port_or_service, fixed_settings, filter)); }
static pointer create(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, const Settings &fixed_settings, const modes::Filter &filter, std::chrono::milliseconds beast_liveness_interval) { return pointer(new NetInput(service, host, port_or_service, fixed_settings, filter, beast_liveness_interval)); }

protected:
std::string what() const override;
Expand All @@ -51,7 +51,7 @@ namespace beast {

private:
// construct a new net input instance, don't start yet
NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_);
NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_);

void resolve_and_connect(const boost::system::error_code &ec = boost::system::error_code());
void try_next_endpoint();
Expand All @@ -65,7 +65,6 @@ namespace beast {

boost::asio::ip::tcp::resolver resolver;
boost::asio::ip::tcp::socket socket;
boost::asio::steady_timer reconnect_timer;
boost::asio::ip::tcp::resolver::iterator next_endpoint;

// cached buffer used for reads
Expand Down
3 changes: 2 additions & 1 deletion beast_input_serial.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@

using namespace beast;

SerialInput::SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate_, const Settings &fixed_settings_, const modes::Filter &filter_) : BeastInput(service_, fixed_settings_, filter_), path(path_), port(service_), autobaud_interval(autobaud_base_interval), autobaud_timer(service_), read_timer(service_), readbuf(std::make_shared<helpers::bytebuf>(read_buffer_size)), warned_about_rate(false) {
SerialInput::SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_)
: BeastInput(service_, fixed_settings_, filter_, beast_liveness_interval_), path(path_), port(service_), autobaud_interval(autobaud_base_interval), autobaud_timer(service_), read_timer(service_), readbuf(std::make_shared<helpers::bytebuf>(read_buffer_size)), warned_about_rate(false) {
// set up autobaud
if (fixed_baud_rate_ == 0) {
autobauding = true;
Expand Down
4 changes: 2 additions & 2 deletions beast_input_serial.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace beast {
const std::chrono::milliseconds read_interval = std::chrono::milliseconds(50);

// factory method
static pointer create(boost::asio::io_service &service, const std::string &path, unsigned int fixed_baud_rate = 0, const Settings &fixed_settings = Settings(), const modes::Filter &filter = modes::Filter()) { return pointer(new SerialInput(service, path, fixed_baud_rate, fixed_settings, filter)); }
static pointer create(boost::asio::io_service &service, const std::string &path, unsigned int fixed_baud_rate, const Settings &fixed_settings, const modes::Filter &filter, std::chrono::milliseconds beast_liveness_interval) { return pointer(new SerialInput(service, path, fixed_baud_rate, fixed_settings, filter, beast_liveness_interval)); }

protected:
std::string what() const override;
Expand All @@ -72,7 +72,7 @@ namespace beast {

private:
// construct a new serial input instance, don't start yet
SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate, const Settings &fixed_settings_, const modes::Filter &filter_);
SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_);

void start_reading(const boost::system::error_code &ec = boost::system::error_code());
void advance_autobaud(void);
Expand Down
22 changes: 18 additions & 4 deletions splitter_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,18 @@ static int realmain(int argc, char **argv) {
modes::FilterDistributor distributor;

po::options_description desc("Allowed options");
desc.add_options()("help", "produce help message")("serial", po::value<std::string>(), "read from given serial device")("net", po::value<net_option>(), "read from given network host:port")("status-file", po::value<std::string>(), "set path to status file")("fixed-baud", po::value<unsigned>()->default_value(0), "set a fixed baud rate, or 0 for autobauding")("listen", po::value<std::vector<listen_option>>(), "specify a [host:]port[:settings] to listen on")(
"connect", po::value<std::vector<connect_option>>(), "specify a host:port[:settings] to connect to")("force", po::value<beast::Settings>()->default_value(beast::Settings()), "specify settings to force on or off when configuring the Beast");
// clang-format off
desc.add_options()
("help", "produce help message")
("serial", po::value<std::string>(), "read from given serial device")
("net", po::value<net_option>(), "read from given network host:port")
("status-file", po::value<std::string>(), "set path to status file")
("fixed-baud", po::value<unsigned>()->default_value(0), "set a fixed baud rate, or 0 for autobauding")
("listen", po::value<std::vector<listen_option>>(), "specify a [host:]port[:settings] to listen on")
("connect", po::value<std::vector<connect_option>>(), "specify a host:port[:settings] to connect to")
("force", po::value<beast::Settings>()->default_value(beast::Settings()), "specify settings to force on or off when configuring the Beast")
("beast-input-timeout", po::value<float>()->default_value(0), "input timeout in non-Radarcape mode, in floating-point seconds (zero disables)");
// clang-format on

po::variables_map opts;

Expand All @@ -152,12 +162,16 @@ static int realmain(int argc, char **argv) {
return EXIT_NO_RESTART;
}

beast::Settings force_settings = opts["force"].as<beast::Settings>();
std::chrono::duration<float> beast_input_timeout_seconds(opts["beast-input-timeout"].as<float>());
std::chrono::milliseconds beast_input_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(beast_input_timeout_seconds);

beast::BeastInput::pointer input;
if (opts.count("serial")) {
input = beast::SerialInput::create(io_service, opts["serial"].as<std::string>(), opts["fixed-baud"].as<unsigned>(), opts["force"].as<beast::Settings>());
input = beast::SerialInput::create(io_service, opts["serial"].as<std::string>(), opts["fixed-baud"].as<unsigned>(), force_settings, modes::Filter(), beast_input_timeout);
} else if (opts.count("net")) {
auto net = opts["net"].as<net_option>();
input = beast::NetInput::create(io_service, net.host, net.port, opts["force"].as<beast::Settings>());
input = beast::NetInput::create(io_service, net.host, net.port, force_settings, modes::Filter(), beast_input_timeout);
} else {
std::cerr << "A --serial or --net argument is needed" << std::endl;
std::cerr << desc << std::endl;
Expand Down