Skip to content

Commit

Permalink
Handling Throttling for Cloud Control Get Resource
Browse files Browse the repository at this point in the history
  • Loading branch information
mk-armah committed Jan 9, 2025
1 parent 5473284 commit 7cca192
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 32 deletions.
6 changes: 6 additions & 0 deletions integrations/aws/utils/misc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import enum

from aiolimiter import AsyncLimiter

from port_ocean.context.ocean import ocean
from utils.overrides import AWSResourceConfig
from typing import List
import asyncio

CLOUD_CONTROL_REQUESTS_PER_ACCOUNT = 50
CLOUD_CONTROL_REQUESTS_PER_SECOND_PER_ACCOUNT = 1
cloud_control_rate_limiter = AsyncLimiter(CLOUD_CONTROL_REQUESTS_PER_ACCOUNT, CLOUD_CONTROL_REQUESTS_PER_SECOND_PER_ACCOUNT)


def get_semaphore() -> asyncio.BoundedSemaphore:
max_concurrent_accounts: int = int(
Expand Down
91 changes: 59 additions & 32 deletions integrations/aws/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from typing import Any, Literal
import typing

from aiolimiter import AsyncLimiter
import aioboto3
from loguru import logger
from utils.misc import (
CustomProperties,
ResourceKindsWithSpecialHandling,
is_access_denied_exception,
is_resource_not_found_exception,
cloud_control_rate_limiter
)
from utils.aws import get_sessions

Expand Down Expand Up @@ -81,28 +83,44 @@ async def describe_single_resource(
stack = response.get("Stacks")[0]
return fix_unserializable_date_properties(stack)
case _:
async with session.client(
"cloudcontrol",
config=Boto3Config(
retries={"max_attempts": 10, "mode": "standard"},
),
) as cloudcontrol:
response = await cloudcontrol.get_resource(
TypeName=kind, Identifier=identifier
)
resource_description = response.get("ResourceDescription")
serialized = resource_description.copy()
serialized.update(
{
"Properties": json.loads(
resource_description.get("Properties")
),
}
)
return serialized
async with cloud_control_rate_limiter:
async with session.client(
"cloudcontrol",
config=Boto3Config(
retries={"max_attempts": 10, "mode": "standard"},
),
) as cloudcontrol:
response = await cloudcontrol.get_resource(
TypeName=kind, Identifier=identifier
)
resource_description = response.get("ResourceDescription")
serialized = resource_description.copy()
serialized.update(
{
"Properties": json.loads(
resource_description.get("Properties")
),
}
)
return serialized
return {}


async def describe_single_resource_cloudcontrol(
kind: str, identifier: str, cloudcontrol, semaphore, rate_limiter):
async with semaphore:
async with rate_limiter:
response = await cloudcontrol.get_resource(TypeName=kind, Identifier=identifier)
resource_description = response.get("ResourceDescription")
serialized = resource_description.copy()
serialized.update(
{
"Properties": json.loads(resource_description.get("Properties")),
}
)
return serialized


async def resync_custom_kind(
kind: str,
session: aioboto3.Session,
Expand Down Expand Up @@ -188,8 +206,10 @@ async def resync_cloudcontrol(
return
logger.info(f"Resyncing {kind} in account {account_id} in region {region}")
next_token = None
while True:
async with session.client("cloudcontrol") as cloudcontrol:
semaphore = asyncio.Semaphore(10) # Limit to 10 concurrent requests
rate_limiter = AsyncLimiter(50, 1) # Limit to 50 requests per second
async with session.client("cloudcontrol") as cloudcontrol:
while True:
try:
params = {
"TypeName": kind,
Expand All @@ -204,18 +224,25 @@ async def resync_cloudcontrol(
break
page_resources = []
if use_get_resource_api:
resources = await asyncio.gather(
*(
describe_single_resource(
kind,
instance.get("Identifier"),
account_id=account_id,
region=region,
)
for instance in resources
async with session.client(
"cloudcontrol",
config=Boto3Config(
retries={"max_attempts": 50, "mode": "adaptive"},
),
return_exceptions=True,
)
) as cloudcontrol:
resources = await asyncio.gather(
*(
describe_single_resource_cloudcontrol(
kind,
instance.get("Identifier"),
cloudcontrol=cloudcontrol,
semaphore=semaphore,
rate_limiter=rate_limiter,
)
for instance in resources
),
return_exceptions=True,
)
else:
resources = [
{
Expand Down

0 comments on commit 7cca192

Please sign in to comment.