Skip to content

Fix use of duplicated topic names for different message types in ROS2 #656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Latest

* Fixed use of duplicated topic names for different message types in ROS2

## CARLA-ROS-Bridge 0.9.12

* Fixed scenario runner node shutdown for foxy
Expand Down
73 changes: 59 additions & 14 deletions ros_compatibility/src/ros_compatibility/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@
from ros_compatibility.core import get_ros_version
from ros_compatibility.exceptions import *

ROS_VERSION = get_ros_version()
ROS_VERSION = get_ros_version()

if ROS_VERSION == 1:

import rospy

from ros_compatibility.logging import logdebug, loginfo, logwarn, logerr, logfatal


class CompatibleNode(object):

def __init__(self, name, **kwargs):
pass
pass

def get_param(self, name, alternative_value=None):
if name.startswith('/'):
Expand All @@ -50,13 +49,15 @@ def logfatal(self, msg):

def new_publisher(self, msg_type, topic, qos_profile, callback_group=None):
if isinstance(qos_profile, int):
qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile)
qos_profile = ros_compatibility.qos.QoSProfile(
depth=qos_profile)

return rospy.Publisher(topic, msg_type, queue_size=qos_profile.depth, latch=qos_profile.is_latched())

def new_subscription(self, msg_type, topic, callback, qos_profile, callback_group=None):
if isinstance(qos_profile, int):
qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile)
qos_profile = ros_compatibility.qos.QoSProfile(
depth=qos_profile)

return rospy.Subscriber(topic, msg_type, callback, queue_size=qos_profile.depth)

Expand Down Expand Up @@ -123,16 +124,16 @@ def destroy(self):
ros_compatibility.qos.DurabilityPolicy.VOLATILE: rclpy.qos.DurabilityPolicy.VOLATILE
}


def _get_rclpy_qos_profile(qos_profile):
return rclpy.qos.QoSProfile(
depth=qos_profile.depth,
durability=_DURABILITY_POLICY_MAP[qos_profile.durability]
)


class CompatibleNode(Node):

created_publisher = {}

def __init__(self, name, **kwargs):
param = Parameter("use_sim_time", Parameter.Type.BOOL, True)
super(CompatibleNode, self).__init__(
Expand Down Expand Up @@ -167,18 +168,58 @@ def logerr(self, text):
def logfatal(self, text):
self.get_logger().fatal(text)

def publisher_topic_conflict_resolver(self, msg_type, topic):
index = 1
test_topic = topic
while test_topic in CompatibleNode.created_publisher:
test_topic = topic + "_{}".format(index)
index = index + 1
return test_topic

def new_publisher(self, msg_type, topic, qos_profile, callback_group=None):
if isinstance(qos_profile, int):
qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile)
qos_profile = ros_compatibility.qos.QoSProfile(
depth=qos_profile)
rclpy_qos_profile = _get_rclpy_qos_profile(qos_profile)

publish_counter = 0
if topic in CompatibleNode.created_publisher:
published_msg_type, publish_counter = CompatibleNode.created_publisher[topic]
if published_msg_type != msg_type:
resolved_topic_name = self.publisher_topic_conflict_resolver(
msg_type=msg_type, topic=topic)
if resolved_topic_name == topic:
raise ROSException("Unable to create publisher with type {} for topic {} because another publisher already was created for type {}. Reimplement publisher_topic_conflict_resolver() function to be able to provide an alternative topic name in case of a conflict.".format(
msg_type, topic, published_msg_type))
else:
self.logwarn("Unable to create publisher with type {} for topic {} because another publisher already was created for type {}. Using alternative topic name {} instead.".format(
msg_type, topic, published_msg_type, resolved_topic_name))
topic = resolved_topic_name
publish_counter = 0
publish_counter = publish_counter + 1
CompatibleNode.created_publisher[topic] = (
msg_type, publish_counter)

return self.create_publisher(
msg_type, topic, rclpy_qos_profile,
callback_group=callback_group)

def destroy_publisher(self, publisher):
if publisher.topic in CompatibleNode.created_publisher:
published_msg_type, publish_counter = CompatibleNode.created_publisher[
publisher.topic]
if publish_counter > 1:
publish_counter = publish_counter - 1
CompatibleNode.created_publisher[publisher.topic] = (
published_msg_type, publish_counter)
else:
CompatibleNode.created_publisher.pop(publisher.topic, None)
super(CompatibleNode, self).destroy_publisher(publisher)

def new_subscription(self, msg_type, topic, callback, qos_profile, callback_group=None):
if isinstance(qos_profile, int):
qos_profile = ros_compatibility.qos.QoSProfile(depth=qos_profile)
qos_profile = ros_compatibility.qos.QoSProfile(
depth=qos_profile)
rclpy_qos_profile = _get_rclpy_qos_profile(qos_profile)

return self.create_subscription(
Expand All @@ -196,7 +237,7 @@ def wait_for_message(self, topic, msg_type, timeout=None, qos_profile=1):
"""
Wait for one message from topic.
This will create a new subcription to the topic, receive one message, then unsubscribe.

Do not call this method in a callback or a deadlock may occur.
"""
s = None
Expand All @@ -207,7 +248,8 @@ def wait_for_message(self, topic, msg_type, timeout=None, qos_profile=1):
topic,
lambda msg: future.set_result(msg),
qos_profile=qos_profile)
rclpy.spin_until_future_complete(self, future, self.executor, timeout)
rclpy.spin_until_future_complete(
self, future, self.executor, timeout)
finally:
if s is not None:
self.destroy_subscription(s)
Expand All @@ -223,7 +265,8 @@ def new_client(self, srv_type, srv_name, timeout_sec=None, callback_group=None):
srv_type, srv_name, callback_group=callback_group)
ready = client.wait_for_service(timeout_sec=timeout_sec)
if not ready:
raise ROSException("Timeout of {}sec while waiting for service".format(timeout_sec))
raise ROSException(
"Timeout of {}sec while waiting for service".format(timeout_sec))
return client

def call_service(self, client, req, timeout=None, spin_until_response_received=False):
Expand All @@ -232,7 +275,8 @@ def call_service(self, client, req, timeout=None, spin_until_response_received=F
return response
else:
future = client.call_async(req)
rclpy.spin_until_future_complete(self, future, self.executor, timeout)
rclpy.spin_until_future_complete(
self, future, self.executor, timeout)

if future.done():
return future.result()
Expand All @@ -241,7 +285,8 @@ def call_service(self, client, req, timeout=None, spin_until_response_received=F
raise ServiceException(
'Service did not return a response before timeout {}'.format(timeout))
else:
raise ServiceException('Service did not return a response')
raise ServiceException(
'Service did not return a response')

def spin(self):
rclpy.spin(self, self.executor)
Expand Down