-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodels.py
165 lines (102 loc) · 5.33 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import numpy as np
class PolicyIteration:
def __init__(self, env, alpha=0.7, gamma=0.9, theta=1e-8, max_iterations=1e4) -> None:
self.env = env
self.alpha = alpha
self.gamma = gamma
self.theta = theta
self.max_iterations = max_iterations
self.V = 0.01 * np.random.rand(self.env.nS) # the value function
self.policy = np.ones([self.env.nS, self.env.nA]) / self.env.nA # the policy table
pass
def policy_evaluation(self):
# iterate
for num_iterations in range(1, int(self.max_iterations)+1):
# this var denotes the change between two iterations
# if the change is insignificant then we break prematurely
delta = 0
for state in range(self.env.nS):
# the new value of the current state
v = 0
# iterate all actions that the agent can take at this state
for action, action_prob in enumerate(self.policy[state]):
# iterate all state that the agent can go
for state_prob, next_state, reward, done in self.env.P[state][action]:
# calculate the new value of V
v += action_prob * state_prob * (reward + self.gamma * self.V[next_state])
# calculate the max. change of this episode
delta = max(np.abs(self.V[state] - v), delta)
self.V[state] = v
if (delta < self.theta):
# print("Policy evaluated in {} iterations".format(num_iterations))
return
return
def _next_step(self, state):
action_table = np.zeros(self.env.nA)
for action in range(self.env.nA):
# iterate all state that the agent can go next
for state_prob, next_state, reward, _ in self.env.P[state][action]:
action_table[action] += state_prob * (reward + self.gamma * self.V[next_state])
return action_table
def policy_iteration(self):
# repeat the process until the policy converges or the number of iterations is reached
for num_evaluation in range(int(self.max_iterations)+1):
policy_stable = True
# evaluate the current policy
V = self.policy_evaluation()
# policy improvement part
for state in range(self.env.nS):
# get the current action based on current policy
cur_action = np.argmax(self.policy[state])
action_value = self._next_step(state)
iterated_action = np.argmax(action_value)
# check if our policy table changes or not
if cur_action != iterated_action:
policy_stable = False
# update our policy table
self.policy[state] += (self.alpha*np.eye(self.env.nA)[iterated_action])
row_sums = self.policy[state].sum()
self.policy[state] = self.policy[state] / row_sums
# if the policy table becomes stable,
# then we can return it
if policy_stable:
# print("Evaluated at {} iterations".format(num_evaluation))
return
return
class ValueIteration:
def __init__(self, env, alpha=0.7, gamma=0.9, theta=1e-8, max_iterations=1e4) -> None:
self.env = env
self.alpha = alpha
self.gamma = gamma
self.theta = theta
self.max_iterations = max_iterations
self.V = 0.01 * np.random.rand(self.env.nS) # the value function
self.policy = np.zeros([self.env.nS, self.env.nA], dtype=int) # the policy table
pass
def value_iteration(self):
for _ in range(int(self.max_iterations)):
delta = 0
for state in range(self.env.nS):
action_val = self._next_step(state)
new_val = np.max(action_val)
delta = max(np.abs(self.V[state] - new_val), delta)
# update the value
self.V[state] = new_val
if delta < self.theta:
# print("Value table evaluated in {} iterations".format(num_iterations))
break
self._create_policy_table()
return
def _create_policy_table(self):
for state in range(self.env.nS):
action_value = self._next_step(state)
optimal_pi = np.argmax(action_value)
self.policy[state, optimal_pi] = 1
return
def _next_step(self, state):
action_table = np.zeros(self.env.nA)
for action in range(self.env.nA):
# iterate all state that the agent can go next
for state_prob, next_state, reward, _ in self.env.P[state][action]:
action_table[action] += state_prob * (reward + self.gamma * self.V[next_state])
return action_table