Skip to content

Commit

Permalink
recovery: merge: removed obsolete Recovery code; used server-send, co…
Browse files Browse the repository at this point in the history
…py-iter everywhere
  • Loading branch information
abudnik committed Sep 27, 2016
1 parent 2473014 commit d3382e6
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 608 deletions.
38 changes: 28 additions & 10 deletions recovery/elliptics_recovery/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ def start(self,

iterated_keys = 0
total_keys = 0
positive_responses = 0
negative_responses = 0

start = time.time()

Expand All @@ -316,16 +318,20 @@ def start(self,

iterated_keys = record.response.iterated_keys
total_keys = record.response.total_keys
if record.response.status == 0:
positive_responses += 1
else:
negative_responses += 1

if iterated_keys % batch_size == 0:
yield (iterated_keys, total_keys, start, end)
yield (iterated_keys, total_keys, positive_responses, negative_responses, start, end)

self._on_key_response(results, record)
self._on_key_response(results, record, address, backend_id)
end = time.time()

elapsed_time = records.elapsed_time()
self.log.debug("Time spended for iterator: {0}/{1}".format(elapsed_time.tsec, elapsed_time.tnsec))
yield (iterated_keys, total_keys, start, end)
yield (iterated_keys, total_keys, positive_responses, negative_responses, start, end)
if self.separately:
yield results
else:
Expand All @@ -343,7 +349,7 @@ def _start_iterator(self, eid, ranges, flags, timestamp_range):
timestamp_range[0],
timestamp_range[1])

def _on_key_response(self, results, record):
def _on_key_response(self, results, record, address, backend_id):
if record.response.status == 0:
self._save_record(results, record)

Expand Down Expand Up @@ -372,11 +378,8 @@ def iterate_with_stats(self, eid, timestamp_range,
result = it
break

iterated_keys, total_keys, start, end = it
result_len = iterated_keys
stats.set_counter('iteration_speed', round(iterated_keys / (end - start), 2))
stats.set_counter('iterated_keys', iterated_keys)
stats.set_counter('total_keys', total_keys)
result_len = it[0]
self._update_stats(stats, it)

if result is None:
stats.set_counter('iterations', -1)
Expand All @@ -385,6 +388,12 @@ def iterate_with_stats(self, eid, timestamp_range,

return result, result_len

def _update_stats(self, stats, it):
iterated_keys, total_keys, _, _, start, end = it
stats.set_counter('iteration_speed', round(iterated_keys / (end - start), 2))
stats.set_counter('iterated_keys', iterated_keys)
stats.set_counter('total_keys', total_keys)


class MergeRecoveryIterator(Iterator):
'''
Expand All @@ -399,10 +408,19 @@ def _start_iterator(self, eid, ranges, flags, timestamp_range):
flags |= elliptics.iterator_flags.move
return self.session.start_copy_iterator(eid, ranges, [eid.group_id], flags, timestamp_range[0], timestamp_range[1])

def _on_key_response(self, results, record):
def _on_key_response(self, results, record, address, backend_id):
if record.response.status != 0:
self.log.error("Key recovery on node: {0}/{1} failed: {2}, key: {3}"
.format(address, backend_id, record.response.status, record.response.key))
self._save_record(results, record)

def _update_stats(self, stats, it):
iterated_keys, total_keys, positive_responses, negative_responses, start, end = it
stats.set_counter('recovery_speed', round(iterated_keys / (end - start), 2))
stats.set_counter('recovered_keys', positive_responses)
stats.set_counter('recovered_keys', -negative_responses)
stats.set_counter('total_keys', total_keys)


class MergeData(object):
"""
Expand Down
Loading

0 comments on commit d3382e6

Please sign in to comment.