Skip to content

Commit 7ddb5bf

Browse files
committed
lil more
1 parent f917de7 commit 7ddb5bf

File tree

3 files changed

+49
-33
lines changed

3 files changed

+49
-33
lines changed

tools/replay/consoleui.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python3
22
import curses
3+
import logging
34
import signal
45
import threading
56
import time
@@ -12,6 +13,24 @@
1213
from openpilot.tools.replay.seg_mgr import ReplayFlags
1314
from openpilot.tools.replay.timeline import FindFlag, TimelineType
1415

16+
17+
class QueueHandler(logging.Handler):
18+
"""Logging handler that queues messages for display in curses UI."""
19+
def __init__(self, queue: list, lock: threading.Lock):
20+
super().__init__()
21+
self._queue = queue
22+
self._lock = lock
23+
24+
def emit(self, record: logging.LogRecord) -> None:
25+
# Map logging levels to our color scheme
26+
level = 0 # DEBUG
27+
if record.levelno >= logging.ERROR:
28+
level = 2 # Critical/Red
29+
elif record.levelno >= logging.WARNING:
30+
level = 1 # Warning/Yellow
31+
with self._lock:
32+
self._queue.append((level, self.format(record)))
33+
1534
BORDER_SIZE = 3
1635

1736
KEYBOARD_SHORTCUTS = [
@@ -88,12 +107,11 @@ def __init__(self, replay: Replay):
88107
# Set up signal handler for clean exit
89108
signal.signal(signal.SIGINT, lambda s, f: setattr(self, '_exit', True))
90109

91-
# Set up log callback
92-
self.replay._seg_mgr.set_log_callback(self._on_log)
93-
94-
def _on_log(self, level: int, msg: str) -> None:
95-
with self._lock:
96-
self._logs.append((level, msg))
110+
# Set up logging handler to capture logs for display
111+
self._log_handler = QueueHandler(self._logs, self._lock)
112+
self._log_handler.setFormatter(logging.Formatter('%(message)s'))
113+
logging.getLogger("replay").addHandler(self._log_handler)
114+
logging.getLogger("replay").setLevel(logging.DEBUG)
97115

98116
def _init_curses(self, stdscr) -> None:
99117
self._stdscr = stdscr

tools/replay/replay.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/usr/bin/env python3
2+
import logging
23
import threading
34
import time
45
from enum import IntFlag
@@ -13,6 +14,8 @@
1314
from openpilot.tools.replay.seg_mgr import SegmentManager, ReplayFlags
1415
from openpilot.tools.replay.timeline import Timeline, FindFlag
1516

17+
log = logging.getLogger("replay")
18+
1619
DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
1720

1821

@@ -66,10 +69,10 @@ def __init__(self, route: str, allow: list[str] = None, block: list[str] = None,
6669

6770
def __del__(self):
6871
if hasattr(self, '_stream_thread') and self._stream_thread is not None and self._stream_thread.is_alive():
69-
print("shutdown: in progress...")
72+
log.info("shutdown: in progress...")
7073
self._interrupt_stream(lambda: setattr(self, '_exit', True) or False)
7174
self._stream_thread.join()
72-
print("shutdown: done")
75+
log.info("shutdown: done")
7376

7477
def _setup_services(self, allow: list[str], block: list[str]) -> None:
7578
active_services = []
@@ -83,7 +86,7 @@ def _setup_services(self, allow: list[str], block: list[str]) -> None:
8386
else:
8487
self._sockets[name] = False
8588

86-
print(f"active services: {', '.join(active_services)}")
89+
log.info(f"active services: {', '.join(active_services)}")
8790
if not self._sm:
8891
self._pm = messaging.PubMaster(active_services)
8992

@@ -159,7 +162,7 @@ def install_event_filter(self, filter_fn: Callable) -> None:
159162
self._event_filter = filter_fn
160163

161164
def load(self) -> bool:
162-
print(f"loading route {self._seg_mgr._route_name}")
165+
log.info(f"loading route {self._seg_mgr._route_name}")
163166
if not self._seg_mgr.load():
164167
return False
165168

@@ -175,7 +178,7 @@ def start(self, seconds: int = 0) -> None:
175178
def pause(self, pause: bool) -> None:
176179
if self._user_paused != pause:
177180
def update():
178-
print(f"{'paused...' if pause else 'resuming'} at {self.current_seconds:.2f} s")
181+
log.info(f"{'paused...' if pause else 'resuming'} at {self.current_seconds:.2f} s")
179182
self._user_paused = pause
180183
return not pause
181184
self._interrupt_stream(update)
@@ -191,10 +194,10 @@ def seek_to(self, seconds: float, relative: bool) -> None:
191194
target_segment = int(target_time / 60)
192195

193196
if not self._seg_mgr.has_segment(target_segment):
194-
print(f"Invalid seek to {target_time:.2f} s (segment {target_segment})")
197+
log.warning(f"Invalid seek to {target_time:.2f} s (segment {target_segment})")
195198
return
196199

197-
print(f"Seeking to {int(target_time)} s, segment {target_segment}")
200+
log.info(f"Seeking to {int(target_time)} s, segment {target_segment}")
198201
if self.on_seeking:
199202
self.on_seeking(target_time)
200203

@@ -272,7 +275,7 @@ def _start_stream(self, segment) -> None:
272275
params.put("CarParams", car_params_bytes)
273276
params.put("CarParamsPersistent", car_params_bytes)
274277
except Exception as e:
275-
print(f"failed to write CarParams: {e}")
278+
log.warning(f"failed to write CarParams: {e}")
276279
break
277280

278281
# Start camera server
@@ -314,7 +317,7 @@ def _stream_thread_fn(self) -> None:
314317
first_idx = i
315318
break
316319
else:
317-
print("waiting for events...")
320+
log.info("waiting for events...")
318321
self._events_ready = False
319322
continue
320323

@@ -328,7 +331,7 @@ def _stream_thread_fn(self) -> None:
328331
if last_idx >= len(events) and not self.has_flag(ReplayFlags.NO_LOOP):
329332
segments = list(self._seg_mgr._segments.keys())
330333
if segments and event_data.is_segment_loaded(max(segments)):
331-
print("reaches the end of route, restart from beginning")
334+
log.info("reaches the end of route, restart from beginning")
332335
self._stream_lock.release()
333336
self.seek_to(self._min_seconds, relative=False)
334337
self._stream_lock.acquire()
@@ -395,7 +398,7 @@ def _publish_message(self, evt) -> None:
395398
msg_bytes = evt.as_builder().to_bytes()
396399
self._pm.send(which, msg_bytes)
397400
except Exception as e:
398-
print(f"stop publishing {which} due to error: {e}")
401+
log.warning(f"stop publishing {which} due to error: {e}")
399402
self._sockets[which] = False
400403

401404
def _publish_frame(self, evt, which: str) -> None:

tools/replay/seg_mgr.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
#!/usr/bin/env python3
2+
import logging
23
import threading
34
from dataclasses import dataclass, field
45
from enum import Enum, IntFlag, auto
56
from typing import Callable, Optional
67

78
from openpilot.selfdrive.test.process_replay.migration import migrate_all
9+
10+
log = logging.getLogger("replay")
811
from openpilot.tools.lib.logreader import LogReader
912
from openpilot.tools.lib.route import Route, Segment
1013
from openpilot.tools.lib.framereader import FrameReader
@@ -69,7 +72,6 @@ def __init__(self, route_name: str, flags: int = 0, data_dir: str = "", auto_sou
6972

7073
self.segment_cache_limit = MIN_SEGMENTS_CACHE
7174
self._on_segment_merged_callback: Optional[Callable[[], None]] = None
72-
self._log_callback: Optional[Callable[[int, str], None]] = None # (level, msg)
7375

7476
def __del__(self):
7577
if hasattr(self, '_cv'):
@@ -87,7 +89,7 @@ def load(self) -> bool:
8789
try:
8890
self._route = Route(self._route_name, data_dir=self._data_dir)
8991
except Exception as e:
90-
print(f"failed to load route: {self._route_name}: {e}")
92+
log.error(f"failed to load route: {self._route_name}: {e}")
9193
return False
9294

9395
# Initialize segment slots for all available segments
@@ -97,10 +99,10 @@ def load(self) -> bool:
9799
self._segments[seg_num] = None
98100

99101
if not self._segments:
100-
print(f"no valid segments in route: {self._route_name}")
102+
log.error(f"no valid segments in route: {self._route_name}")
101103
return False
102104

103-
print(f"loaded route {self._route_name} with {len(self._segments)} valid segments")
105+
log.info(f"loaded route {self._route_name} with {len(self._segments)} valid segments")
104106
self._thread = threading.Thread(target=self._manage_segment_cache, daemon=True)
105107
self._thread.start()
106108
return True
@@ -116,13 +118,6 @@ def set_current_segment(self, seg_num: int) -> None:
116118
def set_callback(self, callback: Callable[[], None]) -> None:
117119
self._on_segment_merged_callback = callback
118120

119-
def set_log_callback(self, callback: Callable[[int, str], None]) -> None:
120-
self._log_callback = callback
121-
122-
def _log(self, level: int, msg: str) -> None:
123-
if self._log_callback:
124-
self._log_callback(level, msg)
125-
126121
def set_filters(self, filters: list[bool]) -> None:
127122
self._filters = filters
128123

@@ -188,14 +183,14 @@ def _load_segments_in_range(self, seg_nums: list[int], cur_seg_num: int) -> bool
188183
continue
189184

190185
# Load segment (blocking - downloads and parses)
191-
self._log(0, f"loading segment {seg_num}...")
186+
log.info(f"loading segment {seg_num}...")
192187
seg_data = self._load_segment(seg_num)
193188
with self._cv:
194189
self._segments[seg_num] = seg_data
195190
self._needs_update = True
196191
self._cv.notify_all()
197192
loaded_any = True
198-
self._log(0, f"segment {seg_num} loaded with {len(seg_data.events)} events")
193+
log.info(f"segment {seg_num} loaded with {len(seg_data.events)} events")
199194

200195
# Only load one segment at a time to be responsive
201196
return loaded_any
@@ -224,7 +219,7 @@ def _load_segment(self, seg_num: int) -> SegmentData:
224219
events = list(migrate_all(lr))
225220
seg_data.events = sorted(events, key=lambda x: x.logMonoTime)
226221
except Exception as e:
227-
print(f"failed to load log for segment {seg_num}: {e}")
222+
log.warning(f"failed to load log for segment {seg_num}: {e}")
228223
seg_data.load_state = LoadState.FAILED
229224
return seg_data
230225

@@ -243,7 +238,7 @@ def _load_segment(self, seg_num: int) -> SegmentData:
243238
if segment.qcamera_path and (self._flags & ReplayFlags.QCAMERA):
244239
seg_data.frame_readers['qcam'] = FrameReader(segment.qcamera_path)
245240
except Exception as e:
246-
print(f"failed to load frames for segment {seg_num}: {e}")
241+
log.warning(f"failed to load frames for segment {seg_num}: {e}")
247242
# Don't fail the whole segment, just skip frames
248243

249244
seg_data.load_state = LoadState.LOADED
@@ -289,5 +284,5 @@ def _merge_segments(self, seg_nums: list[int]) -> bool:
289284
self._event_data = merged_event_data
290285
self._merged_segments = segments_to_merge
291286

292-
self._log(0, f"merged segments: {sorted(segments_to_merge)}")
287+
log.debug(f"merged segments: {sorted(segments_to_merge)}")
293288
return True

0 commit comments

Comments
 (0)