-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathddpg.py
83 lines (72 loc) · 3.02 KB
/
ddpg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import numpy as np
import tensorflow as tf
from actor import Actor
from noise import Noise
from critic import Critic
from memory import Memory
class DDPG:
def __init__(self, sess, params):
self.sess = sess
self.__dict__.update(params)
# create placeholders
self.create_input_placeholders()
# create actor/critic models
self.actor = Actor(self.sess, self.inputs, **self.actor_params)
self.critic = Critic(self.sess, self.inputs, **self.critic_params)
self.noise_params = {k: np.array(list(map(float, v.split(","))))
for k, v in self.noise_params.items()}
self.noise = Noise(**self.noise_params)
self.ou_level = np.zeros(self.dimensions["u"])
self.memory = Memory(self.n_mem_objects,
self.memory_size)
def create_input_placeholders(self):
self.inputs = {}
with tf.name_scope("inputs"):
for ip_name, dim in self.dimensions.items():
self.inputs[ip_name] = tf.placeholder(tf.float32,
shape=(None, dim),
name=ip_name)
self.inputs["g"] = tf.placeholder(tf.float32,
shape=self.inputs["u"].shape,
name="a_grad")
self.inputs["p"] = tf.placeholder(tf.float32,
shape=(None, 1),
name="pred_q")
def step(self, x, is_u_discrete, explore=True):
x = x.reshape(-1, self.dimensions["x"])
u = self.actor.predict(x)
if explore:
self.ou_level = self.noise.ornstein_uhlenbeck_level(self.ou_level)
u = u + self.ou_level
q = self.critic.predict(x, u)
if is_u_discrete:
return [np.argmax(u), u[0], q[0]]
return [u[0], u, q[0]]
def remember(self, experience):
self.memory.add(experience)
def train(self):
# check if the memory contains enough experiences
if self.memory.size < 3*self.b_size:
return
x, g, ag, u, r, nx, ng, t = self.get_batch()
# for her transitions
her_idxs = np.where(np.random.random(self.b_size) < 0.80)[0]
# print("{} of {} selected for HER transitions".
# format(len(her_idxs), self.b_size))
g[her_idxs] = ag[her_idxs]
r[her_idxs] = 1
t[her_idxs] = 1
x = np.hstack([x, g])
nx = np.hstack([nx, ng])
nu = self.actor.predict_target(nx)
tq = r + self.gamma*self.critic.predict_target(nx, nu)*(1-t)
self.critic.train(x, u, tq)
grad = self.critic.get_action_grads(x, u)
# print("Grads:\n", g)
self.actor.train(x, grad)
self.update_targets()
def get_batch(self):
return self.memory.sample(self.b_size)
def update_targets(self):
self.critic.update_target()
self.actor.update_target()