Skip to content

Commit fc384b8

Browse files
Change test to check for trajectory execution
1 parent 14b3a42 commit fc384b8

File tree

4 files changed

+29
-97
lines changed

4 files changed

+29
-97
lines changed

example_7/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ if(BUILD_TESTING)
108108
ament_add_pytest_test(example_7_urdf_xacro test/test_urdf_xacro.py)
109109
ament_add_pytest_test(view_example_7_launch test/test_view_robot_launch.py)
110110
ament_add_pytest_test(run_example_7_launch test/test_r6bot_controller_launch.py)
111-
ament_add_pytest_test(test_send_trajectory_launch test/test_send_trajectory_launch.py)
112111
endif()
113112

114113
## EXPORTS

example_7/reference_generator/send_trajectory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ int main(int argc, char ** argv)
100100
std::fill(last_point_msg.velocities.begin(), last_point_msg.velocities.end(), 0.0);
101101

102102
pub->publish(trajectory_msg);
103+
RCLCPP_INFO(node->get_logger(), "Trajectory sent with %lu points", trajectory_msg.points.size());
103104
while (rclcpp::ok())
104105
{
105106
}

example_7/test/test_r6bot_controller_launch.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030

3131
import os
3232
import pytest
33+
import psutil
34+
import subprocess
35+
import time
3336
import unittest
3437

3538
from ament_index_python.packages import get_package_share_directory
@@ -39,12 +42,12 @@
3942
from launch_testing.actions import ReadyToTest
4043

4144
import launch_testing.markers
42-
import rclpy
4345
from controller_manager.test_utils import (
4446
check_controllers_running,
4547
check_if_js_published,
4648
check_node_running,
4749
)
50+
import rclpy
4851

4952

5053
# Executes the given launch file and checks if all nodes can be started
@@ -94,6 +97,30 @@ def test_check_if_msgs_published(self):
9497
"/joint_states", ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]
9598
)
9699

100+
def test_check_if_trajectory_published(self, proc_output):
101+
102+
topic = "/r6bot_controller/joint_trajectory"
103+
104+
command = [
105+
"ros2",
106+
"launch",
107+
"ros2_control_demo_example_7",
108+
"send_trajectory.launch.py",
109+
]
110+
subprocess.Popen(command) # the process won't finish
111+
time.sleep(1)
112+
pubs = self.node.get_publishers_info_by_topic(topic)
113+
assert len(pubs) > 0, f"No publisher for topic '{topic}' not found!"
114+
115+
# message will be published only once, so WaitForTopics is not suitable here.
116+
# let's just check if the controller received and processed the message
117+
proc_output.assertWaitFor("Trajectory execution complete", timeout=10, stream="stderr")
118+
119+
for proc in psutil.process_iter():
120+
if "send_trajectory" in proc.name():
121+
proc.kill()
122+
print(f"Process {proc.name()} killed")
123+
97124

98125
@launch_testing.post_shutdown_test()
99126
# These tests are run after the processes in generate_test_description() have shutdown.

example_7/test/test_send_trajectory_launch.py

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)