From ff2b581806ac879f4217f8aebfd038bd9310085c Mon Sep 17 00:00:00 2001 From: John Tordoff <> Date: Wed, 28 Feb 2024 14:29:02 -0500 Subject: [PATCH] test subscription for disabled users prototype --- .../test_disabled_accounts_subscription.py | 8 ++++++++ app/celery.py | 15 +++++++++++++++ app/settings.py | 9 +++++++++ app/tasks.py | 16 ++++++++++++++++ 4 files changed, 48 insertions(+) create mode 100644 addon_service/management/commands/test_disabled_accounts_subscription.py create mode 100644 app/celery.py create mode 100644 app/tasks.py diff --git a/addon_service/management/commands/test_disabled_accounts_subscription.py b/addon_service/management/commands/test_disabled_accounts_subscription.py new file mode 100644 index 00000000..11264495 --- /dev/null +++ b/addon_service/management/commands/test_disabled_accounts_subscription.py @@ -0,0 +1,8 @@ +from django.core.management.base import BaseCommand +from app.tasks import listen_for_disabled_users + +class Command(BaseCommand): + help = 'Subscribes to an RPC exchange and listens for messages.' + + def handle(self, *args, **options): + listen_for_disabled_users() \ No newline at end of file diff --git a/app/celery.py b/app/celery.py new file mode 100644 index 00000000..b377f663 --- /dev/null +++ b/app/celery.py @@ -0,0 +1,15 @@ +import os +from celery import Celery + +# Set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings') + +app = Celery(broker='amqp://guest:guest@192.168.168.167:5672//') +app.conf.result_backend = 'rpc://' + +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +app.config_from_object('django.conf:settings', namespace='CELERY') + +# Load task modules from all registered Django apps. +app.autodiscover_tasks() \ No newline at end of file diff --git a/app/settings.py b/app/settings.py index a5dec5f5..a98fdf38 100644 --- a/app/settings.py +++ b/app/settings.py @@ -19,10 +19,19 @@ USER_REFERENCE_LOOKUP_URL = "https://api.osf.io/v2/users/me/" USER_REFERENCE_COOKIE = "osf" +CELERY_BROKER_URL = 'amqp://guest:guest@192.168.168.167:5672/' +CELERY_RESULT_BACKEND = 'django-db' # Store results in the Django DB +CELERY_CACHE_BACKEND = 'django-cache' # Use Django's cache framework + +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' + URI_ID = "http://osf.example/" AUTH_URI_ID = "http://osf.auth/" ALLOWED_HOSTS = env.ALLOWED_HOSTS +BROKER_URL = 'amqp://guest:guest@192.168.168.167:5672/' # Application definition diff --git a/app/tasks.py b/app/tasks.py new file mode 100644 index 00000000..001bc32a --- /dev/null +++ b/app/tasks.py @@ -0,0 +1,16 @@ +from app.celery import app +from kombu import Queue, Exchange, Consumer + + +def listen_for_disabled_users(): + exchange = Exchange('test_exchange', type='direct') + queue = Queue(name='test_rpc_queue', exchange=exchange, routing_key='test_rpc_routing_key') + + def process_message(body, message): + print(f"Received message: {body}") + message.ack() + + with app.connection() as connection: + with Consumer(connection, queues=queue, callbacks=[process_message], accept=['json']): + while True: + connection.drain_events() \ No newline at end of file