Skip to content

Commit

Permalink
use rclcpp serialized messages to write data (#457)
Browse files Browse the repository at this point in the history
* rclcpp writer API

Signed-off-by: Karsten Knese <[email protected]>

* rclcpp reader api

Signed-off-by: Karsten Knese <[email protected]>

* Update rosbag2_cpp/include/rosbag2_cpp/writer.hpp

Co-authored-by: Emerson Knapp <[email protected]>

* linters

Signed-off-by: Karsten Knese <[email protected]>

Co-authored-by: Emerson Knapp <[email protected]>
  • Loading branch information
Karsten1987 and emersonknapp authored Mar 24, 2021
1 parent de70a81 commit d09cdaa
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 3 deletions.
6 changes: 5 additions & 1 deletion rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ endif()
find_package(ament_cmake REQUIRED)
find_package(ament_index_cpp REQUIRED)
find_package(pluginlib REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rmw REQUIRED)
Expand Down Expand Up @@ -65,6 +66,7 @@ add_library(${PROJECT_NAME} SHARED
ament_target_dependencies(${PROJECT_NAME}
ament_index_cpp
pluginlib
rclcpp
rcpputils
rcutils
rmw
Expand Down Expand Up @@ -103,7 +105,9 @@ install(
ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_targets(export_${PROJECT_NAME})
ament_export_dependencies(pluginlib
ament_export_dependencies(
pluginlib
rclcpp
rmw
rmw_implementation
rosbag2_storage
Expand Down
25 changes: 25 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <string>
#include <vector>

#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"

#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
Expand Down Expand Up @@ -110,6 +113,28 @@ class ROSBAG2_CPP_PUBLIC Reader final
*/
std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next();

/**
* Read next message from storage. Will throw if no more messages are available.
* The message will be serialized in the format given to `open`.
*
* Expected usage:
* if (writer.has_next()) message = writer.read_next();
*
* \return next message in non-serialized form
* \throws runtime_error if the Reader is not open.
*/
template<class MessageT>
MessageT read_next()
{
MessageT msg;
auto bag_message = read_next();
rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data);
rclcpp::Serialization<MessageT> serialization;
serialization.deserialize_message(&extracted_serialized_msg, &msg);

return msg;
}

/**
* Ask bagfile for its full metadata.
*
Expand Down
45 changes: 44 additions & 1 deletion rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include <unordered_map>
#include <vector>

#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
Expand Down Expand Up @@ -112,7 +116,7 @@ class ROSBAG2_CPP_PUBLIC Writer final

/**
* Write a message to a bagfile.
* Opposing a call to \sa write without topic metadata, the topic will be created.
* The topic will be created if it has not been created already.
*
* \param message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
Expand All @@ -126,6 +130,45 @@ class ROSBAG2_CPP_PUBLIC Writer final
const std::string & type_name,
const std::string & serialization_format = "cdr");

/**
* Write a serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
*/
void write(
const rclcpp::SerializedMessage & message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \param message MessageT The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
*/
template<class MessageT>
void write(
const MessageT & message,
const std::string & topic_name,
const rclcpp::Time & time)
{
rclcpp::SerializedMessage serialized_msg;

rclcpp::Serialization<MessageT> serialization;
serialization.serialize_message(&message, &serialized_msg);
return write(serialized_msg, topic_name, rosidl_generator_traits::name<MessageT>(), time);
}

writer_interfaces::BaseWriterInterface & get_implementation_handle() const
{
return *writer_impl_;
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<depend>ament_index_cpp</depend>
<depend>pluginlib</depend>
<depend>rclcpp</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>
<depend>rmw</depend>
Expand Down
25 changes: 25 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
#include <string>
#include <utility>

#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

#include "rosbag2_cpp/info.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"

#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_storage/topic_metadata.hpp"

#include "rmw/rmw.h"

namespace rosbag2_cpp
{

Expand Down Expand Up @@ -97,4 +102,24 @@ void Writer::write(
write(message);
}

void Writer::write(
const rclcpp::SerializedMessage & message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();

// temporary store the payload in a shared_ptr.
// add custom no-op deleter to avoid deep copying data.
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});

return write(
serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

} // namespace rosbag2_cpp
25 changes: 24 additions & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string>
#include <vector>

#include "rclcpp/clock.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"

Expand Down Expand Up @@ -74,6 +75,14 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)
bag_message->topic_name = "/my/other/topic";
writer.write(bag_message, "/my/other/topic", "test_msgs/msg/BasicTypes");

// yet another alternative, writing the rclcpp::SerializedMessage directly
writer.write(
serialized_msg, "/yet/another/topic", "test_msgs/msg/BasicTypes",
rclcpp::Clock().now());

// writing a non-serialized message
writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now());

// close on scope exit
}

Expand All @@ -92,9 +101,23 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)

EXPECT_EQ(test_msg, extracted_test_msg);
}
ASSERT_EQ(2u, topics.size());
ASSERT_EQ(4u, topics.size());
EXPECT_EQ("/my/test/topic", topics[0]);
EXPECT_EQ("/my/other/topic", topics[1]);
EXPECT_EQ("/yet/another/topic", topics[2]);
EXPECT_EQ("/a/ros2/message", topics[3]);

// close on scope exit
}

// alternative reader
{
rosbag2_cpp::Reader reader;
reader.open(rosbag_directory.string());
while (reader.has_next()) {
TestMsgT extracted_test_msg = reader.read_next<TestMsgT>();
EXPECT_EQ(test_msg, extracted_test_msg);
}

// close on scope exit
}
Expand Down

0 comments on commit d09cdaa

Please sign in to comment.