Skip to content

Commit

Permalink
test subscription for disabled users prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
John Tordoff committed Feb 28, 2024
1 parent 794c660 commit ff2b581
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.core.management.base import BaseCommand
from app.tasks import listen_for_disabled_users

class Command(BaseCommand):
help = 'Subscribes to an RPC exchange and listens for messages.'

def handle(self, *args, **options):
listen_for_disabled_users()
15 changes: 15 additions & 0 deletions app/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')

app = Celery(broker='amqp://guest:guest@192.168.168.167:5672//')
app.conf.result_backend = 'rpc://'

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()
9 changes: 9 additions & 0 deletions app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@
USER_REFERENCE_LOOKUP_URL = "https://api.osf.io/v2/users/me/"
USER_REFERENCE_COOKIE = "osf"

CELERY_BROKER_URL = 'amqp://guest:guest@192.168.168.167:5672/'
CELERY_RESULT_BACKEND = 'django-db' # Store results in the Django DB
CELERY_CACHE_BACKEND = 'django-cache' # Use Django's cache framework

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

URI_ID = "http://osf.example/"
AUTH_URI_ID = "http://osf.auth/"

ALLOWED_HOSTS = env.ALLOWED_HOSTS
BROKER_URL = 'amqp://guest:guest@192.168.168.167:5672/'


# Application definition
Expand Down
16 changes: 16 additions & 0 deletions app/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from app.celery import app
from kombu import Queue, Exchange, Consumer


def listen_for_disabled_users():
exchange = Exchange('test_exchange', type='direct')
queue = Queue(name='test_rpc_queue', exchange=exchange, routing_key='test_rpc_routing_key')

def process_message(body, message):
print(f"Received message: {body}")
message.ack()

with app.connection() as connection:
with Consumer(connection, queues=queue, callbacks=[process_message], accept=['json']):
while True:
connection.drain_events()

0 comments on commit ff2b581

Please sign in to comment.