Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gv messaging #208

Draft
wants to merge 19 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def fake():
'mark': 'enable_search',
'replacement': mock.MagicMock()
},
'website.search.elastic_search': {
'mark': 'enable_search',
'osf.external.messages.celery_publishers._publish_user_status_change': {
'mark': 'enable_account_status_messaging',
'replacement': mock.MagicMock()
}
}
Expand Down
4 changes: 3 additions & 1 deletion framework/auth/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
user_registered = signals.signal('user-registered')
user_confirmed = signals.signal('user-confirmed')
user_email_removed = signals.signal('user-email-removed')
user_merged = signals.signal('user-merged')
user_account_merged = signals.signal('user-account-merged')
user_account_deactivated = signals.signal('user-account-deactivated')
user_account_reactivated = signals.signal('user-account-reactivated')

unconfirmed_user_created = signals.signal('unconfirmed-user-created')
2 changes: 2 additions & 0 deletions framework/celery_tasks/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def match_by_module(task_path):
return CeleryConfig.task_high_queue
if task_subpath in CeleryConfig.remote_computing_modules:
return CeleryConfig.task_remote_computing_queue
if task_subpath in CeleryConfig.task_account_status_changes_queue:
return CeleryConfig.task_account_status_changes_queue
return CeleryConfig.task_default_queue


Expand Down
Empty file.
45 changes: 45 additions & 0 deletions osf/external/messages/celery_publishers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import waffle
from kombu import Exchange
from framework.celery_tasks import app as celery_app
from website import settings
from osf import features
from osf.utils.requests import get_current_request


def publish_deactivated_user(user):
_publish_user_status_change(
body={
'action': 'deactivate',
'user_uri': user.get_semantic_iri(),
}
)


def publish_reactivate_user(user):
_publish_user_status_change(
body={
'action': 'reactivate',
'user_uri': user.get_semantic_iri(),
},
)


def publish_merged_user(user):
assert user.merged_by, 'User received merge signal, but has no `merged_by` reference.'
_publish_user_status_change(
body={
'action': 'merge',
'into_user_uri': user.merged_by.get_semantic_iri(),
'from_user_uri': user.get_semantic_iri(),
},
)


def _publish_user_status_change(body: dict):
if settings.USE_CELERY and waffle.flag_is_active(get_current_request(), features.ENABLE_GV):
with celery_app.producer_pool.acquire() as producer:
producer.publish(
body=body,
exchange=Exchange(celery_app.conf.task_account_status_changes_queue),
serializer='json'
)
40 changes: 40 additions & 0 deletions osf/management/commands/publish_account_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from django.core.management.base import BaseCommand
from osf.models import OSFUser
from osf.external.messages.celery_publishers import (
publish_deactivated_user,
publish_reactivate_user,
publish_merged_user,
)

actions_to_functions = {
'deactivate': publish_deactivated_user,
'reactivate': publish_reactivate_user,
'merge': publish_merged_user,
}


class Command(BaseCommand):
help = 'Sends a message to manage a user state, for test purposes only.'

def add_arguments(self, parser):
parser.add_argument('user_guid', type=str, help='use the guid of the user to post.')
# Adding a new argument to specify the action to perform
parser.add_argument(
'action',
type=str,
help='The action to perform on the user (deactivate, reactivate, merge).',
choices=['deactivate', 'reactivate', 'merge']
)

def handle(self, *args, **options):
user_guid = options['user_guid']
user = OSFUser.load(user_guid)
action = options['action']

# Using a mapping of action to function to simplify the control flow

if action in actions_to_functions:
actions_to_functions[action](user) # Call the appropriate function
self.stdout.write(self.style.SUCCESS(f'Successfully {action} message for user: {user._id}'))
else:
self.stdout.write(self.style.ERROR('Invalid action specified.'))
12 changes: 3 additions & 9 deletions osf/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,6 @@ def merge_user(self, user):
notifications_configured = user.notifications_configured.copy()
notifications_configured.update(self.notifications_configured)
self.notifications_configured = notifications_configured
if not website_settings.RUNNING_MIGRATION:
for key, value in user.mailchimp_mailing_lists.items():
# subscribe to each list if either user was subscribed
subscription = value or self.mailchimp_mailing_lists.get(key)
signals.user_merged.send(self, list_name=key, subscription=subscription)

