Skip to content

BrokenPipeError when combining Optuna with SubprocVecEnv (multiprocessing) #161

@seb0908

Description

@seb0908

Hi all,

I am trying to use multiprocessing (via SubVecEnv) together with Optuna using BOPTEST Gym. However, my program crashes with a BrokenPipeError (see below).

  • SubprocVecEnv without Optuna works fine.
  • Optuna with DummyVecEnv also works.
  • But combining both (SubprocVecEnv + Optuna) fails.

Could someone help me how to change the code to get it working with multiprocessing?

Here my code:

import os
import numpy as np
import multiprocessing as mp
import optuna as op
from optuna.pruners import MedianPrunerOptunaEvalCallback
from stable_baselines3 import SAC
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv, VecMonitor
from boptestGymEnv import BoptestGymEnv, NormalizedObservationWrapper, DiscretizedActionWrapper, 
from examples import run_baseline, run_sample, run_save_callback,\
    run_variable_episode, run_vectorized, run_multiaction, train_RL

class OptunaOptimization():

  def __init__(self, url, n_envs, n_trials, max_timesteps):
    self.url = url
    self.n_envs = n_envs
    self.n_trials = n_trials
    self.max_timesteps = max_timesteps

  def run_training(self):
    study = op.create_study(direction="maximize", pruner=MedianPruner(n_startup_trials=5, n_warmup_steps=2))
    study.optimize(self.objective, n_trials = self.n_trials)

  def objective(self,trial):
    log_dir = os.path.join("./logs", f"trial_{trial.number}")
    os.makedirs(log_dir, exist_ok=True) 
    learning_rate = trial.suggest_float('learning_rate',1e-5,1e-2, log=True)
    gamma = trial.suggest_float('gamma',0.9,0.999)
    ent_coef = trial.suggest_float('ent_coef', 0.1, 1.0)
    tau = trial.suggest_float('tau', 0.001, 0.1)
    buffer_size = trial.suggest_int('buffer_size', 1e5, 1e7, log=True)
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256, 512, 1024]) 
    return self.train_agent(trial, n_envs=self.n_envs, max_timesteps=self.max_timesteps, learning_rate=learning_rate, 
                            gamma=gamma, ent_coef=ent_coef, tau=tau, buffer_size=buffer_size, batch_size=batch_size)
  
  def train_agent(self, trial, n_envs=2, max_timesteps=int(1e+5), n_splits=20, verbose=1, 
                  learning_rate=0.1, gamma=0.99, ent_coef = 0.1, tau = 0.1, buffer_size=int(1e+6), 
                  batch_size=128):
    
    if self.n_envs == 1:
      env_train = VecMonitor(venv=DummyVecEnv([run_vectorized.make_env(seed=1)]))
    else:
      env_train = VecMonitor(venv=SubprocVecEnv([run_vectorized.make_env(seed=1+i) for i in range(n_envs)]))

    env_evaluate = DummyVecEnv([lambda: run_vectorized.make_env(seed=1)()])  
        
    eval_callback = OptunaEvalCallback(env_evaluate, trial=trial, n_eval_episodes=2, eval_freq=self.max_timesteps/n_splits, 
                                       log_path=None, best_model_save_path=None, deterministic=True, render=False)  

    model = SAC('MlpPolicy', env_train, verbose=verbose, learning_rate=learning_rate, gamma=gamma, 
                ent_coef = ent_coef, tau = tau, buffer_size=buffer_size, batch_size=batch_size, 
                learning_starts=24, train_freq=1, tensorboard_log="./logs/", device='cpu')

    model.learn(total_timesteps=max_timesteps, callback=eval_callback, tb_log_name="SAC_Trial{}".format(trial.number), progress_bar=True)
    env_train.close()
    env_evaluate.close()
    return eval_callback.last_eval_mean_reward

if __name__ == "__main__":
  mp.set_start_method("spawn", force=True)
  opt = OptunaOptimization(url='http://127.0.0.1', n_envs=2, n_trials=20, max_timesteps=int(1e+4))
  opt.run_training()

Here the error traceback:

