From 83237112e22cfc69230dbc59827e80bc4f5eddc2 Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Thu, 6 Oct 2022 15:29:13 +0800 Subject: [PATCH 1/2] Rework timeout logic to try to address the problems described in #5 * If the input connection is forced to Radarcape mode, then Radarcape status messages are expected immediately; beast-splitter does not wait to see a status message before starting the timer that expects at least one status message per 15 seconds. * New option, --beast-input-timeout, sets a timeout for non-Radarcape connections (in seconds). If no valid message is received within that timeout, beast-splitter disconnects/reconnects. + some internal cleanups --- beast_input.cc | 55 +++++++++++++++++++++++++++++++++---------- beast_input.h | 11 +++++++-- beast_input_net.cc | 2 +- beast_input_net.h | 5 ++-- beast_input_serial.cc | 3 ++- beast_input_serial.h | 4 ++-- splitter_main.cc | 22 +++++++++++++---- 7 files changed, 76 insertions(+), 26 deletions(-) diff --git a/beast_input.cc b/beast_input.cc index eea311d..a6f92bf 100644 --- a/beast_input.cc +++ b/beast_input.cc @@ -35,7 +35,8 @@ using namespace beast; enum class BeastInput::ParserState { RESYNC, READ_1A, READ_TYPE, READ_DATA, READ_ESCAPED_1A }; -BeastInput::BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_) : receiver_type(ReceiverType::UNKNOWN), fixed_settings(fixed_settings_), filter(filter_), receiving_gps_timestamps(false), autodetect_timer(service_), reconnect_timer(service_), liveness_timer(service_), good_sync(false), good_messages_count(0), bad_bytes_count(0), first_message(true), state(ParserState::RESYNC) {} +BeastInput::BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_) + : receiver_type(ReceiverType::UNKNOWN), fixed_settings(fixed_settings_), filter(filter_), receiving_gps_timestamps(false), autodetect_timer(service_), reconnect_timer(service_), rc_liveness_timer(service_), beast_liveness_interval(beast_liveness_interval_), beast_liveness_timer(service_), good_sync(false), good_messages_count(0), bad_bytes_count(0), first_message(true), state(ParserState::RESYNC) {} void BeastInput::start() { try_to_connect(); } @@ -56,16 +57,19 @@ void BeastInput::connection_established() { current_settings = Settings(); autodetect_timer.cancel(); - if (fixed_settings.radarcape.on()) + if (fixed_settings.radarcape.on()) { receiver_type = ReceiverType::RADARCAPE; - else if (fixed_settings.radarcape.off()) + reset_rc_liveness(); + } else if (fixed_settings.radarcape.off()) { receiver_type = ReceiverType::BEAST; - else { + reset_beast_liveness(); + } else { receiver_type = ReceiverType::UNKNOWN; autodetect_timer.expires_from_now(radarcape_detect_interval); autodetect_timer.async_wait([this, self](const boost::system::error_code &ec) { if (!ec) { receiver_type = ReceiverType::BEAST; + reset_beast_liveness(); send_settings_message(); } }); @@ -77,6 +81,8 @@ void BeastInput::connection_established() { void BeastInput::connection_failed() { good_sync = false; autodetect_timer.cancel(); + rc_liveness_timer.cancel(); + beast_liveness_timer.cancel(); // schedule reconnect. auto self(shared_from_this()); @@ -262,6 +268,33 @@ void BeastInput::lost_sync() { state = ParserState::RESYNC; } +void BeastInput::reset_rc_liveness() { + auto self(shared_from_this()); + rc_liveness_timer.expires_from_now(radarcape_liveness_interval); + rc_liveness_timer.async_wait([this, self](const boost::system::error_code &ec) { + if (!ec) { + std::cerr << what() << ": no recent status messages received, reconnecting" << std::endl; + disconnect(); + connection_failed(); + } + }); +} + +void BeastInput::reset_beast_liveness() { + if (beast_liveness_interval.count() == 0) + return; + + auto self(shared_from_this()); + beast_liveness_timer.expires_from_now(beast_liveness_interval); + beast_liveness_timer.async_wait([this, self](const boost::system::error_code &ec) { + if (!ec) { + std::cerr << what() << ": no recent data received, reconnecting" << std::endl; + disconnect(); + connection_failed(); + } + }); +} + void BeastInput::dispatch_message() { // monitor status messages for GPS timestamp bit // and for radarcape autodetection @@ -270,18 +303,11 @@ void BeastInput::dispatch_message() { if (receiver_type != ReceiverType::RADARCAPE) { receiver_type = ReceiverType::RADARCAPE; autodetect_timer.cancel(); + beast_liveness_timer.cancel(); send_settings_message(); // for the g/G setting } - auto self(shared_from_this()); - liveness_timer.expires_from_now(radarcape_liveness_interval); - liveness_timer.async_wait([this, self](const boost::system::error_code &ec) { - if (!ec) { - std::cerr << what() << ": no recent status messages received" << std::endl; - disconnect(); - connection_failed(); - } - }); + reset_rc_liveness(); } if (!can_dispatch()) @@ -292,6 +318,9 @@ void BeastInput::dispatch_message() { std::cerr << what() << ": connected to a " << (receiver_type == ReceiverType::RADARCAPE ? "Radarcape" : "Beast") << "-style receiver" << std::endl; } + if (receiver_type == ReceiverType::BEAST) + reset_beast_liveness(); + if (!message_notifier) return; diff --git a/beast_input.h b/beast_input.h index fe8f215..7410ed2 100644 --- a/beast_input.h +++ b/beast_input.h @@ -95,7 +95,7 @@ namespace beast { protected: // construct a new input instance - BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_); + BeastInput(boost::asio::io_service &service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_); virtual ~BeastInput() {} @@ -106,6 +106,9 @@ namespace beast { unsigned good_messages() const { return good_messages_count; } unsigned bad_bytes() const { return bad_bytes_count; } + void reset_rc_liveness(); + void reset_beast_liveness(); + virtual void saw_good_message(void); virtual bool can_dispatch(void) const; @@ -145,7 +148,11 @@ namespace beast { boost::asio::steady_timer reconnect_timer; // timer that expires after radarcape_liveness_interval - boost::asio::steady_timer liveness_timer; + boost::asio::steady_timer rc_liveness_timer; + + // timer that expires after beast_liveness_interval + std::chrono::milliseconds beast_liveness_interval; + boost::asio::steady_timer beast_liveness_timer; // are we currently in sync? bool good_sync; diff --git a/beast_input_net.cc b/beast_input_net.cc index 244c9d7..ce380d4 100644 --- a/beast_input_net.cc +++ b/beast_input_net.cc @@ -37,7 +37,7 @@ using namespace beast; using boost::asio::ip::tcp; -NetInput::NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_) : BeastInput(service_, fixed_settings_, filter_), host(host_), port_or_service(port_or_service_), resolver(service_), socket(service_), reconnect_timer(service_), readbuf(std::make_shared(read_buffer_size)), warned_about_framing(false) {} +NetInput::NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_) : BeastInput(service_, fixed_settings_, filter_, beast_liveness_interval_), host(host_), port_or_service(port_or_service_), resolver(service_), socket(service_), readbuf(std::make_shared(read_buffer_size)), warned_about_framing(false) {} std::string NetInput::what() const { return std::string("net(") + host + std::string(":") + port_or_service + std::string(")"); } diff --git a/beast_input_net.h b/beast_input_net.h index d525d02..c021664 100644 --- a/beast_input_net.h +++ b/beast_input_net.h @@ -41,7 +41,7 @@ namespace beast { const size_t read_buffer_size = 4096; // factory method - static pointer create(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, const Settings &fixed_settings = Settings(), const modes::Filter &filter = modes::Filter()) { return pointer(new NetInput(service, host, port_or_service, fixed_settings, filter)); } + static pointer create(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, const Settings &fixed_settings, const modes::Filter &filter, std::chrono::milliseconds beast_liveness_interval) { return pointer(new NetInput(service, host, port_or_service, fixed_settings, filter, beast_liveness_interval)); } protected: std::string what() const override; @@ -51,7 +51,7 @@ namespace beast { private: // construct a new net input instance, don't start yet - NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_); + NetInput(boost::asio::io_service &service_, const std::string &host_, const std::string &port_or_service_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_); void resolve_and_connect(const boost::system::error_code &ec = boost::system::error_code()); void try_next_endpoint(); @@ -65,7 +65,6 @@ namespace beast { boost::asio::ip::tcp::resolver resolver; boost::asio::ip::tcp::socket socket; - boost::asio::steady_timer reconnect_timer; boost::asio::ip::tcp::resolver::iterator next_endpoint; // cached buffer used for reads diff --git a/beast_input_serial.cc b/beast_input_serial.cc index da36669..fc6a72b 100644 --- a/beast_input_serial.cc +++ b/beast_input_serial.cc @@ -37,7 +37,8 @@ using namespace beast; -SerialInput::SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate_, const Settings &fixed_settings_, const modes::Filter &filter_) : BeastInput(service_, fixed_settings_, filter_), path(path_), port(service_), autobaud_interval(autobaud_base_interval), autobaud_timer(service_), read_timer(service_), readbuf(std::make_shared(read_buffer_size)), warned_about_rate(false) { +SerialInput::SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate_, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_) + : BeastInput(service_, fixed_settings_, filter_, beast_liveness_interval_), path(path_), port(service_), autobaud_interval(autobaud_base_interval), autobaud_timer(service_), read_timer(service_), readbuf(std::make_shared(read_buffer_size)), warned_about_rate(false) { // set up autobaud if (fixed_baud_rate_ == 0) { autobauding = true; diff --git a/beast_input_serial.h b/beast_input_serial.h index 0106c94..0e64fcc 100644 --- a/beast_input_serial.h +++ b/beast_input_serial.h @@ -59,7 +59,7 @@ namespace beast { const std::chrono::milliseconds read_interval = std::chrono::milliseconds(50); // factory method - static pointer create(boost::asio::io_service &service, const std::string &path, unsigned int fixed_baud_rate = 0, const Settings &fixed_settings = Settings(), const modes::Filter &filter = modes::Filter()) { return pointer(new SerialInput(service, path, fixed_baud_rate, fixed_settings, filter)); } + static pointer create(boost::asio::io_service &service, const std::string &path, unsigned int fixed_baud_rate, const Settings &fixed_settings, const modes::Filter &filter, std::chrono::milliseconds beast_liveness_interval) { return pointer(new SerialInput(service, path, fixed_baud_rate, fixed_settings, filter, beast_liveness_interval)); } protected: std::string what() const override; @@ -72,7 +72,7 @@ namespace beast { private: // construct a new serial input instance, don't start yet - SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate, const Settings &fixed_settings_, const modes::Filter &filter_); + SerialInput(boost::asio::io_service &service_, const std::string &path_, unsigned int fixed_baud_rate, const Settings &fixed_settings_, const modes::Filter &filter_, std::chrono::milliseconds beast_liveness_interval_); void start_reading(const boost::system::error_code &ec = boost::system::error_code()); void advance_autobaud(void); diff --git a/splitter_main.cc b/splitter_main.cc index a4ec657..10108d9 100644 --- a/splitter_main.cc +++ b/splitter_main.cc @@ -127,8 +127,18 @@ static int realmain(int argc, char **argv) { modes::FilterDistributor distributor; po::options_description desc("Allowed options"); - desc.add_options()("help", "produce help message")("serial", po::value(), "read from given serial device")("net", po::value(), "read from given network host:port")("status-file", po::value(), "set path to status file")("fixed-baud", po::value()->default_value(0), "set a fixed baud rate, or 0 for autobauding")("listen", po::value>(), "specify a [host:]port[:settings] to listen on")( - "connect", po::value>(), "specify a host:port[:settings] to connect to")("force", po::value()->default_value(beast::Settings()), "specify settings to force on or off when configuring the Beast"); + // clang-format off + desc.add_options() + ("help", "produce help message") + ("serial", po::value(), "read from given serial device") + ("net", po::value(), "read from given network host:port") + ("status-file", po::value(), "set path to status file") + ("fixed-baud", po::value()->default_value(0), "set a fixed baud rate, or 0 for autobauding") + ("listen", po::value>(), "specify a [host:]port[:settings] to listen on") + ("connect", po::value>(), "specify a host:port[:settings] to connect to") + ("force", po::value()->default_value(beast::Settings()), "specify settings to force on or off when configuring the Beast") + ("beast-input-timeout", po::value()->default_value(0), "input timeout in non-Radarcape mode, in floating-point seconds (zero disables)"); + // clang-format on po::variables_map opts; @@ -152,12 +162,16 @@ static int realmain(int argc, char **argv) { return EXIT_NO_RESTART; } + beast::Settings force_settings = opts["force"].as(); + std::chrono::duration beast_input_timeout_seconds(opts["beast-input-timeout"].as()); + std::chrono::milliseconds beast_input_timeout = std::chrono::duration_cast(beast_input_timeout_seconds); + beast::BeastInput::pointer input; if (opts.count("serial")) { - input = beast::SerialInput::create(io_service, opts["serial"].as(), opts["fixed-baud"].as(), opts["force"].as()); + input = beast::SerialInput::create(io_service, opts["serial"].as(), opts["fixed-baud"].as(), force_settings, modes::Filter(), beast_input_timeout); } else if (opts.count("net")) { auto net = opts["net"].as(); - input = beast::NetInput::create(io_service, net.host, net.port, opts["force"].as()); + input = beast::NetInput::create(io_service, net.host, net.port, force_settings, modes::Filter(), beast_input_timeout); } else { std::cerr << "A --serial or --net argument is needed" << std::endl; std::cerr << desc << std::endl; From 87bb0a36d215c64b5ad91702dcfcaf575dc4ead6 Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Thu, 6 Oct 2022 15:35:49 +0800 Subject: [PATCH 2/2] Add a note about --beast-input-timeout to the README --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 821d7a9..320300f 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,15 @@ host:port to connect to via the --net option. You can use this to chain beast-splitter instances together: specify --listen on the beast-splitter closer to the Beast, and --net on the other beast-splitter. +## Input side - Timeouts + +If the --beast-input-timeout option is set, then a Beast-Classic connection +is expected to produce at least one valid message per timeout interval. If it +does not, the connection is considered bad and disconnected/reconnected. This +is useful for non-local network connections to avoid beast-splitter waiting +indefinitely for data on a connection that has silently disconnected on the +remote side. + ## Configuring Beast settings beast-splitter will, by default, autodetect the capabilities of the Beast and