Skip to content

Commit

Permalink
add oauthclient implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
PeyGis committed Jan 15, 2025
1 parent d3fabb1 commit 6133ad7
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 7 deletions.
16 changes: 16 additions & 0 deletions integrations/dynatrace/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ features:
- kind: problem
- kind: slo
- kind: entity
- kind: user
- kind: group
configurations:
- name: dynatraceApiKey
required: true
Expand All @@ -19,3 +21,17 @@ configurations:
type: url
required: true
description: "Dynatrace environment URL in the format https://<environment-id>.live.dynatrace.com e.g https://abc123.live.dynatrace.com. You can get this from the URL when you are logged into your Dynatrace environment."
- name: dynatraceAccountId
type: string
required: false
description: "Dynatrace account ID. You can get this from the URL when you are logged into your Dynatrace environment."
- name: dynatraceOauthClientId
required: false
type: string
sensitive: true
description: Dynatrace OAuth Client ID. Read <a target="_blank" href="https://www.dynatrace.com/support/help/shortlink/api-authentication#generate-a-client-id-and-client-secret">Dynatrace authentication documentation</a> on how to create one.
- name: dynatraceOauthClientSecret
required: false
type: string
sensitive: true
description: Dynatrace OAuth Client Secret. Read <a target="_blank" href="https://www.dynatrace.com/support/help/shortlink/api-authentication#generate-a-client-id-and-client-secret">Dynatrace authentication documentation</a> on how to create one.
53 changes: 49 additions & 4 deletions integrations/dynatrace/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import typing
from enum import StrEnum
from typing import Any, AsyncGenerator
from typing import Any, AsyncGenerator, Optional, cast

import httpx
from loguru import logger
from port_ocean.context.event import event
from port_ocean.utils import http_async_client
from oauth_client import OAuthClient

from integration import DynatraceResourceConfig, EntityFieldsType

Expand All @@ -24,11 +24,38 @@ class ResourceKey(StrEnum):


class DynatraceClient:
def __init__(self, host_url: str, api_key: str) -> None:
def __init__(
self, host_url: str, api_key: str, oauth_client: Optional[OAuthClient] = None
) -> None:
self.host = host_url.rstrip("/")
self.host_url = f"{host_url.rstrip('/')}/api/v2"
self.client = http_async_client
self.client.headers.update({"Authorization": f"Api-Token {api_key}"})
self.oauth_client = oauth_client
self.account_management_url = "https://api.dynatrace.com/iam/v1"

async def _get_paginated_resources_with_oauth(
self, base_url: str, resource_key: str, params: dict[str, Any] = {}
) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Fetch paginated resources with OAuth."""
logger.info(f"Fetching {resource_key} from {base_url} with params {params}")
if self.oauth_client is None:
raise ValueError("OAuth client is not initialized")
try:
while True:
response = await self.oauth_client.send_request(
"GET", base_url, params=params
)
resources = response.get(resource_key, [])
yield resources

next_page_key = response.get("nextPageKey")
if not next_page_key:
break
params = {"nextPageKey": next_page_key}
except httpx.HTTPError as e:
logger.error(f"HTTP error on {base_url}: {e}")
raise

async def _get_paginated_resources(
self, url: str, resource_key: str, params: dict[str, Any] = {}
Expand Down Expand Up @@ -86,7 +113,7 @@ async def _get_entities_from_type(
yield entities

async def get_entities(self) -> AsyncGenerator[list[dict[str, Any]], None]:
selector = typing.cast(DynatraceResourceConfig, event.resource_config).selector
selector = cast(DynatraceResourceConfig, event.resource_config).selector

for entity_type in selector.entity_types:
async for entities in self._get_entities_from_type(
Expand All @@ -109,6 +136,24 @@ async def get_single_problem(self, problem_id: str) -> dict[str, Any]:
logger.error(f"HTTP error on {url}: {e}")
raise

async def get_users(self) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Fetch paginated users from the account management API."""
if not self.oauth_client:
raise ValueError("OAuth client is required to fetch users.")

url = f"{self.account_management_url}/accounts/{self.oauth_client.account_id}/users"
async for users in self._get_paginated_resources_with_oauth(url, "items"):
yield users

