diff --git a/auth_saml/__manifest__.py b/auth_saml/__manifest__.py index f3596d6d5..bf6d3cc79 100644 --- a/auth_saml/__manifest__.py +++ b/auth_saml/__manifest__.py @@ -4,7 +4,7 @@ { "name": "SAML2 Authentication", - "version": "17.0.1.0.0", + "version": "18.0.1.0.0", "category": "Tools", "author": "XCG Consulting, Odoo Community Association (OCA)", "maintainers": ["vincent-hatakeyama"], diff --git a/auth_saml/controllers/main.py b/auth_saml/controllers/main.py index f241d3d50..e48dd4206 100644 --- a/auth_saml/controllers/main.py +++ b/auth_saml/controllers/main.py @@ -17,9 +17,7 @@ exceptions, http, models, -) -from odoo import ( - registry as registry_get, + modules, ) from odoo.http import request from odoo.tools.misc import clean_context @@ -173,7 +171,7 @@ def _get_saml_extra_relaystate(self): } return state - @http.route("/auth_saml/get_auth_request", type="http", auth="none") + @http.route("/auth_saml/get_auth_request", type="http", auth="none", readonly=False) def get_auth_request(self, pid): provider_id = int(pid) @@ -191,7 +189,9 @@ def get_auth_request(self, pid): redirect.autocorrect_location_header = True return redirect - @http.route("/auth_saml/signin", type="http", auth="none", csrf=False) + @http.route( + "/auth_saml/signin", type="http", auth="none", csrf=False, readonly=False + ) @fragment_to_query_string def signin(self, **kw): """ @@ -241,7 +241,13 @@ def signin(self, **kw): url = f"/#action={action}" elif menu: url = f"/#menu_id={menu}" - pre_uid = request.session.authenticate(*credentials) + + credentials_dict = { + "login": credentials[1], + "token": credentials[2], + "type": "saml_token", + } + pre_uid = request.session.authenticate(dbname, credentials_dict) resp = request.redirect(_get_login_redirect_url(pre_uid, url), 303) resp.autocorrect_location_header = False return resp @@ -265,7 +271,9 @@ def signin(self, **kw): redirect.autocorrect_location_header = False return redirect - @http.route("/auth_saml/metadata", type="http", auth="none", csrf=False) + @http.route( + "/auth_saml/metadata", type="http", auth="none", csrf=False, readonly=False + ) def saml_metadata(self, **kw): provider = kw.get("p") dbname = kw.get("d") @@ -273,17 +281,15 @@ def saml_metadata(self, **kw): if not dbname or not provider: _logger.debug("Metadata page asked without database name or provider id") - return request.not_found(_("Missing parameters")) + raise request.not_found(_("Missing parameters")) provider = int(provider) - registry = registry_get(dbname) - - with registry.cursor() as cr: + with modules.registry.Registry(dbname).cursor() as cr: env = api.Environment(cr, SUPERUSER_ID, {}) client = env["auth.saml.provider"].sudo().browse(provider) if not client.exists(): - return request.not_found(_("Unknown provider")) + raise request.not_found(_("Unknown provider")) return request.make_response( client._metadata_string( diff --git a/auth_saml/models/auth_saml_provider.py b/auth_saml/models/auth_saml_provider.py index fc75664ee..e55ddf7fc 100644 --- a/auth_saml/models/auth_saml_provider.py +++ b/auth_saml/models/auth_saml_provider.py @@ -164,7 +164,7 @@ def _compute_sp_metadata_url(self): qs = urllib.parse.urlencode({"p": record.id, "d": self.env.cr.dbname}) record.sp_metadata_url = urllib.parse.urljoin( - base_url, f"/auth_saml/metadata?{qs}" + base_url, (f"/auth_saml/metadata?{qs}") ) def _get_cert_key_path(self, field="sp_pem_public"): @@ -277,6 +277,38 @@ def _get_auth_request(self, extra_state=None, url_root=None): return redirect_url + def _logout(self, saml_user, redirect, url_root=None): + """ + build a logout request and give it back to our client + """ + state = { + "d": self.env.cr.dbname, + "p": self.id, + } + + sig_alg = ds.SIG_RSA_SHA1 + if self.sig_alg: + sig_alg = getattr(ds, self.sig_alg) + + saml_client = self._get_client_for_provider(url_root) + reqid, info = saml_client.prepare_for_logout( + sign=self.sign_authenticate_requests, + relay_state=json.dumps(state), + sigalg=sig_alg, + ) + + redirect_url = None + # Select the IdP URL to send the AuthN request to + for key, value in info["headers"]: + if key == "Location": + redirect_url = value + + self._store_outstanding_request(reqid) + + saml_user.saml_access_token = None + + return redirect_url + def _validate_auth_response(self, token: str, base_url: str = None): """return the validation data corresponding to the access token""" self.ensure_one() diff --git a/auth_saml/models/res_users.py b/auth_saml/models/res_users.py index 412b5c699..841fc63dd 100644 --- a/auth_saml/models/res_users.py +++ b/auth_saml/models/res_users.py @@ -7,7 +7,7 @@ import passlib -from odoo import SUPERUSER_ID, _, api, fields, models, registry, tools +from odoo import SUPERUSER_ID, _, api, fields, models, modules, tools from odoo.exceptions import AccessDenied, ValidationError from .ir_config_parameter import ALLOW_SAML_UID_AND_PASSWORD @@ -48,7 +48,7 @@ def _auth_saml_signin(self, provider: int, validation: dict, saml_response) -> s if len(user) != 1: raise AccessDenied() - with registry(self.env.cr.dbname).cursor() as new_cr: + with modules.registry.Registry(self.env.cr.dbname).cursor() as new_cr: new_env = api.Environment(new_cr, self.env.uid, self.env.context) # Update the token. Need to be committed, otherwise the token is not visible # to other envs, like the one used in login_and_redirect @@ -76,7 +76,7 @@ def auth_saml(self, provider: int, saml_response: str, base_url: str = None): # return user credentials return self.env.cr.dbname, login, saml_response - def _check_credentials(self, password, env): + def _check_credentials(self, credential, env): """Override to handle SAML auths. The token can be a password if the user has used the normal form... @@ -84,10 +84,16 @@ def _check_credentials(self, password, env): and the interesting code is inside the "except" clause. """ try: - # Attempt a regular login (via other auth addons) first. - return super()._check_credentials(password, env) + if self.allow_saml_and_password(): + # If both SAML and password are allowed we can try first the normal auth + return super()._check_credentials(credential, env) + else: + # If only SAML we go to the except clause + raise AccessDenied() from None except (AccessDenied, passlib.exc.PasswordSizeError): + if not (credential["type"] == "saml_token" and credential["token"]): + raise passwd_allowed = ( env["interactive"] or not self.env.user._rpc_api_keys_only() ) @@ -100,12 +106,16 @@ def _check_credentials(self, password, env): .search( [ ("user_id", "=", self.env.user.id), - ("saml_access_token", "=", password), + ("saml_access_token", "=", credential["token"]), ] ) ) if token: - return + return { + "uid": self.env.user.id, + "auth_method": "saml", + "mfa": "default", + } raise AccessDenied() from None @api.model diff --git a/auth_saml/tests/test_pysaml.py b/auth_saml/tests/test_pysaml.py index 9abccf576..7cd3db0a7 100644 --- a/auth_saml/tests/test_pysaml.py +++ b/auth_saml/tests/test_pysaml.py @@ -125,7 +125,7 @@ def test__compute_sp_metadata_url__provider_has_sp_baseurl(self): {"p": self.saml_provider.id, "d": self.env.cr.dbname} ) expected_url = urllib.parse.urljoin( - "http://example.com", f"/auth_saml/metadata?{expected_qs}" + "http://example.com", (f"/auth_saml/metadata?{expected_qs}") ) # Assert that sp_metadata_url is set correctly self.assertEqual(self.saml_provider.sp_metadata_url, expected_url) @@ -200,7 +200,10 @@ def test_login_no_saml(self): # Try to log in with a non-existing SAML token with self.assertRaises(AccessDenied): - self.authenticate(user="test@example.com", password="test_saml_token") + self.user._check_credentials( + {"type": "password", "password": "test_saml_token"}, + {"interactive": True}, + ) redirect_url = self.saml_provider._get_auth_request() self.assertIn("http://localhost:8000/sso/redirect?SAMLRequest=", redirect_url) @@ -254,7 +257,10 @@ def test_login_with_saml(self): # We should not be able to log in with the wrong token with self.assertRaises(AccessDenied): - self.authenticate(user="test@example.com", password=f"{token}-WRONG") + self.user._check_credentials( + {"type": "password", "password": "WRONG_TOKEN"}, + {"interactive": True}, + ) # User should now be able to log in with the token self.authenticate(user="test@example.com", password=token) @@ -268,8 +274,9 @@ def test_disallow_user_password_when_changing_ir_config_parameter(self): ).value = "False" # The password should be blank and the user should not be able to connect with self.assertRaises(AccessDenied): - self.authenticate( - user="user@example.com", password="NesTNSte9340D720te>/-A" + self.user._check_credentials( + {"type": "password", "password": "NesTNSte9340D720te>/-A"}, + {"interactive": True}, ) def test_disallow_user_password_new_user(self): @@ -332,18 +339,19 @@ def test_disallow_user_password_no_password_set(self): with self.assertRaises(ValidationError): user.password = "new password" - def test_disallow_user_password(self): + def test_disallow_user_password_on_option_disable(self): """Test that existing user password is deleted when adding an SAML provider when the disallow option is set.""" + self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa") # change the option self.browse_ref( "auth_saml.allow_saml_uid_and_internal_password" ).value = "False" - # Test that existing user password is deleted when adding an SAML provider - self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa") - self.add_provider_to_user() with self.assertRaises(AccessDenied): - self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa") + self.user._check_credentials( + {"type": "password", "password": "Lu,ums-7vRU>0i]=YDLa"}, + {"interactive": True}, + ) def test_disallow_user_admin_can_have_password(self): """Test that admin can have its password set @@ -417,6 +425,32 @@ def test_disallow_user_password_when_changing_settings(self): ).execute() with self.assertRaises(AccessDenied): - self.authenticate( - user="user@example.com", password="NesTNSte9340D720te>/-A" + self.user._check_credentials( + {"type": "password", "password": "NesTNSte9340D720te>/-A"}, + {"interactive": True}, + ) + + def test_saml_metadata_invalid_provider(self): + """Accessing SAML metadata with an invalid provider ID should return 404.""" + response = self.url_open(f"/auth_saml/metadata?p=999999&d={self.env.cr.dbname}") + self.assertEqual(response.status_code, 404) + self.assertIn("Unknown provider", response.text) + + def test_saml_metadata_missing_parameters(self): + """Accessing the SAML metadata endpoint without params should return 404.""" + response = self.url_open("/auth_saml/metadata") + self.assertEqual(response.status_code, 404) + self.assertIn("Missing parameters", response.text) + + def test_saml_provider_deactivation(self): + """A deactivated SAML provider should not be usable for authentication.""" + self.saml_provider.active = False + + redirect_url = self.saml_provider._get_auth_request() + response = self.idp.fake_login(redirect_url) + unpacked_response = response._unpack() + + with self.assertRaises(AccessDenied): + self.env["res.users"].sudo().auth_saml( + self.saml_provider.id, unpacked_response.get("SAMLResponse"), None ) diff --git a/auth_saml/views/auth_saml.xml b/auth_saml/views/auth_saml.xml index 9ee7dc033..e9ed50207 100644 --- a/auth_saml/views/auth_saml.xml +++ b/auth_saml/views/auth_saml.xml @@ -48,12 +48,12 @@ auth.saml.provider.list auth.saml.provider - + - + @@ -168,10 +168,10 @@ name="attribute_mapping_ids" context="{'default_provider_id': id}" > - + - +

Mapped attributes are copied from the SAML response at every logon, if available. If multiple values are returned (i.e. a list) then the first value is used. @@ -191,7 +191,7 @@ Providers auth.saml.provider - tree,form + list,form - + - +