Skip to content

Commit

Permalink
recovery: various fixes & cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
abudnik committed Dec 15, 2015
1 parent ba3ee17 commit bdf6947
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 30 deletions.
19 changes: 13 additions & 6 deletions recovery/elliptics_recovery/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ def start(self,

iterated_keys = 0
total_keys = 0
positive_responses = 0
negative_responses = 0

start = time.time()

Expand All @@ -316,16 +318,20 @@ def start(self,

iterated_keys = record.response.iterated_keys
total_keys = record.response.total_keys
if record.response.status == 0:
positive_responses += 1
else:
negative_responses += 1

if iterated_keys % batch_size == 0:
yield (iterated_keys, total_keys, start, end)
yield (iterated_keys, total_keys, positive_responses, negative_responses, start, end)

self._on_key_response(results, record)
end = time.time()

elapsed_time = records.elapsed_time()
self.log.debug("Time spended for iterator: {0}/{1}".format(elapsed_time.tsec, elapsed_time.tnsec))
yield (iterated_keys, total_keys, start, end)
yield (iterated_keys, total_keys, positive_responses, negative_responses, start, end)
if self.separately:
yield results
else:
Expand Down Expand Up @@ -372,7 +378,7 @@ def iterate_with_stats(self, eid, timestamp_range,
result = it
break

result_len = iterated_keys = it[0]
result_len = it[0]
self._update_stats(stats, it)

if result is None:
Expand All @@ -383,7 +389,7 @@ def iterate_with_stats(self, eid, timestamp_range,
return result, result_len

def _update_stats(self, stats, it):
iterated_keys, total_keys, start, end = it
iterated_keys, total_keys, _, _, start, end = it
stats.set_counter('iteration_speed', round(iterated_keys / (end - start), 2))
stats.set_counter('iterated_keys', iterated_keys)
stats.set_counter('total_keys', total_keys)
Expand All @@ -407,9 +413,10 @@ def _on_key_response(self, results, record):
self._save_record(results, record)

def _update_stats(self, stats, it):
iterated_keys, total_keys, start, end = it
iterated_keys, total_keys, positive_responses, negative_responses, start, end = it
stats.set_counter('recovery_speed', round(iterated_keys / (end - start), 2))
stats.set_counter('recovered_keys', iterated_keys)
stats.set_counter('recovered_keys', positive_responses)
stats.set_counter('recovered_keys', -negative_responses)
stats.set_counter('total_keys', total_keys)


Expand Down
53 changes: 29 additions & 24 deletions recovery/elliptics_recovery/types/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ class ServerSendRecovery(object):
Special recovery class that tries to recover keys from backends that
should not contain this keys to proper backend via server-send operation.
'''
def __init__(self, ctx, node, group, stats, address, backend_id=None):
self.routes, self.backends = self._prepare_routes(ctx, group, address, backend_id)
def __init__(self, ctx, node, group, stats, address, backend_id):
self.routes = ctx.routes.filter_by_groups([group])
self.backends = self._prepare_backends(ctx, group, address, backend_id)
self.session = elliptics.Session(node)
self.session.exceptions_policy = elliptics.exceptions_policy.no_exceptions
self.session.set_filter(elliptics.filters.all)
Expand All @@ -245,25 +246,21 @@ def __init__(self, ctx, node, group, stats, address, backend_id=None):
self.ctx = ctx
self.stats = stats

def _prepare_routes(self, ctx, group, address, backend_id):
def _prepare_backends(self, ctx, group, address, backend_id):
'''
Returns pair: route list of @group and list of pairs (address, backend)
Returns list of pairs (address, backend)
'''
group_routes = ctx.routes.filter_by_groups([group])
backends = []
if backend_id is not None:
backends.append((address, backend_id))
backends = [(address, backend_id)]
elif ctx.one_node:
if ctx.backend_id is not None:
backends.append((address, ctx.backend_id))
else:
for backend_id in group_routes.get_address_backends(address):
backends.append((address, backend_id))
backends = []
for backend_id in self.routes.get_address_backends(address):
backends.append((address, backend_id))
else:
backends = group_routes.addresses_with_backends()
backends = self.routes.addresses_with_backends()

log.info("Server-send recovery: group: {0}, num backends: {1}".format(group, len(backends)))
return group_routes, backends
return backends

def recover(self, keys):
'''
Expand All @@ -284,19 +281,21 @@ def contain(address, backend_id, key):
if key_candidates:
timeouted_keys = self._server_send(key_candidates, addr, backend_id, responses)
key_candidates = timeouted_keys
self._update_timeouted_keys_stats(len(key_candidates))

self._remove_bad_keys(responses)

return not self._has_unrecovered_keys(responses)

def _server_send(self, keys, addr, backend_id, responses):
'''
Calls server-send with a given list of keys to the specific backend.
Returns list of timeouted keys.
'''
log.debug("Server-send: address: {0}, backend: {1}, num keys: {2}".format(addr, backend_id, len(keys)))

timeouted_keys = []
if self.ctx.dry_run:
return timeouted_keys
return []

start_time = time.time()
recovers_in_progress = len(keys)
Expand All @@ -305,6 +304,7 @@ def _server_send(self, keys, addr, backend_id, responses):
flags = 0 if self.ctx.safe else elliptics.iterator_flags.move
iterator = self.session.server_send(keys, flags, list(self.session.groups))

timeouted_keys = []
for index, result in enumerate(iterator):
status = result.response.status
self._update_stats(start_time, index + 1, recovers_in_progress, status)
Expand All @@ -323,10 +323,9 @@ def _remove_bad_keys(self, responses):
Removes invalid keys with older timestamp or invalid checksum.
'''
bad_keys = []
for val in responses.iteritems():
for response in val[1]:
for key, responses in responses.iteritems():
for response in responses:
if self._check_bad_key(response):
key = val[0]
bad_keys.append((key, ) + response)

results = []
Expand All @@ -342,7 +341,7 @@ def _remove_bad_keys(self, responses):

def _check_bad_key(self, response):
status = response[0]
return status == -errno.EBADFD or status == -errno.EILSEQ
return status in (-errno.EBADFD, -errno.EILSEQ)

def _has_unrecovered_keys(self, responses):
'''
Expand All @@ -355,7 +354,7 @@ def _has_unrecovered_keys(self, responses):

def _check_unrecovered_key(self, responses):
'''
Returns True, if a valid key exists on the backend, but the key could not be recovered by any reason.
Returns True, if a valid key exists at the backend, but the key could not be recovered by any reason.
'''
for r in responses:
status = r[0]
Expand All @@ -368,8 +367,13 @@ def _update_stats(self, start_time, processed_keys, recovers_in_progress, status
recovers_in_progress -= processed_keys
self.stats.set_counter('recovery_speed', round(speed, 2))
self.stats.set_counter('recovers_in_progress', recovers_in_progress)
self.stats.counter('recovered_keys', 1 if status == 0 else -1)
self.ctx.stats.counter('recovered_keys', 1 if status == 0 else -1)
if status != -errno.ETIMEDOUT:
self.stats.counter('recovered_keys', 1 if status == 0 else -1)
self.ctx.stats.counter('recovered_keys', 1 if status == 0 else -1)

def _update_timeouted_keys_stats(self, num_timeouted_keys):
self.stats.counter('recovered_keys', -num_timeouted_keys)
self.ctx.stats.counter('recovered_keys', -num_timeouted_keys)


def dump_process_group((ctx, group)):
Expand All @@ -389,7 +393,8 @@ def dump_process_group((ctx, group)):
remotes=ctx.remotes)
ret = True
with open(ctx.dump_file, 'r') as dump:
ss_rec = ServerSendRecovery(ctx, node, group, stats, ctx.address)
backend_id = ctx.backend_id if ctx.one_node else None
ss_rec = ServerSendRecovery(ctx, node, group, stats, ctx.address, backend_id)
# splits ids from dump file in batchs and recovers it
for batch_id, batch in groupby(enumerate(dump), key=lambda x: x[0] / ctx.batch_size):
keys = [elliptics.Id(val) for _, val in batch]
Expand Down

0 comments on commit bdf6947

Please sign in to comment.