diff --git a/CHANGELOG.md b/CHANGELOG.md index 145615f7..f014f963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## Latest +* Fixed use of duplicated topic names for different message types in ROS2 + ## CARLA-ROS-Bridge 0.9.12 * Fixed scenario runner node shutdown for foxy diff --git a/ros_compatibility/src/ros_compatibility/node.py b/ros_compatibility/src/ros_compatibility/node.py index f80e8536..24d44c5c 100644 --- a/ros_compatibility/src/ros_compatibility/node.py +++ b/ros_compatibility/src/ros_compatibility/node.py @@ -11,7 +11,7 @@ from ros_compatibility.core import get_ros_version from ros_compatibility.exceptions import * -ROS_VERSION = get_ros_version() +ROS_VERSION = get_ros_version() if ROS_VERSION == 1: @@ -19,11 +19,10 @@ from ros_compatibility.logging import logdebug, loginfo, logwarn, logerr, logfatal - class CompatibleNode(object): def __init__(self, name, **kwargs): - pass + pass def get_param(self, name, alternative_value=None): if name.startswith('/'): @@ -50,13 +49,15 @@ def logfatal(self, msg): def new_publisher(self, msg_type, topic, qos_profile, callback_group=None): if isinstance(qos_profile, int): - qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile) + qos_profile = ros_compatibility.qos.QoSProfile( + depth=qos_profile) return rospy.Publisher(topic, msg_type, queue_size=qos_profile.depth, latch=qos_profile.is_latched()) def new_subscription(self, msg_type, topic, callback, qos_profile, callback_group=None): if isinstance(qos_profile, int): - qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile) + qos_profile = ros_compatibility.qos.QoSProfile( + depth=qos_profile) return rospy.Subscriber(topic, msg_type, callback, queue_size=qos_profile.depth) @@ -123,16 +124,16 @@ def destroy(self): ros_compatibility.qos.DurabilityPolicy.VOLATILE: rclpy.qos.DurabilityPolicy.VOLATILE } - def _get_rclpy_qos_profile(qos_profile): return rclpy.qos.QoSProfile( depth=qos_profile.depth, durability=_DURABILITY_POLICY_MAP[qos_profile.durability] ) - class CompatibleNode(Node): + created_publisher = {} + def __init__(self, name, **kwargs): param = Parameter("use_sim_time", Parameter.Type.BOOL, True) super(CompatibleNode, self).__init__( @@ -167,18 +168,58 @@ def logerr(self, text): def logfatal(self, text): self.get_logger().fatal(text) + def publisher_topic_conflict_resolver(self, msg_type, topic): + index = 1 + test_topic = topic + while test_topic in CompatibleNode.created_publisher: + test_topic = topic + "_{}".format(index) + index = index + 1 + return test_topic + def new_publisher(self, msg_type, topic, qos_profile, callback_group=None): if isinstance(qos_profile, int): - qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile) + qos_profile = ros_compatibility.qos.QoSProfile( + depth=qos_profile) rclpy_qos_profile = _get_rclpy_qos_profile(qos_profile) + publish_counter = 0 + if topic in CompatibleNode.created_publisher: + published_msg_type, publish_counter = CompatibleNode.created_publisher[topic] + if published_msg_type != msg_type: + resolved_topic_name = self.publisher_topic_conflict_resolver( + msg_type=msg_type, topic=topic) + if resolved_topic_name == topic: + raise ROSException("Unable to create publisher with type {} for topic {} because another publisher already was created for type {}. Reimplement publisher_topic_conflict_resolver() function to be able to provide an alternative topic name in case of a conflict.".format( + msg_type, topic, published_msg_type)) + else: + self.logwarn("Unable to create publisher with type {} for topic {} because another publisher already was created for type {}. Using alternative topic name {} instead.".format( + msg_type, topic, published_msg_type, resolved_topic_name)) + topic = resolved_topic_name + publish_counter = 0 + publish_counter = publish_counter + 1 + CompatibleNode.created_publisher[topic] = ( + msg_type, publish_counter) + return self.create_publisher( msg_type, topic, rclpy_qos_profile, callback_group=callback_group) + def destroy_publisher(self, publisher): + if publisher.topic in CompatibleNode.created_publisher: + published_msg_type, publish_counter = CompatibleNode.created_publisher[ + publisher.topic] + if publish_counter > 1: + publish_counter = publish_counter - 1 + CompatibleNode.created_publisher[publisher.topic] = ( + published_msg_type, publish_counter) + else: + CompatibleNode.created_publisher.pop(publisher.topic, None) + super(CompatibleNode, self).destroy_publisher(publisher) + def new_subscription(self, msg_type, topic, callback, qos_profile, callback_group=None): if isinstance(qos_profile, int): - qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile) + qos_profile = ros_compatibility.qos.QoSProfile( + depth=qos_profile) rclpy_qos_profile = _get_rclpy_qos_profile(qos_profile) return self.create_subscription( @@ -196,7 +237,7 @@ def wait_for_message(self, topic, msg_type, timeout=None, qos_profile=1): """ Wait for one message from topic. This will create a new subcription to the topic, receive one message, then unsubscribe. - + Do not call this method in a callback or a deadlock may occur. """ s = None @@ -207,7 +248,8 @@ def wait_for_message(self, topic, msg_type, timeout=None, qos_profile=1): topic, lambda msg: future.set_result(msg), qos_profile=qos_profile) - rclpy.spin_until_future_complete(self, future, self.executor, timeout) + rclpy.spin_until_future_complete( + self, future, self.executor, timeout) finally: if s is not None: self.destroy_subscription(s) @@ -223,7 +265,8 @@ def new_client(self, srv_type, srv_name, timeout_sec=None, callback_group=None): srv_type, srv_name, callback_group=callback_group) ready = client.wait_for_service(timeout_sec=timeout_sec) if not ready: - raise ROSException("Timeout of {}sec while waiting for service".format(timeout_sec)) + raise ROSException( + "Timeout of {}sec while waiting for service".format(timeout_sec)) return client def call_service(self, client, req, timeout=None, spin_until_response_received=False): @@ -232,7 +275,8 @@ def call_service(self, client, req, timeout=None, spin_until_response_received=F return response else: future = client.call_async(req) - rclpy.spin_until_future_complete(self, future, self.executor, timeout) + rclpy.spin_until_future_complete( + self, future, self.executor, timeout) if future.done(): return future.result() @@ -241,7 +285,8 @@ def call_service(self, client, req, timeout=None, spin_until_response_received=F raise ServiceException( 'Service did not return a response before timeout {}'.format(timeout)) else: - raise ServiceException('Service did not return a response') + raise ServiceException( + 'Service did not return a response') def spin(self): rclpy.spin(self, self.executor)