Skip to content

ROS1 Py Interfaces

Francesco Ganci edited this page Jun 28, 2022 · 5 revisions

Remarks

⚠️ work in progress ⚠️

⚠️ check the tabulation! Sometimes the snippets here contain mixed spaces and tabs. The standard tabulation method I use is tab. ⚠️

Node Header

We're using the following header here:

import rospy

NODE_NAME = ???str???
DEV_MODE = False

LOGSQUARE = lambda mstr : f"[{mstr}] "
OUTLABEL = LOGSQUARE( NODE_NAME )
TLOG  = lambda m : rospy.loginfo( OUTLABEL + str( m ) )
TWARN = lambda m : rospy.logwarn( OUTLABEL + "WARNING: " + str( m ) )
TERR  = lambda m : rospy.logerr( OUTLABEL + "ERROR" + str( m ) )
TDEV  = lambda m : rospy.logdebug( OUTLABEL + "ERROR" + str( m ) ) if DEV_MODE

Naming Conventions

⚠️ work in progress ⚠️

Naming Conventions for nodes:

  • NODE_NAME = "???"
  • classnode_???

Naming Conventions for topics:

  • TOPIC_???
  • Q_SZ_???
  • RATE_??? (only for continuous publishers)
  • pub_???
  • sub_???

Naming Conventions for Services:

  • ...

Naming Conventions for Actions:

  • ...

📧 ROS1 python topics

Global definitions

Suppose you want to use the message pkg::msgt inside the package pkg. The topic is simply /top.

from pkg.msg import msgt 
# msg type: msgt

TOPIC_TOP = "/top"
Q_SZ_TOP = 1000
RATE_TOP = 5

Open a Publisher

Some remarks:

  • the main call is pub_top = rospy.Publisher( TOPIC_???, ???msg type??? , queue_size=???int??? ) which takes as input the topic name, its type (i.e. String, Twist, etc.) and the queue_size (i.e. 1000).
# define globally
pub_top = None

# in the IF main statement
TLOG( "Creating publisher " + LOGSQUARE( TOPIC_TOP ) + " ... " )
pub_top = rospy.Publisher( TOPIC_TOP, msgt , queue_size=Q_SZ_TOP )
TLOG( "Creating publisher " + LOGSQUARE( TOPIC_TOP ) + " ... OK!" )

Use a publisher

the main call is pub_top.publish( msg ) which simply publishes the message through the topic.

m = tmsg( )

// fill in the message
// m.field = value

pub_top.publish( m );

Continuous publisher at a given frequency

pubrate_top = rospy.Rate( RATE_TOP )
count_top = 0
while not rospy.is_shutdown( ):
    # prepare the messages
    # m = msgt( )
    # m.field = value
    
    # publish the message and count 
    pub_top.publish( m )
    count_top = count_top + 1
    
    # wait until the net cycle
    pubrate_top.sleep( )

open a subscriber

# define globally
sub_top = None
# the last received message
last_msg_top = msgt( )

# define the callback
def cbk_top( data ): # data : msgt ???fields list???
	global last_msg_top
	
	# save the last message
	last_msg_top = data
	
	pass # TODO

# in IF main, define the interface
TLOG( "Subscribing to the topic " + LOGSQUARE( TOPIC_TOP ) + " ... " )
rospy.wait_for_message( TOPIC_TOP, tmsg )
sub_top = rospy.subscriber( TOPIC_TOP, msgt , queue_size=Q_SZ_TOP )
TLOG( "Subscribing to the topic " + LOGSQUARE( TOPIC_TOP ) + " ... OK!" )

In case the method is inside the class, just use the dot notation myclass.cbk_???.

🔗 Useful Links

  • ...
Clone this wiki locally