11"""
22D* Lite grid planning
3- author: vss2sn ([email protected] ) 3+ author: Taha Zahid (@TahaZahid05)
4+ Original author: vss2sn
5+
46Link to papers:
57D* Lite (Link: http://idm-lab.org/bib/abstracts/papers/aaai02b.pdf)
68Improved Fast Replanning for Robot Navigation in Unknown Terrain
79(Link: http://www.cs.cmu.edu/~maxim/files/dlite_icra02.pdf)
8- Implemented maintaining similarity with the pseudocode for understanding.
9- Code can be significantly optimized by using a priority queue for U, etc.
10- Avoiding additional imports based on repository philosophy .
10+
11+ Optimized using heapq ( priority queue) with lazy deletion for O(log n)
12+ priority queue operations .
1113"""
14+ import heapq
1215import math
1316import matplotlib .pyplot as plt
1417import random
@@ -25,6 +28,17 @@ def __init__(self, x: int = 0, y: int = 0, cost: float = 0.0):
2528 self .y = y
2629 self .cost = cost
2730
31+ def __lt__ (self , other ):
32+ return False
33+
34+ def __eq__ (self , other ):
35+ if not isinstance (other , Node ):
36+ return False
37+ return self .x == other .x and self .y == other .y
38+
39+ def __hash__ (self ):
40+ return hash ((self .x , self .y ))
41+
2842
2943def add_coordinates (node1 : Node , node2 : Node ):
3044 new_node = Node ()
@@ -66,13 +80,14 @@ def __init__(self, ox: list, oy: list):
6680 self .obstacles_xy = {(obstacle .x , obstacle .y ) for obstacle in self .obstacles }
6781 self .start = Node (0 , 0 )
6882 self .goal = Node (0 , 0 )
69- self .U = list () # type: ignore
83+ # Priority queue implemented with heapq for O(log n) operations
84+ self .U = [] # Min-heap for open set
85+ self .entry_finder = {} # Maps nodes to heap entries for O(1) lookup
86+ self .counter = 0 # Unique sequence count for tie-breaking
7087 self .km = 0.0
71- self .kold = 0.0
7288 self .rhs = self .create_grid (float ("inf" ))
7389 self .g = self .create_grid (float ("inf" ))
7490 self .detected_obstacles_xy : set [tuple [int , int ]] = set ()
75- self .xy = np .empty ((0 , 2 ))
7691 if show_animation :
7792 self .detected_obstacles_for_plotting_x = list () # type: ignore
7893 self .detected_obstacles_for_plotting_y = list () # type: ignore
@@ -110,9 +125,9 @@ def h(self, s: Node):
110125 # return max(abs(self.start.x - s.x), abs(self.start.y - s.y))
111126 return 1
112127
113- def calculate_key (self , s : Node ):
114- return ( min (self .g [s .x ][s .y ], self .rhs [s .x ][s .y ]) + self .h (s )
115- + self . km , min (self .g [s .x ][s .y ], self .rhs [s .x ][s .y ]))
128+ def calculate_key (self , s : Node ) -> list [ float ] :
129+ return [ min (self .g [s .x ][s .y ], self .rhs [s .x ][s .y ]) + self .h (s ) + self . km ,
130+ min (self .g [s .x ][s .y ], self .rhs [s .x ][s .y ])]
116131
117132 def is_valid (self , node : Node ):
118133 if 0 <= node .x < self .x_max and 0 <= node .y < self .y_max :
@@ -131,6 +146,99 @@ def succ(self, u: Node):
131146 # Grid, so each vertex is connected to the ones around it
132147 return self .get_neighbours (u )
133148
149+ def push (self , task : Node , priority : list ):
150+ """
151+ Add a new node to priority queue or update its priority.
152+ Uses lazy deletion pattern for efficient priority updates.
153+
154+ Args:
155+ task: Node to add/update
156+ priority: Priority key [f-value, g-value]
157+ """
158+ if task in self .entry_finder :
159+ self .remove (task )
160+ count = self .counter
161+ self .counter += 1
162+ entry = [priority , count , task ]
163+ self .entry_finder [task ] = entry
164+ heapq .heappush (self .U , entry )
165+
166+ def remove (self , task : Node ):
167+ """
168+ Mark an existing task as removed (lazy deletion).
169+ The actual removal from heap happens during pop/peek operations.
170+
171+ Args:
172+ task: Node to mark as removed
173+ """
174+ entry = self .entry_finder .pop (task )
175+ entry [- 1 ] = None # Mark as removed
176+
177+ def pop (self ):
178+ """
179+ Remove and return the lowest priority task.
180+ Skips over entries marked as removed (lazy deletion).
181+
182+ Returns:
183+ tuple: (task Node, priority list)
184+
185+ Raises:
186+ KeyError: If heap is empty
187+ """
188+ while self .U :
189+ priority , count , task = heapq .heappop (self .U )
190+ if task is not None :
191+ del self .entry_finder [task ]
192+ return task , priority
193+ raise KeyError ("empty heap" )
194+
195+ def contains (self , task : Node ):
196+ """
197+ Check if a node is in the priority queue.
198+
199+ Args:
200+ task: Node to check
201+
202+ Returns:
203+ bool: True if node is in queue
204+ """
205+ return task in self .entry_finder
206+
207+ def peek (self ):
208+ """
209+ Return the lowest priority task without removing it.
210+ Cleans up entries marked as removed.
211+
212+ Returns:
213+ tuple: (task Node or None, priority list)
214+ """
215+ if not self .U :
216+ return None , [float ('inf' ), float ('inf' )]
217+
218+ while self .U :
219+ entry = self .U [0 ]
220+ priority , count , task = entry
221+
222+ if task is not None :
223+ return task , priority
224+
225+ heapq .heappop (self .U ) # Remove invalid entries
226+
227+ return None , [float ('inf' ), float ('inf' )]
228+
229+ def key_less_than (self , k1 : list , k2 : list ):
230+ """
231+ Lexicographical comparison of priority keys.
232+
233+ Args:
234+ k1: First key [f-value, g-value]
235+ k2: Second key [f-value, g-value]
236+
237+ Returns:
238+ bool: True if k1 < k2 lexicographically
239+ """
240+ return k1 [0 ] < k2 [0 ] or (k1 [0 ] == k2 [0 ] and k1 [1 ] < k2 [1 ])
241+
134242 def initialize (self , start : Node , goal : Node ):
135243 self .start .x = start .x - self .x_min_world
136244 self .start .y = start .y - self .y_min_world
@@ -139,61 +247,96 @@ def initialize(self, start: Node, goal: Node):
139247 if not self .initialized :
140248 self .initialized = True
141249 print ('Initializing' )
142- self .U = list () # Would normally be a priority queue
250+ self .U = []
251+ self .entry_finder .clear ()
252+ self .counter = 0
143253 self .km = 0.0
144254 self .rhs = self .create_grid (math .inf )
145255 self .g = self .create_grid (math .inf )
146256 self .rhs [self .goal .x ][self .goal .y ] = 0
147- self .U . append (( self .goal , self .calculate_key (self .goal ) ))
257+ self .push ( self .goal , self .calculate_key (self .goal ))
148258 self .detected_obstacles_xy = set ()
149259
150260 def update_vertex (self , u : Node ):
151261 if not compare_coordinates (u , self .goal ):
152262 self .rhs [u .x ][u .y ] = min ([self .c (u , sprime ) +
153263 self .g [sprime .x ][sprime .y ]
154264 for sprime in self .succ (u )])
155- if any ([compare_coordinates (u , node ) for node , key in self .U ]):
156- self .U = [(node , key ) for node , key in self .U
157- if not compare_coordinates (node , u )]
158- self .U .sort (key = lambda x : x [1 ])
159265 if self .g [u .x ][u .y ] != self .rhs [u .x ][u .y ]:
160- self .U .append ((u , self .calculate_key (u )))
161- self .U .sort (key = lambda x : x [1 ])
162-
163- def compare_keys (self , key_pair1 : tuple [float , float ],
164- key_pair2 : tuple [float , float ]):
165- return key_pair1 [0 ] < key_pair2 [0 ] or \
166- (key_pair1 [0 ] == key_pair2 [0 ] and key_pair1 [1 ] < key_pair2 [1 ])
266+ if self .contains (u ):
267+ self .remove (u )
268+ self .push (u , self .calculate_key (u ))
269+ elif self .g [u .x ][u .y ] == self .rhs [u .x ][u .y ] and self .contains (u ):
270+ self .remove (u )
167271
168272 def compute_shortest_path (self ):
169- self .U .sort (key = lambda x : x [1 ])
170- has_elements = len (self .U ) > 0
171- start_key_not_updated = self .compare_keys (
172- self .U [0 ][1 ], self .calculate_key (self .start )
173- )
174- rhs_not_equal_to_g = self .rhs [self .start .x ][self .start .y ] != \
175- self .g [self .start .x ][self .start .y ]
176- while has_elements and start_key_not_updated or rhs_not_equal_to_g :
177- self .kold = self .U [0 ][1 ]
178- u = self .U [0 ][0 ]
179- self .U .pop (0 )
180- if self .compare_keys (self .kold , self .calculate_key (u )):
181- self .U .append ((u , self .calculate_key (u )))
182- self .U .sort (key = lambda x : x [1 ])
183- elif (self .g [u .x , u .y ] > self .rhs [u .x , u .y ]).any ():
184- self .g [u .x , u .y ] = self .rhs [u .x , u .y ]
185- for s in self .pred (u ):
186- self .update_vertex (s )
273+ while True :
274+ task , k_old = self .peek ()
275+ if task is None :
276+ break
277+
278+ k_start = self .calculate_key (self .start )
279+
280+ # Stop condition: Start is consistent AND top of heap >= Start Key
281+ if (not self .key_less_than (k_old , k_start ) and
282+ self .rhs [self .start .x ][self .start .y ] == self .g [self .start .x ][self .start .y ]):
283+ break
284+
285+ u , k_old = self .pop ()
286+ k_new = self .calculate_key (u )
287+
288+ if self .key_less_than (k_old , k_new ):
289+ # Node priority has improved, re-insert
290+ self .push (u , k_new )
291+
292+ elif self .g [u .x ][u .y ] > self .rhs [u .x ][u .y ]:
293+ # Overconsistent (path found/improved): Propagate cost to neighbors
294+ self .g [u .x ][u .y ] = self .rhs [u .x ][u .y ]
295+
296+ neighbors_lst = self .pred (u )
297+
298+ for curr in neighbors_lst :
299+ if curr != self .goal :
300+ edge_cost = self .c (curr , u )
301+ self .rhs [curr .x ][curr .y ] = min (self .rhs [curr .x ][curr .y ],
302+ edge_cost + self .g [u .x ][u .y ])
303+ self .update_vertex (curr )
187304 else :
188- self .g [u .x , u .y ] = math .inf
189- for s in self .pred (u ) + [u ]:
190- self .update_vertex (s )
191- self .U .sort (key = lambda x : x [1 ])
192- start_key_not_updated = self .compare_keys (
193- self .U [0 ][1 ], self .calculate_key (self .start )
194- )
195- rhs_not_equal_to_g = self .rhs [self .start .x ][self .start .y ] != \
196- self .g [self .start .x ][self .start .y ]
305+ # Underconsistent (obstacle detected): Reset g to infinity and re-evaluate neighbors
306+ g_old = self .g [u .x ][u .y ]
307+ self .g [u .x ][u .y ] = math .inf
308+
309+ neighbors_lst = self .pred (u )
310+
311+ for curr in (neighbors_lst + [u ]):
312+ if curr == u :
313+ # When curr is u itself, recalculate rhs for u
314+ if curr != self .goal :
315+ temp_rhs = float ('inf' )
316+
317+ curr_neighbors_lst = self .succ (curr )
318+
319+ for j in curr_neighbors_lst :
320+ edge_cost = self .c (curr , j )
321+ temp_rhs = min (temp_rhs , (edge_cost + self .g [j .x ][j .y ]))
322+
323+ self .rhs [curr .x ][curr .y ] = temp_rhs
324+ else :
325+ # For neighbors of u, check if they need rhs recalculation
326+ edge_cost = self .c (curr , u )
327+ if self .rhs [curr .x ][curr .y ] == (edge_cost + g_old ):
328+ if curr != self .goal :
329+ temp_rhs = float ('inf' )
330+
331+ curr_neighbors_lst = self .succ (curr )
332+
333+ for j in curr_neighbors_lst :
334+ edge_cost = self .c (curr , j )
335+ temp_rhs = min (temp_rhs , (edge_cost + self .g [j .x ][j .y ]))
336+
337+ self .rhs [curr .x ][curr .y ] = temp_rhs
338+
339+ self .update_vertex (curr )
197340
198341 def detect_changes (self ):
199342 changed_vertices = list ()
0 commit comments