Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This repository contains the `spot_manipulation_driver` package required to oper
- Execute arm joint trajectories
- Execute gripper trajectories
- Execute arm and gripper trajectories as a single synced trajectory
- Execute body, arm and gripper trajectories as a single synced trajectory
- Execute cartesian position/force trajectory for the EE
- Given pixel coordinates, camera info, and a transform snapshot have the robot go and grasp the pixel-specified object in an image.

Expand All @@ -34,5 +35,9 @@ ros2 launch spot_moveit_config spot_execution.launch.py # same as above, but lau

This package is configured to consider environmental obstacles localized to the map frame, so the `spot_navigation` localization capabilities need to be running for this to work. If you wish to operate without obstacle avoidance (which you shouldn't), you can comment out the `spot_mobility_joint` entry in the robot SRDF and comment out all of the 3D sensors in the `sensors_3d.yaml` file.

## Spot_bringup Launch Args
- `manipulation_action_namespace:="/spot_moveit"` for moveit-related planning
- `kinematic_model:="mobile_manipulation"` to publish mobile-manipulation-related joint states

## Authors
Janak Panthi (aka Crasun Jans) and Alex Navarro
45 changes: 45 additions & 0 deletions spot_manipulation_driver/spot_manipulation_driver/ros_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from sensor_msgs.msg import Image, CameraInfo
from tf2_msgs.msg import TFMessage
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
import math
from geometry_msgs.msg import PoseStamped, TransformStamped, Pose2D
from tf2_geometry_msgs import do_transform_pose

import rclpy.time

Expand Down Expand Up @@ -377,3 +380,45 @@ def cartesian_request_to_command(msg: ArmCartesianCommand.Goal, tf_buffer: Buffe
arm_cartesian_request.wrench_trajectory_in_task.CopyFrom(wrench_trajectory)

return arm_cartesian_request

def transform_2d_pose(transform: TransformStamped, x: float, y: float, theta: float) -> Pose2D:
"""
Transform a 2D pose from source frame to target frame.

Example:
# Transform from base_footprint to odom
transform = tf_buffer.lookup_transform('odom', 'base_footprint', ...)
pose_odom = transform_2d_pose(transform, x_base, y_base, theta_base)
print(f"Position: ({pose_odom.x}, {pose_odom.y}), Theta: {pose_odom.theta}")

Args:
transform: TransformStamped from source frame to target frame
x: x position in source frame (meters)
y: y position in source frame (meters)
theta: yaw angle in source frame (radians)

Returns:
Pose2D: (x, y, theta) all expressed in target frame
"""
# Create pose in source frame
pose_in = PoseStamped()
pose_in.pose.position.x = x
pose_in.pose.position.y = y
pose_in.pose.position.z = 0.0

# Convert theta to quaternion (rotation around z-axis in source frame)
pose_in.pose.orientation.x = 0.0
pose_in.pose.orientation.y = 0.0
pose_in.pose.orientation.z = math.sin(theta / 2.0)
pose_in.pose.orientation.w = math.cos(theta / 2.0)

# Transform the pose from source frame to target frame
pose_out = do_transform_pose(pose_in.pose, transform)

# Extract and create Pose2D result
result = Pose2D()
result.x = pose_out.position.x
result.y = pose_out.position.y
result.theta = 2.0 * math.atan2(pose_out.orientation.z, pose_out.orientation.w)

return result
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@
from bosdyn.api.mobility_command_pb2 import MobilityCommand
from bosdyn.api.arm_command_pb2 import ArmCommand, ArmJointMoveCommand, ArmJointPosition
from bosdyn.client.frame_helpers import (ODOM_FRAME_NAME, GROUND_PLANE_FRAME_NAME, HAND_FRAME_NAME, BODY_FRAME_NAME,
GRAV_ALIGNED_BODY_FRAME_NAME, VISION_FRAME_NAME, get_a_tform_b)
GRAV_ALIGNED_BODY_FRAME_NAME, VISION_FRAME_NAME, get_a_tform_b, get_vision_tform_body)
from bosdyn.client.math_helpers import SE3Pose
from bosdyn.client.robot_command import (RobotCommandBuilder, blocking_command)
from bosdyn.client.exceptions import RpcError
from bosdyn.client.inverse_kinematics import InverseKinematicsClient
from bosdyn.util import seconds_to_timestamp, seconds_to_duration
from bosdyn.util import seconds_to_timestamp, seconds_to_duration, timestamp_to_sec
from google.protobuf import duration_pb2, timestamp_pb2
from spot_driver.async_queries import AsyncRobotState
from spot_driver.spot_lease_manager import SpotLeaseManager
Expand All @@ -61,7 +61,19 @@

MAX_BODY_POSES = 100
MAX_ARM_POINTS = 10
MAX_MOBILE_MANIPULATION_POINTS = 10
ARM_NUM_JOINTS = 6
GRIPPER_NUM_JOINTS = 1
BODY_MOBILE_NUM_JOINTS = 3
MOBILE_MANIPULATION_NUM_JOINTS = ARM_NUM_JOINTS + BODY_MOBILE_NUM_JOINTS + GRIPPER_NUM_JOINTS
GRIPPER_CLOSE_TORQUE = 15.0
MAX_GRIPPER_JOINT_VEL = 1000 # rad/s
MAX_GRIPPER_JOINT_ACC = 1000 # rad/s^2
MAX_ARM_JOINT_VEL = 1000 # rad/s
MAX_ARM_JOINT_ACC = 1000 # rad/s^2
MAX_BODY_VEL_LINEAR = 1.5 # m/s
MAX_BODY_VEL_ANGULAR = 1.5 # rad/s
BODY_HEIGHT_FOR_MOBILE_MANIPULATION = 0.0 # meters

class GraspStrategy(Enum):
TOP_DOWN_GRASP = 1
Expand Down Expand Up @@ -194,7 +206,7 @@ def verify_power_and_estop(self):

self.verify_estop()

# Execute long trajectories
# Execute long arm trajectories
def arm_long_trajectory_executor(
self, traj_point_positions, traj_point_velocities, timepoints, cancel_event: threading.Event
) -> None:
Expand Down Expand Up @@ -464,6 +476,205 @@ def arm_and_gripper_long_trajectory_executor(
# self._logger.info("Successfully executed arm and gripper trajectory")
# return True

def create_mobile_manipulation_command(self, traj_points: list, times: list[float], ref_time: float) -> robot_command_pb2.RobotCommand:
"""
Creates a synchronized command for mobile manipulation (i.e. body, arm, and gripper). The body points are expected to be in the vision frame.

Args:
traj_point_positions: List of joint positions for each trajectory point in the mobile manipulation trajectory. Includes body and arm joints.
times: List of time offsets from reference time
ref_time: Reference timestamp for the trajectory

Returns:
RobotCommand with synchronized gripper, arm and mobility commands, or None on error
"""

# Validate length of times and trajectory points
if len(times) != len(traj_points):
self._logger.error("Length mismatch between times and trajectory points.")
return None

# Validate number of joints in each trajectory point
for i, positions in enumerate(traj_points):
if len(positions) != MOBILE_MANIPULATION_NUM_JOINTS:
self._logger.error(f"Invalid number of joints at index {i}: expected {MOBILE_MANIPULATION_NUM_JOINTS}, got {len(positions)}")
return None

# Separate body and arm points
body_points = [point[:BODY_MOBILE_NUM_JOINTS] for point in traj_points]
arm_positions = [point[BODY_MOBILE_NUM_JOINTS:BODY_MOBILE_NUM_JOINTS + ARM_NUM_JOINTS] for point in traj_points]# Next ARM_NUM_JOINTS in each point are arm joints
gripper_positions_list = [point[-GRIPPER_NUM_JOINTS:] for point in traj_points] # Last GRIPPER_NUM_JOINTS in each point are gripper joints
gripper_positions = [elem for sublist in gripper_positions_list for elem in sublist] # flatten the list

# Build gripper command
gripper_command = RobotCommandBuilder.claw_gripper_command_helper(
gripper_positions=gripper_positions,
times=times,
ref_time=ref_time,
max_vel=MAX_GRIPPER_JOINT_VEL,
max_acc=MAX_GRIPPER_JOINT_ACC
)
# Build arm command
arm_command = RobotCommandBuilder.arm_joint_move_helper(
joint_positions=arm_positions,
times=times,
ref_time=ref_time,
max_vel=MAX_ARM_JOINT_VEL,
max_acc=MAX_ARM_JOINT_ACC
)

# Build mobility command
mobility_command = robot_command_pb2.RobotCommand()

# Reference frame
mobility_command.synchronized_command.mobility_command.se2_trajectory_request.se2_frame_name = VISION_FRAME_NAME

# Mobility params
speed_limit = geometry_pb2.SE2VelocityLimit(
max_vel=geometry_pb2.SE2Velocity(
linear=geometry_pb2.Vec2(x=MAX_BODY_VEL_LINEAR, y=MAX_BODY_VEL_LINEAR),
angular=MAX_BODY_VEL_ANGULAR
),
min_vel=geometry_pb2.SE2Velocity(
linear=geometry_pb2.Vec2(x=-MAX_BODY_VEL_LINEAR, y=-MAX_BODY_VEL_LINEAR),
angular=-MAX_BODY_VEL_ANGULAR
)
)

mobility_params = spot_command_pb2.MobilityParams(vel_limit=speed_limit)
mobility_command.synchronized_command.mobility_command.params.CopyFrom(RobotCommandBuilder._to_any(mobility_params))

# Reference and end times
mobility_command.synchronized_command.mobility_command.se2_trajectory_request.trajectory.reference_time.CopyFrom(ref_time)
end_time = seconds_to_timestamp(timestamp_to_sec(ref_time) + times[-1])
mobility_command.synchronized_command.mobility_command.se2_trajectory_request.end_time.CopyFrom(end_time)

# Se2 trajectory
for i in range(len(body_points)):
x_vision = body_points[i][0]
y_vision = body_points[i][1]
yaw_vision = body_points[i][2]

# Add points to trajectory
point = mobility_command.synchronized_command.mobility_command.se2_trajectory_request.trajectory.points.add()
point.pose.position.x = x_vision
point.pose.position.y = y_vision
point.pose.angle = yaw_vision
duration = seconds_to_duration(times[i])
point.time_since_reference.CopyFrom(duration)

# Combine gripper, arm and mobility commands into synchronized command
command = RobotCommandBuilder.build_synchro_command(mobility_command, arm_command, gripper_command)

return command

def get_body_corrected_command(self, robot_cmd):
"""
Returns the command to correct the body portion of a mobile manipulation command using vision-based odometry.
Extracts the last commanded body position from the input command, computes the error between the current state and the commanded position, and corrects the body position accordingly. Commands are assumed to be in the VISION frame.
Args:
robot_cmd: RobotCommand containing synchronized mobility, arm, and gripper commands
Returns:
corrected_robot_cmd: RobotCommand with corrected body positions
end_time_secs: float indicating the end time for the command execution
"""

TIMESTAMP_SHIFT = 0.3

robot_state = self._lease_manager._robot_state_client.get_robot_state()
vision_T_body = get_vision_tform_body(robot_state.kinematic_state.transforms_snapshot)

# Extract last commanded point for the body
commanded = robot_cmd.synchronized_command.mobility_command.se2_trajectory_request.trajectory.points[-1]

# Compute error
dx = vision_T_body.position.x - commanded.pose.position.x
dy = vision_T_body.position.y - commanded.pose.position.y
dyaw = vision_T_body.rot.to_yaw() - commanded.pose.angle

# Extract last positions from the robot command
# The last point contains body (x, y, yaw), then arm joints, then gripper
last_body_x = commanded.pose.position.x
last_body_y = commanded.pose.position.y
last_body_yaw = commanded.pose.angle

# Compute position correction
corrected_positions = [last_body_x - dx, last_body_y - dy, last_body_yaw - dyaw]

# Extract arm and gripper positions from synchronized command
arm_traj = robot_cmd.synchronized_command.arm_command.arm_joint_move_command.trajectory
gripper_traj = robot_cmd.synchronized_command.gripper_command.claw_gripper_command.trajectory

arm_end_pos = arm_traj.points[-1].position
last_arm_positions = [arm_end_pos.sh0.value, arm_end_pos.sh1.value, arm_end_pos.el0.value, arm_end_pos.el1.value, arm_end_pos.wr0.value, arm_end_pos.wr1.value] # shoulder yaw, shoulder pitch, elbow pitch, elbow roll, wrist pitch, wrist roll
last_gripper_position = gripper_traj.points[-1].point

# Combine into full mobile manipulation point
corrected_positions.extend(last_arm_positions)
corrected_positions.append(last_gripper_position)

# Extract last timestamp and compute corrected timestamp
last_timestamp = commanded.time_since_reference.seconds + commanded.time_since_reference.nanos / 1e9
corrected_timestamp = last_timestamp + TIMESTAMP_SHIFT

# Extract reference time from the original command
ref_time_proto = robot_cmd.synchronized_command.mobility_command.se2_trajectory_request.trajectory.reference_time
ref_time_seconds = timestamp_to_sec(ref_time_proto) + TIMESTAMP_SHIFT
ref_time = seconds_to_timestamp(ref_time_seconds)

corrected_robot_cmd = self.create_mobile_manipulation_command([corrected_positions], [corrected_timestamp], ref_time)
end_time_secs = time.time() + TIMESTAMP_SHIFT
return corrected_robot_cmd, end_time_secs

# Execute long mobile manipulation trajectories
def mobile_manipulation_long_trajectory_executor(
self, traj_point_positions, traj_point_velocities, timepoints, cancel_event: threading.Event
) -> None:

# Make sure the robot is powered on (which implicitly implies that it's estopped as well)
try:
if self._lease_manager.robot.is_powered_on(5):
self._logger.info("Spot is about to move its whole body --DEBUG .")
else:
self._logger.warn("Cannot execute mobile manipulation long trajectory, robot is not powered on")
return False
except RpcError as e:
self._logger.warn(f"Cannot execute mobile manipulation long trajectory, unable to communicate with robot\n\tMsg: {e}")
return False

start_time = time.time()
ref_time = seconds_to_timestamp(start_time)

# Initialize trajectory manager to extract slices from the original trajectory for execution
mobile_manipulation_trajectory_manager = TrajectoryManager(
points=traj_point_positions,
times_since_ref=timepoints,
ref_time=start_time
)

while not mobile_manipulation_trajectory_manager.done():
if cancel_event.is_set():
self.stop_robot()
self._logger.info("Mobile manipulation trajectory action cancelled early. Stopping robot")
return False

window, timestamps, sleep_time = mobile_manipulation_trajectory_manager.get_window(MAX_MOBILE_MANIPULATION_POINTS)

robot_cmd = self.create_mobile_manipulation_command(window, timestamps, ref_time)

success, msg, _ = self._lease_manager.robot_command(robot_cmd, end_time_secs=time.time()+sleep_time)

if not success:
self._logger.warn(f"mobile_manipulation_long_trajectory_executor: Error executing robot command: {msg}")
return False

if mobile_manipulation_trajectory_manager.last():
time.sleep(sleep_time)
else:
time.sleep(max(sleep_time - 0.2, 0))

self._logger.info("Successfully executed mobile manipulation trajectory")
return True

def body_manipulation_trajectory_executor(
self, body_trajectory: List[SE3Pose], arm_trajectory: List[List[float]], timestamps: List[float]
Expand Down
Loading