|
1 | 1 | from collections.abc import Callable |
| 2 | +from typing import Final |
2 | 3 |
|
3 | 4 | import rclpy |
4 | | -from mavros_msgs.msg import OverrideRCIn |
| 5 | +from mavros_msgs.msg import ManualControl |
5 | 6 | from rclpy.executors import MultiThreadedExecutor |
6 | 7 | from rclpy.node import Node |
7 | 8 | from rclpy.qos import QoSPresetProfiles |
|
17 | 18 |
|
18 | 19 | # Range of values Pixhawk takes |
19 | 20 | # In microseconds |
20 | | -ZERO_SPEED = 1500 |
21 | | -MAX_RANGE_SPEED = 400 |
22 | | -RANGE_SPEED = MAX_RANGE_SPEED * SPEED_THROTTLE |
| 21 | +ZERO_SPEED: Final = 0 |
| 22 | +Z_ZERO_SPEED: Final = 500 |
| 23 | +MAX_RANGE_SPEED: Final = 2000 |
| 24 | +Z_MAX_RANGE_SPEED: Final = 1000 |
| 25 | +RANGE_SPEED: Final = MAX_RANGE_SPEED * SPEED_THROTTLE |
| 26 | +Z_RANGE_SPEED: Final = Z_MAX_RANGE_SPEED * SPEED_THROTTLE |
| 27 | + |
| 28 | +EXTENSIONS_CODE: Final = 0b00000011 |
23 | 29 |
|
24 | 30 | # Channels for RC command |
25 | 31 | MAX_CHANNEL = 8 |
@@ -57,36 +63,39 @@ def __init__(self) -> None: |
57 | 63 | QoSPresetProfiles.DEFAULT.value, |
58 | 64 | ) |
59 | 65 |
|
60 | | - self.rc_pub = self.create_publisher( |
61 | | - OverrideRCIn, 'mavros/rc/override', QoSPresetProfiles.DEFAULT.value |
| 66 | + self.mc_pub = self.create_publisher( |
| 67 | + ManualControl, 'mavros/manual_control/send', QoSPresetProfiles.DEFAULT.value |
62 | 68 | ) |
63 | 69 |
|
64 | 70 | @staticmethod |
65 | 71 | def apply(msg: PixhawkInstruction, function_to_apply: Callable[[float], float]) -> None: |
66 | 72 | """Apply a function to each dimension of this PixhawkInstruction.""" |
67 | 73 | msg.forward = function_to_apply(msg.forward) |
68 | | - msg.vertical = function_to_apply(msg.vertical) |
| 74 | + msg.vertical = msg.vertical |
69 | 75 | msg.lateral = function_to_apply(msg.lateral) |
70 | 76 | msg.pitch = function_to_apply(msg.pitch) |
71 | 77 | msg.yaw = function_to_apply(msg.yaw) |
72 | 78 | msg.roll = function_to_apply(msg.roll) |
73 | 79 |
|
74 | 80 | @staticmethod |
75 | | - def to_override_rc_in(msg: PixhawkInstruction) -> OverrideRCIn: |
| 81 | + def to_manual_control(msg: PixhawkInstruction) -> ManualControl: |
76 | 82 | """Convert this PixhawkInstruction to an rc_msg with the appropriate channels array.""" |
77 | | - rc_msg = OverrideRCIn() |
| 83 | + mc_msg = ManualControl() |
78 | 84 |
|
79 | 85 | # Maps to PWM |
80 | | - MultiplexerNode.apply(msg, lambda value: int(RANGE_SPEED * value) + ZERO_SPEED) |
| 86 | + MultiplexerNode.apply(msg, lambda value: (RANGE_SPEED * value) + ZERO_SPEED) |
81 | 87 |
|
82 | | - rc_msg.channels[FORWARD_CHANNEL] = msg.forward |
83 | | - rc_msg.channels[THROTTLE_CHANNEL] = msg.vertical |
84 | | - rc_msg.channels[LATERAL_CHANNEL] = msg.lateral |
85 | | - rc_msg.channels[PITCH_CHANNEL] = msg.pitch |
86 | | - rc_msg.channels[YAW_CHANNEL] = msg.yaw |
87 | | - rc_msg.channels[ROLL_CHANNEL] = msg.roll |
| 88 | + mc_msg.x = msg.forward |
| 89 | + mc_msg.z = ( |
| 90 | + Z_RANGE_SPEED * msg.vertical |
| 91 | + ) + Z_ZERO_SPEED # To account for different z limits |
| 92 | + mc_msg.y = msg.lateral |
| 93 | + mc_msg.r = msg.yaw |
| 94 | + mc_msg.enabled_extensions = EXTENSIONS_CODE |
| 95 | + mc_msg.s = msg.pitch |
| 96 | + mc_msg.t = msg.roll |
88 | 97 |
|
89 | | - return rc_msg |
| 98 | + return mc_msg |
90 | 99 |
|
91 | 100 | def state_control( |
92 | 101 | self, req: AutonomousFlight.Request, res: AutonomousFlight.Response |
@@ -114,7 +123,7 @@ def control_callback(self, msg: PixhawkInstruction) -> None: |
114 | 123 | else: |
115 | 124 | return |
116 | 125 |
|
117 | | - self.rc_pub.publish(msg=self.to_override_rc_in(msg)) |
| 126 | + self.mc_pub.publish(self.to_manual_control(msg)) |
118 | 127 |
|
119 | 128 |
|
120 | 129 | def main() -> None: |
|
0 commit comments