diff --git a/examples/ros2/subscriber/CMakeLists.txt b/examples/ros2/subscriber/CMakeLists.txt index 2663257..dc0f224 100644 --- a/examples/ros2/subscriber/CMakeLists.txt +++ b/examples/ros2/subscriber/CMakeLists.txt @@ -41,3 +41,24 @@ install( TARGETS rt_ros2_subscriber_complex_data DESTINATION lib/${PROJECT_NAME} ) + +add_executable(rt_ros2_subscriber_queued_data + queued_data.cc +) + +target_link_libraries(rt_ros2_subscriber_queued_data + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_subscriber_queued_data) + +ament_target_dependencies(rt_ros2_subscriber_queued_data + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_subscriber_queued_data + DESTINATION lib/${PROJECT_NAME} +) diff --git a/examples/ros2/subscriber/queued_data.cc b/examples/ros2/subscriber/queued_data.cc new file mode 100644 index 0000000..ff450dc --- /dev/null +++ b/examples/ros2/subscriber/queued_data.cc @@ -0,0 +1,73 @@ +#include +#include + +#include +#include +#include + +using RealtimeType = std_msgs::msg::Int64; +using RosType = std_msgs::msg::Int64; + +class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t run_duration_; + + // We force turn off the trivial data check, because the ROS message data type + // has a defined constructor with code in it. That said, the code really is + // pretty trivial so it is safe to use in real-time. We thus disable the trivial + // type check manually. + std::shared_ptr> subscription_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + subscription_ = ros2_adapter_->CreateSubscriptionQueued("/cactus_rt/simple", rclcpp::QoS(100)); + } + + protected: + LoopControl Loop(int64_t elapsed_ns) noexcept override { + while (true) { + auto msg = subscription_->Read(); + if (!msg) { + break; + } + + LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg->id, msg->value.data); + } + + return elapsed_ns > run_duration_ ? LoopControl::Stop : LoopControl::Continue; + } +}; + +int main(int argc, const char* argv[]) { + cactus_rt::ros2::App app(argc, argv, "QueuedDataROS2Subscriber"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + + app.Start(); + + std::thread t([&app, &time]() { + std::this_thread::sleep_for(time); + app.RequestStop(); + }); + t.detach(); + + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 3c5f443..da9c572 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -78,6 +78,19 @@ class Ros2Adapter { return subscriber; } + template + std::shared_ptr> CreateSubscriptionQueued( + const std::string& topic_name, + const rclcpp::QoS& qos, + size_t rt_queue_size = 1000 + ) { + auto subscriber = SubscriptionQueued::Create(logger_, *this->ros_node_, topic_name, qos, rt_queue_size); + + const std::scoped_lock lock(mut_); + subscriptions_.push_back(subscriber); + return subscriber; + } + private: void TimerCallback(); void DrainQueues(); diff --git a/include/cactus_rt/ros2/subscription.h b/include/cactus_rt/ros2/subscription.h index a501e51..67bd856 100644 --- a/include/cactus_rt/ros2/subscription.h +++ b/include/cactus_rt/ros2/subscription.h @@ -1,7 +1,10 @@ #ifndef CACTUS_RT_ROS2_SUBSCRIPTION_H_ #define CACTUS_RT_ROS2_SUBSCRIPTION_H_ +#include + #include +#include #include #include @@ -85,6 +88,73 @@ class SubscriptionLatest : public ISubscription { return latest_value_.Read(); } }; + +template +class SubscriptionQueued : public ISubscription { + friend class Ros2Adapter; + + static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v, "RealtimeT must be a trivial object to be real-time safe"); + + using NoConversion = std::is_same; + using AdaptedRosType = typename std::conditional_t>; + + using Queue = moodycamel::ReaderWriterQueue>; + + quill::Logger* logger_; + typename rclcpp::Subscription::SharedPtr ros_subscription_; + int64_t current_msg_id_ = 0; + Queue queue_; + + void SubscriptionCallback(const RealtimeT& rt_msg) { + current_msg_id_++; // Useful to detect message has changed by the real-time thread. + + // A copy directly into the queue. + // TODO: warn that we are losing data? + queue_.try_emplace(current_msg_id_, rt_msg); + } + + static std::shared_ptr> Create( + quill::Logger* logger, + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos, + const size_t rt_queue_size = 1000 + ) { + std::shared_ptr> subscription( + new SubscriptionQueued( + logger, + moodycamel::ReaderWriterQueue>(rt_queue_size) + ) + ); + + subscription->ros_subscription_ = node.create_subscription( + topic_name, + qos, + [subscription](const RealtimeT& rt_msg) { + // TODO: we are capturing the subscription shared_ptr by value. Will this cause a memory leak? + subscription->SubscriptionCallback(rt_msg); + } + ); + + return subscription; + } + + SubscriptionQueued( + quill::Logger* logger, + moodycamel::ReaderWriterQueue>&& queue + ) : logger_(logger), queue_(std::move(queue)) {} + + public: + std::optional> Read() noexcept { + StampedValue v; + if (queue_.try_dequeue(v)) { + return v; + } + + return std::nullopt; + } +}; + } // namespace cactus_rt::ros2 #endif