# clear subscriptions for merged user
signals.user_merged.send(user, list_name=key, subscription=False, send_goodbye=False)

for target_id, timestamp in user.comments_viewed_timestamp.items():
if not self.comments_viewed_timestamp.get(target_id):
self.comments_viewed_timestamp[target_id] = timestamp
Expand Down Expand Up @@ -872,6 +863,7 @@ def merge_user(self, user):
user.merged_by = self

user.save()
signals.user_account_merged.send(user)

def _merge_users_preprints(self, user):
"""
Expand Down Expand Up @@ -986,6 +978,7 @@ def deactivate_account(self):
# Call to `unsubscribe` above saves, and can lead to stale data
self.reload()
self.is_disabled = True
signals.user_account_deactivated.send(self)

# we must call both methods to ensure the current session is cleared and all existing
# sessions are revoked.
Expand All @@ -1002,6 +995,7 @@ def reactivate_account(self):
self.requested_deactivation = False
from website.mailchimp_utils import subscribe_on_confirm
subscribe_on_confirm(self)
signals.user_account_reactivated.send(self)

def update_is_active(self):
"""Update ``is_active`` to be consistent with the fields that
Expand Down
4 changes: 2 additions & 2 deletions osf_tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from importlib import import_module

from framework.auth.exceptions import ExpiredTokenError, InvalidTokenError, ChangePasswordError
from framework.auth.signals import user_merged
from framework.auth.signals import user_account_merged
from framework.analytics import get_total_activity_count
from framework.exceptions import PermissionsError
from framework.celery_tasks import handlers
Expand Down Expand Up @@ -1507,7 +1507,7 @@ def test_send_user_merged_signal(self, mock_get_mailchimp_api, dupe, merge_dupe)

with capture_signals() as mock_signals:
merge_dupe()
assert mock_signals.signals_sent() == set([user_merged])
assert mock_signals.signals_sent() == set([user_account_merged])

@pytest.mark.enable_enqueue_task
@mock.patch('website.mailchimp_utils.get_mailchimp_api')
Expand Down
127 changes: 127 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,38 @@
import time
import unittest
from django.utils import timezone
from django.dispatch import receiver


from flask import Flask
from nose.tools import * # noqa (PEP8 asserts)
import blinker

from tests.base import OsfTestCase, DbTestCase
from osf_tests.factories import RegistrationFactory, UserFactory, fake_email
from framework.auth.signals import (
user_account_deactivated,
user_account_reactivated,
user_account_merged
)

from framework.auth.utils import generate_csl_given_name
from framework.routing import Rule, json_renderer
from framework.utils import secure_filename, throttle_period_expired
from api.base.utils import waterbutler_api_url_for
from osf.utils.functional import rapply
from waffle.testutils import override_flag
from website.routes import process_rules, OsfWebRenderer
from website import settings
from website.util import paths
from website.util import web_url_for, api_url_for, is_json_request, conjunct, api_v2_url
from website.project import utils as project_utils
from website.profile import utils as profile_utils

from osf import features

from kombu import Exchange

try:
import magic # noqa
LIBMAGIC_AVAILABLE = True
Expand Down Expand Up @@ -453,3 +465,118 @@ def test_build_create_user_time_conflict(self):
user_one_create = UserFactory()
user_two_create = UserFactory()
assert user_one_create.username != user_two_create.username


@pytest.mark.django_db
class TestUserSignals:

@pytest.fixture
def user(self, db):
return UserFactory()

@pytest.fixture
def old_user(self, db):
return UserFactory()

@pytest.fixture
def deactivated_user(self, db):
user = UserFactory()
user.deactivate_account()
return user

@pytest.fixture
def account_status_changes_exchange(self):
return Exchange('account_status_changes')

@mock.patch('osf.external.messages.celery_publishers.publish_deactivated_user')
def test_user_account_deactivated_signal(self, mock_publish_deactivated_user, user):
# Connect a mock receiver to the signal for testing
@receiver(user_account_deactivated)
def mock_receiver(user, **kwargs):
return mock_publish_deactivated_user(user)

