diff --git a/gazouilloire/cli/__main__.py b/gazouilloire/cli/__main__.py index 5cddcf6..375914d 100644 --- a/gazouilloire/cli/__main__.py +++ b/gazouilloire/cli/__main__.py @@ -177,15 +177,24 @@ def resolve(host, port, path, batch_size, verbose, url_debug, db_name): "and return the conversations " "containing those tweets") @click.option("--list-fields", is_flag=True, help="Print the full list of available fields to export then quit.") +@click.option("--resume", "-r", is_flag=True, help="Restart the export from the last id specified in --output file") def export(path, query, exclude_threads, exclude_retweets, verbose, export_threads_from_file, export_tweets_from_file, - columns, list_fields, output, since, until): + columns, list_fields, output, resume, since, until): + if resume and not output: + log.error("The --resume option requires to set a file name with --output") + sys.exit(1) + + if resume and not os.path.isfile(output): + log.error("The file {} could not be found".format(output)) + sys.exit(1) + if list_fields: for field in TWEET_FIELDS: print(field) else: conf = load_conf(path) export_csv(conf, query, exclude_threads, exclude_retweets, since, until, - verbose, export_threads_from_file, export_tweets_from_file, columns, output) + verbose, export_threads_from_file, export_tweets_from_file, columns, output, resume) @main.command(help="Get a report about the number of tweets. Type 'gazou count' to get the number of collected tweets " "or 'gazou count médialab' to get the number of tweets that contain médialab") diff --git a/gazouilloire/exports/export_csv.py b/gazouilloire/exports/export_csv.py index f3c2648..3187d60 100644 --- a/gazouilloire/exports/export_csv.py +++ b/gazouilloire/exports/export_csv.py @@ -11,7 +11,7 @@ from twitwi.utils import custom_get_normalized_hostname from twitwi.constants import TWEET_FIELDS from gazouilloire.config_format import log - +from casanova import reverse_reader def date_to_timestamp(date): return str(date.timestamp()) @@ -27,7 +27,7 @@ def post_process_tweet_from_elastic(source): return source -def yield_csv(queryiterator): +def yield_csv(queryiterator, last_ids=set()): for t in queryiterator: try: source = t["_source"] @@ -36,15 +36,15 @@ def yield_csv(queryiterator): log.error(t["_id"] + " not found in database") continue # ignore tweets only caught on deletion missing most fields - if len(source) >= 10: + if len(source) >= 10 and t["_id"] not in last_ids: transform_tweet_into_csv_dict( post_process_tweet_from_elastic(source), item_id=t["_id"], allow_erroneous_plurals=True ) yield source -def build_body(query, exclude_threads, exclude_retweets, since=None, until=None): - if len(query) == 0 and not exclude_threads and not exclude_retweets and not since and not until: +def build_body(query, exclude_threads, exclude_retweets, since=None, until=None, outputfile=None, resume=False): + if len(query) == 0 and not exclude_threads and not exclude_retweets and not since and not until and not resume: body = { "query": { "match_all": {} @@ -125,8 +125,25 @@ def call_database(conf): sys.exit(1) +def find_potential_duplicate_ids(outputfile): + """ + if there is no timestamp in the initial file, error from the beginning? + what if the 2 files do not have the same number of columns? + @param outputfile: + @return: last_timestamp, last_ids + """ + last_ids = set() + last_time = reverse_reader.last_cell(outputfile, 'local_time') + with open(outputfile, "r") as f: + rev_reader = reverse_reader(f) + for row in rev_reader: + if row[rev_reader.headers.local_time] == last_time: + last_ids.add(row[rev_reader.headers.id]) + else: + return last_time, last_ids + def export_csv(conf, query, exclude_threads, exclude_retweets, since, until, - verbose, export_threads_from_file, export_tweets_from_file, selection, outputfile): + verbose, export_threads_from_file, export_tweets_from_file, selection, outputfile, resume): threads = conf.get('grab_conversations', False) if selection: SELECTION = selection.split(",") @@ -160,24 +177,33 @@ def export_csv(conf, query, exclude_threads, exclude_retweets, since, until, with open(export_tweets_from_file) as f: body = sorted([t.get("id", t.get("_id")) for t in csv.DictReader(f)]) - else: - body = build_body(query, exclude_threads, exclude_retweets, since, until) - - if isinstance(body, list): + if export_threads_from_file or export_tweets_from_file: count = len(body) iterator = yield_csv(db.multi_get(body)) else: + last_ids = set() + if resume: + last_timestamp, last_ids = find_potential_duplicate_ids(outputfile) + since = datetime.fromisoformat(last_timestamp) + body = build_body(query, exclude_threads, exclude_retweets, since, until) count = db.client.count(index=db.tweets, body=body)['count'] body["sort"] = ["timestamp_utc"] - iterator = yield_csv(helpers.scan(client=db.client, index=db.tweets, query=body, preserve_order=True)) + iterator = yield_csv( + helpers.scan(client=db.client, index=db.tweets, query=body, preserve_order=True), + last_ids=last_ids + ) if verbose: import progressbar bar = progressbar.ProgressBar(max_value=count) iterator = bar(iterator) - file = open(outputfile, 'w', newline='') if outputfile else sys.stdout + if resume: + file = open(outputfile, 'a', newline='') + else: + file = open(outputfile, 'w', newline='') if outputfile else sys.stdout writer = csv.DictWriter(file, fieldnames=SELECTION, restval='', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore') - writer.writeheader() + if not resume: + writer.writeheader() for t in iterator: writer.writerow(t) file.close() diff --git a/setup.py b/setup.py index 7ee7ef3..4feb13b 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,8 @@ "minet >= 0.52, < 0.53", "future", "click", - "progressbar2" + "progressbar2", + "casanova >= 0.15.5" ], entry_points={ 'console_scripts': [