-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathsmallfile.py
2381 lines (2054 loc) · 86.2 KB
/
smallfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
smallfile.py -- SmallfileWorkload class used in each workload thread
Copyright 2012 -- Ben England
Licensed under the Apache License at http://www.apache.org/licenses/LICENSE-2.0
See Appendix on this page for instructions pertaining to license.
Created on Apr 22, 2009
"""
# repeat a file operation N times
# allow for multi-thread tests with stonewalling
# we can launch any combination of these to simulate more complex workloads
# possible enhancements:
# embed parallel python and thread launching logic so we can have both
# CLI and GUI interfaces to same code
#
# to run all unit tests:
# python smallfile.py
# to run just one of unit tests do
# python -m unittest smallfile.Test.your-unit-test
# alternative single-test syntax:
# python smallfile.py -v Test.test_c1_Mkdir
#
# on older Fedoras:
# yum install python-unittest2
# on Fedora 33 with python 3.9.2, unittest is built in and no package is needed
import codecs
import copy
import errno
import logging
import math
import os
import os.path
import random
import socket
import sys
import threading
import time
from os.path import exists, join
from shutil import rmtree
from sync_files import ensure_deleted, ensure_dir_exists, touch
OK = 0 # system call return code for success
NOTOK = 1
KB_PER_GB = 1 << 20
USEC_PER_SEC = 1000000.0
# min % of files processed considered acceptable for a test run
# this should be a parameter but we'll just lower it to 70% for now
# FIXME: should be able to calculate default based on thread count, etc.
pct_files_min = 70
# we have to support a variety of python environments,
# so for optional features don't blow up if they aren't there, just remember
xattr_installed = False
try:
import xattr
xattr_installed = True
except ImportError:
pass
fadvise_installed = False
try:
import drop_buffer_cache
fadvise_installed = True
except ImportError:
pass
fallocate_installed = False
try:
import fallocate # not yet in python os module
fallocate_installed = True
except ImportError:
pass
unittest_module = None
try:
import unittest2
unittest_module = unittest2
except ImportError:
pass
try:
import unittest
unittest_module = unittest
except ImportError:
pass
# makes using python -m pdb easier with unit tests
# set .pdbrc file to contain something like:
# b run_unit_tests
# c
# b Test.test_whatever
def run_unit_tests():
if unittest_module:
unittest_module.main()
else:
raise SMFRunException("no python unittest module available")
# python threading module method name isAlive changed to is_alive in python3
use_isAlive = sys.version_info[0] < 3
# Windows 2008 server seemed to have this environment variable
# didn't check if it's universal
is_windows_os = os.getenv("HOMEDRIVE") is not None
# O_BINARY variable means we don't need to special-case windows
# in every open statement
O_BINARY = 0
if is_windows_os:
O_BINARY = os.O_BINARY
# for timeout debugging
debug_timeout = os.getenv("DEBUG_TIMEOUT")
# FIXME: pass in file pathname instead of file number
class MFRdWrExc(Exception):
def __init__(self, opname_in, filenum_in, rqnum_in, bytesrtnd_in):
self.opname = opname_in
self.filenum = filenum_in
self.rqnum = rqnum_in
self.bytesrtnd = bytesrtnd_in
def __str__(self):
return "file {filenum} request {rqnum} byte count {bc} {op}".format(
filenum=str(self.filenum),
rqnum=str(self.rqnum),
bc=str(self.bytesrtnd),
op=self.opname,
)
class SMFResultException(Exception):
pass
class SMFRunException(Exception):
pass
def myassert(bool_expr):
if not bool_expr:
raise SMFRunException("assertion failed!")
# abort routine just cleans up threads
def abort_test(abort_fn, thread_list):
if not os.path.exists(abort_fn):
touch(abort_fn)
for t in thread_list:
t.terminate()
# hide difference between python2 and python3
# python threading module method name isAlive changed to is_alive in python3
def thrd_is_alive(thrd):
use_isAlive = sys.version_info[0] < 3
return thrd.isAlive() if use_isAlive else thrd.is_alive()
# next two routines are for asynchronous replication
# we remember the time when a file was completely written
# and its size using xattr,
# then we read xattr in do_await_create operation
# and compute latencies from that
def remember_ctime_size_xattr(filedesc):
nowtime = str(time.time())
st = os.fstat(filedesc)
xattr.setxattr(
filedesc,
"user.smallfile-ctime-size",
nowtime + "," + str(st.st_size / SmallfileWorkload.BYTES_PER_KB),
)
def recall_ctime_size_xattr(pathname):
(ctime, size_kb) = (None, None)
try:
with open(pathname, "r") as fd:
xattr_str = xattr.getxattr(fd, "user.smallfile-ctime-size")
token_pair = str(xattr_str).split(",")
ctime = float(token_pair[0][2:])
size_kb = int(token_pair[1].split(".")[0])
except IOError as e:
eno = e.errno
if eno != errno.ENODATA:
raise e
return (ctime, size_kb)
def get_hostname(h):
if h is None:
h = socket.gethostname()
return h
def hostaddr(h): # return the IP address of a hostname
if h is None:
a = socket.gethostbyname(socket.gethostname())
else:
a = socket.gethostbyname(h)
return a
def hexdump(b):
s = ""
for j in range(0, len(b)):
s += "%02x" % b[j]
return s
def binary_buf_str(b): # display a binary buffer as a text string
if sys.version < "3":
return codecs.unicode_escape_decode(b)[0]
else:
if isinstance(b, str):
return bytes(b).decode("UTF-8", "backslashreplace")
else:
return b.decode("UTF-8", "backslashreplace")
class SmallfileWorkload:
rename_suffix = ".rnm"
all_op_names = [
"create",
"delete",
"append",
"overwrite",
"read",
"readdir",
"rename",
"delete-renamed",
"cleanup",
"symlink",
"mkdir",
"rmdir",
"stat",
"chmod",
"setxattr",
"getxattr",
"swift-get",
"swift-put",
"ls-l",
"await-create",
"truncate-overwrite",
]
OK = 0
NOTOK = 1
BYTES_PER_KB = 1024
MICROSEC_PER_SEC = 1000000.0
# number of files between stonewalling check at smallest file size
max_files_between_checks = 100
# default for UNIX
tmp_dir = os.getenv("TMPDIR")
if tmp_dir is None: # windows case
tmp_dir = os.getenv("TEMP")
if tmp_dir is None: # assume POSIX-like
tmp_dir = "/var/tmp"
# constant file size
fsdistr_fixed = -1
# a file size distribution type that results in a few files much larger
# than the mean and mostly files much smaller than the mean
fsdistr_random_exponential = 0
# multiply mean size by this to get max file size
random_size_limit = 8
# large prime number used to randomly select directory given file number
some_prime = 900593
# build largest supported buffer, and fill it full of random hex digits,
# then just use a substring of it below
biggest_buf_size_bits = 20
random_seg_size_bits = 10
biggest_buf_size = 1 << biggest_buf_size_bits
# initialize files with up to this many different random patterns
buf_offset_range = 1 << 10
loggers = {} # so we only instantiate logger for a given thread name once
# constructor sets up initial, default values for test parameters
# user overrides these values using CLI interface parameters
# for boolean parameters,
# preceding comment describes what happens if parameter is set to True
def __init__(self):
# all threads share same directory
self.is_shared_dir = False
# file operation type, default idempotent
self.opname = "cleanup"
# how many files accessed, default = quick test
self.iterations = 200
# top of directory tree, default always exists on local fs
top = join(self.tmp_dir, "smf")
# file that tells thread when to start running
self.starting_gate = None
# transfer size (KB), 0 = default to file size
self.record_sz_kb = 0
# total data read/written in KB
self.total_sz_kb = 64
# file size distribution, default = all files same size
self.filesize_distr = self.fsdistr_fixed
# how many directories to use
self.files_per_dir = 100
# fanout if > 1 dir/thread needed
self.dirs_per_dir = 10
# size of xattrs to read/write
self.xattr_size = 0
# number of xattrs to read/write
self.xattr_count = 0
# test-over polling rate
self.files_between_checks = 20
# prepend this to file name
self.prefix = ""
# append this to file name
self.suffix = ""
# directories are accessed randomly
self.hash_to_dir = False
# fsync() issued after a file is modified
self.fsync = False
# update xattr with ctime+size
self.record_ctime_size = False
# end test as soon as any thread finishes
self.stonewall = True
# finish remaining requests after test ends
self.finish_all_rq = False
# append response times to .rsptimes
self.measure_rsptimes = False
# write/expect binary random (incompressible) data
self.incompressible = False
# , compare read data to what was written
self.verify_read = True
# should we attempt to adjust pause between files
self.auto_pause = False
# sleep this long between each file op
self.pause_between_files = 0.0
# collect samples for this long, then add to start time
self.pause_history_duration = 1.0
# wait this long after cleanup for async. deletion activity to finish
self.cleanup_delay_usec_per_file = 0
# which host the invocation ran on
self.onhost = get_hostname(None)
# thread ID
self.tid = ""
# debug to screen
self.log_to_stderr = False
# print debug messages
self.verbose = False
# create directories as needed
self.dirs_on_demand = False
# for internal use only
self.set_top([top])
# logging level, default is just informational, warning or error
self.log_level = logging.INFO
# will be initialized later with thread-safe python logging object
self.log = None
# buffer for reads and writes will be here
self.buf = None
# copy from here on writes, compare to here on reads
self.biggest_buf = None
# random seed used to control sequence of random numbers,
# default to different sequence every time
self.randstate = random.Random()
# number of hosts/pods in test, default is 1 smallfile host/pod
self.total_hosts = 1
# number of threads in each host/pod
self.threads = 1
# reset object state variables
self.reset()
# FIXME: should be converted to dictionary and output in JSON
# convert object to string for logging, etc.
def __str__(self):
s = " opname=" + self.opname
s += " iterations=" + str(self.iterations)
s += " top_dirs=" + str(self.top_dirs)
s += " src_dirs=" + str(self.src_dirs)
s += " dest_dirs=" + str(self.dest_dirs)
s += " network_dir=" + str(self.network_dir)
s += " shared=" + str(self.is_shared_dir)
s += " record_sz_kb=" + str(self.record_sz_kb)
s += " total_sz_kb=" + str(self.total_sz_kb)
s += " filesize_distr=" + str(self.filesize_distr)
s += " files_per_dir=%d" % self.files_per_dir
s += " dirs_per_dir=%d" % self.dirs_per_dir
s += " dirs_on_demand=" + str(self.dirs_on_demand)
s += " xattr_size=%d" % self.xattr_size
s += " xattr_count=%d" % self.xattr_count
s += " starting_gate=" + str(self.starting_gate)
s += " prefix=" + self.prefix
s += " suffix=" + self.suffix
s += " hash_to_dir=" + str(self.hash_to_dir)
s += " fsync=" + str(self.fsync)
s += " stonewall=" + str(self.stonewall)
s += " cleanup_delay_usec_per_file=" + str(self.cleanup_delay_usec_per_file)
s += " files_between_checks=" + str(self.files_between_checks)
s += " pause=" + str(self.pause_between_files)
s += " pause_sec=" + str(self.pause_sec)
s += " auto_pause=" + str(self.auto_pause)
s += " verify_read=" + str(self.verify_read)
s += " incompressible=" + str(self.incompressible)
s += " finish_all_rq=" + str(self.finish_all_rq)
s += " rsp_times=" + str(self.measure_rsptimes)
s += " tid=" + self.tid
s += " loglevel=" + str(self.log_level)
s += " filenum=" + str(self.filenum)
s += " filenum_final=" + str(self.filenum_final)
s += " rq=" + str(self.rq)
s += " rq_final=" + str(self.rq_final)
s += " total_hosts=" + str(self.total_hosts)
s += " threads=" + str(self.threads)
s += " start=" + str(self.start_time)
s += " end=" + str(self.end_time)
s += " elapsed=" + str(self.elapsed_time)
s += " host=" + str(self.onhost)
s += " status=" + str(self.status)
s += " abort=" + str(self.abort)
s += " log_to_stderr=" + str(self.log_to_stderr)
s += " verbose=" + str(self.verbose)
return s
# if you want to use the same instance for multiple tests
# call reset() method between tests
def reset(self):
# results returned in variables below
self.filenum = 0 # how many files have been accessed so far
self.filenum_final = None # how many files accessed when test ended
self.rq = 0 # how many reads/writes have been attempted so far
self.rq_final = None # how many reads/writes completed when test ended
self.abort = False
self.file_dirs = [] # subdirectores within per-thread dir
self.status = ok
# response time samples for auto-pause feature
self.pause_rsptime_count = 100
# special value that means no response times have been measured yet
self.pause_rsptime_unmeasured = -11
self.files_between_pause = 5
self.pause_rsptime_index = self.pause_rsptime_unmeasured
self.pause_rsptime_history = [0 for k in range(0, self.pause_rsptime_count)]
self.pause_sample_count = 0
# start time for this history interval
self.pause_history_start_time = 0.0
self.pause_sec = self.pause_between_files / self.MICROSEC_PER_SEC
# recalculate this to capture any changes in self.total_hosts and self.threads
self.total_threads = self.total_hosts * self.threads
self.throttling_factor = 0.1 * math.log(self.total_threads + 1, 2)
# to measure per-thread elapsed time
self.start_time = None
self.end_time = None
self.elapsed_time = None
# to measure file operation response times
self.op_start_time = None
self.rsptimes = []
self.rsptime_filename = None
# given a set of top-level directories (e.g. for NFS benchmarking)
# set up shop in them
# we only use one directory for network synchronization
def set_top(self, top_dirs, network_dir=None):
self.top_dirs = top_dirs
# create/read files here
self.src_dirs = [join(d, "file_srcdir") for d in top_dirs]
# rename files to here
self.dest_dirs = [join(d, "file_dstdir") for d in top_dirs]
# directory for synchronization files shared across hosts
self.network_dir = join(top_dirs[0], "network_shared")
if network_dir:
self.network_dir = network_dir
def create_top_dirs(self, is_multi_host):
if os.path.exists(self.network_dir):
rmtree(self.network_dir)
if is_multi_host:
# so all remote clients see that directory was recreated
time.sleep(2.1)
ensure_dir_exists(self.network_dir)
for dlist in [self.src_dirs, self.dest_dirs]:
for d in dlist:
ensure_dir_exists(d)
if is_multi_host:
# workaround to force cross-host synchronization
time.sleep(1.1) # lets NFS mount option actimeo=1 take effect
os.listdir(self.network_dir)
# create per-thread log file
# we have to avoid getting the logger for self.tid more than once,
# or else we'll add a handler more than once to this logger
# and cause duplicate log messages in per-invoke log file
def start_log(self):
try:
self.log = self.loggers[self.tid]
except KeyError:
self.log = logging.getLogger(self.tid)
self.loggers[self.tid] = self.log
if self.log_to_stderr:
h = logging.StreamHandler()
else:
h = logging.FileHandler(self.log_fn())
log_format = self.tid + " %(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
h.setFormatter(formatter)
self.log.addHandler(h)
self.loglevel = logging.INFO
if self.verbose:
self.loglevel = logging.DEBUG
self.log.setLevel(self.loglevel)
# indicate start of an operation
def op_starttime(self, starttime=None):
if not starttime:
self.op_start_time = time.time()
else:
self.op_start_time = starttime
# indicate end of an operation,
# this appends the elapsed time of the operation to .rsptimes array
def op_endtime(self, opname):
end_time = time.time()
rsp_time = end_time - self.op_start_time
if self.measure_rsptimes:
self.rsptimes.append((opname, self.op_start_time, rsp_time))
self.op_start_time = None
if self.auto_pause:
self.adjust_pause_time(end_time, rsp_time)
# save response times seen by this thread
def save_rsptimes(self):
fname = "rsptimes_{tid}_{host}_{op}_{ts}.csv".format(
tid=str(self.tid),
host=get_hostname(None),
op=self.opname,
ts=str(self.start_time),
)
rsptime_fname = join(self.network_dir, fname)
with open(rsptime_fname, "w") as f:
for opname, start_time, rsp_time in self.rsptimes:
# time granularity is microseconds, accuracy is less
f.write(
"%8s, %9.6f, %9.6f\n"
% (opname, start_time - self.start_time, rsp_time)
)
os.fsync(f.fileno()) # particularly for NFS this is needed
# compute pause time based on available response time samples,
# assuming all threads converge to roughly the same average response time
# we treat the whole system as one big queueing center and apply
# little's law U = XS to it to estimate what pause time should be
# to achieve max throughput without excessive queueing and unfairness
def calculate_pause_time(self, end_time):
# there are samples to process
mean_rsptime = sum(self.pause_rsptime_history) / self.pause_rsptime_count
time_so_far = end_time - self.pause_history_start_time
# estimate system throughput assuming all threads are same
# per-thread throughput is measured by number of rsptime samples
# in this interval divided by length of interval
est_throughput = self.pause_sample_count * self.total_threads / time_so_far
# assumption: all threads converge to the same throughput
mean_utilization = mean_rsptime * est_throughput
old_pause = self.pause_sec
new_pause = mean_utilization * mean_rsptime * self.throttling_factor
self.pause_sec = (old_pause + 2 * new_pause) / 3.0
self.log.debug(
"time_so_far %f samples %d index %d mean_rsptime %f throttle %f est_throughput %f mean_util %f"
% (
time_so_far,
self.pause_sample_count,
self.pause_rsptime_index,
mean_rsptime,
self.throttling_factor,
est_throughput,
mean_utilization,
)
)
self.log.info(
"per-thread pause changed from %9.6f to %9.6f" % (old_pause, self.pause_sec)
)
# adjust pause time based on whether response time was significantly bigger than pause time
# we lower the pause time until
def adjust_pause_time(self, end_time, rsp_time):
self.log.debug(
"adjust_pause_time %f %f %f %f"
% (end_time, rsp_time, self.pause_sec, self.pause_history_start_time)
)
if self.pause_rsptime_index == self.pause_rsptime_unmeasured:
self.pause_sec = 0.00001
self.pause_history_start_time = end_time - rsp_time
# try to get the right order of magnitude for response time estimate immediately
self.pause_rsptime_history = [
rsp_time for k in range(0, self.pause_rsptime_count)
]
self.pause_rsptime_index = 1
self.pause_sample_count = 1
self.pause_sec = self.throttling_factor * rsp_time
# self.calculate_pause_time(end_time)
self.log.info("per-thread pause initialized to %9.6f" % self.pause_sec)
else:
# insert response time into ring buffer of most recent response times
self.pause_rsptime_history[self.pause_rsptime_index] = rsp_time
self.pause_rsptime_index += 1
if self.pause_rsptime_index >= self.pause_rsptime_count:
self.pause_rsptime_index = 0
self.pause_sample_count += 1
# if it's time to adjust pause_sec...
if (
self.pause_history_start_time + self.pause_history_duration < end_time
or self.pause_sample_count > self.pause_rsptime_count / 2
):
self.calculate_pause_time(end_time)
self.pause_history_start_time = end_time
self.pause_sample_count = 0
# determine if test interval is over for this thread
# each thread uses this to signal that it is at the starting gate
# (i.e. it is ready to immediately begin generating workload)
def gen_thread_ready_fname(self, tid, hostname=None):
return join(self.tmp_dir, "thread_ready." + tid + ".tmp")
# each host uses this to signal that it is
# ready to immediately begin generating workload
# each host places this file in a directory shared by all hosts
# to indicate that this host is ready
def gen_host_ready_fname(self, hostname=None):
if not hostname:
hostname = self.onhost
return join(self.network_dir, "host_ready." + hostname + ".tmp")
# abort file tells other threads not to start test
# because something has already gone wrong
def abort_fn(self):
return join(self.network_dir, "abort.tmp")
# stonewall file stops test measurement
# (does not stop worker thread unless --finish N is used)
def stonewall_fn(self):
return join(self.network_dir, "stonewall.tmp")
# log file for this worker thread goes here
def log_fn(self):
return join(self.tmp_dir, "invoke_logs-%s.log" % self.tid)
# file for result stored as pickled python object
def host_result_filename(self, result_host=None):
if result_host is None:
result_host = self.onhost
return join(self.network_dir, result_host + "_result.pickle")
# we use the seed function to control per-thread random sequence
# we want seed to be saved
# so that operations subsequent to initial create will know
# what file size is for thread T's file j without having to stat the file
def init_random_seed(self):
fn = self.gen_thread_ready_fname(self.tid, hostname=self.onhost) + ".seed"
thread_seed = str(time.time())
self.log.debug("seed opname: " + self.opname)
if self.opname == "create" or self.opname == "swift-put":
thread_seed = str(time.time()) + " " + self.tid
ensure_deleted(fn)
with open(fn, "w") as seedfile:
seedfile.write(str(thread_seed))
self.log.debug("write seed %s " % thread_seed)
# elif ['append', 'read', 'swift-get'].__contains__(self.opname):
else:
try:
with open(fn, "r") as seedfile:
thread_seed = seedfile.readlines()[0].strip()
self.log.debug("read seed %s " % thread_seed)
except OSError as e:
if e.errno == errno.ENOENT and self.opname in [
"cleanup",
"rmdir",
"delete",
]:
self.log.info(
"no saved random seed found in %s but it does not matter for deletes"
% fn
)
self.randstate.seed(thread_seed)
def get_next_file_size(self):
next_size = self.total_sz_kb
if self.filesize_distr == self.fsdistr_random_exponential:
next_size = max(
1,
min(
int(self.randstate.expovariate(1.0 / self.total_sz_kb)),
self.total_sz_kb * self.random_size_limit,
),
)
if self.log_level == logging.DEBUG:
self.log.debug("rnd expn file size %d KB" % next_size)
else:
self.log.debug("fixed file size %d KB" % next_size)
return next_size
# tell test driver that we're at the starting gate
# this is a 2 phase process
# first wait for each thread on this host to reach starting gate
# second, wait for each host in test to reach starting gate
# in case we have a lot of threads/hosts, sleep 1 sec between polls
# also, wait 2 sec after seeing starting gate to maximize probability
# that other hosts will also see it at the same time
def wait_for_gate(self):
if self.starting_gate:
gateReady = self.gen_thread_ready_fname(self.tid)
touch(gateReady)
delay_time = 0.1
while not os.path.exists(self.starting_gate):
if os.path.exists(self.abort_fn()):
raise SMFRunException("thread " + str(self.tid) + " saw abort flag")
# wait a little longer so that
# other clients have time to see that gate exists
delay_time = delay_time * 1.5
if delay_time > 2.0:
delay_time = 2.0
time.sleep(delay_time)
gateinfo = os.stat(self.starting_gate)
synch_time = gateinfo.st_mtime + 3.0 - time.time()
if synch_time > 0.0:
time.sleep(synch_time)
if synch_time < 0.0:
self.log.warn("other threads may have already started")
if self.verbose:
self.log.debug(
"started test at %f sec after waiting %f sec"
% (time.time(), synch_time)
)
# record info needed to compute test statistics
def end_test(self):
# be sure end_test is not called more than once
# during do_workload()
if self.test_ended():
return
myassert(
self.end_time is None
and self.rq_final is None
and self.filenum_final is None
)
self.rq_final = self.rq
self.filenum_final = self.filenum
self.end_time = time.time()
self.elapsed_time = self.end_time - self.start_time
stonewall_path = self.stonewall_fn()
if self.filenum >= self.iterations and not os.path.exists(stonewall_path):
try:
touch(stonewall_path)
self.log.info("stonewall file %s written" % stonewall_path)
except IOError as e:
err = e.errno
if err != errno.EEXIST:
# workaround for possible bug in Gluster
if err != errno.EINVAL:
self.log.error(
"unable to write stonewall file %s" % stonewall_path
)
self.log.exception(e)
self.status = err
else:
self.log.info("saw EINVAL on stonewall, ignoring it")
def test_ended(self):
return (self.end_time is not None) and (self.end_time > self.start_time)
# see if we should do one more file
# to minimize overhead, do not check stonewall file before every iteration
def do_another_file(self):
if self.stonewall and (((self.filenum + 1) % self.files_between_checks) == 0):
stonewall_path = self.stonewall_fn()
if self.verbose:
self.log.debug(
"checking for stonewall file %s after %s iterations"
% (stonewall_path, self.filenum)
)
if os.path.exists(stonewall_path):
self.log.info(
"stonewall file %s seen after %d iterations"
% (stonewall_path, self.filenum)
)
self.end_test()
# if user doesn't want to finish all requests and test has ended, stop
if not self.finish_all_rq and self.test_ended():
return False
if self.status != ok:
self.end_test()
return False
if self.filenum >= self.iterations:
self.end_test()
return False
if self.abort:
raise SMFRunException("thread " + str(self.tid) + " saw abort flag")
self.filenum += 1
if self.pause_sec > 0.0 and self.iterations % self.files_between_pause == 0:
time.sleep(self.pause_sec * self.files_between_pause)
return True
# in this method of directory selection, as filenum increments upwards,
# we place F = files_per_dir files into directory,
# then next F files into directory D+1, etc.
# we generate directory pathnames like radix-D numbers
# where D is subdirectories per directory
# see URL http://gmplib.org/manual/Binary-to-Radix.html#Binary-to-Radix
# this algorithm should take O(log(F))
def mk_seq_dir_name(self, file_num):
dir_in = file_num // self.files_per_dir
# generate powers of self.files_per_dir not greater than dir_in
level_dirs = []
dirs_for_this_level = self.dirs_per_dir
while dirs_for_this_level <= dir_in:
level_dirs.append(dirs_for_this_level)
dirs_for_this_level *= self.dirs_per_dir
# generate each "digit" in radix-D number as result of quotients
# from dividing remainder by next lower power of D (think of base 10)
levels = len(level_dirs)
level = levels - 1
pathlist = []
while level > -1:
dirs_in_level = level_dirs[level]
quotient = dir_in // dirs_in_level
dir_in = dir_in - quotient * dirs_in_level
dirnm = "d_" + str(quotient).zfill(3)
pathlist.append(dirnm)
level -= 1
pathlist.append("d_" + str(dir_in).zfill(3))
return os.sep.join(pathlist)
def mk_hashed_dir_name(self, file_num):
pathlist = []
random_hash = file_num * self.some_prime % self.iterations
dir_num = random_hash // self.files_per_dir
while dir_num > 1:
dir_num_hash = dir_num * self.some_prime % self.dirs_per_dir
pathlist.insert(0, "h_" + str(dir_num_hash).zfill(3))
dir_num //= self.dirs_per_dir
return os.sep.join(pathlist)
def mk_dir_name(self, file_num):
if self.hash_to_dir:
return self.mk_hashed_dir_name(file_num)
else:
return self.mk_seq_dir_name(file_num)
# generate file name to put in this directory
# prefix can be used for process ID or host ID for example
# names are unique to each thread
# automatically computes subdirectory for file based on
# files_per_dir, dirs_per_dir and placing file as high in tree as possible
# for multiple-mountpoint tests,
# we need to select top-level dir based on file number
# to spread load across mountpoints,
# so we use round-robin mountpoint selection
# NOTE: this routine is called A LOT,
# so need to optimize by avoiding lots of os.path.join calls
def mk_file_nm(self, base_dirs, filenum=-1):
if filenum == -1:
filenum = self.filenum
listlen = len(base_dirs)
tree = base_dirs[filenum % listlen]
components = [
tree,
os.sep,
self.file_dirs[filenum],
os.sep,
self.prefix,
"_",
self.onhost,
"_",
self.tid,
"_",
str(filenum),
"_",
self.suffix,
]
return "".join(components)
# generate buffer contents, use these on writes and
# compare against them for reads where random data is used,
def create_biggest_buf(self, contents_random):
# generate random byte sequence if desired.
random_segment_size = 1 << self.random_seg_size_bits
if not self.incompressible:
# generate a random byte sequence of length 2^random_seg_size_bits
# and then repeat the sequence
# until we get to size 2^biggest_buf_size_bits in length
if contents_random:
biggest_buf = bytearray(
[
self.randstate.randrange(0, 127)
for k in range(0, random_segment_size)
]