Skip to content

Add best_effort_qos_send_buffer_limit to support bandwidth-constrained usage #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Parameters are provided to configure the behavior of the bridge. These parameter
* __topic_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of whitelisted topic names. Defaults to `[".*"]`.
* __service_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of whitelisted service names. Defaults to `[".*"]`.
* __param_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of whitelisted parameter names. Defaults to `[".*"]`.
* __client_topic_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of whitelisted client-published topic names. Defaults to `[".*"]`.
* __client_topic_whitelist__: List of regular expressions ([ECMAScript grammar](https://en.cppreference.com/w/cpp/regex/ecmascript)) of whitelisted client-published topic names. Defaults to `[".*"]`.
* __send_buffer_limit__: Connection send buffer limit in bytes. Messages will be dropped when a connection's send buffer reaches this limit to avoid a queue of outdated messages building up. Defaults to `10000000` (10 MB).
* __use_compression__: Use websocket compression (permessage-deflate). It is recommended to leave this turned off as it increases CPU usage and per-message compression often yields low compression ratios for robotics data. Defaults to `false`.
* __capabilities__: List of supported [server capabilities](https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md). Defaults to `[clientPublish,parameters,parametersSubscribe,services,connectionGraph,assets]`.
Expand All @@ -85,9 +85,13 @@ Parameters are provided to configure the behavior of the bridge. These parameter
* (ROS 2) __min_qos_depth__: Minimum depth used for the QoS profile of subscriptions. Defaults to `1`. This is to set a lower limit for a subscriber's QoS depth which is computed by summing up depths of all publishers. See also [#208](https://github.com/foxglove/ros-foxglove-bridge/issues/208).
* (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `25`.
* (ROS 2) __best_effort_qos_topic_whitelist__: List of regular expressions (ECMAScript) for topics that should be forced to use 'best_effort' QoS. Unmatched topics will use 'reliable' QoS if ALL publishers are 'reliable', 'best_effort' if any publishers are 'best_effort'. Defaults to `["(?!)"]` (match nothing).
* (ROS 2) __best_effort_qos_send_buffer_limit__: Connection send buffer limit in bytes for `best_effort` messages. Defaults to `10000000` (10 MB).
* (ROS 2) __include_hidden__: Include hidden topics and services. Defaults to `false`.
* (ROS 2) __disable_load_message__: Do not publish as loaned message when publishing a client message. Defaults to `true`.

#### Running in a bandwidth-constrained environment
When running the foxglove bridge in a bandwidth constrained environment, messages may queue up in the `send_buffer` resulting in dropped messages. In order to control which messages are dropped, `best_effort_qos_topic_whitelist` can be used to force some topics to be "best_effort" rather than reliable. This can be used in conjunction with a high `send_buffer_limit` (for reliable messages) and a lower `best_effort_qos_send_buffer_limit` to ensure `best_effort` messages are dropped first.

## Building from source

### Fetch source and install dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ServerOptions {
std::vector<std::string> supportedEncodings;
std::unordered_map<std::string, std::string> metadata;
size_t sendBufferLimitBytes = DEFAULT_SEND_BUFFER_LIMIT_BYTES;
size_t bestEffortQosSendBufferLimitBytes = DEFAULT_SEND_BUFFER_LIMIT_BYTES;
bool useTls = false;
std::string certfile = "";
std::string keyfile = "";
Expand Down Expand Up @@ -95,7 +96,7 @@ class ServerInterface {
virtual void setHandlers(ServerHandlers<ConnectionHandle>&& handlers) = 0;

virtual void sendMessage(ConnectionHandle clientHandle, ChannelId chanId, uint64_t timestamp,
const uint8_t* payload, size_t payloadSize) = 0;
const uint8_t* payload, size_t payloadSize, bool bestEffort) = 0;
virtual void broadcastTime(uint64_t timestamp) = 0;
virtual void sendServiceResponse(ConnectionHandle clientHandle,
const ServiceResponse& response) = 0;
Expand Down
31 changes: 23 additions & 8 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class Server final : public ServerInterface<ConnHandle> {
void setHandlers(ServerHandlers<ConnHandle>&& handlers) override;

void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
const uint8_t* payload, size_t payloadSize) override;
const uint8_t* payload, size_t payloadSize, bool bestEffort) override;
void broadcastTime(uint64_t timestamp) override;
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override;
void sendServiceFailure(ConnHandle clientHandle, ServiceId serviceId, uint32_t callId,
Expand Down Expand Up @@ -957,20 +957,35 @@ inline void Server<ServerConfiguration>::removeServices(const std::vector<Servic
template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendMessage(ConnHandle clientHandle, ChannelId chanId,
uint64_t timestamp, const uint8_t* payload,
size_t payloadSize) {
size_t payloadSize, bool bestEffort) {
websocketpp::lib::error_code ec;
const auto con = _server.get_con_from_hdl(clientHandle, ec);
if (ec || !con) {
return;
}

const auto bufferSizeinBytes = con->get_buffered_amount();
if (bufferSizeinBytes + payloadSize >= _options.sendBufferLimitBytes) {
const auto logFn = [this, clientHandle]() {
sendStatusAndLogMsg(clientHandle, StatusLevel::Warning, "Send buffer limit reached");
};
FOXGLOVE_DEBOUNCE(logFn, 2500);
return;
// There are 2 different buffer limits to consider, one for "best_effort" and one for "reliable"
// topics

if (bestEffort) {
if (bufferSizeinBytes >= _options.bestEffortQosSendBufferLimitBytes) {
const auto logFn = [this, clientHandle]() {
sendStatusAndLogMsg(clientHandle, StatusLevel::Warning,
"'Best Effort' send buffer limit reached");
};
FOXGLOVE_DEBOUNCE(logFn, 2500);
return;
}
} else {
if (bufferSizeinBytes >= _options.sendBufferLimitBytes) {
const auto logFn = [this, clientHandle]() {
sendStatusAndLogMsg(clientHandle, StatusLevel::Warning,
"'Reliable' send buffer limit reached");
};
FOXGLOVE_DEBOUNCE(logFn, 2500);
return;
}
}

SubscriptionId subId = std::numeric_limits<SubscriptionId>::max();
Expand Down
2 changes: 1 addition & 1 deletion ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
const ros::MessageEvent<ros_babel_fish::BabelFishMessage const>& msgEvent) {
const auto& msg = msgEvent.getConstMessage();
const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec();
_server->sendMessage(clientHandle, channelId, receiptTimeNs, msg->buffer(), msg->size());
_server->sendMessage(clientHandle, channelId, receiptTimeNs, msg->buffer(), msg->size(), false);
}

void serviceRequest(const foxglove::ServiceRequest& request, ConnectionHandle clientHandle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ constexpr char PARAM_KEYFILE[] = "keyfile";
constexpr char PARAM_MIN_QOS_DEPTH[] = "min_qos_depth";
constexpr char PARAM_MAX_QOS_DEPTH[] = "max_qos_depth";
constexpr char PARAM_BEST_EFFORT_QOS_TOPIC_WHITELIST[] = "best_effort_qos_topic_whitelist";
constexpr char PARAM_BEST_EFFORT_QOS_SEND_BUFFER_LIMIT[] = "best_effort_qos_send_buffer_limit";
constexpr char PARAM_TOPIC_WHITELIST[] = "topic_whitelist";
constexpr char PARAM_SERVICE_WHITELIST[] = "service_whitelist";
constexpr char PARAM_PARAMETER_WHITELIST[] = "param_whitelist";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class FoxgloveBridge : public rclcpp::Node {
void logHandler(LogLevel level, char const* msg);

void rosMessageHandler(const foxglove::ChannelId& channelId, ConnectionHandle clientHandle,
std::shared_ptr<const rclcpp::SerializedMessage> msg);
std::shared_ptr<const rclcpp::SerializedMessage> msg, rclcpp::QoS qos);

void serviceRequest(const foxglove::ServiceRequest& request, ConnectionHandle clientHandle);

Expand Down
13 changes: 13 additions & 0 deletions ros2_foxglove_bridge/src/param_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ void declareParameters(rclcpp::Node* node) {
node->declare_parameter(PARAM_BEST_EFFORT_QOS_TOPIC_WHITELIST, std::vector<std::string>({"(?!)"}),
bestEffortQosTopicWhiteListDescription);

auto bestEffortQosSendBufferLimit = rcl_interfaces::msg::ParameterDescriptor{};
bestEffortQosSendBufferLimit.name = PARAM_BEST_EFFORT_QOS_SEND_BUFFER_LIMIT;
bestEffortQosSendBufferLimit.type = rcl_interfaces::msg::ParameterType::PARAMETER_INTEGER;
bestEffortQosSendBufferLimit.description =
"Connection send buffer limit in bytes for 'best_effort' messages";
bestEffortQosSendBufferLimit.read_only = true;
bestEffortQosSendBufferLimit.integer_range.resize(1);
bestEffortQosSendBufferLimit.integer_range[0].from_value = 0;
bestEffortQosSendBufferLimit.integer_range[0].to_value = std::numeric_limits<int64_t>::max();
bestEffortQosSendBufferLimit.read_only = true;
node->declare_parameter(PARAM_BEST_EFFORT_QOS_SEND_BUFFER_LIMIT, DEFAULT_SEND_BUFFER_LIMIT,
bestEffortQosSendBufferLimit);

auto topicWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{};
topicWhiteListDescription.name = PARAM_TOPIC_WHITELIST;
topicWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY;
Expand Down
17 changes: 11 additions & 6 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options)

const auto port = static_cast<uint16_t>(this->get_parameter(PARAM_PORT).as_int());
const auto address = this->get_parameter(PARAM_ADDRESS).as_string();
const auto send_buffer_limit =
const auto sendBufferLimit =
static_cast<size_t>(this->get_parameter(PARAM_SEND_BUFFER_LIMIT).as_int());
const auto useTLS = this->get_parameter(PARAM_USETLS).as_bool();
const auto certfile = this->get_parameter(PARAM_CERTFILE).as_string();
Expand All @@ -39,6 +39,8 @@ FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options)
const auto bestEffortQosTopicWhiteList =
this->get_parameter(PARAM_BEST_EFFORT_QOS_TOPIC_WHITELIST).as_string_array();
_bestEffortQosTopicWhiteListPatterns = parseRegexStrings(this, bestEffortQosTopicWhiteList);
const auto bestEffortQosSendBufferLimit =
static_cast<size_t>(this->get_parameter(PARAM_BEST_EFFORT_QOS_SEND_BUFFER_LIMIT).as_int());
const auto topicWhiteList = this->get_parameter(PARAM_TOPIC_WHITELIST).as_string_array();
_topicWhitelistPatterns = parseRegexStrings(this, topicWhiteList);
const auto serviceWhiteList = this->get_parameter(PARAM_SERVICE_WHITELIST).as_string_array();
Expand Down Expand Up @@ -67,7 +69,8 @@ FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options)
}
serverOptions.supportedEncodings = {"cdr", "json"};
serverOptions.metadata = {{"ROS_DISTRO", rosDistro}};
serverOptions.sendBufferLimitBytes = send_buffer_limit;
serverOptions.sendBufferLimitBytes = sendBufferLimit;
serverOptions.bestEffortQosSendBufferLimitBytes = bestEffortQosSendBufferLimit;
serverOptions.sessionId = std::to_string(std::time(nullptr));
serverOptions.useCompression = useCompression;
serverOptions.useTls = useTLS;
Expand Down Expand Up @@ -543,8 +546,8 @@ void FoxgloveBridge::subscribe(foxglove::ChannelId channelId, ConnectionHandle c
try {
auto subscriber = this->create_generic_subscription(
topic, datatype, qos,
[this, channelId, clientHandle](std::shared_ptr<const rclcpp::SerializedMessage> msg) {
this->rosMessageHandler(channelId, clientHandle, msg);
[this, channelId, clientHandle, qos](std::shared_ptr<const rclcpp::SerializedMessage> msg) {
this->rosMessageHandler(channelId, clientHandle, msg, qos);
},
subscriptionOptions);
subscriptionsByClient.emplace(clientHandle, std::move(subscriber));
Expand Down Expand Up @@ -860,14 +863,16 @@ void FoxgloveBridge::logHandler(LogLevel level, char const* msg) {

void FoxgloveBridge::rosMessageHandler(const foxglove::ChannelId& channelId,
ConnectionHandle clientHandle,
std::shared_ptr<const rclcpp::SerializedMessage> msg) {
std::shared_ptr<const rclcpp::SerializedMessage> msg,
rclcpp::QoS qos) {
// NOTE: Do not call any RCLCPP_* logging functions from this function. Otherwise, subscribing
// to `/rosout` will cause a feedback loop
const auto timestamp = this->now().nanoseconds();
assert(timestamp >= 0 && "Timestamp is negative");
const auto rclSerializedMsg = msg->get_rcl_serialized_message();
auto bestEffort = qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort;
_server->sendMessage(clientHandle, channelId, static_cast<uint64_t>(timestamp),
rclSerializedMsg.buffer, rclSerializedMsg.buffer_length);
rclSerializedMsg.buffer, rclSerializedMsg.buffer_length, bestEffort);
}

void FoxgloveBridge::serviceRequest(const foxglove::ServiceRequest& request,
Expand Down