diff --git a/RedditUtils.py b/RedditUtils.py new file mode 100644 index 0000000..c812f92 --- /dev/null +++ b/RedditUtils.py @@ -0,0 +1,74 @@ +import os +import re + +import praw +from praw.exceptions import RedditAPIException +from praw.models import Submission, Comment +from psaw import PushshiftAPI + +from WikiClient import WikiClient +from const import Const +from replies import get_response_message + + +class RedditUtils: + client_id = os.environ.get('CLIENT_ID') + client_secret = os.environ.get('CLIENT_SECRET') + username = os.environ.get('USERNAME') + password = os.environ.get('PASS') + + def __init__(self): + self.reddit = praw.Reddit(client_id=self.client_id, + client_secret=self.client_secret, + user_agent='windows:github.com/matej2/location-info:v0.6 (by /u/mtj510)', + username=self.username, + password=self.password) + if not self.reddit.read_only: + print("Connected and running.") + + def get_meta_post(self): + """ + Get meta post + :return: + praw.models.Submission: A meta post + """ + api = PushshiftAPI() + sub = self.reddit.subreddit('u_LocationInfoBot') + + gen = api.search_submissions(filter=['title', 'selftext', 'url'], limit=20, q='meta', author=Const.user) + result = list(gen) + + if result == [] or result is None or result[0].selftext == '': + post = sub.submit(title='meta', selftext='{ "status": "created" }') + else: + post = Submission(self.reddit, url=result[0].url) + return post + + @staticmethod + def reply_to_comment(city: str, target_comment: Comment): + + if city is None: + new_comment = get_response_message(None, Const.NO_BODY, None) + else: + wiki_meta = WikiClient.get_location_meta(city) + + if wiki_meta is None: + new_comment = get_response_message(None, Const.LOC_NOT_FOUND.format(city), None) + else: + nearby_locations = WikiClient.get_nearby_locations(wiki_meta.lon, wiki_meta.lat) + new_comment = get_response_message(wiki_meta.title, wiki_meta.desc, nearby_locations) + + print(Const.successfully_processed(city)) + + try: + result_comment = target_comment.reply(new_comment) + except RedditAPIException as e: + print(e) + return result_comment.id + + def get_location_from_comment(c): + result = re.search('Information for location:\s*(.*):$', c.body, flags=re.IGNORECASE | re.MULTILINE) + if result is not None: + return result.group(1) + else: + return None diff --git a/WikiClient.py b/WikiClient.py new file mode 100644 index 0000000..3a464d8 --- /dev/null +++ b/WikiClient.py @@ -0,0 +1,80 @@ +import json +import re + +import mwparserfromhell +import requests +import wikipedia +from mwparserfromhell.nodes.extras import Parameter + +from models import LocationMeta + + +class WikiClient: + @staticmethod + def get_location_meta(city): + search = wikipedia.search(city) + st = 0 + + if search is None: + return False + + for result in search: + try: + page = wikipedia.page(title=result, auto_suggest=False) + except wikipedia.DisambiguationError: + return None + except wikipedia.PageError: + return None + + if WikiClient.is_location(page): + summary = wikipedia.summary(page.title, sentences=3, auto_suggest=False) + return LocationMeta(page.title, summary, page.coordinates[0], page.coordinates[1], page.url) + + if st > 3: + return None + st = st + 1 + + return None + + @staticmethod + def is_location(page): + for attr in page.categories: + if attr == 'Coordinates on Wikidata': + return True + return False + + @staticmethod + def get_nearby_locations(lon, lat): + loc_list = wikipedia.geosearch(lon, lat, results=10) + return ', '.join(loc_list) + + # See https://stackoverflow.com/a/33336820/10538678 + @staticmethod + def get_taxonomy(title): + infobox = None + parsed_params = [] + a = '' + + r = requests.get( + 'https://en.wikipedia.org/w/api.php?action=query&titles=' + title + '&prop=revisions&rvprop=content&rvsection' + '=0&format=json') + t = json.loads(r.text) + + for i in t['query']['pages']: + a = t['query']['pages'][i]['revisions'][0]['*'] + + template_list = mwparserfromhell.parse(a).filter_templates() + for template in template_list: + if 'Infobox' in template.name: + infobox = template + + if infobox is None: + return None + else: + for param in infobox.params: + add_par = Parameter( + name=param.name.strip(), + value=re.sub('\\n', '', param.value.strip()) + ) + parsed_params.append(add_par) + return parsed_params \ No newline at end of file diff --git a/const.py b/const.py index 46639fd..06802d9 100644 --- a/const.py +++ b/const.py @@ -24,6 +24,11 @@ class Const: SPACE_REGEX = '\s+' NONE = 'None' + + # Reddit meta + user = 'LocationInfoBot' + mention = f'u/{user}' + @staticmethod def successfully_processed(city: str): """ diff --git a/main.py b/main.py index 6d48691..70882af 100644 --- a/main.py +++ b/main.py @@ -1,319 +1,25 @@ -import json -import os import re from time import sleep -import mwparserfromhell -import praw -import requests -import wikipedia -from mwparserfromhell.nodes.extras import Parameter -from praw.exceptions import RedditAPIException -from praw.models import Submission, Comment -from psaw import PushshiftAPI - +from RedditUtils import RedditUtils from const import Const -from models import LocationMeta -from replies import get_response_message - -user = 'LocationInfoBot' -mention = f'u/{user}' -client_id = os.environ.get('CLIENT_ID') -client_secret = os.environ.get('CLIENT_SECRET') -username = os.environ.get('USERNAME') -password = os.environ.get('PASS') - - -def get_reddit_instance(): - reddit = praw.Reddit(client_id=client_id, - client_secret=client_secret, - user_agent='windows:github.com/matej2/location-info:v0.6 (by /u/mtj510)', - username=username, - password=password) - if not reddit.read_only: - print("Connected and running.") - return reddit - else: - return False - - -def reply_to_comment(city: str, target_comment: Comment): - - if city is None: - new_comment = get_response_message(None, Const.NO_BODY, None) - else: - wiki_meta = get_location_meta(city) - - if wiki_meta is None: - new_comment = get_response_message(None, Const.LOC_NOT_FOUND.format(city), None) - else: - nearby_locations = get_nearby_locations(wiki_meta.lon, wiki_meta.lat) - new_comment = get_response_message(wiki_meta.title, wiki_meta.desc, nearby_locations) - - print(Const.successfully_processed(city)) - - try: - result_comment = target_comment.reply(new_comment) - except RedditAPIException as e: - print(e) - return result_comment.id - - -def send_photo(city, photo): - response = {} - if city is None: - comment = get_response_message(None, Const.NO_BODY, None) - else: - wiki_obj = get_location_meta(city) - - if wiki_obj is not None and ('i.redd.it' in photo.url or 'imgur' in photo.url ): - payload = { - "location": { - "title": str(wiki_obj.title), - "lat": str(wiki_obj.lon), - "lng": str(wiki_obj.lat) - }, - "photo": { - "title": str(photo.title), - "url": str('https://www.reddit.com' + photo.permalink), - "thumb": str(photo.thumbnail) - } - } - - url = "http://127.0.0.1:8000/photo/" - - headers = { - 'Content-Type': 'application/javascript' - } - - response = requests.request("POST", url, headers=headers, json=payload) - - print(f'{city} successfully processed') - else: - print(f'{city} not found') - - -def get_location_meta(city): - search = wikipedia.search(city) - st = 0 - - if search is None: - return False - - for result in search: - try: - page = wikipedia.page(title=result, auto_suggest=False) - except wikipedia.DisambiguationError: - return None - except wikipedia.PageError: - return None - - if is_location(page): - summary = wikipedia.summary(page.title, sentences=3, auto_suggest=False) - return LocationMeta(page.title, summary, page.coordinates[0], page.coordinates[1], page.url) - - if st > 3: - return None - st = st + 1 - - return None - - -def is_location(page): - for attr in page.categories: - if attr == 'Coordinates on Wikidata': - return True - return False - - -def get_nearby_locations(lon, lat): - loc_list = wikipedia.geosearch(lon, lat, results=10) - return ', '.join(loc_list) - - -# See https://stackoverflow.com/a/33336820/10538678 -def get_taxonomy(title): - infobox = None - parsed_params = [] - a = '' - - r = requests.get( - 'https://en.wikipedia.org/w/api.php?action=query&titles=' + title + '&prop=revisions&rvprop=content&rvsection' - '=0&format=json') - t = json.loads(r.text) - - for i in t['query']['pages']: - a = t['query']['pages'][i]['revisions'][0]['*'] - - template_list = mwparserfromhell.parse(a).filter_templates() - for template in template_list: - if 'Infobox' in template.name: - infobox = template - - if infobox is None: - return None - else: - for param in infobox.params: - add_par = Parameter( - name=param.name.strip(), - value=re.sub('\\n', '', param.value.strip()) - ) - parsed_params.append(add_par) - return parsed_params - - -def is_replied(submission): - for comment in submission.comments: - if comment.author is not None and comment.author.name == user: - return True - return False - - -def process_inbox_by_keywords(): - api = PushshiftAPI() - r = get_reddit_instance() - config = get_config() - last_processed_key = 'last_processed' - last_processed = '' - - # Retrieves subs ordered by time descending - gen = api.search_submissions( - limit=300, - filter=['id', 'title', 'url', 'permalink'], - title='Location:|location:', - q='i.reddit|imgur', - sort='created_utc:desc') - comment_results = list(gen) - - for comment in comment_results: - if Const.TRIGGER_PHARSE in comment.title.lower(): - if comment.id == config.get(last_processed_key): - return True - - # extract the word from the comment - body = re.search(Const.KEYWORD, comment.title, flags=re.IGNORECASE) - - if body is not None: - word = re.sub(Const.SPECIAL_CHARS, '', body.group(1)).strip() - - post = Submission(r, id=comment.id) - if is_replied(post) is False: - #send_photo(word, s) - print('sending') - else: - return True - - if last_processed == '': - last_processed = post.id - sleep(3) - - last = { - last_processed_key: last_processed - } - #update_config(last) - return True - - -def get_meta_post(): - r = get_reddit_instance() - api = PushshiftAPI() - sub = r.subreddit('u_LocationInfoBot') - - gen = api.search_submissions(filter=['title', 'selftext', 'url'], limit=20, q='meta', author=user) - result = list(gen) - - if result == [] or result is None or result[0].selftext == '': - post = sub.submit(title='meta', selftext='{ "status": "created" }') - else: - post = Submission(r, url=result[0].url) - return post - - -def update_config(obj): - conf = get_meta_post() - curr = json.loads(conf.selftext) - - curr.update(obj) - - updated = json.dumps(curr) - conf.edit(updated) - - -def get_config(): - return json.loads(get_meta_post().selftext) - - -def main(): - reddit = get_reddit_instance() - try: - inbox = list(reddit.inbox.unread()) - except praw.exceptions.APIException: - print('Rate limited.') - return False - inbox.reverse() - - for item in inbox: - if mention.lower() in item.body.lower(): - text = item.body - result = re.search(Const.body_regex(mention), text, flags=re.IGNORECASE) - if result is not None: - body = result.group(1) - - if reply_to_comment(body, item): - item.mark_read() - else: - item.reply(Const.NOT_DETECTED) - sleep(10) def main_stream(): - reddit = get_reddit_instance() + reddit_client = RedditUtils() + reddit_instance = reddit_client.reddit - for item in reddit.inbox.stream(): - if mention.lower() in item.body.lower(): + for item in reddit_instance.inbox.stream(): + if Const.mention.lower() in item.body.lower(): text = item.body - result = re.search(Const.body_regex(mention), text, flags=re.IGNORECASE) + result = re.search(Const.body_regex(Const.mention), text, flags=re.IGNORECASE) if result is not None: body = result.group(1) - if reply_to_comment(body, item): + if reddit_instance.reply_to_comment(body, item): item.mark_read() else: item.reply(Const.NOT_DETECTED) sleep(10) -def get_comments(): - reddit = get_reddit_instance() - comments = reddit.redditor(user).comments.new(limit=100) - filtered_comments = [] - - for c in comments: - if Const.LOC_NOT_FOUND not in c.body and 'I am a bot and this was an automated message' in c.body: - filtered_comments.append(c) - return filtered_comments - - -def get_location_from_comment(c): - result = re.search('Information for location:\s*(.*):$', c.body, flags=re.IGNORECASE | re.MULTILINE) - if result is not None: - return result.group(1) - else: - return None - - -def process_past_comments(): - comments = get_comments() - for c in comments: - location = get_location_from_comment(c) - if location is not None: - submission = c.submission - send_photo(location, submission) - - -def purge(): - reddit = get_reddit_instance() - for comment in reddit.redditor(user).comments.new(limit=20): - if comment.score < 0: - print(f'Removing comment {comment.body}') - comment.delete() diff --git a/manual.py b/manual.py index d0a2dbc..9a00c10 100644 --- a/manual.py +++ b/manual.py @@ -1,5 +1,4 @@ -from main import main, process_inbox_by_keywords, process_past_comments, main_stream +from main import main_stream -#process_keywords() -main_stream() -process_past_comments() \ No newline at end of file +if __name__ == '__main__': + main_stream() \ No newline at end of file diff --git a/replies.py b/replies.py index 2e167d8..412c576 100644 --- a/replies.py +++ b/replies.py @@ -55,3 +55,9 @@ def get_response_message(city, msg, nearby: Optional[str]): {FOOTER}''' return message + +def is_replied(submission): + for comment in submission.comments: + if comment.author is not None and comment.author.name == user: + return True + return False \ No newline at end of file diff --git a/scheduler.py b/scheduler.py deleted file mode 100644 index c02f707..0000000 --- a/scheduler.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.interval import IntervalTrigger - -from main import main, purge, process_inbox_by_keywords - -if __name__ == '__main__': - scheduler = AsyncIOScheduler() - - scheduler.add_job(main, trigger=IntervalTrigger(minutes=15)) - scheduler.add_job(process_inbox_by_keywords, trigger=IntervalTrigger(minutes=10)) - scheduler.add_job(purge, trigger=IntervalTrigger(hours=23)) - - scheduler.start() - # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed. - try: - asyncio.get_event_loop().run_forever() - except (KeyboardInterrupt, SystemExit): - pass \ No newline at end of file