async def get_groups(self) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Fetch paginated teams from the account management API."""
if not self.oauth_client:
raise ValueError("OAuth client is required to fetch teams.")

url = f"{self.account_management_url}/accounts/{self.oauth_client.account_id}/groups"
async for teams in self._get_paginated_resources_with_oauth(url, "items"):
yield teams

async def healthcheck(self) -> None:
try:
response = await self.client.get(
Expand Down
44 changes: 41 additions & 3 deletions integrations/dynatrace/main.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,45 @@
from enum import StrEnum
from typing import Any
from typing import Any, Optional

from loguru import logger
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE

from client import DynatraceClient
from oauth_client import OAuthClient


class ObjectKind(StrEnum):
PROBLEM = "problem"
SLO = "slo"
ENTITY = "entity"
USER = "user"
GROUP = "group"


oauth_client: Optional[OAuthClient] = None


def initialize_client() -> DynatraceClient:
global oauth_client
host_url = ocean.integration_config["dynatrace_host_url"]
api_key = ocean.integration_config["dynatrace_api_key"]

# Optional OAuth configurations
account_id = ocean.integration_config.get("dynatrace_account_id")
oauth_client_id = ocean.integration_config.get("dynatrace_oauth_client_id")
oauth_client_secret = ocean.integration_config.get("dynatrace_oauth_client_secret")

if not oauth_client:
if oauth_client_id and oauth_client_secret and account_id:
oauth_client = OAuthClient(
client_id=oauth_client_id,
client_secret=oauth_client_secret,
account_id=account_id,
)

return DynatraceClient(
ocean.integration_config["dynatrace_host_url"],
ocean.integration_config["dynatrace_api_key"],
host_url=host_url, api_key=api_key, oauth_client=oauth_client
)


Expand Down Expand Up @@ -45,6 +67,22 @@ async def on_resync_entities(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
yield entities


@ocean.on_resync(ObjectKind.USER)
async def on_resync_users(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
dynatrace_client = initialize_client()

async for users in dynatrace_client.get_users():
yield users


@ocean.on_resync(ObjectKind.GROUP)
async def on_resync_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
dynatrace_client = initialize_client()

async for groups in dynatrace_client.get_groups():
yield groups


@ocean.router.post("/webhook/problem")
async def on_problem_event(event: dict[str, str | Any]) -> dict[str, bool]:
"""
Expand Down
87 changes: 87 additions & 0 deletions integrations/dynatrace/oauth_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from loguru import logger
from typing import Optional, Any
import time
import httpx
from port_ocean.utils import http_async_client


class OAuthClient:
def __init__(
self,
client_id: str,
client_secret: str,
account_id: str,
token_url: str = "https://sso.dynatrace.com/sso/oauth2/token",
) -> None:
self.client_id = client_id
self.client_secret = client_secret
self.account_id = account_id
self.token_url = token_url
self.access_token: Optional[str] = None
self.token_expiry: float = 0
self.client = http_async_client

async def _retrieve_token(self) -> None:
"""Retrieve a new bearer token using the correct URL-encoded format."""
logger.info("Fetching a new OAuth token...")
try:
response = await self.client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "account-idm-read iam:users:read iam:groups:read",
"resource": f"urn:dtaccount:{self.account_id}",
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# The token expires in 'expires_in' seconds. Set a buffer of 10 seconds regenerate early.
self.token_expiry = time.time() + token_data.get("expires_in", 300) - 10
logger.warning(
f"OAuth token retrieved successfully. Expiring in {self.token_expiry}"
)
except httpx.HTTPError as e:
logger.error(f"Failed to fetch OAuth token: {e}")
raise

async def _ensure_token(self) -> None:
"""Ensure that a valid access token is available."""
if not self.access_token or time.time() >= self.token_expiry:
logger.info("Access token expired or not available. Refreshing...")
await self._retrieve_token()

async def _prepare_headers(self) -> None:
"""Update headers with the latest access token."""
await self._ensure_token()
self.client.headers.update({"Authorization": f"Bearer {self.access_token}"})

async def send_request(
self, method: str, url: str, **kwargs: Any
) -> dict[str, Any]:
"""Make a request with updated headers."""
await self._prepare_headers()
try:
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# If we get a 401 Unauthorized, refresh the token and retry the request
if e.response.status_code == 401:
logger.warning(
"Token expired or invalid (401). Refreshing token and retrying request..."
)
# await self._retrieve_token()
await self._prepare_headers()
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
else:
logger.error(f"HTTP error during {method} request to {url}: {e}")
raise
except httpx.HTTPError as e:
logger.error(f"HTTP error during {method} request to {url}: {e}")
raise

0 comments on commit 6133ad7

Please sign in to comment.