-
Notifications
You must be signed in to change notification settings - Fork 5
/
run_cart_pole.py
114 lines (93 loc) · 3.47 KB
/
run_cart_pole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
Example of an efficiently implemented RL model
in a 1D environment. This example uses the CartPole
environment from OpenAI's gym.
"""
import numpy as np
import gym
import time
from mpi4py import MPI
import heapq
import tensorflow as tf
from models import OneDimModel
from utils import *
import multiprocessing
import pickle
def worker(model, max_steps=1000):
"""
Performs the game simulation, and is called across all processes.
Returns a list of training data in the shape (n_steps, observation_shape),
and the total reward of the rollout.
"""
train_data = []
# https://gym.openai.com/envs/CartPole-v0/
env = gym.make('CartPole-v0')
obs = env.reset()
ep_reward = 0
for _ in range(max_steps):
act, val = model.gen_actions_and_values([obs])
act, val = act[0], val[0]
next_obs, rew, d, _ = env.step(act)
train_data.append([obs, act, rew, val, next_obs])
obs = next_obs
ep_reward += rew
if d:
break
train_data = np.asarray(train_data)
ep_reward = np.sum(train_data[:, 2])
# Calculate GAEs and replace values with the new values.
train_data[:, 3] = calculate_gaes(train_data[:, 2], train_data[:, 3])
return train_data, ep_reward
if __name__ == '__main__':
### Setup for MPI ###
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
n_processes = comm.Get_size()
controller = 0
### Define starting parameters ###
n_epochs = 1000
n_train_batches = 32
n_process_batches = int(n_train_batches / n_processes)
log_freq = 5
init_logger('training.log')
### Enable gpu usage for just the main process for training ###
if rank == controller:
device_config = tf.ConfigProto()
else:
device_config = tf.ConfigProto(device_count={'GPU': 0})
model = OneDimModel(comm, controller, rank, n_acts=2,
obs_shape=(4,), sess_config=device_config)
all_rewards = []
for epoch in range(1, n_epochs+1):
batch_data = []
train_data = []
for _ in range(n_process_batches):
### Simulate more episodes to gain training data ###
if rank == controller:
batch_data = comm.gather(worker(model), controller)
batch_train_data = [datum[0] for datum in batch_data]
batch_reward_data = [datum[1] for datum in batch_data]
train_data.extend(batch_train_data)
all_rewards.extend(batch_reward_data)
else:
comm.gather(worker(model), controller)
if rank == controller:
### Log and print reward ###
if epoch % log_freq == 0:
print(
f'Epoch: {epoch}, Avg Reward: {np.mean(all_rewards[-n_train_batches:])}')
log(
f'Epoch: {epoch}, Avg Reward: {np.mean(all_rewards[-n_train_batches:])}')
### Format training data ###
train_data = np.concatenate(train_data)
np.random.shuffle(train_data)
obs_train_data = np.vstack(train_data[:, 0])
action_train_data = train_data[:, 1]
reward_train_data = train_data[:, 2]
gae_train_data = train_data[:, 3]
### Train model and sync weights with all processes ###
model.train_policy(
obs_train_data, action_train_data, reward_train_data, gae_train_data)
model.sync_weights()
else:
model.sync_weights()