Skip to content

Commit

Permalink
gazou export --resume
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaz committed Jul 7, 2021
1 parent 823ca91 commit a07420a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 16 deletions.
13 changes: 11 additions & 2 deletions gazouilloire/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,24 @@ def resolve(host, port, path, batch_size, verbose, url_debug, db_name):
"and return the conversations "
"containing those tweets")
@click.option("--list-fields", is_flag=True, help="Print the full list of available fields to export then quit.")
@click.option("--resume", "-r", is_flag=True, help="Restart the export from the last id specified in --output file")
def export(path, query, exclude_threads, exclude_retweets, verbose, export_threads_from_file, export_tweets_from_file,
columns, list_fields, output, since, until):
columns, list_fields, output, resume, since, until):
if resume and not output:
log.error("The --resume option requires to set a file name with --output")
sys.exit(1)

if resume and not os.path.isfile(output):
log.error("The file {} could not be found".format(output))
sys.exit(1)

if list_fields:
for field in TWEET_FIELDS:
print(field)
else:
conf = load_conf(path)
export_csv(conf, query, exclude_threads, exclude_retweets, since, until,
verbose, export_threads_from_file, export_tweets_from_file, columns, output)
verbose, export_threads_from_file, export_tweets_from_file, columns, output, resume)

@main.command(help="Get a report about the number of tweets. Type 'gazou count' to get the number of collected tweets "
"or 'gazou count médialab' to get the number of tweets that contain médialab")
Expand Down
52 changes: 39 additions & 13 deletions gazouilloire/exports/export_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from twitwi.utils import custom_get_normalized_hostname
from twitwi.constants import TWEET_FIELDS
from gazouilloire.config_format import log

from casanova import reverse_reader

def date_to_timestamp(date):
return str(date.timestamp())
Expand All @@ -27,7 +27,7 @@ def post_process_tweet_from_elastic(source):
return source


def yield_csv(queryiterator):
def yield_csv(queryiterator, last_ids=set()):
for t in queryiterator:
try:
source = t["_source"]
Expand All @@ -36,15 +36,15 @@ def yield_csv(queryiterator):
log.error(t["_id"] + " not found in database")
continue
# ignore tweets only caught on deletion missing most fields
if len(source) >= 10:
if len(source) >= 10 and t["_id"] not in last_ids:
transform_tweet_into_csv_dict(
post_process_tweet_from_elastic(source), item_id=t["_id"], allow_erroneous_plurals=True
)
yield source


def build_body(query, exclude_threads, exclude_retweets, since=None, until=None):
if len(query) == 0 and not exclude_threads and not exclude_retweets and not since and not until:
def build_body(query, exclude_threads, exclude_retweets, since=None, until=None, outputfile=None, resume=False):
if len(query) == 0 and not exclude_threads and not exclude_retweets and not since and not until and not resume:
body = {
"query": {
"match_all": {}
Expand Down Expand Up @@ -125,8 +125,25 @@ def call_database(conf):
sys.exit(1)


def find_potential_duplicate_ids(outputfile):
"""
if there is no timestamp in the initial file, error from the beginning?
what if the 2 files do not have the same number of columns?
@param outputfile:
@return: last_timestamp, last_ids
"""
last_ids = set()
last_time = reverse_reader.last_cell(outputfile, 'local_time')
with open(outputfile, "r") as f:
rev_reader = reverse_reader(f)
for row in rev_reader:
if row[rev_reader.headers.local_time] == last_time:
last_ids.add(row[rev_reader.headers.id])
else:
return last_time, last_ids

def export_csv(conf, query, exclude_threads, exclude_retweets, since, until,
verbose, export_threads_from_file, export_tweets_from_file, selection, outputfile):
verbose, export_threads_from_file, export_tweets_from_file, selection, outputfile, resume):
threads = conf.get('grab_conversations', False)
if selection:
SELECTION = selection.split(",")
Expand Down Expand Up @@ -160,24 +177,33 @@ def export_csv(conf, query, exclude_threads, exclude_retweets, since, until,
with open(export_tweets_from_file) as f:
body = sorted([t.get("id", t.get("_id")) for t in csv.DictReader(f)])

else:
body = build_body(query, exclude_threads, exclude_retweets, since, until)

if isinstance(body, list):
if export_threads_from_file or export_tweets_from_file:
count = len(body)
iterator = yield_csv(db.multi_get(body))
else:
last_ids = set()
if resume:
last_timestamp, last_ids = find_potential_duplicate_ids(outputfile)
since = datetime.fromisoformat(last_timestamp)
body = build_body(query, exclude_threads, exclude_retweets, since, until)
count = db.client.count(index=db.tweets, body=body)['count']
body["sort"] = ["timestamp_utc"]
iterator = yield_csv(helpers.scan(client=db.client, index=db.tweets, query=body, preserve_order=True))
iterator = yield_csv(
helpers.scan(client=db.client, index=db.tweets, query=body, preserve_order=True),
last_ids=last_ids
)
if verbose:
import progressbar
bar = progressbar.ProgressBar(max_value=count)
iterator = bar(iterator)

file = open(outputfile, 'w', newline='') if outputfile else sys.stdout
if resume:
file = open(outputfile, 'a', newline='')
else:
file = open(outputfile, 'w', newline='') if outputfile else sys.stdout
writer = csv.DictWriter(file, fieldnames=SELECTION, restval='', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore')
writer.writeheader()
if not resume:
writer.writeheader()
for t in iterator:
writer.writerow(t)
file.close()
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"minet >= 0.52, < 0.53",
"future",
"click",
"progressbar2"
"progressbar2",
"casanova >= 0.15.5"
],
entry_points={
'console_scripts': [
Expand Down

0 comments on commit a07420a

Please sign in to comment.