Skip to content

Commit

Permalink
The simulation app now waits for the thread manager to be done before…
Browse files Browse the repository at this point in the history
… closing. This avoids a massive mess with threads.
  • Loading branch information
AntoineRichard committed Sep 24, 2024
1 parent 797d5e2 commit 73ff931
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/environments/large_scale_lunar.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def update(self) -> None:
def monitor_thread_is_alive(self) -> None:
return self.LSTM.map_manager.hr_dem_gen.monitor_thread.thread.is_alive()

def get_wait_for_threads(self) -> None:
return [self.LSTM.map_manager.hr_dem_gen.monitor_thread.thread.join]

def load(self) -> None:
"""
Loads the lab interactive elements in the stage.
Expand Down
77 changes: 76 additions & 1 deletion src/environments_wrappers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,86 @@

def startSim(cfg: dict):
from omni.isaac.kit import SimulationApp
import omni
from src.environments.rendering import set_lens_flares, set_chromatic_aberrations, set_motion_blur

class SimulationApp_wait(SimulationApp):
def __init__(self, launch_config: dict = None, experience: str = "") -> None:
super().__init__(launch_config, experience)
self.wait_for_threads = []

def add_wait(self, waiting_functions: list) -> None:
"""
Adds a list of functions that will wait until a condition is met before closing the simulation.
"""
self.wait_for_threads += waiting_functions

def close(self, wait_for_replicator=True) -> None:
"""Close the running Omniverse Toolkit."""
try:
# make sure that any replicator workflows finish rendering/writing
import omni.replicator.core as rep

if rep.orchestrator.get_status() not in [
rep.orchestrator.Status.STOPPED,
rep.orchestrator.Status.STOPPING,
]:
rep.orchestrator.stop()
if wait_for_replicator:
rep.orchestrator.wait_until_complete()

# Disable capture on play to avoid replicator engaging on any new timeline events
rep.orchestrator.set_capture_on_play(False)
except Exception:
pass

for wait in self.wait_for_threads:
self._app.print_and_log(f"Waiting for external thread to join: {wait}")
wait()

# workaround for exit issues, clean the stage first:
if omni.usd.get_context().can_close_stage():
omni.usd.get_context().close_stage()
# omni.kit.app.get_app().update()
# check if exited already
if not self._exiting:
self._exiting = True
self._app.print_and_log("Simulation App Shutting Down")

# We are exisitng but something is still loading, wait for it to load to avoid a deadlock
def is_stage_loading() -> bool:
"""Convenience function to see if any files are being loaded.
bool: Convenience function to see if any files are being loaded. True if loading, False otherwise
"""
import omni.usd

context = omni.usd.get_context()
if context is None:
return False
else:
_, _, loading = context.get_stage_loading_status()
return loading > 0

if is_stage_loading():
print(
" Waiting for USD resource operations to complete (this may take a few seconds), use Ctrl-C to exit immediately"
)
while is_stage_loading():
self._app.update()

self._app.shutdown()
# disabled on linux to workaround issues where unloading plugins causes carb to fail
self._framework.unload_all_plugins()
# Force all omni module to unload on close
# This prevents crash on exit
# for m in list(sys.modules.keys()):
# if "omni" in m and m != "omni.kit.app":
# del sys.modules[m]
print("Simulation App Shutdown Complete")

# Starts the simulation and allows to import things related to Isaac and PXR
renderer_cfg = cfg["rendering"]["renderer"]
simulation_app = SimulationApp(renderer_cfg.__dict__)
simulation_app = SimulationApp_wait(renderer_cfg.__dict__)
set_lens_flares(cfg)
set_motion_blur(cfg)
set_chromatic_aberrations(cfg)
Expand Down
7 changes: 7 additions & 0 deletions src/environments_wrappers/ros2/base_wrapper_ros2.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,10 @@ def monitor_thread_is_alive(self):
"""

return True

def get_wait_for_threads(self):
"""
Returns the list of waiting threads.
"""

return []
3 changes: 3 additions & 0 deletions src/environments_wrappers/ros2/largescale_ros2.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,6 @@ def set_sun_pose(self, data: Pose) -> None:

def monitor_thread_is_alive(self):
return self.LC.monitor_thread_is_alive()

def get_wait_for_threads(self):
return self.LC.get_wait_for_threads()
3 changes: 3 additions & 0 deletions src/environments_wrappers/ros2/simulation_manager_ros2.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ def __init__(
self.exec2_thread = Thread(target=self.exec2.spin, daemon=True, args=())
self.exec2_thread.start()

if self.ROSLabManager.get_wait_for_threads():
self.simulation_app.add_wait(self.ROSLabManager.get_wait_for_threads())

# Have you ever asked your self: "Is there a limit of topics one can subscribe to in ROS2?"
# Yes "Josh" there is.
# 24 topics. More than that and you won't reveive any messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,6 @@ def monitor_main_thread(self) -> None:
- The ctrl-c signal. This was added as it seems that the simulation was completely ignoring the ctrl-c signal.
"""

use_multiprocessing = False
while not self.event.is_set():
time.sleep(1)
if not threading.main_thread().is_alive():
Expand All @@ -818,13 +817,9 @@ def monitor_main_thread(self) -> None:
logger.warn(
"Use kill -9 PID to kill the remaining threads. PID being the PIDs returned by the previous command."
)
use_multiprocessing = True
break
if self.ctrl_c:
logger.debug("Ctrl-C caught, shutting down workers.")
break
if use_multiprocessing:
self.apply_shutdowns_in_different_process()
else:
self.apply_shutdowns()
self.apply_shutdowns()
logger.debug("Thread monitor exiting.")

0 comments on commit 73ff931

Please sign in to comment.