-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreinforcement.py
563 lines (505 loc) · 23.6 KB
/
reinforcement.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
import argparse
import os
import chess
import torch
import random
import torch.nn as nn
import torch.optim as optim
from tools import initializeStockfish
import numpy as np
import csv
import copy
# Read hyperparameters and configuration parameters
parser = argparse.ArgumentParser()
parser.add_argument("--mode", default="train", nargs="?",
help="Choose to create examples (--mode=examples) with 10 games and 20 turns each "
"or train the model (--mode=train) (default: train)")
parser.add_argument("--epochs", default=10, type=int, nargs="?",
help="Number of epochs to train (default: 10)")
parser.add_argument("--batch", default=600, type=int, nargs="?",
help="Batch size of training examples used for each epoch (default: 600)")
parser.add_argument("--name", default="reinforcement.pt", nargs="?",
help="Name to load/save model (default: reinforcement.pt)")
parser.add_argument("--dataset", default="training.csv", nargs="?",
help="Dataset used for training and saving new examples when interacting with the environment (default: training.csv")
parser.add_argument("--lr", default=0.001, type=float, nargs="?",
help="Learning rate for the model (default: 0.001)")
parser.add_argument("--epsilon", default=0.95, type=float, nargs="?",
help="Epsilon for epsilon greedy (default: 0.95)")
parser.add_argument("--gamma", default=0.95, type=float, nargs="?",
help="Gamma to discount future rewards (default: 0.95)")
parser.add_argument("--enemy-elo", default=1000, type=int, nargs="?",
help="Elo of the Stockfish enemy player in the environment (default: 1000)")
args, unknown = parser.parse_known_args()
# Set arguments passed in call
n_epochs = args.epochs
batch_size = args.batch
csv_file_name = f"data/reinforcement/{args.dataset}"
model_name = f"models/reinforcement/{args.name}"
lr = args.lr
epsilon = args.epsilon
gamma = args.gamma
evaluator = initializeStockfish() # Best Stockfish possible to make evaluations
# Training or creating new examples is decided in last lines
# Explanation of network input and output sizes
possible_pieces = 5 * 2 + 1 # Each player has 5 unique pieces (King, Pawn, Knight, Rook, Bishop) and empty field
fields_in_chess_board = 8*8 # 1-8 and a-h
input_size = possible_pieces*fields_in_chess_board # so each field can have 11 unique pieces on it
no_of_white_pieces = 8 # We play only white, therefore only need white actions
output_size = no_of_white_pieces * fields_in_chess_board # Simplified assumptuion that every piece can move everywhere during the game
# One Hot Encoding of the chess squares to act as input for the Neural Network
one_hot_mapping = {
0: [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # Empty
1: [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], # White Pawn
3: [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0], # White Bishop
4: [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0], # White Knight
5: [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0], # White Rook
1000: [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0], # White King
-1: [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0], # Black Pawn
-3: [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0], # Black Bishop
-4: [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0], # Black Knight
-5: [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0], # Black Rook
-1000: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1] # Black King
}
def create_starting_board():
"""
Returns a new mini-chess board in starting position
:return: New chess board
"""
return chess.Board("2rknb2/2pppp2/8/8/8/8/2PPPP2/2RKNB2 w - - 0 1")
def convertPositionToString(fen):
"""
Maps a FEN String to a numerical representation: {'p': 1, 'r': 5, 'n': 4, 'b': 3, 'k': 1000}
:param fen: A FEN String
:return: String representation of the board that is
"""
# We dont consider the queen because its mini-chess
piece_values = {'p': 1, 'r': 5, 'n': 4, 'b': 3, 'k': 1000}
fen_board = chess.Board(fen)
fen_board = str(fen_board)
lines = fen_board.split('\n')
result = []
for line in lines:
for char in line.split(' '):
char = char.strip()
if char.lower() in piece_values:
value = piece_values[char.lower()]
result.append(str(value) if char.islower() else str(-value))
else:
result.append('0')
return ','.join(result)
def transformSingleBoardToOneHot(state_param):
"""
Transforms a chess.Board into a flat one hot encoded vector
:param state_param: board
:return: Flat one hot encoding of the board
"""
evaluator.set_fen_position(state_param.fen())
state_param = convertPositionToString(evaluator.get_fen_position())
state_param = state_param.split(',')
newBoardRepresentation = np.array([])
for field in state_param[:]:
newBoardRepresentation = np.append(newBoardRepresentation, one_hot_mapping[int(field)])
return newBoardRepresentation
def get_piece_type_of(piece_name):
"""
Maps the piece name to the chess-library enum
:param piece_name: The name of the piece to be mapped
:return: The mapping from the chess library to the piece_name
"""
match piece_name:
case "Rook":
return chess.ROOK
case "King":
return chess.KING
case "Knight":
return chess.KNIGHT
case "Bishop":
return chess.BISHOP
case _:
return chess.PAWN
def determine_pawn_from_file(file):
"""
Maps the file to the associated pawn
:param file: The file of the board
:return: Index of the pawn from the piece_map
"""
match file:
case "c":
return 0
case "d":
return 1
case "e":
return 2
case "f":
return 3
case _:
return 0 # Cant move there anways because its not on a pawn file so we dont care about it. just so we dont get errors
def map_action_indice_to_move(state, action):
"""
Maps the action index to a move that the chess engine can understand
:param state: Current state
:param action: Considered action as an index
:return: The UCI representation of a move or None if the associated piece doesn't exist anymore
"""
if action is None:
return None
piece_map = ["Pawn1", "Pawn2", "Pawn3", "Pawn4",
"Rook", "King", "Knight", "Bishop"]
piece_name = piece_map[action // 64]
file = chr(action % 8 + 97)
number = (action % 64) // 8 + 1
piece_type = get_piece_type_of(piece_name)
square_indices = list(state.pieces(piece_type, chess.WHITE))
# Can have more than one square_indices if there are multiple pawns
if len(square_indices) > 1:
pawn_index = determine_pawn_from_file(file)
try:
current_position_of_piece = chess.square_name(square_indices[pawn_index])
except:
return None # P# iece doesnt exist anymore
else:
try:
current_position_of_piece = chess.square_name(square_indices[0])
except:
return None # Piece doesnt exist anymore
return current_position_of_piece + file + str(number)
def get_epsilon_greedy_move_from_q_values(state, q_value_action):
"""
Using Epsilon-Greedy it chooses a move for the agent to play
:param state: Considered board
:param q_value_action: The predicted Q-Values from the Network
:return: A legal move and its index or (chess.Move.from_uci("0000"), -1) if too many iterations were needed to find a legal move
"""
# Only want to try 512 times, otherwise it takes too long to find a legal move sometimes
for i in range(512):
# Greedy Epsilon
if np.random.rand() < epsilon:
action_index = np.random.randint(0, output_size)
else:
action_index = np.argmax(q_value_action.detach().numpy())
move = map_action_indice_to_move(state, action_index)
move_algebraic_notation = uci_to_algebraic_notation(move)
if move_algebraic_notation in list(state.legal_moves):
return move_algebraic_notation, action_index
# if it didnt find a legal move within 512 tries, we just return an null move
return chess.Move.from_uci("0000"), -1
def save_example(current_state, action, reward, next_state, action_index, next_state_as_fen):
"""
Saves the current example in a csv format and metadata so when retrieving the data again we dont have to map from one hot to fen for example
:param current_state: Current board state as one hot encoded
:param action: Action taken in UCI form
:param reward: Numerical reward
:param next_state: Next state as one hot encoded
:param action_index: Index of the action in the output of the Q-Network
:param next_state_as_fen: FEN representation of the next state
:return:
"""
if reward is not None:
current_state_as_csv = ','.join(['%.0f' % num for num in current_state])
next_state_as_csv = ','.join(['%.0f' % num for num in next_state])
concatenated_example = f"{current_state_as_csv}+{str(action)}+{str(reward)}+{next_state_as_csv}+{str(action_index)}+{next_state_as_fen}"
try:
with open(csv_file_name, 'a', newline='') as csv_file:
if action_index != -1:
csv_file.write(concatenated_example + '\n')
print(f"save state, {action}, {reward}, next state, action_index, next_state_as_fen to {csv_file_name}")
else:
print("didnt save state because action_index was invalid")
except Exception as e:
print(f"Error in 'save_example': {e}")
else:
print("Shouldn't happen: reward was None")
def determine_reward(before_action, after_action):
"""
Calculating the reward using the cpawn/mate value of the Stockfish-Engine
:param before_action: cpawn/mate value before agent executed action
:param after_action: cpawn/mate value after agent executed action
:return: Reward for the agent
"""
eval_type_before_action = before_action.get('type')
eval_type_after_action = after_action.get('type')
eval_value_before_action = before_action.get("value")
eval_value_after_action = after_action.get("value")
change_of_cpawn_value = eval_value_after_action - eval_value_before_action
if eval_type_before_action == "cp" and eval_type_after_action == "cp":
# if cpawn changes to our favor it was a good move, otherwise bad or 0 if nothing changed
if change_of_cpawn_value > 0:
return +1
elif change_of_cpawn_value < 0:
return -1
else:
return 0
elif eval_type_before_action == "cp" and eval_type_after_action == "mate":
# if the evaluation type is now mate, we either have mate in x turns in favor or against us
# if for us, its a very good move
if eval_value_after_action > 0:
return 10
# if its against us, its a very bad move to put ourselves in that position
elif eval_value_after_action < 0:
return -10
else:
return 0
elif eval_type_before_action == "mate" and eval_type_after_action == "mate":
# Means we had a mate in x turns and now its even less turns --> good move
if change_of_cpawn_value > 0:
return +1
# we for example had mate in 5 and now have mate in 7 --> bad move
elif change_of_cpawn_value < 0:
return -1
else:
return 0
elif eval_type_before_action == "mate" and eval_type_after_action == "cp":
# had a mate but didnt do a move to further this advantage
if eval_value_before_action > 0:
return -5
# had a mate against us but did a good move to escape it. Can this ever happen?
elif eval_value_before_action < 0:
return +1 # was in mate and got better situation
def create_new_example(state, enemy_player, q_net):
"""
Creates a new example from a state, an enemy and the current network
:param state: Current state
:param enemy_player: Enemy player
:param q_net: Our Q-Network
:return: The state after the agent and enemy player each did a turn
"""
#transform state into one hot
current_state = transformSingleBoardToOneHot(state)
# evaluate current position
evaluator.set_fen_position(state.fen())
before_action_eval = evaluator.get_evaluation()
input_for_net = torch.tensor(current_state, dtype=torch.float32)
# put state into NN
q_values = q_net(input_for_net)
# greedy epsilon with output and check if its legal, otherwise go do it again
agent_move, action_index = get_epsilon_greedy_move_from_q_values(state, q_values)
if action_index == -1:
return None
# do step
state.push(agent_move)
# calculate reward
evaluator.set_fen_position(state.fen())
after_action_eval = evaluator.get_evaluation()
reward = determine_reward(before_action_eval, after_action_eval)
# do step with enemy
enemy_player.set_fen_position(state.fen())
best_enemy_move = enemy_player.get_best_move_time(200)
state.push(chess.Move.from_uci(best_enemy_move))
next_state = transformSingleBoardToOneHot(state)
# save as example
save_example(current_state, agent_move, reward, next_state, action_index, state.fen())
return state
def create_new_examples(games, turns, q_net):
"""
Creates new examples to serve as training data
:param games: Amount of games to be played
:param turns: Amount of turns in each game
:param q_net: The Network to be used for selecting agent actions
:return:
"""
for game in range(games):
board = create_starting_board()
next_state = board
print("Starting new game")
for turn in range(turns):
next_state = create_new_example(next_state, enemy_player, q_net)
if next_state is None or next_state.is_game_over():
print("Game over or no action could be found in reasonable time!")
break
def get_number_of_rows_in_training_set():
"""
Determines the number of rows in the training set
:return: Number of rows in training set
"""
with open(csv_file_name) as f:
return sum(1 for row in f)
def transform_csv_row_into_parts(row):
"""
Splits the row up into: current_state, action, reward, next_state, action_index, next_state_as_fen
:param row: Row to be split
:return: current_state, action, reward, next_state, action_index, next_state_as_fen
"""
row = ",".join(row)
parts = row.split("+")
return parts[0], parts[1], parts[2], parts[3], parts[4], parts[5]
def load_training_data(batch_indices):
"""
Loads each row of the training data already split up into the columns
:param batch_indices: Training examples to be loaded
:return: Lists of each column from the training data
"""
current_states = np.array([])
actions = np.array([])
rewards = np.array([])
next_states = np.array([])
action_indices = np.array([], dtype=int)
next_states_as_fen = np.array([])
with open(csv_file_name) as f:
reader = csv.reader(f)
training_rows = [row for idx, row in enumerate(reader) if idx in batch_indices]
for row in training_rows:
current_state, action, reward, next_state, action_index, next_state_as_fen = transform_csv_row_into_parts(row)
current_states = np.append(current_states, np.fromstring(current_state, sep=",", dtype=float))
actions = np.append(actions, action)
rewards = np.append(rewards, int(reward))
next_states = np.append(next_states, np.fromstring(next_state, sep=",", dtype=float))
action_indices = np.append(action_indices, int(action_index))
next_states_as_fen = np.append(next_states_as_fen, next_state_as_fen)
return current_states.reshape(len(training_rows), input_size),\
actions,\
rewards,\
next_states.reshape(len(training_rows), input_size),\
action_indices, \
next_states_as_fen
def load_model(model_name):
"""
Loads the model if it exists, otherwise creates new one
:param model_name: Name of the model
:return: The model
"""
if os.path.isfile(model_name):
return torch.load(model_name)
else:
return nn.Sequential(
nn.Linear(input_size, 256),
nn.ReLU(),
nn.Linear(256, 32),
nn.ReLU(),
nn.Linear(32, output_size)
)
def get_q_value_of_selected_actions(q_values, action_indices):
"""
Returns the Q-Values of the selected actions
:param q_values: List of Q-Values predicted by a network
:param action_indices: List of indices of the actions taken
:return: List of predicted Q-Value of the selected action in the order the list of Q-Value-Predictions were provided
"""
# Map action to index of q_values
selected_q_values = torch.empty(q_values.shape[0])
for i in range(q_values.shape[0]):
selected_q_values[i] = q_values[i][action_indices[i]]
return selected_q_values
def uci_to_algebraic_notation(uci):
"""
Maps the UCI move to the corresponding algebraic notation
:param uci: UCI String to convert
:return: Algebraic notation of the UCI move or a non-move if the UCI move (such as "a1a1") was illegal
"""
try:
return chess.Move.from_uci(uci)
except:
# if illegal move such as "a1a1" happens we just return "0000" to signal that no move is being done
return chess.Move.from_uci("0000")
def get_highest_legal_q_value_from_predictions(state, q_values):
"""
Returns the highest legal Q-Value from the given Q-Values and ensures that it's a legal move
:param state: Current state of board
:param q_values: Q-Values predicted by network
:return: The legal move with the highest Q-Value
"""
# copy tensor to another one which we can shorten
copy_of_q_values = copy.copy(q_values)
# check if highest move is legal
while copy_of_q_values.shape[0] > 0:
highest_q_value = torch.max(copy_of_q_values)
index_of_highest_q_value_in_copy = torch.argmax(copy_of_q_values)
# Match element in original q_values and return the index of that element
index_in_orginal_q_values = ((q_values == highest_q_value).nonzero(as_tuple=True)[0])
# If theres more than 1 element matching, we just take the first one
if index_in_orginal_q_values.shape[0] > 1:
index_in_orginal_q_values = index_in_orginal_q_values[0]
move = map_action_indice_to_move(state, int(index_in_orginal_q_values))
move = uci_to_algebraic_notation(move)
if move is not None and move in list(state.legal_moves):
return move
else:
copy_of_q_values = torch.cat(
[copy_of_q_values[0:index_of_highest_q_value_in_copy], copy_of_q_values[index_of_highest_q_value_in_copy + 1:]])
def select_best_values_for_each_example(predicted_target_values, next_states_as_fen):
"""
For each of the lists of Q-Values (predicted_target_values) it returns the legal move with the highest Q-Value
:param predicted_target_values: List of Q-Value tensor
:param next_states_as_fen: List of board states in FEN representation
:return: List of highest legal move Q-Values for each provided tensor of Q-Values
"""
max_prediction_values = torch.empty(predicted_target_values.shape[0])
for i in range(predicted_target_values.shape[0]):
considered_state_as_fen = next_states_as_fen[i]
state_of_considered_board = chess.Board(considered_state_as_fen)
considered_tensor = copy.copy(predicted_target_values[i])
found_legal_move = False
while not found_legal_move and considered_tensor.shape[0] > 0:
highest_q_value = torch.max(considered_tensor)
index_of_highest_q_value_in_copy = torch.argmax(considered_tensor)
index_in_orginal_q_values = ((predicted_target_values[i] == highest_q_value).nonzero(as_tuple=True)[0])
if index_in_orginal_q_values.shape[0] > 1: # If multiple elements have same value
index_in_orginal_q_values = index_in_orginal_q_values[0]
move = map_action_indice_to_move(state_of_considered_board, int(index_in_orginal_q_values))
move = uci_to_algebraic_notation(move)
if move is not None and move in list(state_of_considered_board.legal_moves):
found_legal_move = True
max_prediction_values[i] = highest_q_value
else: # Else we remove this element from tensor cause its an illegal move
considered_tensor = torch.cat([considered_tensor[0:index_of_highest_q_value_in_copy], considered_tensor[index_of_highest_q_value_in_copy + 1:]])
return max_prediction_values
def train(epochs, batch_size, learning_rate):
"""
Trains the network with a number of epochs and batch-size and saves it
:param epochs: Number of epochs to be processed
:param batch_size: Size of batch used for each epoch
:param learning_rate: Learning rate to be used for Adam optimizer
:return:
"""
# Load model if exists
q_net = load_model(model_name)
target_net = copy.deepcopy(q_net) # otherwise it'd be the same object
loss_fn = nn.MSELoss()
optimizer = optim.Adam(q_net.parameters(), lr=learning_rate)
for epoch in range(epochs):
# Every 25 epochs lower the epsilon value
if epoch % 25 == 0:
global epsilon
epsilon = epsilon * 0.95 # Decay epsilon after time
# New training data each epoch with one game and 20 turns
create_new_examples(1, 20, q_net)
print("Training the model...")
number_of_rows = get_number_of_rows_in_training_set()
possible_indices = [*range(0, number_of_rows, 1)]
batch_indices = random.sample(possible_indices, batch_size)
# Load random batch of training data
current_states, actions, rewards, next_states, action_indices, next_states_as_fen = load_training_data(batch_indices)
X_qnet = torch.tensor(current_states, dtype=torch.float32)
X_tnet = torch.tensor(next_states, dtype=torch.float32)
rewards_tensor = torch.tensor(rewards, dtype=torch.float32)
# Q-net predicts value ONLY for the one action we actually selected in the data creation
predicted_q_values = q_net(X_qnet)
value_of_selected_actions = get_q_value_of_selected_actions(predicted_q_values, action_indices)
# target predicts Q values for target network
predicted_target_values = target_net(X_tnet)
# best legal action that can be taken from these states (Target Q value)
best_q_values = select_best_values_for_each_example(predicted_target_values, next_states_as_fen)
# Target Q Value is output of target plus reward from sample
target_q_values = rewards_tensor + gamma * best_q_values
# compute loss
loss = loss_fn(value_of_selected_actions, target_q_values)
# train Q network, target net is fixed
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Finished epoch {epoch}, latest loss {loss}')
# copy Q-Network to target network after some epochs
if epoch % 25 == 0:
target_net = copy.deepcopy(q_net)
# save model
torch.save(q_net, model_name)
if args.enemy_elo is not None:
enemy_player = initializeStockfish(int(args.enemy_elo))
else:
enemy_player = initializeStockfish(1000)
# !!! Most important part: Decides if we train or create new examples !!!
if __name__ == "__main__":
if args.mode == "train":
train(n_epochs, batch_size, lr)
elif args.mode == "examples":
create_new_examples(10, 20, load_model(model_name))