Skip to content

Commit

Permalink
add queue and signals
Browse files Browse the repository at this point in the history
  • Loading branch information
John Tordoff committed Mar 18, 2024
1 parent e319191 commit bab4498
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 16 deletions.
4 changes: 3 additions & 1 deletion framework/auth/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
user_registered = signals.signal('user-registered')
user_confirmed = signals.signal('user-confirmed')
user_email_removed = signals.signal('user-email-removed')
user_merged = signals.signal('user-merged')
user_account_merged = signals.signal('user-account-merged')
user_account_deactivated = signals.signal('user-account-deactivated')
user_account_reactivated = signals.signal('user-account-reactivated')

unconfirmed_user_created = signals.signal('unconfirmed-user-created')
47 changes: 47 additions & 0 deletions osf/external/messages/celery_publishers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from kombu import Exchange
from framework.celery_tasks import app as celery_app
from framework.celery_tasks.handlers import enqueue_task


def publish_deactivated_user(user):
enqueue_task(
_publish_user_status_change.s(
body={
'action': 'deactivate',
'user_uri': user.url,
},
)
)


def publish_reactivate_user(user):
enqueue_task(
_publish_user_status_change.s(
body={
'action': 'reactivate',
'user_uri': user.url,
},
)
)


def publish_merged_user(user):
enqueue_task(
_publish_user_status_change.s(
body={
'action': 'merge',
'user_uri': user.url,
'merged_user_uri': user.merged_by.url,
},
)
)


@celery_app.task()
def _publish_user_status_change(body: dict):
with celery_app.producer_pool.acquire() as producer:
producer.publish(
body=body,
exchange=Exchange(celery_app.conf.account_status_changes),
serializer='json'
)
39 changes: 39 additions & 0 deletions osf/management/commands/test_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from django.core.management.base import BaseCommand
from osf.models import OSFUser
from osf.external.messages.celery_publishers import (
publish_deactivated_user,
publish_reactivate_user,
publish_merged_user,
)


class Command(BaseCommand):
help = 'Sends a message to manage a user state, for test purposes only.'

def add_arguments(self, parser):
parser.add_argument('user_guid', type=str, help='URI of the user to post.')
# Adding a new argument to specify the action to perform
parser.add_argument(
'action',
type=str,
help='The action to perform on the user (deactivate, reactivate, merge).',
choices=['deactivate', 'reactivate', 'merge']
)

def handle(self, *args, **options):
user_guid = options['user_guid']
user = OSFUser.objects.get(guids___id=user_guid)
action = options['action']

# Using a mapping of action to function to simplify the control flow
actions_to_functions = {
'deactivate': publish_deactivated_user,
'reactivate': publish_reactivate_user,
'merge': publish_merged_user,
}

if action in actions_to_functions:
actions_to_functions[action](user) # Call the appropriate function
self.stdout.write(self.style.SUCCESS(f'Successfully {action} message for user: {user._id}'))
else:
self.stdout.write(self.style.ERROR('Invalid action specified.'))
13 changes: 4 additions & 9 deletions osf/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,6 @@ def merge_user(self, user):
notifications_configured = user.notifications_configured.copy()
notifications_configured.update(self.notifications_configured)
self.notifications_configured = notifications_configured
if not website_settings.RUNNING_MIGRATION:
for key, value in user.mailchimp_mailing_lists.items():
# subscribe to each list if either user was subscribed
subscription = value or self.mailchimp_mailing_lists.get(key)
signals.user_merged.send(self, list_name=key, subscription=subscription)

# clear subscriptions for merged user
signals.user_merged.send(user, list_name=key, subscription=False, send_goodbye=False)

for target_id, timestamp in user.comments_viewed_timestamp.items():
if not self.comments_viewed_timestamp.get(target_id):
self.comments_viewed_timestamp[target_id] = timestamp
Expand Down Expand Up @@ -872,6 +863,8 @@ def merge_user(self, user):
user.merged_by = self

user.save()
signals.user_account_merged.send(user)
signals.user_account_deactivated.send(self)

def _merge_users_preprints(self, user):
"""
Expand Down Expand Up @@ -993,6 +986,7 @@ def deactivate_account(self):
if isinstance(req, FlaskRequest):
logout()
remove_sessions_for_user(self)
signals.user_account_deactivated.send(user=self)

def reactivate_account(self):
"""
Expand All @@ -1002,6 +996,7 @@ def reactivate_account(self):
self.requested_deactivation = False
from website.mailchimp_utils import subscribe_on_confirm
subscribe_on_confirm(self)
signals.user_account_reactivated.send(self)

