From a6630e3863476866a3b30b794f0c50152acbcaf8 Mon Sep 17 00:00:00 2001 From: John Tordoff <> Date: Wed, 28 Feb 2024 14:29:02 -0500 Subject: [PATCH] Setup rabbit listener for disabled users --- addon_service/apps.py | 9 ++++++ .../test_disabled_accounts_subscription.py | 8 +++++ addon_service/tasks.py | 29 +++++++++++++++++++ app/authentication.py | 2 +- app/settings.py | 2 ++ requirements/requirements.txt | 3 +- 6 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 addon_service/management/commands/test_disabled_accounts_subscription.py create mode 100644 addon_service/tasks.py diff --git a/addon_service/apps.py b/addon_service/apps.py index e0583163..050ef700 100644 --- a/addon_service/apps.py +++ b/addon_service/apps.py @@ -1,6 +1,15 @@ +import threading + from django.apps import AppConfig +from addon_service.tasks import listen_for_osf_signals + class AddonServiceConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "addon_service" + + def ready(self): + thread = threading.Thread(target=listen_for_osf_signals) + thread.daemon = True + thread.start() diff --git a/addon_service/management/commands/test_disabled_accounts_subscription.py b/addon_service/management/commands/test_disabled_accounts_subscription.py new file mode 100644 index 00000000..b72d41a4 --- /dev/null +++ b/addon_service/management/commands/test_disabled_accounts_subscription.py @@ -0,0 +1,8 @@ +from django.core.management.base import BaseCommand +from addon_service.tasks import listen_for_disabled_users + +class Command(BaseCommand): + help = 'Subscribes to an RPC exchange and listens for messages.' + + def handle(self, *args, **options): + listen_for_disabled_users() \ No newline at end of file diff --git a/addon_service/tasks.py b/addon_service/tasks.py new file mode 100644 index 00000000..ba8ce28b --- /dev/null +++ b/addon_service/tasks.py @@ -0,0 +1,29 @@ +import Celery + +from kombu import Queue, Exchange, Consumer +OSF_DEFAULT_QUEUE = 'celery' +from addon_service.models import UserReference +from app.settings import OSF_BROKER_URL + +def process_disabled_user_message(body, message): + print(f"Received message: {body}") + user_uri = body['user_uri'] + UserReference.objects.get(user_uri=user_uri).delete() + message.ack() + + +def listen_for_osf_signals(): + with Celery(broker=OSF_BROKER_URL).connection() as connection: + with Consumer( + connection, + queues=Queue( + name=OSF_DEFAULT_QUEUE, # TODO: Any plans here? + exchange=Exchange(OSF_DEFAULT_QUEUE), # TODO: Define exchange(s) for merged/disabled + ), + callbacks=[ # TODO: Multiple callbacks or multiple queues? + process_disabled_user_message + ], + accept=['json'] + ): + while True: + connection.drain_events() \ No newline at end of file diff --git a/app/authentication.py b/app/authentication.py index 15923954..1321f074 100644 --- a/app/authentication.py +++ b/app/authentication.py @@ -14,7 +14,7 @@ ) -TODO: Improve dockerization of OSF so that we don't need this +# TODO: Improve dockerization of OSF so that we don't need this def handle_redirects(response): """Redirect fix for localhost during local development.""" if settings.DEBUG and response.status_code in {301, 302, 303, 307, 308}: diff --git a/app/settings.py b/app/settings.py index a5dec5f5..4d6ab731 100644 --- a/app/settings.py +++ b/app/settings.py @@ -19,6 +19,8 @@ USER_REFERENCE_LOOKUP_URL = "https://api.osf.io/v2/users/me/" USER_REFERENCE_COOKIE = "osf" +OSF_BROKER_URL = 'amqp://guest:guest@192.168.168.167:5672//' + URI_ID = "http://osf.example/" AUTH_URI_ID = "http://osf.auth/" diff --git a/requirements/requirements.txt b/requirements/requirements.txt index e18cc218..09fdb33b 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,4 +3,5 @@ psycopg>=3.1.8 djangorestframework==3.14.0 djangorestframework-jsonapi==6.1.0 django-filter -httpx==0.26.0 \ No newline at end of file +httpx==0.26.0 +celery \ No newline at end of file