-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathExperiment.py
286 lines (243 loc) · 8.63 KB
/
Experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
__author__ = 'cmantas'
import traceback
from lib.tiramola_logging import get_logger
from os import remove, mkdir, listdir
from shutil import move, copy
from time import strftime
from os.path import isdir, isfile, join, exists, basename
from random import random
from json import load, dumps, loads
from lib.persistance_module import env_vars, reload_env_vars, home
from time import sleep
from sys import exc_info
from ClientsCluster import my_Clients as ClientsCluster
import CassandraCluster
from VM import Timer
## global logger
log = get_logger("EXPERIMENT", 'INFO', logfile=home+'files/logs/Coordinator.log')
o_ev = {}
measurements_dir = "files/measurements"
def list_files(dir_path):
"""
lists all files (not dirs) in a given directory
:param dir_path:
:return:
"""
return [f for f in listdir(dir_path) if isfile(join(dir_path, f))]
def wait_get_one(dir_path):
"""
returns the content
:param dir_path:
:return:
"""
files = list_files(dir_path)
print_once = True
while len(files)==0:
if print_once:
print "Waiting for files..."
print_once = False
sleep(1)
files = list_files(dir_path)
fname = dir_path + "/" + files.pop()
return fname
def watch(dir_path, callback):
"""
watches a directory and when there are files available in it, it loads their contents to memory, moves them
and then calls the callback function giving the file contents as an argument
:param dir_path:
:param callback:
:return:
"""
while True:
fname = wait_get_one(dir_path)
f = open(fname, 'r')
contents = f.read()
f.close
done_dir = dir_path+"/done"
if not exists(done_dir):
mkdir(done_dir)
try:
remove(done_dir+"/"+basename(fname))
except:
pass
move(fname, done_dir)
callback(contents)
def run_sinusoid(target, period, offset):
svr_hosts = CassandraCluster.get_hosts(private=False)
args = dict()
args["target"] = target
args["period"] = period
args['type'] = 'sinusoid'
args['servers'] = svr_hosts
args['offset'] = offset
ClientsCluster.run(args)
def run_stress():
log.info("running stress workload")
svr_hosts = CassandraCluster.get_hosts(private=True)
params = {'type': 'stress', 'servers': svr_hosts}
ClientsCluster.run(params)
def experiment(name, target, period, offset, periods_count):
"""
runs a full experiment and outputs the the results to directory inside the measurements dir
:param name:
:param target:
:param period:
:param offset:
:param minutes:
:return:
"""
#delete any previous measurements
try:
remove("%s/measurements.txt" % measurements_dir)
except:
pass
#empty the contents of the coordinator.log
try:
open('files/logs/Coordinator.log', 'w+').close()
except:
pass
# kill the workload (in case there were leftover clients running)
log.info("killing workload")
ClientsCluster.kill_nodes()
# create a directory for the experiment results
dir_path = measurements_dir+"/"+name
if isdir(dir_path): dir_path += "_"+str(int(random()*1000))
try:mkdir(dir_path)
except:log.error("Could not create experiment directory");exit(-1)
success = False
try:
# actually run the tiramola automatic provisioning algorithm
try:
import Coordinator
for i in range(periods_count):
CassandraCluster.compaction()
run_sinusoid(target, period, offset)
log.info("Running the Coordinator for period " + str(i))
Coordinator.run(60 * period)
log.info("killing workload")
ClientsCluster.kill_nodes()
success = True
except:
print traceback.format_exc()
traceback.print_exc(file=open(dir_path+"/errors", "w+"))
if not success:
log.info(" killing workload")
ClientsCluster.kill_nodes()
# move the measurements file
move("files/measurements/measurements.txt", dir_path)
# move the predictions file
if isfile("files/measurements/predictions.txt"): move("files/measurements/predictions.txt", dir_path)
info_long = "target = %d\noffset = %d\nperiod = %dmin\nperiods = %dmin\ndate = %s" %\
(target, offset, period/60, periods_count, strftime('%b%d-%H:%M'))
global env_vars
info_long += "\ngain = " + env_vars['gain']
info_long += "\ndecision_interval = " + str(env_vars['decision_interval'])
info_long += "\ndecision_threshold = " + str(int(float(env_vars['decision_threshold'])*100)) + "%"
try:
global o_ev
info_long += "\n" + dumps(o_ev, indent=3)
except:
pass
#write information to file
with open (dir_path+"/info", 'w+') as f:
f.write(info_long)
# move the Coordinator log
try:
copy("files/logs/Coordinator.log", dir_path)
except:
pass
#draw the result graphs
from lib.draw_experiment import draw_exp
try:
draw_exp(dir_path+"/measurements.txt")
except:
traceback.print_exc(file=open(dir_path+"/errors", "w+"))
log.info("EXPERIMENT DONE: Result measurements in: "+dir_path)
except:
traceback.print_exc(file=open(dir_path+"/errors", "w+"))
return success
def simulate():
try:
remove("files/measurements/measurements.txt")
except:
pass
from new_decision_module import RLDecisionMaker
fsm = RLDecisionMaker("localhost", 8)
fsm.simulate_training_set()
from lib.draw_experiment import draw_exp
try:
mkdir("files/measurements/simulation/")
except:
pass
move("files/measurements/measurements.txt", "files/measurements/simulation/measurements.txt")
draw_exp("files/measurements/simulation/measurements.txt")
def clean_start():
success = False
while not success:
try:
#clean-start the cluster by default or if clean is True
CassandraCluster.kill_nodes()
used = env_vars["min_cluster_size"]
CassandraCluster.bootstrap_cluster(used)
#load_data
svr_hosts = CassandraCluster.get_hosts()
args = {'type': 'load', 'servers': svr_hosts, 'records': env_vars['records']}
ClientsCluster.run(args)
success = True
except:
print "Unexpected error on clean:" + str(exc_info()[0])
print traceback.format_exc()
log.error("Failed to clean, restarting")
sleep(120)
def run_experiments(experiment_file):
"""
loads the experiments from a file to a list and runs them in batch
:param experiment_file:
:return:
"""
#load the file with all the experiments
exp_list = load(open(experiment_file))
run_batch_experiments(exp_list)
def run_experiments_from_string(string_exp):
exp_list = loads(string_exp)
run_batch_experiments(exp_list)
def run_batch_experiments(exp_list):
"""
runs a batch of experiments as specified to the experiment file
:param experiment_file:
:return:
"""
#run each one of the experiments
log.info("running batch experiments")
for exp in exp_list:
# overwrite the given env_vars
from lib.persistance_module import env_vars
reload_env_vars()
global o_ev, env_vars
o_ev = exp['env_vars']
env_vars.update(o_ev)
if 'simulation' in exp and exp['simulation']:
simulate()
else:
target = int(exp['workload']["target"])
period = int(exp['workload']["period"])
offset = int(exp['workload']["offset"])
periods_count = int(exp["periods_count"])
name = exp['name']
#run the experiment
tries = 5
success = False
while not success and tries > 0:
if (not ('clean' in exp)) or bool(exp['clean']):
clean_start()
else:
#make sure the cluster is at its min size
CassandraCluster.set_cluster_size(env_vars["min_cluster_size"])
success = experiment(name, target, period, offset, periods_count)
if not success:
log.info("Experiment failed, sleeping 10mins and Retrying")
sleep(600)
tries -=1
if __name__ == '__main__':
print 'testing experiments creation'
run_experiments("test.json")