Skip to content

Fix view_controller_chains #2026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ros2controlcli/doc/userdoc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,16 @@ view_controller_chains
.. code-block:: console

$ ros2 control view_controller_chains -h
usage: ros2 control view_controller_chains [-h] [--spin-time SPIN_TIME] [-s] [-c CONTROLLER_MANAGER] [--include-hidden-nodes] [--ros-args ...]
usage: ros2 control view_controller_chains [-h] [--spin-time SPIN_TIME] [-s] [--save] [-c CONTROLLER_MANAGER] [--include-hidden-nodes] [--ros-args ...]

Generates a diagram of the loaded chained controllers into /tmp/controller_diagram.gv.pdf
Generates a diagram of the loaded chained controllers

options:
-h, --help show this help message and exit
--spin-time SPIN_TIME
Spin time in seconds to wait for discovery (only applies when not using an already running daemon)
-s, --use-sim-time Enable ROS simulation time
--save Save PDF to controller_diagram.pdf instead of viewing image
-c CONTROLLER_MANAGER, --controller-manager CONTROLLER_MANAGER
Name of the controller manager ROS node (default: controller_manager)
--include-hidden-nodes
Expand Down
2 changes: 1 addition & 1 deletion ros2controlcli/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<depend>controller_manager</depend>
<depend>controller_manager_msgs</depend>
<exec_depend>rosidl_runtime_py</exec_depend>
<exec_depend>python3-pygraphviz</exec_depend>
<exec_depend>python3-graphviz</exec_depend>

<export>
<build_type>ament_python</build_type>
Expand Down
78 changes: 46 additions & 32 deletions ros2controlcli/ros2controlcli/verb/view_controller_chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from controller_manager import list_controllers
from controller_manager import list_hardware_interfaces
from controller_manager import list_controllers, list_hardware_components

from ros2cli.node.direct import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2cli.verb import VerbExtension

from ros2controlcli.api import add_controller_mgr_parsers

import pygraphviz as pgz
import graphviz


def make_controller_node(
s,
controller_name,
state_interfaces,
command_interfaces,
input_controllers,
output_controllers,
input_chain_connections,
output_chain_connections,
port_map,
):
state_interfaces = sorted(list(state_interfaces))
command_interfaces = sorted(list(command_interfaces))
input_controllers = sorted(list(input_controllers))
output_controllers = sorted(list(output_controllers))
input_chain_connections = sorted(list(input_chain_connections))
output_chain_connections = sorted(list(output_chain_connections))

inputs_str = ""
for ind, state_interface in enumerate(state_interfaces):
deliminator = "|"
if ind == len(state_interface) - 1:
deliminator = ""
inputs_str += "<{}> {} {} ".format(
"state_end_" + state_interface, state_interface, deliminator
"state_end_" + state_interface, state_interface + " (state)", deliminator
)

for ind, input_controller in enumerate(input_controllers):
for ind, input_controller in enumerate(input_chain_connections):
deliminator = "|"
if ind == len(input_controller) - 1:
deliminator = ""
inputs_str += "<{}> {} {} ".format(
"controller_end_" + input_controller, input_controller, deliminator
"controller_end_" + input_controller, input_controller + " (exp ref)", deliminator
)
port_map["controller_end_" + input_controller] = controller_name

Expand All @@ -62,18 +61,20 @@ def make_controller_node(
if ind == len(command_interface) - 1:
deliminator = ""
outputs_str += "<{}> {} {} ".format(
"command_start_" + command_interface, command_interface, deliminator
"command_start_" + command_interface, command_interface + " (cmd)", deliminator
)

for ind, output_controller in enumerate(output_controllers):
for ind, output_controller in enumerate(output_chain_connections):
deliminator = "|"
if ind == len(output_controller) - 1:
deliminator = ""
outputs_str += "<{}> {} {} ".format(
"controller_start_" + output_controller, output_controller, deliminator
"controller_start_" + output_controller,
output_controller + " (exp state)",
deliminator,
)

s.add_node(controller_name, label=f"{controller_name}|{{{{{inputs_str}}}|{{{outputs_str}}}}}")
s.node(controller_name, f"{controller_name}|{{{{{inputs_str}}}|{{{outputs_str}}}}}")


def make_command_node(s, command_interfaces):
Expand All @@ -87,9 +88,7 @@ def make_command_node(s, command_interfaces):
"command_end_" + command_interface, command_interface, deliminator
)

s.add_node(
"command_interfaces", label="{}|{{{{{}}}}}".format("command_interfaces", outputs_str)
)
s.node("command_interfaces", "{}|{{{{{}}}}}".format("hw command_interfaces", outputs_str))


def make_state_node(s, state_interfaces):
Expand All @@ -103,7 +102,7 @@ def make_state_node(s, state_interfaces):
"state_start_" + state_interface, state_interface, deliminator
)

