diff --git a/addon_service/apps.py b/addon_service/apps.py index e0583163..050ef700 100644 --- a/addon_service/apps.py +++ b/addon_service/apps.py @@ -1,6 +1,15 @@ +import threading + from django.apps import AppConfig +from addon_service.tasks import listen_for_osf_signals + class AddonServiceConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "addon_service" + + def ready(self): + thread = threading.Thread(target=listen_for_osf_signals) + thread.daemon = True + thread.start() diff --git a/addon_service/management/commands/test_disabled_accounts_subscription.py b/addon_service/management/commands/test_disabled_accounts_subscription.py new file mode 100644 index 00000000..b72d41a4 --- /dev/null +++ b/addon_service/management/commands/test_disabled_accounts_subscription.py @@ -0,0 +1,8 @@ +from django.core.management.base import BaseCommand +from addon_service.tasks import listen_for_disabled_users + +class Command(BaseCommand): + help = 'Subscribes to an RPC exchange and listens for messages.' + + def handle(self, *args, **options): + listen_for_disabled_users() \ No newline at end of file diff --git a/addon_service/osf_listener.py b/addon_service/osf_listener.py new file mode 100644 index 00000000..e66d58af --- /dev/null +++ b/addon_service/osf_listener.py @@ -0,0 +1,3 @@ +from celery import Celery + +osf_listener = Celery(broker='amqp://guest:guest@192.168.168.167:5672//') diff --git a/addon_service/tasks.py b/addon_service/tasks.py new file mode 100644 index 00000000..114cd57a --- /dev/null +++ b/addon_service/tasks.py @@ -0,0 +1,26 @@ +from addon_service.osf_listener import osf_listener +from kombu import Queue, Exchange, Consumer +OSF_DEFAULT_QUEUE = 'celery' + + +def process_disabled_user_message(body, message): + print(f"Received message: {body}") + print(f"Received message: {message}") + message.ack() + + +def listen_for_osf_signals(): + with osf_listener.connection() as connection: + with Consumer( + connection, + queues=Queue( + name=OSF_DEFAULT_QUEUE, # TODO: Any plans here? + exchange=Exchange(OSF_DEFAULT_QUEUE), # TODO: Define exchange(s) for merged/disabled + ), + callbacks=[ # TODO: Multiple callbacks or multiple queues? + process_disabled_user_message + ], + accept=['json'] + ): + while True: + connection.drain_events() \ No newline at end of file diff --git a/app/authentication.py b/app/authentication.py index 15923954..1321f074 100644 --- a/app/authentication.py +++ b/app/authentication.py @@ -14,7 +14,7 @@ ) -TODO: Improve dockerization of OSF so that we don't need this +# TODO: Improve dockerization of OSF so that we don't need this def handle_redirects(response): """Redirect fix for localhost during local development.""" if settings.DEBUG and response.status_code in {301, 302, 303, 307, 308}: diff --git a/app/settings.py b/app/settings.py index a5dec5f5..ba073c74 100644 --- a/app/settings.py +++ b/app/settings.py @@ -19,12 +19,15 @@ USER_REFERENCE_LOOKUP_URL = "https://api.osf.io/v2/users/me/" USER_REFERENCE_COOKIE = "osf" +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' + URI_ID = "http://osf.example/" AUTH_URI_ID = "http://osf.auth/" ALLOWED_HOSTS = env.ALLOWED_HOSTS - # Application definition INSTALLED_APPS = ( diff --git a/requirements/requirements.txt b/requirements/requirements.txt index e18cc218..09fdb33b 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,4 +3,5 @@ psycopg>=3.1.8 djangorestframework==3.14.0 djangorestframework-jsonapi==6.1.0 django-filter -httpx==0.26.0 \ No newline at end of file +httpx==0.26.0 +celery \ No newline at end of file