def update_is_active(self):
"""Update ``is_active`` to be consistent with the fields that
Expand Down
4 changes: 2 additions & 2 deletions osf_tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from importlib import import_module

from framework.auth.exceptions import ExpiredTokenError, InvalidTokenError, ChangePasswordError
from framework.auth.signals import user_merged
from framework.auth.signals import user_account_merged
from framework.analytics import get_total_activity_count
from framework.exceptions import PermissionsError
from framework.celery_tasks import handlers
Expand Down Expand Up @@ -1507,7 +1507,7 @@ def test_send_user_merged_signal(self, mock_get_mailchimp_api, dupe, merge_dupe)

with capture_signals() as mock_signals:
merge_dupe()
assert mock_signals.signals_sent() == set([user_merged])
assert mock_signals.signals_sent() == set([user_account_merged])

@pytest.mark.enable_enqueue_task
@mock.patch('website.mailchimp_utils.get_mailchimp_api')
Expand Down
48 changes: 46 additions & 2 deletions website/profile/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from framework.auth.decorators import must_be_confirmed
from framework.auth.exceptions import ChangePasswordError
from framework.auth.views import send_confirm_email
from framework.auth.signals import user_merged
from framework.auth.signals import (
user_account_merged,
user_account_deactivated,
user_account_reactivated
)
from framework.exceptions import HTTPError, PermissionsError
from framework.flask import redirect # VOL-aware redirect
from framework.status import push_status_message
Expand All @@ -36,6 +40,12 @@
from website.util import api_v2_url, web_url_for, paths
from website.util.sanitize import escape_html
from addons.base import utils as addon_utils
from osf.external.messages.celery_publishers import (
publish_reactivate_user,
publish_deactivated_user,
publish_merged_user
)


from api.waffle.utils import storage_i18n_flag_active

Expand Down Expand Up @@ -506,7 +516,6 @@ def user_choose_mailing_lists(auth, **kwargs):
return {'message': 'Successfully updated mailing lists', 'result': all_mailing_lists}, 200


@user_merged.connect
def update_mailchimp_subscription(user, list_name, subscription, send_goodbye=True):
""" Update mailing list subscription in mailchimp.
Expand All @@ -527,6 +536,41 @@ def update_mailchimp_subscription(user, list_name, subscription, send_goodbye=Tr
pass


@user_account_merged.connect
def send_account_merged_message(user):
""" Sends a message using Celery messaging to alert other services that an osf.io user has been merged."""
publish_merged_user(user)


@user_account_deactivated.connect
def send_mailchimp_unsubscribe(user):
""" Tells mailchimp a to unsubscribe a merged user """
if not settings.RUNNING_MIGRATION:
for key, value in user.mailchimp_mailing_lists.items():
update_mailchimp_subscription(user, list_name=key, subscription=False, send_goodbye=False)


@user_account_merged.connect
def send_mailchimp_merge(user):
""" Tells mailchimp a to merge a users subscriptions """
if not settings.RUNNING_MIGRATION:
for key, value in user.merged_by.mailchimp_mailing_lists.items():
subscription = value or user.merged_by.mailchimp_mailing_lists.get(key)
update_mailchimp_subscription(user, list_name=key, subscription=subscription, send_goodbye=False)


@user_account_deactivated.connect
def send_account_deactivation_message(user):
""" Sends a message using Celery messaging to alert other services that an osf.io user has been deactivated."""
publish_deactivated_user(user)


@user_account_reactivated.connect
def send_account_reactivation_message(user):
""" Sends a message using Celery messaging to alert other services that an osf.io user has been reactivated."""
publish_reactivate_user(user)


def mailchimp_get_endpoint(**kwargs):
"""Endpoint that the mailchimp webhook hits to check that the OSF is responding"""
return {}, http_status.HTTP_200_OK
Expand Down
1 change: 1 addition & 0 deletions website/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ class CeleryConfig:
routing_key=task_med_queue, consumer_arguments={'x-priority': 1}),
Queue(task_high_queue, Exchange(task_high_queue),
routing_key=task_high_queue, consumer_arguments={'x-priority': 10}),
Queue(account_status_changes, Exchange(account_status_changes), routing_key=account_status_changes)
)

task_default_exchange_type = 'direct'
Expand Down
7 changes: 5 additions & 2 deletions website/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from website.conferences import signals as conference
from website.reviews import signals as reviews

ALL_SIGNALS = [

ALL_SIGNALS = [ # TODO: Fix
project.comment_added,
project.mention_added,
project.unreg_contributor_added,
Expand All @@ -17,7 +18,9 @@
auth.user_confirmed,
auth.user_email_removed,
auth.user_registered,
auth.user_merged,
auth.user_account_deactivated,
auth.user_account_reactivated,
auth.user_account_merged,
auth.unconfirmed_user_created,
event.file_updated,
conference.osf4m_user_created,
Expand Down

0 comments on commit bab4498

Please sign in to comment.