Skip to content

Commit

Permalink
recovery: merge: retries of timeouted remove & server-send operations
Browse files Browse the repository at this point in the history
  • Loading branch information
abudnik committed Dec 16, 2015
1 parent bdf6947 commit f251b74
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions recovery/elliptics_recovery/types/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def _server_send(self, keys, addr, backend_id, responses):
iterator = self.session.server_send(keys, flags, list(self.session.groups))

timeouted_keys = []
index = -1
for index, result in enumerate(iterator):
status = result.response.status
self._update_stats(start_time, index + 1, recovers_in_progress, status)
Expand All @@ -316,6 +317,10 @@ def _server_send(self, keys, addr, backend_id, responses):
r = (status, addr, backend_id)
responses[str(key)].append(r)

if index < 0:
log.error("Server-send operation failed: {0}/{1}".format(addr, backend_id))
timeouted_keys = keys

return timeouted_keys

def _remove_bad_keys(self, responses):
Expand All @@ -328,16 +333,23 @@ def _remove_bad_keys(self, responses):
if self._check_bad_key(response):
bad_keys.append((key, ) + response)

results = []
for k in bad_keys:
key, _, addr, backend_id = k
self.remove_session.set_direct_id(addr, backend_id)
result = self.remove_session.remove(elliptics.Id(key))
results.append(result)

for i, r in enumerate(results):
status = r.get()[0].status
log.info("Removing key: {0}, status: ".format(bad_keys[i], status))
for attempt in range(self.ctx.attempts):
if bad_keys:
results = []
for k in bad_keys:
key, _, addr, backend_id = k
self.remove_session.set_direct_id(addr, backend_id)
result = self.remove_session.remove(elliptics.Id(key))
results.append(result)

timeouted_keys = []
is_last_attempt = (attempt == self.ctx.attempts - 1)
for i, r in enumerate(results):
status = r.get()[0].status
log.info("Removing key: {0}, status: {1}, last attempt: {2}".format(bad_keys[i], status, is_last_attempt))
if status == -errno.ETIMEDOUT:
timeouted_keys.append(bad_keys[i])
bad_keys = timeouted_keys

def _check_bad_key(self, response):
status = response[0]
Expand Down

0 comments on commit f251b74

Please sign in to comment.