Traceback (most recent call last):
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\vec_env\subproc_vec_env.py", line 46, in _worker
    observation, reset_info = env.reset(seed=data[0], **maybe_options)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\gymnasium\core.py", line 553, in reset
    obs, info = self.env.reset(seed=seed, options=options)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\boptestGymEnv.py", line 490, in reset
    res = requests.put('{0}/initialize/{1}'.format(self.url,self.testid),
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'payload'
[W 2025-05-06 19:55:02,720] Trial 0 failed with parameters: {'learning_rate': 0.0004144161427253373, 'gamma': 0.9823286814016307, 'ent_coef': 0.8818468464865998, 'tau': 0.01163540012881728, 'buffer_size': 254671, 'batch_size': 32} because of the following error: EOFError().
Traceback (most recent call last):
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 328, in _recv_bytes
    nread, err = ov.GetOverlappedResult(True)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 109] Die Pipe wurde beendet

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\optuna\study\_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "c:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\testcases\testcase1_SAC.py", line 44, in objective
    return self.train_agent(trial, n_envs=self.n_envs, max_timesteps=self.max_timesteps, learning_rate=learning_rate,
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\testcases\testcase1_SAC.py", line 65, in train_agent
    model.learn(total_timesteps=max_timesteps, callback=eval_callback, tb_log_name="SAC_Trial{}".format(trial.number), progress_bar=True)
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\sac\sac.py", line 308, in learn
    return super().learn(
           ^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\off_policy_algorithm.py", line 314, in learn
    total_timesteps, callback = self._setup_learn(
                                ^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\off_policy_algorithm.py", line 297, in _setup_learn
    return super()._setup_learn(
           ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\base_class.py", line 424, in _setup_learn
    self._last_obs = self.env.reset()  # type: ignore[assignment]
                     ^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\vec_env\vec_monitor.py", line 70, in reset
    obs = self.venv.reset()
          ^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\vec_env\subproc_vec_env.py", line 145, in reset
    results = [remote.recv() for remote in self.remotes]
               ^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
          ^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 337, in _recv_bytes
    raise EOFError
EOFError
[W 2025-05-06 19:55:02,726] Trial 0 failed with value None.
Traceback (most recent call last):
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 328, in _recv_bytes
    nread, err = ov.GetOverlappedResult(True)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 109] Die Pipe wurde beendet

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\testcases\testcase1_SAC.py", line 73, in <module>
    opt.run_training()
  File "c:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\testcases\testcase1_SAC.py", line 33, in run_training
    study.optimize(self.objective, n_trials = self.n_trials)
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\optuna\study\study.py", line 475, in optimize
    _optimize(
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\optuna\study\_optimize.py", line 63, in _optimize
    _optimize_sequential(
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\optuna\study\_optimize.py", line 160, in _optimize_sequential
    frozen_trial = _run_trial(study, func, catch)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\optuna\study\_optimize.py", line 248, in _run_trial
    raise func_err
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\optuna\study\_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "c:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\testcases\testcase1_SAC.py", line 44, in objective
    return self.train_agent(trial, n_envs=self.n_envs, max_timesteps=self.max_timesteps, learning_rate=learning_rate,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\sebas\Documents\Studium\Masterarbeit\Code\project1-boptest-gym\testcases\testcase1_SAC.py", line 65, in train_agent
    model.learn(total_timesteps=max_timesteps, callback=eval_callback, tb_log_name="SAC_Trial{}".format(trial.number), progress_bar=True)
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\sac\sac.py", line 308, in learn
    return super().learn(
           ^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\off_policy_algorithm.py", line 314, in learn
    total_timesteps, callback = self._setup_learn(
                                ^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\off_policy_algorithm.py", line 297, in _setup_learn
    return super()._setup_learn(
           ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\base_class.py", line 424, in _setup_learn
    self._last_obs = self.env.reset()  # type: ignore[assignment]
                     ^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\vec_env\vec_monitor.py", line 70, in reset
    obs = self.venv.reset()
          ^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\site-packages\stable_baselines3\common\vec_env\subproc_vec_env.py", line 145, in reset
    results = [remote.recv() for remote in self.remotes]
               ^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
          ^^^^^^^^^^^^^^^^^^
  File "C:\Users\sebas\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 337, in _recv_bytes
    raise EOFError
EOFError

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions