diff --git a/recovery/elliptics_recovery/types/merge.py b/recovery/elliptics_recovery/types/merge.py index ead5a3d3d..5d097d8d1 100755 --- a/recovery/elliptics_recovery/types/merge.py +++ b/recovery/elliptics_recovery/types/merge.py @@ -305,6 +305,7 @@ def _server_send(self, keys, addr, backend_id, responses): iterator = self.session.server_send(keys, flags, list(self.session.groups)) timeouted_keys = [] + index = -1 for index, result in enumerate(iterator): status = result.response.status self._update_stats(start_time, index + 1, recovers_in_progress, status) @@ -316,6 +317,10 @@ def _server_send(self, keys, addr, backend_id, responses): r = (status, addr, backend_id) responses[str(key)].append(r) + if index < 0: + log.error("Server-send operation failed: {0}/{1}".format(addr, backend_id)) + timeouted_keys = keys + return timeouted_keys def _remove_bad_keys(self, responses): @@ -328,16 +333,23 @@ def _remove_bad_keys(self, responses): if self._check_bad_key(response): bad_keys.append((key, ) + response) - results = [] - for k in bad_keys: - key, _, addr, backend_id = k - self.remove_session.set_direct_id(addr, backend_id) - result = self.remove_session.remove(elliptics.Id(key)) - results.append(result) - - for i, r in enumerate(results): - status = r.get()[0].status - log.info("Removing key: {0}, status: ".format(bad_keys[i], status)) + for attempt in range(self.ctx.attempts): + if bad_keys: + results = [] + for k in bad_keys: + key, _, addr, backend_id = k + self.remove_session.set_direct_id(addr, backend_id) + result = self.remove_session.remove(elliptics.Id(key)) + results.append(result) + + timeouted_keys = [] + is_last_attempt = (attempt == self.ctx.attempts - 1) + for i, r in enumerate(results): + status = r.get()[0].status + log.info("Removing key: {0}, status: {1}, last attempt: {2}".format(bad_keys[i], status, is_last_attempt)) + if status == -errno.ETIMEDOUT: + timeouted_keys.append(bad_keys[i]) + bad_keys = timeouted_keys def _check_bad_key(self, response): status = response[0]