# Trigger the signal
user.deactivate_account()

# Verify that the mock receiver was called
mock_publish_deactivated_user.assert_called_once_with(user)

@mock.patch('osf.external.messages.celery_publishers.publish_merged_user')
def test_user_account_merged_signal(self, mock_publish_merged_user, user, old_user):
# Connect a mock receiver to the signal for testing
@receiver(user_account_merged)
def mock_receiver(user, **kwargs):
return mock_publish_merged_user(user)

# Trigger the signal
user.merge_user(old_user)

# Verify that the mock receiver was called
mock_publish_merged_user.assert_called_once_with(old_user)

@mock.patch('osf.external.messages.celery_publishers.publish_reactivate_user')
def test_user_account_deactivate_signal(self, mock_publish_reactivate_user, deactivated_user):
# Connect a mock receiver to the signal for testing
@receiver(user_account_reactivated)
def mock_receiver(user, **kwargs):
return mock_publish_reactivate_user(user)

# Trigger the signal
deactivated_user.reactivate_account()

# Verify that the mock receiver was called
mock_publish_reactivate_user.assert_called_once_with(deactivated_user)

@pytest.mark.enable_account_status_messaging
@mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire')
def test_publish_body_on_deactivation(self, mock_publish_user_status_change, user, account_status_changes_exchange):
with mock.patch.object(settings, 'USE_CELERY', True):
with override_flag(features.ENABLE_GV, active=True):
user.deactivate_account()

mock_publish_user_status_change().__enter__().publish.assert_called_once_with(
body={'action': 'deactivate', 'user_uri': f'http://localhost:5000/{user._id}'},
exchange=account_status_changes_exchange,
serializer='json',
)

@pytest.mark.enable_account_status_messaging
@mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire')
def test_publish_body_on_reactivation(
self,
mock_publish_user_status_change,
deactivated_user,
account_status_changes_exchange
):
with mock.patch.object(settings, 'USE_CELERY', True):
with override_flag(features.ENABLE_GV, active=True):
deactivated_user.reactivate_account()

mock_publish_user_status_change().__enter__().publish.assert_called_once_with(
body={'action': 'reactivate', 'user_uri': f'http://localhost:5000/{deactivated_user._id}'},
exchange=account_status_changes_exchange,
serializer='json',
)

@pytest.mark.enable_account_status_messaging
@mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire')
def test_publish_body_on_merger(
self,
mock_publish_user_status_change,
user,
old_user,
account_status_changes_exchange
):
with mock.patch.object(settings, 'USE_CELERY', True):
with override_flag(features.ENABLE_GV, active=True):
user.merge_user(old_user)

mock_publish_user_status_change().__enter__().publish.assert_called_once_with(
body={
'action': 'merge',
'into_user_uri': f'http://localhost:5000/{user._id}',
'from_user_uri': f'http://localhost:5000/{old_user._id}'
},
exchange=account_status_changes_exchange,
serializer='json',
)
6 changes: 3 additions & 3 deletions website/mailchimp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def subscribe_mailchimp(list_name, user_id):
user.save()


def unsubscribe_mailchimp(list_name, user_id, username=None, send_goodbye=True):
def unsubscribe_mailchimp(list_name, user_id, username=None):
"""Unsubscribe a user from a mailchimp mailing list given its name.
:param str list_name: mailchimp mailing list name
Expand Down Expand Up @@ -113,10 +113,10 @@ def unsubscribe_mailchimp(list_name, user_id, username=None, send_goodbye=True):
@queued_task
@app.task
@transaction.atomic
def unsubscribe_mailchimp_async(list_name, user_id, username=None, send_goodbye=True):
def unsubscribe_mailchimp_async(list_name, user_id, username=None):
""" Same args as unsubscribe_mailchimp, used to have the task be run asynchronously
"""
unsubscribe_mailchimp(list_name=list_name, user_id=user_id, username=username, send_goodbye=send_goodbye)
unsubscribe_mailchimp(list_name=list_name, user_id=user_id, username=username)

@user_confirmed.connect
def subscribe_on_confirm(user):
Expand Down
Loading
Loading