-
Notifications
You must be signed in to change notification settings - Fork 6
/
pf_ulv_ros2.py
executable file
·186 lines (155 loc) · 7.89 KB
/
pf_ulv_ros2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python
# Common python libaries
import os
import time
import math
import argparse
import numpy as np
import matplotlib.pyplot as plt
import itertools
import copy
# ROS libaries
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import Range
from nav_msgs.msg import Odometry
from depthai_ros_msgs.msg import SpatialDetectionArray
from pfilter import ParticleFilter, squared_error
# from tensorflow import keras
from utlis import utils
import pf_ulv
class UWBRelativePositionNode(Node) :
'''
ROS Node that estimates relative position of two robots using odom and single uwb range.
'''
def __init__(self, pf_estimator, uwb_pair = [(0,1)], robot_ids=[0,1]):
'''
Define the parameters, publishers, and subscribers
'''
# Init node
super().__init__('relative_pf_rclpy')
# Define QoS profile for odom and UWB subscribers
self.qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
self.pf_estimator = pf_estimator
self.uwb_pairs = uwb_pair
self.robot_ids = robot_ids
self.uwb_ranges_dict = {p:0.0 for p in self.uwb_pairs}
self.odom_data_dict = {id:np.array([]) for id in self.robot_ids}
self.odom_data_aligned_dict = {id:np.array([]) for id in self.robot_ids}
self.mocap_data_dict = {id:np.array([]) for id in self.robot_ids}
self.spatial_dict={id:[] for id in self.robot_ids}
self.pose_estimations = []
self.get_logger().info("Subscribing to topics......")
# subscribe to uwb ranges
self.uwb_subs = [
self.create_subscription(Range, "/uwb/tof/n_{}/n_{}/distance".format(p[0], p[1]),
self.create_uwb_ranges_cb(p),qos_profile=self.qos) for i, p in enumerate(uwb_pair)]
self.get_logger().info("{} UWB ranges subscribed!".format(len(self.uwb_ranges_dict)))
# subscribe to optitrack mocap poses
self.mocap_subs = [
self.create_subscription(PoseStamped, "/vrpn_client_node/tb0{}/pose".format(i),
self.create_mocap_pose_cb(i), qos_profile=self.qos) for i in robot_ids]
self.get_logger().info("{} Mocaps poses subscribed!".format(len(self.mocap_data_dict)))
# subscribe to odometries
self.odom_subs = [
self.create_subscription(Odometry, "/turtle0{}/odom".format(i),
self.create_odom_cb(i), qos_profile=self.qos) for i in robot_ids]
self.get_logger().info("{} odom poses subscribed!".format(len(self.odom_data_dict)))
# subscribe to spatial detections
self.spatial_subs = [
self.create_subscription(SpatialDetectionArray, "/turtle0{}/color/yolov4_Spatial_detections".format(i),
self.create_spatial_cb(i), qos_profile=self.qos) for i in robot_ids]
self.get_logger().info("{} spatial detections subscribed!".format(len(self.spatial_dict)))
self.get_logger().info("\\\\\\Class Initialized Done//////")
def create_uwb_ranges_cb(self, p):
return lambda range : self.uwb_range_cb(p, range)
def uwb_range_cb(self, p, range):
self.uwb_ranges_dict[p] = range.range
def create_mocap_pose_cb(self, p):
return lambda pos : self.mocap_pose_cb(p, pos)
def mocap_pose_cb(self, p, pos):
self.mocap_data_dict[p] = np.array([pos.pose.position.x, pos.pose.position.y, pos.pose.position.z, pos.pose.orientation.x, pos.pose.orientation.y, pos.pose.orientation.z, pos.pose.orientation.w])
def create_odom_cb(self, p):
return lambda odom : self.odom_cb(p, odom)
def odom_cb(self, p, odom):
self.odom_data_dict[p] = np.array([odom.pose.pose.position.x, odom.pose.pose.position.y, odom.pose.pose.position.z, odom.pose.pose.orientation.x, odom.pose.pose.orientation.y, odom.pose.pose.orientation.z, odom.pose.pose.orientation.w])
def create_spatial_cb(self, p):
return lambda detections : self.spatial_cb(p, detections)
def spatial_cb(self, p, detections):
self.spatial_dict[p] = np.array([[det.position.x, det.position.y, det.position.y] for det in detections.detections])
'''
Update the data for the particle filter
apply the odom to mocap data alignment
'''
def update_pf_data(self):
# res = utils.dicts_empty([self.mocap_data_dict])
res = utils.dict_values_empty(self.mocap_data_dict)
# print(res)
# the robot 0 has no odometry, so we use mocap data plus noise instead
if res:
# print("update odom data with mocap data......")
self.odom_data_aligned_dict = self.mocap_data_dict
return True
# self.odom_data_dict[0] = self.mocap_data_dict[0] + np.random.normal(0, 0.01, 7)
# if utils.dict_values_empty(self.odom_data_dict):
# self.odom_data_aligned_dict = copy.deepcopy(self.odom_data_dict)
# # apply the odom to mocap data alignment
# for k in self.odom_data_dict:
# transformed_source = np.dot(self.odom_data_dict[k][0:3], np.array(odom_transform[k][0])) + np.array(odom_transform[k][1])
# self.odom_data_aligned_dict[k][0:3] = transformed_source
# return True
# else:
# return False
else:
return False
def update_pf_filter(self):
# time.sleep(0.1)
# flag = self.update_pf_data()
if self.update_pf_data():
# print("Updating particle filter......")
self.pf_estimator.update_input(self.uwb_ranges_dict, self.odom_data_aligned_dict, self.spatial_dict)
# self.get_logger().info("Updating particle filter......")
self.pf_estimator.update_filter()
# self.pf_estimator.plot_particles()
print(">>>>>> robot poses saved!")
# print("\n")
def __del__(self):
# self.get_logger().info(np.array(self.pose_estimations).shape)
self.pose_estimations = self.pf_estimator.get_robot_poses()
if len(self.pose_estimations) > 0:
print(np.array(self.pf_estimator.get_robot_poses()).shape)
np.savetxt("pose_estimations.csv", np.array(self.pose_estimations), delimiter=",", fmt ='% s')
np.savetxt("odom_saved.csv", np.array(self.pf_estimator.odom_save), delimiter=",", fmt ='% s')
self.get_logger().info("Destroying UWB relative position node......")
def main(args=None):
rclpy.init(args=args)
# Set the robot ids and uwb pairs
robot_ids = [0, 1, 2, 3, 4]
uwb_pair = [(0,1), (0,2), (0,3), (0,4), (1,2), (1,3), (1,4), (2,3), (2,4), (3,4)]
# Initialize the particle filter
estimator = pf_ulv.UWBParticleFilter(spatial_enable = False, lstm_enable = False, identical_thresh = 0.1, robot_ids = robot_ids)
# Initialize the ros node and timer
ros_node = UWBRelativePositionNode(estimator, uwb_pair = uwb_pair,
robot_ids = robot_ids)
filter_timer = ros_node.create_timer(1/20.0, ros_node.update_pf_filter)
try: # ros exception
try: # keyboard interrupt
while rclpy.ok() :
rclpy.spin(ros_node)
except KeyboardInterrupt :
ros_node.get_logger().error('Keyboard Interrupt detected! Trying to stop ros_node node!')
except Exception as e:
ros_node.destroy_node()
ros_node.get_logger().info("UWB particle ros_node failed %r."%(e,))
finally:
rclpy.shutdown()
ros_node.destroy_timer(filter_timer)
ros_node.destroy_node()
if __name__ == '__main__':
main()