From ba3ee177657393b5abeab2665e90901d6b09e8c1 Mon Sep 17 00:00:00 2001 From: abudnik Date: Tue, 15 Dec 2015 17:56:07 +0400 Subject: [PATCH] recovery: merge: removed obsolete Recovery code; used server-send, copy-iter everywhere --- recovery/elliptics_recovery/types/merge.py | 682 ++++----------------- 1 file changed, 122 insertions(+), 560 deletions(-) diff --git a/recovery/elliptics_recovery/types/merge.py b/recovery/elliptics_recovery/types/merge.py index e47cf9818..36dbf2bbb 100755 --- a/recovery/elliptics_recovery/types/merge.py +++ b/recovery/elliptics_recovery/types/merge.py @@ -31,10 +31,10 @@ import traceback import threading import errno -from bisect import bisect +import time from ..etime import Time -from ..utils.misc import elliptics_create_node, RecoverStat, LookupDirect, RemoveDirect, WindowedRecovery +from ..utils.misc import elliptics_create_node from ..route import RouteList from ..iterator import MergeRecoveryIterator from ..range import IdRange @@ -43,289 +43,6 @@ log = logging.getLogger(__name__) -class Recovery(object): - ''' - Base class for recovering one key with specified timestamp and size from specified address to group - with optional check that group doesn't have or have an older version of the key. - If Recovery was inited with callback then this callback will be called when all work is done. - ''' - def __init__(self, key, timestamp, flags, size, address, backend_id, group, ctx, node, check=True, callback=None): - self.key = key - self.key_timestamp = timestamp - self.key_flags = flags - self.address = address - self.backend_id = backend_id - self.group = group - self.node = node - self.direct_session = elliptics.Session(node) - self.direct_session.trace_id = ctx.trace_id - self.direct_session.set_direct_id(self.address, self.backend_id) - self.direct_session.groups = [group] - self.session = elliptics.Session(node) - self.session.groups = [group] - self.session.trace_id = ctx.trace_id - self.ctx = ctx - self.stats = RecoverStat() - self.result = True - self.attempt = 0 - self.total_size = size - self.recovered_size = 0 - # if size of object more that size of one chunk than file should be read/written in chunks - self.chunked = self.total_size > self.ctx.chunk_size - self.check = check - self.callback = callback - self.complete = threading.Event() - log.debug("Created Recovery object for key: {0}, node: {1}/{2}".format(repr(key), address, backend_id)) - - def run(self): - log.info("Recovering key: {0}, node: {1}/{2}".format(repr(self.key), self.address, self.backend_id)) - if self.key_flags & elliptics.record_flags.uncommitted: - log.info('Key: {0} is uncommitted. Remove it'.format(self.key)) - self.remove() - self.stats.skipped += 1 - return - address, _, backend_id = self.ctx.routes.filter_by_group(self.group).get_id_routes(self.key)[0] - if (address, backend_id) == (self.address, self.backend_id): - log.debug("Key: {0} already on the right node: {1}/{2}" - .format(repr(self.key), self.address, self.backend_id)) - self.stats.skipped += 1 - self.stop(True) - return - else: - log.debug("Key: {0} should be on node: {1}/{2}" - .format(repr(self.key), address, backend_id)) - self.dest_address = address - self.dest_backend_id = backend_id - if self.check: - log.debug("Lookup key: {0} on node: {1}/{2}".format(repr(self.key), - self.dest_address, - self.dest_backend_id)) - LookupDirect(self.dest_address, - self.dest_backend_id, - self.key, - self.group, - self.ctx, - self.node, - self.onlookup).run() - elif self.ctx.dry_run: - log.debug("Dry-run mode is turned on. Skipping reading, writing and removing stages.") - self.stop(True) - return - else: - self.attempt = 0 - self.read() - - def stop(self, result): - self.result = result - log.debug("Finished recovering key: {0} with result: {1}".format(self.key, self.result)) - if self.callback: - self.callback(self.result, self.stats) - self.complete.set() - - def read(self): - size = 0 - try: - log.debug("Reading key: {0} from node: {1}/{2}, chunked: {3}" - .format(repr(self.key), self.address, self.backend_id, self.chunked)) - if self.chunked: - # size of chunk that should be read/written next - size = min(self.total_size - self.recovered_size, self.ctx.chunk_size) - if self.recovered_size != 0: - if self.key_flags & elliptics.record_flags.chunked_csum: - # if record was checksummed by chunks there is no need to disable checksum verification - self.direct_session.ioflags &= ~elliptics.io_flags.nocsum - else: - # if it is not first chunk then do not check checksum on read - self.direct_session.ioflags |= elliptics.io_flags.nocsum - self.direct_session.read_data(self.key, - offset=self.recovered_size, - size=size).connect(self.onread) - except Exception, e: - log.error("Read key: {0} by offset: {1} and size: {2} raised exception: {3}, traceback: {4}" - .format(self.key, self.recovered_size, size, repr(e), traceback.format_exc())) - self.stop(False) - - def write(self): - try: - log.debug("Writing key: {0} to node: {1}/{2}".format(repr(self.key), - self.dest_address, - self.dest_backend_id)) - if self.chunked: - if self.recovered_size == 0: - # if it is first chunk - write it via prepare - self.write_result = self.session.write_prepare(key=self.key, - data=self.write_data, - remote_offset=self.recovered_size, - psize=self.total_size) - elif self.recovered_size + len(self.write_data) < self.total_size: - # if it is not last chunk - write it via write_plain - self.write_result = self.session.write_plain(key=self.key, - data=self.write_data, - remote_offset=self.recovered_size) - else: - # if it is the last chunk - write it via write_commit - self.write_result = self.session.write_commit(key=self.key, - data=self.write_data, - remote_offset=self.recovered_size, - csize=self.total_size) - else: - # if object was not splitted by chunks then write it via write_data - self.write_result = self.session.write_data(key=self.key, - data=self.write_data, - offset=self.recovered_size) - self.write_result.connect(self.onwrite) - except Exception, e: - log.error("Write exception: {0}, traceback: {1}" - .format(repr(e), traceback.format_exc())) - self.Stop(False) - - def remove(self): - if self.ctx.safe or self.ctx.dry_run: - if self.ctx.safe: - log.info("Safe mode is turned on. Skip removing key: {0}".format(repr(self.key))) - else: - log.info("Dry-run mode is turned on. Skip removing key: {0}.".format(repr(self.key))) - self.stop(True) - else: - log.info("Removing key: {0} from node: {1}/{2}".format(repr(self.key), self.address, self.backend_id)) - # remove object directly from address by using RemoveDirect - RemoveDirect(self.address, - self.backend_id, - self.key, - self.group, - self.ctx, - self.node, - self.onremove).run() - - def onlookup(self, result, stats): - try: - self.stats += stats - if result and self.key_timestamp < result.timestamp: - log.debug("Key: {0} on node: {1}/{2} is newer. Just removing it from node: {3}/{4}." - .format(repr(self.key), self.dest_address, self.dest_backend_id, - self.address, self.backend_id)) - self.attempt = 0 - self.remove() - return - - log.debug("Key: {0} on node: {1}/{2} is older or miss. Reading it from node: {3}/{4}" - .format(repr(self.key), self.dest_address, self.dest_backend_id, self.address, self.backend_id)) - if self.ctx.dry_run: - log.debug("Dry-run mode is turned on. Skipping reading, writing and removing stages.") - return - self.attempt = 0 - self.read() - except Exception as e: - log.error("Onlookup exception: {0}, traceback: {1}" - .format(repr(e), traceback.format_exc())) - self.stop(False) - - def onread(self, results, error): - try: - if error.code or len(results) < 1: - log.debug("Read key: {0} on node: {1}/{2} has been timed out: {3}" - .format(repr(self.key), self.address, self.backend_id, error)) - if self.attempt < self.ctx.attempts: - old_timeout = self.session.timeout - self.session.timeout *= 2 - self.attempt += 1 - log.debug("Retry to read key: {0} attempt: {1}/{2} " - "increased timeout: {3}/{4}" - .format(repr(self.key), self.attempt, - self.ctx.attempts, - self.direct_session.timeout, old_timeout)) - self.read() - self.stats.read_retries += 1 - return - log.error("Reading key: {0} on the node: {1}/{2} failed. " - "Skipping it: {3}" - .format(repr(self.key), - self.address, - self.backend_id, - error)) - self.stats.read_failed += 1 - self.stop(False) - return - - if self.recovered_size == 0: - self.session.user_flags = results[0].user_flags - self.session.timestamp = results[0].timestamp - self.key_flags = results[0].record_flags - if self.total_size != results[0].total_size: - self.total_size = results[0].total_size - self.chunked = self.total_size > self.ctx.chunk_size - self.stats.read += 1 - self.write_data = results[0].data - self.total_size = results[0].io_attribute.total_size - self.stats.read_bytes += results[0].size - self.attempt = 0 - self.write() - except Exception as e: - log.error("Onread exception: {0}, traceback: {1}" - .format(repr(e), traceback.format_exc())) - self.stop(False) - - def onwrite(self, results, error): - self.write_result = None - try: - if error.code or len(results) < 1: - log.debug("Write key: {0} on node: {1}/{2} has been timed out: {3}" - .format(repr(self.key), - self.dest_address, - self.dest_backend_id, - error)) - if self.attempt < self.ctx.attempts: - old_timeout = self.session.timeout - self.session.timeout *= 2 - self.attempt += 1 - log.debug("Retry to write key: {0} attempt: {1}/{2} " - "increased timeout: {3}/{4}" - .format(repr(self.key), - self.attempt, self.ctx.attempts, - self.direct_session.timeout, old_timeout)) - self.stats.write_retries += 1 - self.write() - return - log.error("Writing key: {0} to node: {1}/{2} failed. " - "Skipping it: {3}" - .format(repr(self.key), - self.dest_address, - self.dest_backend_id, - error)) - self.stats.write_failed += 1 - self.stop(False) - return - - self.stats.write += 1 - self.stats.written_bytes += len(self.write_data) - self.recovered_size += len(self.write_data) - self.attempt = 0 - - if self.recovered_size < self.total_size: - self.read() - else: - log.debug("Key: {0} has been copied to node: {1}/{2}. So we can delete it from node: {3}/{4}" - .format(repr(self.key), self.dest_address, self.dest_backend_id, - self.address, self.backend_id)) - self.remove() - except Exception as e: - log.error("Onwrite exception: {0}, traceback: {1}" - .format(repr(e), traceback.format_exc())) - self.stop(False) - - def onremove(self, removed, stats): - self.stats += stats - self.stop(removed) - - def wait(self): - if not self.complete.is_set(): - self.complete.wait() - - def succeeded(self): - self.wait() - return self.result - - def iterate_node(ctx, node, address, backend_id, ranges, eid, stats): try: log.debug("Running iterator on node: {0}/{1}".format(address, backend_id)) @@ -355,91 +72,52 @@ def iterate_node(ctx, node, address, backend_id, ranges, eid, stats): return None -class WindowedMerge(WindowedRecovery): - def __init__(self, ctx, address, backend_id, group, node, results, stats): - super(WindowedMerge, self).__init__(ctx, stats) - self.address = address - self.backend_id = backend_id - self.group = group - self.node = node - self.results = iter(results) - - def run_one(self): - try: - response = None - with self.lock: - response = next(self.results) - self.recovers_in_progress += 1 - Recovery(key=response.key, - timestamp=response.timestamp, - size=response.size, - flags=response.record_flags, - address=self.address, - backend_id=self.backend_id, - group=self.group, - ctx=self.ctx, - node=self.node, - callback=self.callback).run() - return True - except StopIteration: - pass - return False - - -def recover(ctx, address, backend_id, group, node, results, stats): - if results is None or len(results) < 1: - log.warning("Recover skipped iterator results are empty for node: {0}/{1}" - .format(address, backend_id)) - return True - - ret = WindowedMerge(ctx=ctx, - address=address, - backend_id=backend_id, - group=group, - node=node, - results=results, - stats=stats).run() - - return ret - - def process_node_backend(ctx, address, backend_id, group, ranges): - log.debug("Processing node: {0}/{1} from group: {2} for ranges: {3}" - .format(address, backend_id, group, ranges)) - stats = ctx.stats['node_{0}/{1}'.format(address, backend_id)] - stats.timer('process', 'started') - - elog = elliptics.Logger(ctx.log_file, int(ctx.log_level)) - node = elliptics_create_node(address=ctx.address, - elog=elog, - wait_timeout=ctx.wait_timeout, - remotes=ctx.remotes, - io_thread_num=4) - - stats.timer('process', 'iterate') - results = iterate_node(ctx=ctx, - node=node, - address=address, - backend_id=backend_id, - ranges=ranges, - eid=ctx.routes.get_address_backend_route_id(address, backend_id), - stats=stats) - if results is None or len(results) == 0: - log.warning('Iterator result is empty, skipping') - return True - - stats.timer('process', 'dump_keys') - dump_path = os.path.join(ctx.tmp_dir, 'dump_{0}.{1}'.format(address, backend_id)) - log.debug("Dump iterated keys to file: {0}".format(dump_path)) - with open(dump_path, 'w') as dump_f: - for r in results: - dump_f.write('{0}\n'.format(r.key)) - - stats.timer('process', 'recover') - ret = recover(ctx, address, backend_id, group, node, results, stats) - stats.timer('process', 'finished') + try: + log.debug("Processing node: {0}/{1} from group: {2} for ranges: {3}" + .format(address, backend_id, group, ranges)) + stats = ctx.stats['node_{0}/{1}'.format(address, backend_id)] + stats.timer('process', 'started') + + elog = elliptics.Logger(ctx.log_file, int(ctx.log_level)) + node = elliptics_create_node(address=ctx.address, + elog=elog, + wait_timeout=ctx.wait_timeout, + remotes=ctx.remotes, + io_thread_num=4) + + stats.timer('process', 'iterate') + results = iterate_node(ctx=ctx, + node=node, + address=address, + backend_id=backend_id, + ranges=ranges, + eid=ctx.routes.get_address_backend_route_id(address, backend_id), + stats=stats) + if results is None or len(results) == 0: + log.warning('Iterator result is empty, skipping') + return True - return ret + stats.timer('process', 'dump_keys') + dump_path = os.path.join(ctx.tmp_dir, 'dump_{0}.{1}'.format(address, backend_id)) + log.debug("Dump iterated keys to file: {0}".format(dump_path)) + with open(dump_path, 'w') as dump_f: + for r in results: + dump_f.write('{0}\n'.format(r.key)) + + stats.timer('process', 'recover') + ss_rec = ServerSendRecovery(ctx, node, group, stats, address, backend_id) + ret = True + for batch_id, batch in groupby(enumerate(results), key=lambda x: x[0] / ctx.batch_size): + keys = [val.key for _, val in batch] + ret &= ss_rec.recover(keys) + stats.timer('process', 'finished') + + return ret + except Exception as e: + log.error("Processing node failed for: {0}/{1}: {2}, traceback: {3}" + .format(address, backend_id, repr(e), traceback.format_exc())) + return False def get_ranges(ctx, group): @@ -549,167 +227,38 @@ def main(ctx): return ret -# special recovery class that lookups id on all nodes in group -# finds out newest version and reads/writes it to node where it should live. -class DumpRecover(object): - def __init__(self, routes, node, id, group, ctx): - self.node = node - self.id = id - self.routes = routes.filter_by_group(group) - self.group = group - self.ctx = ctx - # determines node where the id lives - self.address, _, self.backend_id = self.routes.get_id_routes(self.id)[0] - self.lookups_count = 0 - self.recover_address = None - self.stats = RecoverStat() - self.complete = threading.Event() - self.result = True - - def run(self): - self.lookup_results = [] - # looks up for id on each node in group - if self.ctx.one_node: - id_host = self.routes.get_id_routes(self.id)[0][0], self.routes.get_id_routes(self.id)[0][2] - if self.ctx.backend_id is not None: - addresses_with_backends = [(self.ctx.address, self.ctx.backend_id)] - else: - address_routes = self.routes.filter_by_address(self.ctx.address) - addresses_with_backends = list(address_routes.addresses_with_backends()) - if id_host not in addresses_with_backends: - addresses_with_backends.append(id_host) - else: - addresses_with_backends = self.routes.addresses_with_backends() - if len(addresses_with_backends) <= 1: - log.debug("Key: {0} already on the right node: {1}/{2}. Skip it" - .format(repr(self.id), id_host[0], id_host[1])) - self.stats.skipped += 1 - self.stop(True) - return - self.lookups_count = len(addresses_with_backends) - for addr, backend_id in addresses_with_backends: - LookupDirect(addr, - backend_id, - self.id, - self.group, - self.ctx, - self.node, - self.onlookup).run() - - def stop(self, result): - self.result = result - log.debug("Finished recovering key: {0} with result: {1}".format(self.id, self.result)) - self.complete.set() - - def onlookup(self, result, stats): - self.stats += stats - self.lookup_results.append(result) - if len(self.lookup_results) == self.lookups_count: - self.check() - - def check(self): - # finds timestamp of newest object - self.lookup_results = [r for r in self.lookup_results if r] - if not self.lookup_results: - log.debug("Key: {0} has not been found in group {1}. Skip it".format(self.id, self.group)) - self.stats.skipped += 1 - self.stop(True) - return - max_ts = max([r.timestamp for r in self.lookup_results]) - log.debug("Max timestamp of key: {0}: {1}".format(repr(self.id), max_ts)) - # filters objects with newest timestamp - results = [r for r in self.lookup_results if r and r.timestamp == max_ts] - # finds max size of newest object - max_size = max([r.total_size for r in results]) - log.debug("Max size of latest replicas for key: {0}: {1}".format(repr(self.id), max_size)) - # filters newest objects with max size - results = [(r.address, r.backend_id, r.record_flags) for r in results if r.total_size == max_size] - if (self.address, self.backend_id) in results: - log.debug("Node: {0} already has the latest version of key: {1}." - .format(self.address, repr(self.id), self.group)) - # if destination node already has newest object then just remove key from unproper nodes - self.remove() - else: - # if destination node has outdated object - recovery it from one of filtered nodes - self.timestamp = max_ts - self.size = max_size - self.recover_address, self.recover_backend_id, self.key_flags = results[0] - log.debug("Node: {0} has the newer version of key: {1}. Recovering it on node: {2}" - .format(self.recover_address, repr(self.id), self.address)) - self.recover() - - def recover(self): - self.recover_result = Recovery(key=self.id, - timestamp=self.timestamp, - size=self.size, - flags=self.key_flags, - address=self.recover_address, - backend_id=self.recover_backend_id, - group=self.group, - ctx=self.ctx, - node=self.node, - check=False, - callback=self.onrecover) - self.recover_result.run() - - def onrecover(self, result, stats): - self.stats += stats - self.result &= result - self.remove() - - def remove(self): - # remove id from node with positive lookups but not from destination node and node that took a part in recovery - def check(r): - return r and r.address not in (self.address, self.recover_address) - addresses_with_backends = [(r.address, r.backend_id) for r in self.lookup_results if check(r)] - if addresses_with_backends and not self.ctx.safe: - log.debug("Removing key: {0} from nodes: {1}".format(repr(self.id), addresses_with_backends)) - for addr, backend_id in addresses_with_backends: - RemoveDirect(addr, backend_id, self.id, self.group, - self.ctx, self.node, self.onremove).run() - else: - self.stop(True) - - def wait(self): - if not self.complete.is_set(): - self.complete.wait() - - def onremove(self, removed, stats): - self.stats += stats - self.stop(self.result & removed) - - def succeeded(self): - self.wait() - return self.result - - class ServerSendRecovery(object): ''' Special recovery class that tries to recover keys from backends that should not contain this keys to proper backend via server-send operation. ''' - def __init__(self, ctx, node, group): - self.routes, self.backends = self._prepare_routes(ctx, group) + def __init__(self, ctx, node, group, stats, address, backend_id=None): + self.routes, self.backends = self._prepare_routes(ctx, group, address, backend_id) self.session = elliptics.Session(node) self.session.exceptions_policy = elliptics.exceptions_policy.no_exceptions self.session.set_filter(elliptics.filters.all) self.session.timeout = 60 self.session.groups = [group] self.session.trace_id = ctx.trace_id + self.remove_session = self.session.clone() + self.remove_session.set_filter(elliptics.filters.all_final) self.ctx = ctx + self.stats = stats - def _prepare_routes(self, ctx, group): + def _prepare_routes(self, ctx, group, address, backend_id): ''' Returns pair: route list of @group and list of pairs (address, backend) ''' group_routes = ctx.routes.filter_by_groups([group]) backends = [] - if ctx.one_node: + if backend_id is not None: + backends.append((address, backend_id)) + elif ctx.one_node: if ctx.backend_id is not None: - backends = [(ctx.address, ctx.backend_id)] + backends.append((address, ctx.backend_id)) else: - for backend_id in group_routes.get_address_backends(ctx.address): - backends.append((ctx.address, backend_id)) + for backend_id in group_routes.get_address_backends(address): + backends.append((address, backend_id)) else: backends = group_routes.addresses_with_backends() @@ -731,29 +280,43 @@ def contain(address, backend_id, key): responses = {str(k): [] for k in keys} # key -> [list of responses] for addr, backend_id in self.backends: key_candidates = [k for k in keys if not contain(addr, backend_id, k)] - if key_candidates: - self._server_send(key_candidates, addr, backend_id, responses) + for i in range(self.ctx.attempts): + if key_candidates: + timeouted_keys = self._server_send(key_candidates, addr, backend_id, responses) + key_candidates = timeouted_keys self._remove_bad_keys(responses) - return self._get_unrecovered_keys(responses) + return not self._has_unrecovered_keys(responses) def _server_send(self, keys, addr, backend_id, responses): ''' Calls server-send with a given list of keys to the specific backend. ''' log.debug("Server-send: address: {0}, backend: {1}, num keys: {2}".format(addr, backend_id, len(keys))) + + timeouted_keys = [] if self.ctx.dry_run: - return + return timeouted_keys + + start_time = time.time() + recovers_in_progress = len(keys) self.session.set_direct_id(addr, backend_id) flags = 0 if self.ctx.safe else elliptics.iterator_flags.move iterator = self.session.server_send(keys, flags, list(self.session.groups)) - for result in iterator: + + for index, result in enumerate(iterator): status = result.response.status - key = str(result.response.key) - r = (status, addr, backend_id) + self._update_stats(start_time, index + 1, recovers_in_progress, status) + key = result.response.key log.debug("Server-send result: key: {0}, status: {1}".format(key, status)) - responses[key].append(r) + if status == -errno.ETIMEDOUT: + timeouted_keys.append(key) + else: + r = (status, addr, backend_id) + responses[str(key)].append(r) + + return timeouted_keys def _remove_bad_keys(self, responses): ''' @@ -769,8 +332,8 @@ def _remove_bad_keys(self, responses): results = [] for k in bad_keys: key, _, addr, backend_id = k - self.session.set_direct_id(addr, backend_id) - result = self.session.remove(elliptics.Id(key)) + self.remove_session.set_direct_id(addr, backend_id) + result = self.remove_session.remove(elliptics.Id(key)) results.append(result) for i, r in enumerate(results): @@ -781,17 +344,14 @@ def _check_bad_key(self, response): status = response[0] return status == -errno.EBADFD or status == -errno.EILSEQ - def _get_unrecovered_keys(self, responses): + def _has_unrecovered_keys(self, responses): ''' - Returns keys that was not recovered via server-send. + Returns True, if some key was not recovered via server-send. ''' - keys = [] - for val in responses.iteritems(): - key_responses = val[1] - if not key_responses or self._check_unrecovered_key(key_responses): - key = elliptics.Id(val[0]) - keys.append(key) - return keys + for key_responses in responses.itervalues(): + if self._check_unrecovered_key(key_responses): + return True + return False def _check_unrecovered_key(self, responses): ''' @@ -803,41 +363,43 @@ def _check_unrecovered_key(self, responses): return True return False + def _update_stats(self, start_time, processed_keys, recovers_in_progress, status): + speed = processed_keys / (time.time() - start_time) + recovers_in_progress -= processed_keys + self.stats.set_counter('recovery_speed', round(speed, 2)) + self.stats.set_counter('recovers_in_progress', recovers_in_progress) + self.stats.counter('recovered_keys', 1 if status == 0 else -1) + self.ctx.stats.counter('recovered_keys', 1 if status == 0 else -1) + def dump_process_group((ctx, group)): - log.debug("Processing group: {0}".format(group)) - stats = ctx.stats['group_{0}'.format(group)] - stats.timer('process', 'started') - if group not in ctx.routes.groups(): - log.error("Group: {0} is not presented in route list".format(group)) + try: + log.debug("Processing group: {0}".format(group)) + stats = ctx.stats['group_{0}'.format(group)] + stats.timer('process', 'started') + if group not in ctx.routes.groups(): + log.error("Group: {0} is not presented in route list".format(group)) + return False + elog = elliptics.Logger(ctx.log_file, int(ctx.log_level)) + node = elliptics_create_node(address=ctx.address, + elog=elog, + wait_timeout=ctx.wait_timeout, + net_thread_num=1, + io_thread_num=1, + remotes=ctx.remotes) + ret = True + with open(ctx.dump_file, 'r') as dump: + ss_rec = ServerSendRecovery(ctx, node, group, stats, ctx.address) + # splits ids from dump file in batchs and recovers it + for batch_id, batch in groupby(enumerate(dump), key=lambda x: x[0] / ctx.batch_size): + keys = [elliptics.Id(val) for _, val in batch] + ret &= ss_rec.recover(keys) + stats.timer('process', 'finished') + return ret + except Exception as e: + log.error("Processing group failed for: {0}, group: {1}: {2}, traceback: {3}" + .format(ctx.address, group, repr(e), traceback.format_exc())) return False - elog = elliptics.Logger(ctx.log_file, int(ctx.log_level)) - node = elliptics_create_node(address=ctx.address, - elog=elog, - wait_timeout=ctx.wait_timeout, - net_thread_num=1, - io_thread_num=1, - remotes=ctx.remotes) - ret = True - with open(ctx.dump_file, 'r') as dump: - ss_rec = ServerSendRecovery(ctx, node, group) - # splits ids from dump file in batchs and recovers it - for batch_id, batch in groupby(enumerate(dump), key=lambda x: x[0] / ctx.batch_size): - recovers = [] - rs = RecoverStat() - keys = [elliptics.Id(val) for _, val in batch] - keys = ss_rec.recover(keys) - for k in keys: - rec = DumpRecover(routes=ctx.routes, node=node, id=k, group=group, ctx=ctx) - recovers.append(rec) - rec.run() - for r in recovers: - r.wait() - ret &= r.succeeded() - rs += r.stats - rs.apply(stats) - stats.timer('process', 'finished') - return ret def dump_main(ctx):