Skip to content

Commit

Permalink
Setup rabbit listener for disabled users
Browse files Browse the repository at this point in the history
  • Loading branch information
John Tordoff committed Feb 29, 2024
1 parent 794c660 commit ed4293d
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 3 deletions.
9 changes: 9 additions & 0 deletions addon_service/apps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import threading

from django.apps import AppConfig

from addon_service.tasks import listen_for_osf_signals


class AddonServiceConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "addon_service"

def ready(self):
thread = threading.Thread(target=listen_for_osf_signals)
thread.daemon = True
thread.start()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.core.management.base import BaseCommand
from addon_service.tasks import listen_for_disabled_users

class Command(BaseCommand):
help = 'Subscribes to an RPC exchange and listens for messages.'

def handle(self, *args, **options):
listen_for_disabled_users()
3 changes: 3 additions & 0 deletions addon_service/osf_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from celery import Celery

osf_listener = Celery(broker='amqp://guest:guest@192.168.168.167:5672//')
26 changes: 26 additions & 0 deletions addon_service/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from addon_service.osf_listener import osf_listener
from kombu import Queue, Exchange, Consumer
OSF_DEFAULT_QUEUE = 'celery'


def process_disabled_user_message(body, message):
print(f"Received message: {body}")
print(f"Received message: {message}")
message.ack()


def listen_for_osf_signals():
with osf_listener.connection() as connection:
with Consumer(
connection,
queues=Queue(
name=OSF_DEFAULT_QUEUE, # TODO: Any plans here?
exchange=Exchange(OSF_DEFAULT_QUEUE), # TODO: Define exchange(s) for merged/disabled
),
callbacks=[ # TODO: Multiple callbacks or multiple queues?
process_disabled_user_message
],
accept=['json']
):
while True:
connection.drain_events()
2 changes: 1 addition & 1 deletion app/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)


TODO: Improve dockerization of OSF so that we don't need this
# TODO: Improve dockerization of OSF so that we don't need this
def handle_redirects(response):
"""Redirect fix for localhost during local development."""
if settings.DEBUG and response.status_code in {301, 302, 303, 307, 308}:
Expand Down
5 changes: 4 additions & 1 deletion app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
USER_REFERENCE_LOOKUP_URL = "https://api.osf.io/v2/users/me/"
USER_REFERENCE_COOKIE = "osf"

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

URI_ID = "http://osf.example/"
AUTH_URI_ID = "http://osf.auth/"

ALLOWED_HOSTS = env.ALLOWED_HOSTS


# Application definition

INSTALLED_APPS = (
Expand Down
3 changes: 2 additions & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ psycopg>=3.1.8
djangorestframework==3.14.0
djangorestframework-jsonapi==6.1.0
django-filter
httpx==0.26.0
httpx==0.26.0
celery

0 comments on commit ed4293d

Please sign in to comment.