Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
dev: sync 0.35.4 (#825)
Browse files Browse the repository at this point in the history
* refactor

* fix bug

* bump 0.35.4dev0
  • Loading branch information
sbasan authored Sep 13, 2024
1 parent 907da38 commit e9f8dfd
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 102 deletions.
2 changes: 1 addition & 1 deletion ENDPOINTS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
**THIS FILE WAS AUTO-GENERATED DO NOT EDIT**

Generated for: catalystwan-0.35.3.dev0
Generated for: catalystwan-0.35.4.dev0

All URIs are relative to */dataservice*
HTTP request | Supported Versions | Method | Payload Type | Return Type | Tenancy Mode
Expand Down
12 changes: 7 additions & 5 deletions catalystwan/apigw_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ class ApiGwAuth(AuthBase, AuthProtocol):
2. Use the token in the Authorization header for subsequent requests.
"""

def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify=False):
def __init__(self, login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool = False):
self.login = login
self.token = ""
self.logger = logger or logging.getLogger(__name__)
self.verify = verify

def __str__(self) -> str:
return f"ApiGatewayAuth(mode={self.login.mode})"

def __call__(self, request: PreparedRequest) -> PreparedRequest:
self.handle_auth(request)
self.build_digest_header(request)
Expand All @@ -61,7 +64,9 @@ def build_digest_header(self, request: PreparedRequest) -> None:
request.headers.update(header)

@staticmethod
def get_token(base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify=False) -> str:
def get_token(
base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.Logger] = None, verify: bool = False
) -> str:
try:
response = post(
url=f"{base_url}/apigw/login",
Expand All @@ -84,9 +89,6 @@ def get_token(base_url: str, apigw_login: ApiGwLogin, logger: Optional[logging.L
raise CatalystwanException("Failed to get bearer token")
return token

def __str__(self) -> str:
return f"ApiGatewayAuth(mode={self.login.mode})"

def logout(self, client: APIEndpointClient) -> None:
return None

Expand Down
2 changes: 1 addition & 1 deletion catalystwan/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def auth_response_debug(response: Response, title: str = "Auth") -> str:
if environ.get("catalystwan_auth_trace") is not None:
return response_history_debug(response, None)
return ", ".join(
[f"{title}: {r.request.method} {r.request.url} <{r.status_code}>" for r in response.history + [response]]
[title] + [f"{r.request.method} {r.request.url} <{r.status_code}>" for r in response.history + [response]]
)


Expand Down
36 changes: 14 additions & 22 deletions catalystwan/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from catalystwan.response import ManagerResponse, response_history_debug
from catalystwan.utils.session_type import SessionType
from catalystwan.version import NullVersion, parse_api_version
from catalystwan.vmanage_auth import vManageAuth
from catalystwan.vmanage_auth import create_vmanage_auth, vManageAuth

JSON = Union[Dict[str, "JSON"], List["JSON"], str, int, float, bool, None]

Expand Down Expand Up @@ -169,7 +169,7 @@ def create_manager_session(
Returns:
ManagerSession: logged-in and operative session to perform tasks on SDWAN Manager.
"""
auth = vManageAuth(username, password, logger=logger)
auth = create_vmanage_auth(username, password, subdomain, logger)
session = ManagerSession(
base_url=create_base_url(url, port),
auth=auth,
Expand Down Expand Up @@ -313,10 +313,6 @@ def login(self) -> ManagerSession:
self.cookies.clear_session_cookies()
self._auth.clear()
self.auth = self._auth
if self.subdomain:
tenant_id = self.get_tenant_id()
vsession_id = self.get_virtual_session_id(tenant_id)
self.headers.update({"VSessionId": vsession_id})
try:
server_info = self.server()
except DefaultPasswordError:
Expand Down Expand Up @@ -361,7 +357,7 @@ def elapsed() -> float:
resp = head(
self.base_url,
timeout=self.polling_requests_timeout,
verify=False,
verify=self.verify,
headers={"User-Agent": USER_AGENT},
)
self.logger.debug(self.response_trace(resp, None))
Expand All @@ -381,7 +377,7 @@ def elapsed() -> float:
resp = get(
server_ready_url,
timeout=self.polling_requests_timeout,
verify=False,
verify=self.verify,
headers={"User-Agent": USER_AGENT},
)
self.logger.debug(self.response_trace(resp, None))
Expand Down Expand Up @@ -415,6 +411,16 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse:
self.logger.debug(exception)
raise ManagerRequestException(*exception.args, request=exception.request, response=exception.response)

if response.jsessionid_expired and self.state == ManagerSessionState.OPERATIVE:
self.logger.warning("Logging to session. Reason: expired JSESSIONID detected in response headers")
self.state = ManagerSessionState.LOGIN
return self.request(method, url, *args, **_kwargs)

if response.api_gw_unauthorized and self.state == ManagerSessionState.OPERATIVE:
self.logger.warning("Logging to API GW session. Reason: unauthorized detected in response headers")
self.state = ManagerSessionState.LOGIN
return self.request(method, url, *args, **_kwargs)

if response.request.url and "passwordReset.html" in response.request.url:
raise DefaultPasswordError("Password must be changed to use this session.")

Expand Down Expand Up @@ -478,20 +484,6 @@ def get_tenant_id(self) -> str:

return tenant.tenant_id

def get_virtual_session_id(self, tenant_id: str) -> str:
"""Get VSessionId for a specific tenant
Note: In a multitenant vManage system, this API is only available in the Provider view.
Args:
tenant_id: provider or tenant UUID
Returns:
Virtual session token
"""
url_path = f"/dataservice/tenant/{tenant_id}/vsessionid"
response = self.post(url_path)
return response.json()["VSessionId"]

def logout(self) -> None:
self._auth.logout(self)

Expand Down
35 changes: 25 additions & 10 deletions catalystwan/tests/test_vmanage_auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright 2022 Cisco Systems, Inc. and its affiliates

import unittest
from datetime import timedelta
from typing import Callable, Dict, List, Union
from unittest import TestCase, mock
from uuid import uuid4

Expand All @@ -13,11 +15,16 @@


class MockResponse:
def __init__(self, status_code: int, text: str, cookies: dict):
def __init__(self, status_code: int, text: str, cookies: Union[dict, RequestsCookieJar]):
self._status_code = status_code
self._text = text
self.cookies = cookies
self.request = Request()
self.history: List = list()
self.reason = "MockResponse"
self.elapsed = timedelta(0)
self.headers: Dict = dict()
self.json: Callable[..., Dict] = lambda: dict()

@property
def status_code(self) -> int:
Expand All @@ -29,9 +36,11 @@ def text(self) -> str: # TODO


def mock_request_j_security_check(*args, **kwargs):
jsessionid_cookie = RequestsCookieJar()
jsessionid_cookie.set("JSESSIONID", "xyz")
url_response = {
"https://1.1.1.1:1111/j_security_check": {
"admin": MockResponse(200, "", {"JSESSIONID": "xyz"}),
"admin": MockResponse(200, "", jsessionid_cookie),
"invalid_username": MockResponse(200, "<html>error</html>", {}),
}
}
Expand Down Expand Up @@ -70,7 +79,9 @@ def test_get_cookie(self, mock_post):
"j_password": self.password,
}
# Act
vManageAuth.get_jsessionid(self.base_url, username, self.password)
vmanage_auth = vManageAuth(username, self.password)
vmanage_auth._base_url = self.base_url
vmanage_auth.get_jsessionid()

# Assert
mock_post.assert_called_with(
Expand All @@ -90,11 +101,11 @@ def test_get_cookie_invalid_username(self, mock_post):
}
# Act
with self.assertRaises(UnauthorizedAccessError):
vManageAuth.get_jsessionid(self.base_url, username, self.password)
vManageAuth(username, self.password).get_jsessionid()

# Assert
mock_post.assert_called_with(
url="https://1.1.1.1:1111/j_security_check",
url="/j_security_check",
data=security_payload,
verify=False,
headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": USER_AGENT},
Expand All @@ -104,14 +115,18 @@ def test_get_cookie_invalid_username(self, mock_post):
def test_fetch_token(self, mock_get):
# Arrange
valid_url = "https://1.1.1.1:1111/dataservice/client/token"
cookies = RequestsCookieJar()
cookies.set("JSESSIONID", "xyz")

# Act
token = vManageAuth.get_xsrftoken(self.base_url, "xyz")
vmanage_auth = vManageAuth("user", self.password)
vmanage_auth._base_url = self.base_url
vmanage_auth.cookies = cookies
token = vmanage_auth.get_xsrftoken()

# Assert
self.assertEqual(token, "valid-token")
cookies = RequestsCookieJar()
cookies.set("JSESSIONID", "xyz")

mock_get.assert_called_with(
url=valid_url,
verify=False,
Expand All @@ -122,12 +137,12 @@ def test_fetch_token(self, mock_get):
@mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_invalid_token_status)
def test_incorrect_xsrf_token_status(self, mock_get):
with self.assertRaises(CatalystwanException):
vManageAuth.get_xsrftoken(self.base_url, "xyz")
vManageAuth("user", self.password).get_xsrftoken()

@mock.patch("catalystwan.vmanage_auth.get", side_effect=mock_invalid_token_format)
def test_incorrect_xsrf_token_format(self, mock_get):
with self.assertRaises(CatalystwanException):
vManageAuth.get_xsrftoken(self.base_url, "xyz")
vManageAuth("user", self.password).get_xsrftoken()


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit e9f8dfd

Please sign in to comment.