Skip to content

Commit

Permalink
[Integration] [AWS] | Added support to choose specific regions to que…
Browse files Browse the repository at this point in the history
…ry resources from (#1099)
  • Loading branch information
mk-armah authored Nov 4, 2024
1 parent 92bd75a commit bc4c19b
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 53 deletions.
5 changes: 5 additions & 0 deletions integrations/aws/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ configurations:
type: string
sensitive: true
description: AWS API Key for custom events, used to validate the event source for real-time event updates.
- name: maximumConcurrentAccounts
type: integer
require: false
description: The number of concurrent accounts to scan. By default, it is set to 50.
default: 50
deploymentMethodRequirements:
- type: default
configurations: ['awsAccessKeyId', 'awsSecretAccessKey']
Expand Down
15 changes: 15 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.53 (2024-10-31)


### Improvements

- Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping.
- Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently.

### Bug Fixes

- Skip missing resources in a region without interrupting sync across other regions.


## 0.2.52 (2024-10-30)


### Bug Fixes

- Updated `joined_timestamp` mapping in AWS Organizations to comply with RFC3339 timestamp format by replacing the space delimiter with 'T' in the `JoinedTimestamp` field.


## 0.2.51 (2024-10-23)


Expand Down
106 changes: 77 additions & 29 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,37 @@
from port_ocean.context.ocean import ocean
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from port_ocean.context.event import event
from utils.overrides import AWSPortAppConfig, AWSResourceConfig
from utils.misc import (
get_matching_kinds_and_blueprints_from_config,
CustomProperties,
ResourceKindsWithSpecialHandling,
is_access_denied_exception,
is_server_error,
semaphore,
get_semaphore,
)
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)
import functools

semaphore = get_semaphore()


async def _handle_global_resource_resync(
kind: str,
credentials: AwsCredentials,
aws_resource_config: AWSResourceConfig,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
denied_access_to_default_region = False
default_region = get_default_region_from_credentials(credentials)
default_session = await credentials.create_session(default_region)
try:
async for batch in resync_cloudcontrol(kind, default_session):
async for batch in resync_cloudcontrol(
kind, default_session, aws_resource_config
):
yield batch
except Exception as e:
if is_access_denied_exception(e):
Expand All @@ -63,7 +70,9 @@ async def _handle_global_resource_resync(
logger.info(f"Trying to resync {kind} in all regions until success")
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
async for batch in resync_cloudcontrol(
kind, session, aws_resource_config
):
yield batch
break
except Exception as e:
Expand All @@ -77,13 +86,19 @@ async def resync_resources_for_account(
"""Function to handle fetching resources for a single account."""
errors, regions = [], []

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
async for batch in _handle_global_resource_resync(
kind, credentials, aws_resource_config
):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
async for batch in resync_cloudcontrol(
kind, session, aws_resource_config
):
yield batch
except Exception as exc:
regions.append(session.region_name)
Expand Down Expand Up @@ -123,6 +138,7 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ELASTICACHE_CLUSTER)
async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

tasks = [
semaphore_async_iterator(
Expand All @@ -135,6 +151,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_cache_clusters",
"CacheClusters",
"Marker",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -149,6 +166,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -160,6 +178,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_load_balancers",
"LoadBalancers",
"Marker",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -175,6 +194,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -186,6 +206,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"list_certificates",
"CertificateSummaryList",
"NextToken",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -200,6 +221,8 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE)
async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -211,6 +234,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_images",
"Images",
"NextToken",
aws_resource_config,
{"Owners": ["self"]},
),
)
Expand All @@ -225,6 +249,8 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK)
async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -236,6 +262,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_stacks",
"Stacks",
"NextToken",
aws_resource_config,
),
)
async for session in get_sessions()
Expand Down Expand Up @@ -293,10 +320,35 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
with logger.contextualize(
account_id=account_id, resource_type=resource_type, identifier=identifier
):
matching_resource_configs = get_matching_kinds_and_blueprints_from_config(
resource_type
aws_port_app_config = typing.cast(AWSPortAppConfig, event.port_app_config)
if not isinstance(aws_port_app_config, AWSPortAppConfig):
logger.info("No resources configured in the port app config")
return fastapi.Response(status_code=status.HTTP_200_OK)

allowed_configs, disallowed_configs = (
get_matching_kinds_and_blueprints_from_config(
resource_type, region, aws_port_app_config.resources
)
)

if disallowed_configs:
logger.info(
f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed"
)
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprints in disallowed_configs.values()
for blueprint in blueprints
]
)

if not allowed_configs:
logger.info(
f"{resource_type} not found or disabled for region {region} in account {account_id}"
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.debug(
"Querying full resource on AWS before registering change in port"
)
Expand All @@ -310,33 +362,29 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)
if is_server_error(e):
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.error(
f"Failed to retrieve '{resource_type}' resource with ID '{identifier}' in region '{region}' for account '{account_id}'. "
f"Verify that the resource exists and that the necessary permissions are granted."
)

resource = None

for kind in matching_resource_configs:
blueprints = matching_resource_configs[kind]
for kind, blueprints in allowed_configs.items():
if not resource: # Resource probably deleted
for blueprint in blueprints:
logger.info(
"Resource not found in AWS, un-registering from port"
)
await ocean.unregister(
[
Entity(
blueprint=blueprint,
identifier=identifier,
)
]
)
logger.info("Resource not found in AWS, un-registering from port")
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprint in blueprints
]
)
else: # Resource found in AWS, update port
logger.info("Resource found in AWS, registering change in port")
resource.update(
Expand All @@ -347,14 +395,14 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
}
)
await ocean.register_raw(
kind,
[fix_unserializable_date_properties(resource)],
kind, [fix_unserializable_date_properties(resource)]
)

logger.info("Webhook processed successfully")
return fastapi.Response(
status_code=status.HTTP_200_OK, content=json.dumps({"ok": True})
)

except Exception as e:
logger.exception("Failed to process event from aws")
return fastapi.Response(
Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.52"
version = "0.2.53"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <shalev@getport.io>", "Erik Zaadi <erik@getport.io>"]

Expand Down
Loading

0 comments on commit bc4c19b

Please sign in to comment.