Skip to content

Commit

Permalink
[change] Moved logic to get_one_time_login_url utility function
Browse files Browse the repository at this point in the history
  • Loading branch information
pandafy authored Jan 10, 2024
1 parent cca5c19 commit b162832
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 40 deletions.
26 changes: 4 additions & 22 deletions openwisp_radius/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from openwisp_utils.tasks import OpenwispCeleryTask

from .radclient.client import RadClient
from .utils import load_model
from .utils import get_one_time_login_url, load_model

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,29 +75,13 @@ def convert_called_station_id(unique_id=None):
@shared_task(base=OpenwispCeleryTask)
def send_login_email(accounting_data):
from allauth.account.models import EmailAddress
from sesame.utils import get_query_string

Organization = swapper.load_model('openwisp_users', 'Organization')
username = accounting_data.get('username', None)
org_uuid = accounting_data.get('organization')
organization = Organization.objects.select_related('radius_settings').get(
id=org_uuid
)

try:
org_radius_settings = organization.radius_settings
except ObjectDoesNotExist:
logger.warning(
f'Organization "{organization.name}" does not '
'have any OpenWISP RADIUS settings configured'
)
return

login_url = org_radius_settings.login_url
if not login_url:
logger.debug(f'login_url is not defined for {organization.name} organization')
return

try:
user = (
EmailAddress.objects.select_related('user')
Expand All @@ -107,14 +91,12 @@ def send_login_email(accounting_data):
except ObjectDoesNotExist:
logger.warning(f'user with username "{username}" does not exists')
return
if not user.is_member(organization):
logger.warning(
f'user with username "{username}" is not member of "{organization.name}"'
)

one_time_login_url = get_one_time_login_url(user, organization)
if not one_time_login_url:
return

with translation.override(user.language):
one_time_login_url = login_url + get_query_string(user)
subject = _('New WiFi session started')
context = {
'user': user,
Expand Down
35 changes: 18 additions & 17 deletions openwisp_radius/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,32 @@ def test_delete_unverified_users(self):
self.assertEqual(User.objects.count(), 0)

@mock.patch('openwisp_radius.tasks.logger')
@mock.patch('openwisp_radius.utils.logger')
@mock.patch('django.utils.translation.activate')
# @capture_stdout()
def test_send_login_email(self, translation_activate, logger):
def test_send_login_email(self, translation_activate, utils_logger, task_logger):
accounting_data = _RADACCT.copy()
organization = self._get_org()
accounting_data['organization'] = organization.id
total_mails = len(mail.outbox)
radius_settings = organization.radius_settings

with self.subTest('do not send email if username is invalid'):
tasks.send_login_email.delay(accounting_data)
self.assertEqual(len(mail.outbox), total_mails)
username = accounting_data.get('username')
task_logger.warning.assert_called_with(
f'user with username "{username}" does not exists'
)

user = self._get_user()
accounting_data['username'] = user.username

with self.subTest(
'do not send mail if login_url does not exists for the organization'
):
tasks.send_login_email.delay(accounting_data)
self.assertEqual(len(mail.outbox), total_mails)
logger.debug.assert_called_with(
utils_logger.debug.assert_called_with(
f'login_url is not defined for {organization.name} organization'
)
translation_activate.assert_not_called()
Expand All @@ -135,22 +148,10 @@ def test_send_login_email(self, translation_activate, logger):
radius_settings.save(update_fields=['login_url'])
total_mails = len(mail.outbox)

with self.subTest('do not send email if username is invalid'):
tasks.send_login_email.delay(accounting_data)
self.assertEqual(len(mail.outbox), total_mails)
username = accounting_data.get('username')
logger.warning.assert_called_with(
f'user with username "{username}" does not exists'
)

logger.reset_mock()
user = self._get_user()
accounting_data['username'] = user.username

with self.subTest('do not send mail if user is not a member of organization'):
tasks.send_login_email.delay(accounting_data)
self.assertEqual(len(mail.outbox), total_mails)
logger.warning.assert_called_with(
utils_logger.warning.assert_called_with(
f'user with username "{user.username}" is '
f'not member of "{organization.name}"'
)
Expand Down Expand Up @@ -238,7 +239,7 @@ def test_send_login_email(self, translation_activate, logger):
organization.radius_settings.delete()
tasks.send_login_email.delay(accounting_data)
self.assertEqual(len(mail.outbox), total_mails)
logger.warning.assert_called_with(
utils_logger.warning.assert_called_with(
f'Organization "{organization.name}" does not '
'have any OpenWISP RADIUS settings configured'
)
Expand Down
8 changes: 7 additions & 1 deletion openwisp_radius/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.test import override_settings

from ..utils import find_available_username, validate_csvfile
from ..utils import find_available_username, get_one_time_login_url, validate_csvfile
from . import FileMixin
from .mixins import BaseTestCase

Expand Down Expand Up @@ -32,3 +33,8 @@ def test_validate_csvfile(self):
with self.assertRaises(ValidationError) as error:
validate_csvfile(open(improper_csv_path, 'rt'))
self.assertTrue('Improper CSV format' in error.exception.message)

@override_settings(AUTHENTICATION_BACKENDS=[])
def test_get_one_time_login_url(self):
login_url = get_one_time_login_url(None, None)
self.assertEqual(login_url, None)
30 changes: 30 additions & 0 deletions openwisp_radius/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,33 @@ def get_group_checks(group):
for group_check in group_checks:
result[group_check.attribute] = group_check
return result


def get_one_time_login_url(user, organization):
if 'sesame.backends.ModelBackend' not in getattr(
settings, 'AUTHENTICATION_BACKENDS', []
):
return
from sesame.utils import get_query_string

try:
org_radius_settings = organization.radius_settings
except ObjectDoesNotExist:
logger.warning(
f'Organization "{organization.name}" does not '
'have any OpenWISP RADIUS settings configured'
)
return

login_url = org_radius_settings.login_url
if not login_url:
logger.debug(f'login_url is not defined for {organization.name} organization')
return

if not user.is_member(organization):
logger.warning(
f'user with username "{user}" is not member of "{organization.name}"'
)
return

return login_url + get_query_string(user)

0 comments on commit b162832

Please sign in to comment.