s.add_node("state_interfaces", label="{}|{{{{{}}}}}".format("state_interfaces", inputs_str))
s.node("state_interfaces", "{}|{{{{{}}}}}".format("hw state_interfaces", inputs_str))


def show_graph(
Expand All @@ -115,9 +114,11 @@ def show_graph(
state_interfaces,
visualize,
):
s = pgz.AGraph(name="g", strict=False, directed=True, rankdir="LR")
s.node_attr["shape"] = "record"
s.node_attr["style"] = "rounded"
s = graphviz.Digraph(
"g",
filename="/tmp/controller_diagram.gv",
node_attr={"shape": "record", "style": "rounded"},
)
port_map = dict()
# get all controller names
controller_names = set()
Expand All @@ -142,32 +143,38 @@ def show_graph(

for controller_name in controller_names:
for connection in output_chain_connections[controller_name]:
s.add_edge(
s.edge(
"{}:{}".format(controller_name, "controller_start_" + connection),
"{}:{}".format(
port_map["controller_end_" + connection], "controller_end_" + connection
),
)
for state_connection in state_connections[controller_name]:
s.add_edge(
s.edge(
"{}:{}".format("state_interfaces", "state_start_" + state_connection),
"{}:{}".format(controller_name, "state_end_" + state_connection),
)
for command_connection in command_connections[controller_name]:
s.add_edge(
s.edge(
"{}:{}".format(controller_name, "command_start_" + command_connection),
"{}:{}".format("command_interfaces", "command_end_" + command_connection),
)

s.graph_attr.update(ranksep="2")
s.layout(prog="dot")
s.attr(ranksep="2")
s.attr(rankdir="LR")
if visualize:
s.draw("/tmp/controller_diagram.gv.pdf", format="pdf")
s.view()
else:
s.render(filename="controller_diagram", view=False, cleanup=True)


def parse_response(list_controllers_response, list_hardware_response, visualize=True):
command_interfaces = {x.name for x in list_hardware_response.command_interfaces}
state_interfaces = {x.name for x in list_hardware_response.state_interfaces}
command_interfaces = {
x.name for hw in list_hardware_response.component for x in hw.command_interfaces
}
state_interfaces = {
x.name for hw in list_hardware_response.component for x in hw.state_interfaces
}
command_connections = dict()
state_connections = dict()
input_chain_connections = {x.name: set() for x in list_controllers_response.controller}
Expand Down Expand Up @@ -195,15 +202,22 @@ def parse_response(list_controllers_response, list_hardware_response, visualize=


class ViewControllerChainsVerb(VerbExtension):
"""Generates a diagram of the loaded chained controllers into /tmp/controller_diagram.gv.pdf."""
"""Generates a diagram of the loaded chained controllers."""

def add_arguments(self, parser, cli_name):
add_arguments(parser)
parser.add_argument(
"--save",
action="store_true",
help="Save PDF to controller_diagram.pdf instead of viewing image",
)
add_controller_mgr_parsers(parser)

def main(self, *, args):
with NodeStrategy(args).direct_node as node:
list_controllers_response = list_controllers(node, args.controller_manager)
list_hardware_response = list_hardware_interfaces(node, args.controller_manager)
parse_response(list_controllers_response, list_hardware_response)
list_hardware_response = list_hardware_components(node, args.controller_manager)
parse_response(
list_controllers_response, list_hardware_response, visualize=not args.save
)
return 0
12 changes: 8 additions & 4 deletions ros2controlcli/test/test_view_controller_chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
import unittest
from controller_manager_msgs.msg import ControllerState
from controller_manager_msgs.msg import HardwareInterface
from controller_manager_msgs.msg import HardwareComponentState
from controller_manager_msgs.msg import ChainConnection
from controller_manager_msgs.srv import ListControllers
from controller_manager_msgs.srv import ListHardwareInterfaces
from controller_manager_msgs.srv import ListHardwareComponents

from ros2controlcli.verb.view_controller_chains import parse_response


class TestViewControllerChains(unittest.TestCase):
def test_expected(self):
list_controllers_response = ListControllers.Response()
list_hardware_response = ListHardwareInterfaces.Response()
list_hardware_response = ListHardwareComponents.Response()

chained_to_controller = ControllerState()
chained_from_controller = ControllerState()
Expand Down Expand Up @@ -72,8 +73,11 @@ def test_expected(self):
controller_list = [chained_from_controller, chained_to_controller]

list_controllers_response.controller = controller_list
list_hardware_response.state_interfaces = state_interfaces
list_hardware_response.command_interfaces = command_interfaces

list_controllers_response_component = HardwareComponentState()
list_controllers_response_component.state_interfaces = state_interfaces
list_controllers_response_component.command_interfaces = command_interfaces
list_hardware_response.component.append(list_controllers_response_component)
try:
parse_response(list_controllers_response, list_hardware_response, visualize=False)
except Exception:
Expand Down