From e9f8dfd4721f97149d0f3a1990c5814515df3fb0 Mon Sep 17 00:00:00 2001 From: Szymon Basan <116343782+sbasan@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:03:18 +0200 Subject: [PATCH] dev: sync 0.35.4 (#825) * refactor * fix bug * bump 0.35.4dev0 --- ENDPOINTS.md | 2 +- catalystwan/apigw_auth.py | 12 +- catalystwan/response.py | 2 +- catalystwan/session.py | 36 ++--- catalystwan/tests/test_vmanage_auth.py | 35 +++-- catalystwan/vmanage_auth.py | 198 +++++++++++++++++-------- pyproject.toml | 2 +- 7 files changed, 185 insertions(+), 102 deletions(-) diff --git a/ENDPOINTS.md b/ENDPOINTS.md index c82c74f7..be7d8fe5 100644 --- a/ENDPOINTS.md +++ b/ENDPOINTS.md @@ -1,6 +1,6 @@ **THIS FILE WAS AUTO-GENERATED DO NOT EDIT** -Generated for: catalystwan-0.35.3.dev0 +Generated for: catalystwan-0.35.4.dev0 All URIs are relative to */dataservice* HTTP request | Supported Versions | Method | Payload Type | Return Type | Tenancy Mode diff --git a/catalystwan/apigw_auth.py b/catalystwan/apigw_auth.py index 30022805..b8e19335 100644 --- a/catalystwan/apigw_auth.py +++ b/catalystwan/apigw_auth.py @@ -32,12 +32,15 @@ class ApiGwAuth(AuthBase, AuthProtocol): 2. Use the token in the Authorization header for subsequent requests. """ - def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify=False): + def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool = False): self.login = login self.token = "" self.logger = logger or logging.getLogger(__name__) self.verify = verify + def __str__(self) -> str: + return f"ApiGatewayAuth(mode={self.login.mode})" + def __call__(self, request: PreparedRequest) -> PreparedRequest: self.handle_auth(request) self.build_digest_header(request) @@ -61,7 +64,9 @@ def build_digest_header(self, request: PreparedRequest) -> None: request.headers.update(header) @staticmethod - def get_token(base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify=False) -> str: + def get_token( + base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool = False + ) -> str: try: response = post( url=f"{base_url}/apigw/login", @@ -84,9 +89,6 @@ def get_token(base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.L raise CatalystwanException("Failed to get bearer token") return token - def __str__(self) -> str: - return f"ApiGatewayAuth(mode={self.login.mode})" - def logout(self, client: APIEndpointClient) -> None: return None diff --git a/catalystwan/response.py b/catalystwan/response.py index 8cc6864d..88245090 100644 --- a/catalystwan/response.py +++ b/catalystwan/response.py @@ -109,7 +109,7 @@ def auth_response_debug(response: Response, title: str = "Auth") -> str: if environ.get("catalystwan_auth_trace") is not None: return response_history_debug(response, None) return ", ".join( - [f"{title}: {r.request.method} {r.request.url} <{r.status_code}>" for r in response.history + [response]] + [title] + [f"{r.request.method} {r.request.url} <{r.status_code}>" for r in response.history + [response]] ) diff --git a/catalystwan/session.py b/catalystwan/session.py index 0ceea40f..043a3dec 100644 --- a/catalystwan/session.py +++ b/catalystwan/session.py @@ -30,7 +30,7 @@ from catalystwan.response import ManagerResponse, response_history_debug from catalystwan.utils.session_type import SessionType from catalystwan.version import NullVersion, parse_api_version -from catalystwan.vmanage_auth import vManageAuth +from catalystwan.vmanage_auth import create_vmanage_auth, vManageAuth JSON = Union[Dict[str, "JSON"], List["JSON"], str, int, float, bool, None] @@ -169,7 +169,7 @@ def create_manager_session( Returns: ManagerSession: logged-in and operative session to perform tasks on SDWAN Manager. """ - auth = vManageAuth(username, password, logger=logger) + auth = create_vmanage_auth(username, password, subdomain, logger) session = ManagerSession( base_url=create_base_url(url, port), auth=auth, @@ -313,10 +313,6 @@ def login(self) -> ManagerSession: self.cookies.clear_session_cookies() self._auth.clear() self.auth = self._auth - if self.subdomain: - tenant_id = self.get_tenant_id() - vsession_id = self.get_virtual_session_id(tenant_id) - self.headers.update({"VSessionId": vsession_id}) try: server_info = self.server() except DefaultPasswordError: @@ -361,7 +357,7 @@ def elapsed() -> float: resp = head( self.base_url, timeout=self.polling_requests_timeout, - verify=False, + verify=self.verify, headers={"User-Agent": USER_AGENT}, ) self.logger.debug(self.response_trace(resp, None)) @@ -381,7 +377,7 @@ def elapsed() -> float: resp = get( server_ready_url, timeout=self.polling_requests_timeout, - verify=False, + verify=self.verify, headers={"User-Agent": USER_AGENT}, ) self.logger.debug(self.response_trace(resp, None)) @@ -415,6 +411,16 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse: self.logger.debug(exception) raise ManagerRequestException(*exception.args, request=exception.request, response=exception.response) + if response.jsessionid_expired and self.state == ManagerSessionState.OPERATIVE: + self.logger.warning("Logging to session. Reason: expired JSESSIONID detected in response headers") + self.state = ManagerSessionState.LOGIN + return self.request(method, url, *args, **_kwargs) + + if response.api_gw_unauthorized and self.state == ManagerSessionState.OPERATIVE: + self.logger.warning("Logging to API GW session. Reason: unauthorized detected in response headers") + self.state = ManagerSessionState.LOGIN + return self.request(method, url, *args, **_kwargs) + if response.request.url and "passwordReset.html" in response.request.url: raise DefaultPasswordError("Password must be changed to use this session.") @@ -478,20 +484,6 @@ def get_tenant_id(self) -> str: return tenant.tenant_id - def get_virtual_session_id(self, tenant_id: str) -> str: - """Get VSessionId for a specific tenant - - Note: In a multitenant vManage system, this API is only available in the Provider view. - - Args: - tenant_id: provider or tenant UUID - Returns: - Virtual session token - """ - url_path = f"/dataservice/tenant/{tenant_id}/vsessionid" - response = self.post(url_path) - return response.json()["VSessionId"] - def logout(self) -> None: self._auth.logout(self) diff --git a/catalystwan/tests/test_vmanage_auth.py b/catalystwan/tests/test_vmanage_auth.py index 8d9f5682..3e917d5a 100644 --- a/catalystwan/tests/test_vmanage_auth.py +++ b/catalystwan/tests/test_vmanage_auth.py @@ -1,6 +1,8 @@ # Copyright 2022 Cisco Systems, Inc. and its affiliates import unittest +from datetime import timedelta +from typing import Callable, Dict, List, Union from unittest import TestCase, mock from uuid import uuid4 @@ -13,11 +15,16 @@ class MockResponse: - def __init__(self, status_code: int, text: str, cookies: dict): + def __init__(self, status_code: int, text: str, cookies: Union[dict, RequestsCookieJar]): self._status_code = status_code self._text = text self.cookies = cookies self.request = Request() + self.history: List = list() + self.reason = "MockResponse" + self.elapsed = timedelta(0) + self.headers: Dict = dict() + self.json: Callable[..., Dict] = lambda: dict() @property def status_code(self) -> int: @@ -29,9 +36,11 @@ def text(self) -> str: # TODO def mock_request_j_security_check(*args, **kwargs): + jsessionid_cookie = RequestsCookieJar() + jsessionid_cookie.set("JSESSIONID", "xyz") url_response = { "https://1.1.1.1:1111/j_security_check": { - "admin": MockResponse(200, "", {"JSESSIONID": "xyz"}), + "admin": MockResponse(200, "", jsessionid_cookie), "invalid_username": MockResponse(200, "error", {}), } } @@ -70,7 +79,9 @@ def test_get_cookie(self, mock_post): "j_password": self.password, } # Act - vManageAuth.get_jsessionid(self.base_url, username, self.password) + vmanage_auth = vManageAuth(username, self.password) + vmanage_auth._base_url = self.base_url + vmanage_auth.get_jsessionid() # Assert mock_post.assert_called_with( @@ -90,11 +101,11 @@ def test_get_cookie_invalid_username(self, mock_post): } # Act with self.assertRaises(UnauthorizedAccessError): - vManageAuth.get_jsessionid(self.base_url, username, self.password) + vManageAuth(username, self.password).get_jsessionid() # Assert mock_post.assert_called_with( - url="https://1.1.1.1:1111/j_security_check", + url="/j_security_check", data=security_payload, verify=False, headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT}, @@ -104,14 +115,18 @@ def test_get_cookie_invalid_username(self, mock_post): def test_fetch_token(self, mock_get): # Arrange valid_url = "https://1.1.1.1:1111/dataservice/client/token" + cookies = RequestsCookieJar() + cookies.set("JSESSIONID", "xyz") # Act - token = vManageAuth.get_xsrftoken(self.base_url, "xyz") + vmanage_auth = vManageAuth("user", self.password) + vmanage_auth._base_url = self.base_url + vmanage_auth.cookies = cookies + token = vmanage_auth.get_xsrftoken() # Assert self.assertEqual(token, "valid-token") - cookies = RequestsCookieJar() - cookies.set("JSESSIONID", "xyz") + mock_get.assert_called_with( url=valid_url, verify=False, @@ -122,12 +137,12 @@ def test_fetch_token(self, mock_get): @mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_invalid_token_status) def test_incorrect_xsrf_token_status(self, mock_get): with self.assertRaises(CatalystwanException): - vManageAuth.get_xsrftoken(self.base_url, "xyz") + vManageAuth("user", self.password).get_xsrftoken() @mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_invalid_token_format) def test_incorrect_xsrf_token_format(self, mock_get): with self.assertRaises(CatalystwanException): - vManageAuth.get_xsrftoken(self.base_url, "xyz") + vManageAuth("user", self.password).get_xsrftoken() if __name__ == "__main__": diff --git a/catalystwan/vmanage_auth.py b/catalystwan/vmanage_auth.py index 0507d567..cfcfdd76 100644 --- a/catalystwan/vmanage_auth.py +++ b/catalystwan/vmanage_auth.py @@ -1,17 +1,19 @@ # Copyright 2022 Cisco Systems, Inc. and its affiliates import logging -from typing import Callable, Optional +from http.cookies import SimpleCookie +from typing import Optional from urllib.parse import urlparse from packaging.version import Version # type: ignore from requests import PreparedRequest, Response, get, post from requests.auth import AuthBase -from requests.cookies import RequestsCookieJar +from requests.cookies import RequestsCookieJar, merge_cookies from catalystwan import USER_AGENT from catalystwan.abstractions import APIEndpointClient, AuthProtocol -from catalystwan.exceptions import CatalystwanException +from catalystwan.exceptions import CatalystwanException, TenantSubdomainNotFound +from catalystwan.models.tenant import Tenant from catalystwan.response import ManagerResponse, auth_response_debug from catalystwan.version import NullVersion @@ -39,6 +41,25 @@ def __str__(self): return f"Trying to access vManage with the following credentials: {self.username}/****. {self.message}" +def update_headers( + request: PreparedRequest, + jsessionid: Optional[str], + xsrftoken: Optional[str] = None, + vsessionid: Optional[str] = None, +) -> None: + if jsessionid is not None: + # preserve existing cookies and insert JSESSIONID + # PreparedRequest.preparce_cookies cannot be used as they can be already set in session context + cookie: SimpleCookie = SimpleCookie() + cookie.load(request.headers.get("Cookie", "")) + cookie["JSESSIONID"] = jsessionid + request.headers["Cookie"] = cookie.output(header="", sep=";").strip() + if xsrftoken is not None: + request.headers["x-xsrf-token"] = xsrftoken + if vsessionid is not None: + request.headers["VSessionId"] = vsessionid + + class vManageAuth(AuthBase, AuthProtocol): """Attaches vManage Authentication to the given Requests object. @@ -49,81 +70,67 @@ class vManageAuth(AuthBase, AuthProtocol): 2. Get a cross-site request forgery prevention token, which is required for most POST operations. """ - def __init__(self, username: str, password: str, logger: Optional[logging.Logger] = None, verify=False): + def __init__(self, username: str, password: str, logger: Optional[logging.Logger] = None, verify: bool = False): self.username = username self.password = password - self.jsessionid: str = "" - self.xsrftoken: str = "" + self.xsrftoken: Optional[str] = None self.verify = verify self.logger = logger or logging.getLogger(__name__) - self._cookie: RequestsCookieJar = RequestsCookieJar() - self._callback: Optional[Callable[[AuthBase], None]] = None - self._base_url: Optional[str] = None + self.cookies: RequestsCookieJar = RequestsCookieJar() + self._base_url: str = "" - def response_hook(self, r: Response, **kwargs) -> Response: - _r = ManagerResponse(r) - if _r.jsessionid_expired: - self.clear() - return r + def __str__(self) -> str: + return f"vManageAuth(username={self.username})" def __call__(self, request: PreparedRequest) -> PreparedRequest: self.handle_auth(request) - self.build_digest_header(request) - request.register_hook("response", self.response_hook) + update_headers(request, self.jsessionid, self.xsrftoken) return request + def sync_cookies(self, cookies: RequestsCookieJar) -> None: + self.cookies = merge_cookies(self.cookies, cookies) + + @property + def jsessionid(self) -> Optional[str]: + return self.cookies.get("JSESSIONID") + def handle_auth(self, request: PreparedRequest): if not self.jsessionid or not self.xsrftoken: self.authenticate(request) - @staticmethod - def get_jsessionid( - base_url: str, username: str, password: str, logger: Optional[logging.Logger] = None, verify: bool = False - ) -> str: + def get_jsessionid(self) -> str: security_payload = { - "j_username": username, - "j_password": password, + "j_username": self.username, + "j_password": self.password, } - url = base_url + "/j_security_check" + url = self._base_url + "/j_security_check" headers = {"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT} - response: Response = post(url=url, headers=headers, data=security_payload, verify=verify) - if logger is not None: - logger.debug(auth_response_debug(response)) - jsessionid = response.cookies.get("JSESSIONID", "") - if response.text != "" or not isinstance(jsessionid, str) or jsessionid == "": - raise UnauthorizedAccessError(username, password) - return jsessionid - - @staticmethod - def get_xsrftoken( - base_url: str, jsessionid: str, logger: Optional[logging.Logger] = None, verify: bool = False - ) -> str: - url = base_url + "/dataservice/client/token" + response: Response = post(url=url, headers=headers, data=security_payload, verify=self.verify) + self.sync_cookies(response.cookies) + self.logger.debug(auth_response_debug(response, str(self))) + if response.text != "" or not isinstance(self.jsessionid, str) or self.jsessionid == "": + raise UnauthorizedAccessError(self.username, self.password) + return self.jsessionid + + def get_xsrftoken(self) -> str: + url = self._base_url + "/dataservice/client/token" headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT} - cookie = RequestsCookieJar() - cookie.set("JSESSIONID", jsessionid) response: Response = get( url=url, - cookies=cookie, + cookies=self.cookies, headers=headers, - verify=verify, + verify=self.verify, ) - if logger is not None: - logger.debug(auth_response_debug(response)) + self.sync_cookies(response.cookies) + self.logger.debug(auth_response_debug(response, str(self))) if response.status_code != 200 or "" in response.text: raise CatalystwanException("Failed to get XSRF token") return response.text def authenticate(self, request: PreparedRequest): self._base_url = f"{str(urlparse(request.url).scheme)}://{str(urlparse(request.url).netloc)}" # noqa: E231 - self.jsessionid = self.get_jsessionid(self._base_url, self.username, self.password, self.logger, self.verify) - self._cookie = RequestsCookieJar() - self._cookie.set("JSESSIONID", self.jsessionid) - self.xsrftoken = self.get_xsrftoken(self._base_url, self.jsessionid, self.logger, self.verify) - - def build_digest_header(self, request: PreparedRequest) -> None: - request.headers["x-xsrf-token"] = self.xsrftoken - request.prepare_cookies(self._cookie) + self.get_jsessionid() + self.xsrftoken = self.get_xsrftoken() def logout(self, client: APIEndpointClient) -> None: if isinstance((version := client.api_version), NullVersion): @@ -131,23 +138,90 @@ def logout(self, client: APIEndpointClient) -> None: elif self._base_url is None: self.logger.warning("Cannot perform logout without known base url") else: + headers = {"x-xsrf-token": self.xsrftoken, "User-Agent": USER_AGENT} if version >= Version("20.12"): - response = post( - f"{self._base_url}/logout", - cookies=self._cookie, - headers={"x-xsrf-token": self.xsrftoken}, - verify=False, - ) + response = post(f"{self._base_url}/logout", headers=headers, cookies=self.cookies, verify=self.verify) else: - response = get(f"{self._base_url}", cookies=self._cookie, verify=False) - self.logger.debug(auth_response_debug(response)) + response = get(f"{self._base_url}/logout", headers=headers, cookies=self.cookies, verify=self.verify) + self.logger.debug(auth_response_debug(response, str(self))) if response.status_code != 200: self.logger.error("Unsuccessfull logout") + self.clear() + + def clear(self) -> None: + self.cookies.clear_session_cookies() + self.xsrftoken = None + + +class vSessionAuth(vManageAuth): + def __init__( + self, + username: str, + password: str, + subdomain: str, + logger: Optional[logging.Logger] = None, + verify: bool = False, + ): + super().__init__(username, password, logger, verify) + self.subdomain = subdomain + self.vsessionid: Optional[str] = None def __str__(self) -> str: - return f"vManageAuth(username={self.username})" + return f"vSessionAuth(username={self.username},subdomain={self.subdomain})" # noqa: E231 + + def __call__(self, request: PreparedRequest) -> PreparedRequest: + self.handle_auth(request) + update_headers(request, self.jsessionid, self.xsrftoken, self.vsessionid) + return request + + def authenticate(self, request: PreparedRequest): + super().authenticate(request) + tenantid = self.get_tenantid() + self.vsessionid = self.get_vsessionid(tenantid) + + def get_tenantid(self) -> str: + url = self._base_url + "/dataservice/tenant" + headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT, "x-xsrf-token": self.xsrftoken} + response: Response = get( + url=url, + cookies=self.cookies, + headers=headers, + verify=self.verify, + ) + self.sync_cookies(response.cookies) + self.logger.debug(auth_response_debug(response, str(self))) + tenants = ManagerResponse(response).dataseq(Tenant) + tenant = tenants.filter(subdomain=self.subdomain).single_or_default() + if not tenant or not tenant.tenant_id: + raise TenantSubdomainNotFound(f"Tenant ID for sub-domain: {self.subdomain} not found") + return tenant.tenant_id + + def get_vsessionid(self, tenantid: str) -> str: + url = self._base_url + f"/dataservice/tenant/{tenantid}/vsessionid" + headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT, "x-xsrf-token": self.xsrftoken} + response: Response = post( + url=url, + cookies=self.cookies, + headers=headers, + verify=self.verify, + ) + self.sync_cookies(response.cookies) + self.logger.debug(auth_response_debug(response, str(self))) + return response.json()["VSessionId"] def clear(self) -> None: - self.jsessionid = "" - self.xsrftoken = "" - self._cookie = RequestsCookieJar() + super().clear() + self.vsessionid = None + + +def create_vmanage_auth( + username: str, + password: str, + subdomain: Optional[str] = None, + logger: Optional[logging.Logger] = None, + verify: bool = False, +) -> vManageAuth: + if subdomain is not None: + return vSessionAuth(username, password, subdomain, logger=logger, verify=verify) + else: + return vManageAuth(username, password, logger=logger, verify=verify) diff --git a/pyproject.toml b/pyproject.toml index 57579a90..dd752dd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "catalystwan" -version = "0.35.3dev2" +version = "0.35.4dev0" description = "Cisco Catalyst WAN SDK for Python" authors = ["kagorski "] readme = "README.md"