-
Notifications
You must be signed in to change notification settings - Fork 4
/
model.py
175 lines (146 loc) · 9.98 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import numpy as np
from tensorflow.keras import backend as K
from tensorflow.keras import activations, initializers
from tensorflow.keras.layers import Layer
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
import warnings
from itertools import zip_longest
from abc import ABC, abstractmethod
import numpy as np
import tensorflow as tf
from gym.spaces import Discrete
from stable_baselines.common.tf_util import batch_to_seq, seq_to_batch
from stable_baselines.common.tf_layers import conv, linear, conv_to_fc, lstm
from stable_baselines.common.distributions import make_proba_dist_type, CategoricalProbabilityDistribution, \
MultiCategoricalProbabilityDistribution, DiagGaussianProbabilityDistribution, BernoulliProbabilityDistribution
from stable_baselines.common.input import observation_input
from stable_baselines.common.policies import nature_cnn
def bnn_extractor(flat_observations, net_arch, act_fun):
"""
Constructs an variational layer that receives observations as an input and outputs a latent representation for the policy and
a value network. The ``net_arch`` parameter allows to specify the amount and size of the hidden layers and how many
of them are shared between the policy network and the value network. It is assumed to be a list with the following
structure:
1. An arbitrary length (zero allowed) number of integers each specifying the number of units in a shared layer.
If the number of ints is zero, there will be no shared layers.
2. An optional dict, to specify the following non-shared layers for the value network and the policy network.
It is formatted like ``dict(vf=[<value layer sizes>], pi=[<policy layer sizes>])``.
If it is missing any of the keys (pi or vf), no non-shared layers (empty list) is assumed.
For example to construct a network with one shared layer of size 55 followed by two non-shared layers for the value
network of size 255 and a single non-shared layer of size 128 for the policy network, the following layers_spec
would be used: ``[55, dict(vf=[255, 255], pi=[128])]``. A simple shared network topology with two layers of size 128
would be specified as [128, 128].
:param flat_observations: (tf.Tensor) The observations to base policy and value function on.
:param net_arch: ([int or dict]) The specification of the policy and value networks.
See above for details on its formatting.
:param act_fun: (tf function) The activation function to use for the networks.
:return: (tf.Tensor, tf.Tensor) latent_policy, latent_value of the specified network.
If all layers are shared, then ``latent_policy == latent_value``
"""
latent = flat_observations
policy_only_layers = [] # Layer sizes of the network that only belongs to the policy network
value_only_layers = [] # Layer sizes of the network that only belongs to the value network
kernel_divergence_fn=lambda q, p, _: tfp.distributions.kl_divergence(q, p)
# Iterate through the shared layers and build the shared parts of the network
for idx, layer in enumerate(net_arch):
if isinstance(layer, int): # Check that this is a shared layer
layer_size = layer
# latent = act_fun(linear(latent, "shared_fc{}".format(idx), layer_size, init_scale=np.sqrt(2)))
latent = act_fun(tfp.layers.DenseFlipout(layer_size, name="shared_fc{}".format(idx), activation='relu', kernel_divergence_fn=kernel_divergence_fn)(latent))
else:
assert isinstance(layer, dict), "Error: the net_arch list can only contain ints and dicts"
if 'pi' in layer:
assert isinstance(layer['pi'], list), "Error: net_arch[-1]['pi'] must contain a list of integers."
policy_only_layers = layer['pi']
if 'vf' in layer:
assert isinstance(layer['vf'], list), "Error: net_arch[-1]['vf'] must contain a list of integers."
value_only_layers = layer['vf']
break # From here on the network splits up in policy and value network
# Build the non-shared part of the network
latent_policy = latent
latent_value = latent
for idx, (pi_layer_size, vf_layer_size) in enumerate(zip_longest(policy_only_layers, value_only_layers)):
if pi_layer_size is not None:
assert isinstance(pi_layer_size, int), "Error: net_arch[-1]['pi'] must only contain integers."
# latent_policy = act_fun(linear(latent_policy, "pi_fc{}".format(idx), pi_layer_size, init_scale=np.sqrt(2)))
latent_policy = act_fun(tfp.layers.DenseFlipout(pi_layer_size, name="pi_fc{}".format(idx), activation='relu', kernel_divergence_fn=kernel_divergence_fn)(latent_policy))
if vf_layer_size is not None:
assert isinstance(vf_layer_size, int), "Error: net_arch[-1]['vf'] must only contain integers."
# latent_value = act_fun(linear(latent_value, "vf_fc{}".format(idx), vf_layer_size, init_scale=np.sqrt(2)))
latent_value = act_fun(tfp.layers.DenseFlipout(vf_layer_size, name="vf_fc{}".format(idx), activation='relu', kernel_divergence_fn=kernel_divergence_fn)(latent_value))
return latent_policy, latent_value
from stable_baselines.common.policies import ActorCriticPolicy, nature_cnn
class FeedForwardPolicy(ActorCriticPolicy):
"""
Policy object that implements actor critic, using a feed forward neural network.
:param sess: (TensorFlow session) The current TensorFlow session
:param ob_space: (Gym Space) The observation space of the environment
:param ac_space: (Gym Space) The action space of the environment
:param n_env: (int) The number of environments to run
:param n_steps: (int) The number of steps to run for each environment
:param n_batch: (int) The number of batch to run (n_envs * n_steps)
:param reuse: (bool) If the policy is reusable or not
:param layers: ([int]) (deprecated, use net_arch instead) The size of the Neural network for the policy
(if None, default to [64, 64])
:param net_arch: (list) Specification of the actor-critic policy network architecture (see mlp_extractor
documentation for details).
:param act_fun: (tf.func) the activation function to use in the neural network.
:param cnn_extractor: (function (TensorFlow Tensor, ``**kwargs``): (TensorFlow Tensor)) the CNN feature extraction
:param feature_extraction: (str) The feature extraction type ("cnn" or "mlp")
:param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction
"""
def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=False, layers=None, net_arch=None,
act_fun=tf.tanh, cnn_extractor=nature_cnn, feature_extraction="cnn", **kwargs):
super(FeedForwardPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=reuse,
scale=(feature_extraction == "cnn"))
self._kwargs_check(feature_extraction, kwargs)
if layers is not None:
warnings.warn("Usage of the `layers` parameter is deprecated! Use net_arch instead "
"(it has a different semantics though).", DeprecationWarning)
if net_arch is not None:
warnings.warn("The new `net_arch` parameter overrides the deprecated `layers` parameter!",
DeprecationWarning)
if net_arch is None:
if layers is None:
layers = [64, 64]
net_arch = [dict(vf=layers, pi=layers)]
with tf.variable_scope("model", reuse=reuse):
if feature_extraction == "cnn":
pi_latent = vf_latent = cnn_extractor(self.processed_obs, **kwargs)
elif feature_extraction == "bnn":
pi_latent, vf_latent = bnn_extractor(tf.layers.flatten(self.processed_obs), net_arch, act_fun)
else:
pi_latent, vf_latent = mlp_extractor(tf.layers.flatten(self.processed_obs), net_arch, act_fun)
self._value_fn = linear(vf_latent, 'vf', 1)
self._proba_distribution, self._policy, self.q_value = \
self.pdtype.proba_distribution_from_latent(pi_latent, vf_latent, init_scale=0.01)
self._setup_init()
def step(self, obs, state=None, mask=None, deterministic=False):
if deterministic:
action, value, neglogp = self.sess.run([self.deterministic_action, self.value_flat, self.neglogp],
{self.obs_ph: obs})
else:
action, value, neglogp = self.sess.run([self.action, self.value_flat, self.neglogp],
{self.obs_ph: obs})
return action, value, self.initial_state, neglogp
def proba_step(self, obs, state=None, mask=None):
return self.sess.run(self.policy_proba, {self.obs_ph: obs})
def value(self, obs, state=None, mask=None):
return self.sess.run(self.value_flat, {self.obs_ph: obs})
class BnnPolicy(FeedForwardPolicy):
"""
Policy object that implements actor critic, using a Bayesian neural net (2 layers of 64)
:param sess: (TensorFlow session) The current TensorFlow session
:param ob_space: (Gym Space) The observation space of the environment
:param ac_space: (Gym Space) The action space of the environment
:param n_env: (int) The number of environments to run
:param n_steps: (int) The number of steps to run for each environment
:param n_batch: (int) The number of batch to run (n_envs * n_steps)
:param reuse: (bool) If the policy is reusable or not
:param _kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction
"""
def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=False, **_kwargs):
super(BnnPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse,
feature_extraction="bnn", **_kwargs)