diff --git a/openwisp_radius/tasks.py b/openwisp_radius/tasks.py index ab31a413..09f63209 100644 --- a/openwisp_radius/tasks.py +++ b/openwisp_radius/tasks.py @@ -16,7 +16,7 @@ from openwisp_utils.tasks import OpenwispCeleryTask from .radclient.client import RadClient -from .utils import load_model +from .utils import get_one_time_login_url, load_model logger = logging.getLogger(__name__) @@ -75,7 +75,6 @@ def convert_called_station_id(unique_id=None): @shared_task(base=OpenwispCeleryTask) def send_login_email(accounting_data): from allauth.account.models import EmailAddress - from sesame.utils import get_query_string Organization = swapper.load_model('openwisp_users', 'Organization') username = accounting_data.get('username', None) @@ -83,21 +82,6 @@ def send_login_email(accounting_data): organization = Organization.objects.select_related('radius_settings').get( id=org_uuid ) - - try: - org_radius_settings = organization.radius_settings - except ObjectDoesNotExist: - logger.warning( - f'Organization "{organization.name}" does not ' - 'have any OpenWISP RADIUS settings configured' - ) - return - - login_url = org_radius_settings.login_url - if not login_url: - logger.debug(f'login_url is not defined for {organization.name} organization') - return - try: user = ( EmailAddress.objects.select_related('user') @@ -107,14 +91,12 @@ def send_login_email(accounting_data): except ObjectDoesNotExist: logger.warning(f'user with username "{username}" does not exists') return - if not user.is_member(organization): - logger.warning( - f'user with username "{username}" is not member of "{organization.name}"' - ) + + one_time_login_url = get_one_time_login_url(user, organization) + if not one_time_login_url: return with translation.override(user.language): - one_time_login_url = login_url + get_query_string(user) subject = _('New WiFi session started') context = { 'user': user, diff --git a/openwisp_radius/tests/test_tasks.py b/openwisp_radius/tests/test_tasks.py index 5db762f3..69f62049 100644 --- a/openwisp_radius/tests/test_tasks.py +++ b/openwisp_radius/tests/test_tasks.py @@ -113,19 +113,32 @@ def test_delete_unverified_users(self): self.assertEqual(User.objects.count(), 0) @mock.patch('openwisp_radius.tasks.logger') + @mock.patch('openwisp_radius.utils.logger') @mock.patch('django.utils.translation.activate') - # @capture_stdout() - def test_send_login_email(self, translation_activate, logger): + def test_send_login_email(self, translation_activate, utils_logger, task_logger): accounting_data = _RADACCT.copy() organization = self._get_org() accounting_data['organization'] = organization.id total_mails = len(mail.outbox) + radius_settings = organization.radius_settings + + with self.subTest('do not send email if username is invalid'): + tasks.send_login_email.delay(accounting_data) + self.assertEqual(len(mail.outbox), total_mails) + username = accounting_data.get('username') + task_logger.warning.assert_called_with( + f'user with username "{username}" does not exists' + ) + + user = self._get_user() + accounting_data['username'] = user.username + with self.subTest( 'do not send mail if login_url does not exists for the organization' ): tasks.send_login_email.delay(accounting_data) self.assertEqual(len(mail.outbox), total_mails) - logger.debug.assert_called_with( + utils_logger.debug.assert_called_with( f'login_url is not defined for {organization.name} organization' ) translation_activate.assert_not_called() @@ -135,22 +148,10 @@ def test_send_login_email(self, translation_activate, logger): radius_settings.save(update_fields=['login_url']) total_mails = len(mail.outbox) - with self.subTest('do not send email if username is invalid'): - tasks.send_login_email.delay(accounting_data) - self.assertEqual(len(mail.outbox), total_mails) - username = accounting_data.get('username') - logger.warning.assert_called_with( - f'user with username "{username}" does not exists' - ) - - logger.reset_mock() - user = self._get_user() - accounting_data['username'] = user.username - with self.subTest('do not send mail if user is not a member of organization'): tasks.send_login_email.delay(accounting_data) self.assertEqual(len(mail.outbox), total_mails) - logger.warning.assert_called_with( + utils_logger.warning.assert_called_with( f'user with username "{user.username}" is ' f'not member of "{organization.name}"' ) @@ -238,7 +239,7 @@ def test_send_login_email(self, translation_activate, logger): organization.radius_settings.delete() tasks.send_login_email.delay(accounting_data) self.assertEqual(len(mail.outbox), total_mails) - logger.warning.assert_called_with( + utils_logger.warning.assert_called_with( f'Organization "{organization.name}" does not ' 'have any OpenWISP RADIUS settings configured' ) diff --git a/openwisp_radius/tests/test_utils.py b/openwisp_radius/tests/test_utils.py index 08a253dc..ce4d84d7 100644 --- a/openwisp_radius/tests/test_utils.py +++ b/openwisp_radius/tests/test_utils.py @@ -1,7 +1,8 @@ from django.contrib.auth import get_user_model from django.core.exceptions import ValidationError +from django.test import override_settings -from ..utils import find_available_username, validate_csvfile +from ..utils import find_available_username, get_one_time_login_url, validate_csvfile from . import FileMixin from .mixins import BaseTestCase @@ -32,3 +33,8 @@ def test_validate_csvfile(self): with self.assertRaises(ValidationError) as error: validate_csvfile(open(improper_csv_path, 'rt')) self.assertTrue('Improper CSV format' in error.exception.message) + + @override_settings(AUTHENTICATION_BACKENDS=[]) + def test_get_one_time_login_url(self): + login_url = get_one_time_login_url(None, None) + self.assertEqual(login_url, None) diff --git a/openwisp_radius/utils.py b/openwisp_radius/utils.py index fa2dfc3d..35203f4b 100644 --- a/openwisp_radius/utils.py +++ b/openwisp_radius/utils.py @@ -251,3 +251,33 @@ def get_group_checks(group): for group_check in group_checks: result[group_check.attribute] = group_check return result + + +def get_one_time_login_url(user, organization): + if 'sesame.backends.ModelBackend' not in getattr( + settings, 'AUTHENTICATION_BACKENDS', [] + ): + return + from sesame.utils import get_query_string + + try: + org_radius_settings = organization.radius_settings + except ObjectDoesNotExist: + logger.warning( + f'Organization "{organization.name}" does not ' + 'have any OpenWISP RADIUS settings configured' + ) + return + + login_url = org_radius_settings.login_url + if not login_url: + logger.debug(f'login_url is not defined for {organization.name} organization') + return + + if not user.is_member(organization): + logger.warning( + f'user with username "{user}" is not member of "{organization.name}"' + ) + return + + return login_url + get_query_string(user)