Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(fix): correct buoy pair generation, among other things #196

Open
wants to merge 2 commits into
base: popkarsd/buoy_action_server_integration
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 89 additions & 13 deletions all_seaing_autonomy/all_seaing_autonomy/roboboat/waypoint_finder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
from typing_extensions import Generic
import yaml
import rclpy
from rclpy.node import Node
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self):
self.first_map = True

self.safe_margin = self.get_parameter("safe_margin").get_parameter_value().double_value

color_label_mappings_file = self.get_parameter(
"color_label_mappings_file"
).value
Expand Down Expand Up @@ -375,7 +376,7 @@ def pair_angle_to_pose(self, pair, angle):

def generate_waypoints(self):
"""
Runs every time a new obstacle map is received, keeps
Runs every time a new obstacle map is received, keeps
track of the pair of buoys the robot is heading towards,
checks if it passed it (the robot is in front of the pair of buoys)
(#TODO: add a margin of error such that the robot is considered to have passed the buoys if it's a bit in front of them)
Expand Down Expand Up @@ -407,23 +408,98 @@ def generate_waypoints(self):
self.get_logger().debug("No next buoy pair to go to.")
# wait for next spin
return
# TODO: there is no longer a case where it is done wiht the task.
# also, what happens when eg. the green buoy is passed but not the red?
# TODO: there is no longer a case where it is done with the task.
# also, what happens when e.g. the green buoy is passed but not the red?

buoy_pairs = [self.pair_to]
next_buoy_pair = self.next_pair(self.pair_to, red_buoys, green_buoys)
buoy_pairs = [self.pair_to, next_buoy_pair]
waypoints = [self.midpoint_pair(self.pair_to)]

self.get_logger().debug(f"pair to: {len(buoy_pairs)}")

# form a sequence of buoy pairs (and the respective waypoints) that form a path that the robot can follow
# will terminate if we run out of either green or red buoys

while True:
next_buoy_pair = self.next_pair(buoy_pairs[-1], red_buoys, green_buoys)
if next_buoy_pair is None:
break
buoy_pairs.append(next_buoy_pair)
waypoints.append(self.midpoint_pair(next_buoy_pair))
# types:
# ---
# T :: ([a], [b]) description: `(buoy_pairs, waypoints)`

# check if `x` exists according to `t` (i.e. t(x) has a readable value)
# x :: a
# t :: a → b | None
# exists :: t → x → bool
exists = lambda t: lambda x: t(x) is not None

# perform `f` over `x` so long as a predicate `p` is true
# p :: a → bool
# f :: a → a
# x :: a
# do_while :: p → f → x → a
do_while = lambda p: lambda f: lambda x: do_while(p)(f)(f(x)) if p(x) else x

# get buoy pairs from `(buoy_pairs, waypoints)`
# x :: T
# get_buoy_pairs :: x → [a]
get_buoy_pairs = lambda x: x[0]

# get next buoy pair from `(buoy_pairs, waypoints)`
# x :: T
# get_buoy_pairs :: x → a
get_next_buoy_pair = lambda x: get_buoy_pairs(x)[-1]

# get waypoints from `(buoy_pairs, waypoints)`
# x :: T
# get_buoy_pairs :: x → [b]
get_waypoints = lambda x: x[1]

# check whether next buoy pair exists (takes implicit parameter `x = (buoy_pairs, waypoints)`)
# x :: T
# next_buoy_pair_exists :: x → bool
next_buoy_pair_exists = exists(get_next_buoy_pair)

# perform `f` over `x` so long as next buoy pair (last entry in `buoy_pairs` exists); `f` and `x` are implicit parameters
# f :: T → T
# x :: T
# do_while_next_buoy_pair_exists :: f → x → T
do_while_next_buoy_pair_exists = do_while(next_buoy_pair_exists)

# generate next buoy pair from `(buoy_pairs, waypoints)`
# x :: T
# generate_next_buoy_pair :: a
generate_next_buoy_pair = lambda x: self.next_pair(get_next_buoy_pair(x), red_buoys, green_buoys)

# generate new buoy pairs list from `(buoy_pairs, waypoints)`
# x :: T
# generate_buoy_pairs :: T → [a]
generate_buoy_pairs = lambda x: get_buoy_pairs(x) + [generate_next_buoy_pair(x)]

# generate new waypoints list from `(buoy_pairs, waypoints)`
# x :: T
# generate_waypoints :: T → [b]
generate_waypoints = lambda x: get_waypoints(x) + [self.midpoint_pair(get_next_buoy_pair(x))]

# generate next iteration of points structure (list of tuples `(buoy_pairs, waypoints)`)
# x :: T
# generate_points :: x → T
generate_points = lambda x: (generate_buoy_pairs(x), generate_waypoints(x))

# run all iterations of `generate_points` provdied buoy pairs can still be found,
# with implicit parameter `x` for initial `(buoy_pairs, waypoints)` to start the iteration wtih
# x :: T
# generate_all_points :: T
generate_all_points = do_while_next_buoy_pair_exists(generate_points)

# resulting list of all buoy pairs and waypoints as list of tuples `(buoy_pairs, waypoints)`
# points :: T
points = generate_all_points((buoy_pairs, waypoints))

# select all buoy pairs except last one (which is `None`)
buoy_pairs = get_buoy_pairs(points)[:-1]

# select all waypoints; list will be of the same size as `buoy_pairs`
# due to the starting sizes of both lists; `buoy_pairs` starts off one item longer than `waypoints`,
# but removing the last element ensures they have the same length
waypoints = get_waypoints(points)

# convert the sequence to a format appropriate to publishing for the path planner to use
waypoint_arr = WaypointArray(
Expand Down Expand Up @@ -465,9 +541,9 @@ def generate_waypoints(self):
self.get_logger().debug(f"cur_waypoint: {waypoint}, sent_waypoints: {self.sent_waypoints}")
self.get_logger().debug(f"len(waypoints): {len(waypoints)}")

#check if waypoint is close enough (check_dist) to some previous waypoint
# check if waypoint is close enough (check_dist) to some previous waypoint
passed_waypoint = False
check_dist = 1 #magic number T_T
check_dist = 1 # magic number T_T
for sent_waypoint in self.sent_waypoints:
if (waypoint[0]-sent_waypoint[0])**2+(waypoint[1]-sent_waypoint[1])**2 < check_dist**2:
passed_waypoint = True
Expand Down