diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 7702ce3d85..15c55a5934 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.1.83 (2024-12-29) + + +### Bug Fixes + +- Fixed the issue with `get_current_resource_config()` and the `preserveApiResponseCaseStyle` selector in real-time event which leads to the failure of the event processing + ## 0.1.82 (2024-12-26) diff --git a/integrations/gcp/gcp_core/overrides.py b/integrations/gcp/gcp_core/overrides.py index 3fcef3a8b4..a0d99fa133 100644 --- a/integrations/gcp/gcp_core/overrides.py +++ b/integrations/gcp/gcp_core/overrides.py @@ -5,7 +5,7 @@ ResourceConfig, Selector, ) -from pydantic import Field +from pydantic import Field, BaseModel class GCPCloudResourceSelector(Selector): @@ -39,3 +39,7 @@ class GCPPortAppConfig(PortAppConfig): resources: list[GCPCloudResourceConfig | GCPResourceConfig | ResourceConfig] = ( Field(default_factory=list) ) + + +class ProtoConfig(BaseModel): + preserving_proto_field_name: typing.Optional[bool] = None diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 00877e4202..6fab7b5360 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Optional import typing from google.api_core.exceptions import NotFound, PermissionDenied @@ -15,7 +15,6 @@ from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM from port_ocean.utils.cache import cache_iterator_result - from gcp_core.errors import ResourceNotFoundError from gcp_core.utils import ( EXTRA_PROJECT_FIELD, @@ -28,6 +27,7 @@ from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS from asyncio import BoundedSemaphore +from gcp_core.overrides import ProtoConfig DEFAULT_SEMAPHORE = BoundedSemaphore(MAXIMUM_CONCURRENT_REQUESTS) @@ -213,34 +213,46 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE: yield organizations -async def get_single_project(project_name: str) -> RAW_ITEM: +async def get_single_project( + project_name: str, config: Optional[ProtoConfig] = None +) -> RAW_ITEM: async with ProjectsAsyncClient() as projects_client: return parse_protobuf_message( await projects_client.get_project( name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT - ) + ), + config, ) -async def get_single_folder(folder_name: str) -> RAW_ITEM: +async def get_single_folder( + folder_name: str, config: Optional[ProtoConfig] = None +) -> RAW_ITEM: async with FoldersAsyncClient() as folders_client: return parse_protobuf_message( await folders_client.get_folder( name=folder_name, timeout=DEFAULT_REQUEST_TIMEOUT - ) + ), + config, ) -async def get_single_organization(organization_name: str) -> RAW_ITEM: +async def get_single_organization( + organization_name: str, config: Optional[ProtoConfig] = None +) -> RAW_ITEM: async with OrganizationsAsyncClient() as organizations_client: return parse_protobuf_message( await organizations_client.get_organization( name=organization_name, timeout=DEFAULT_REQUEST_TIMEOUT - ) + ), + config, ) -async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM: +async def get_single_topic( + topic_id: str, + config: Optional[ProtoConfig] = None, +) -> RAW_ITEM: """ The Topics are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing. Here the PublisherAsyncClient is used, ignoring state in assets inventory @@ -249,11 +261,15 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM: return parse_protobuf_message( await async_publisher_client.get_topic( topic=topic_id, timeout=DEFAULT_REQUEST_TIMEOUT - ) + ), + config, ) -async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM: +async def get_single_subscription( + subscription_id: str, + config: Optional[ProtoConfig] = None, +) -> RAW_ITEM: """ Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing. Here the SubscriberAsyncClient is used, ignoring state in assets inventory @@ -262,7 +278,8 @@ async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ return parse_protobuf_message( await async_subscriber_client.get_subscription( subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT - ) + ), + config, ) @@ -287,35 +304,45 @@ async def search_single_resource( async def feed_event_to_resource( - asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any] + asset_type: str, + asset_name: str, + project_id: str, + asset_data: dict[str, Any], + config: Optional[ProtoConfig] = None, ) -> RAW_ITEM: resource = None if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config) else: match asset_type: case AssetTypesWithSpecialHandling.TOPIC: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") - resource = await get_single_topic(project_id, topic_name) - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + resource = await get_single_topic(topic_name, config) + resource[EXTRA_PROJECT_FIELD] = await get_single_project( + project_id, config + ) case AssetTypesWithSpecialHandling.SUBSCRIPTION: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") - resource = await get_single_subscription(project_id, topic_name) - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + resource = await get_single_subscription(topic_name, config) + resource[EXTRA_PROJECT_FIELD] = await get_single_project( + project_id, config + ) case AssetTypesWithSpecialHandling.FOLDER: folder_id = asset_name.replace( "//cloudresourcemanager.googleapis.com/", "" ) - resource = await get_single_folder(folder_id) + resource = await get_single_folder(folder_id, config) case AssetTypesWithSpecialHandling.ORGANIZATION: organization_id = asset_name.replace( "//cloudresourcemanager.googleapis.com/", "" ) - resource = await get_single_organization(organization_id) + resource = await get_single_organization(organization_id, config) case AssetTypesWithSpecialHandling.PROJECT: - resource = await get_single_project(project_id) + resource = await get_single_project(project_id, config) case _: resource = asset_data["asset"]["resource"]["data"] - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + resource[EXTRA_PROJECT_FIELD] = await get_single_project( + project_id, config + ) return resource diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index e2c0e5dbac..422699c6fb 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -3,15 +3,14 @@ import os import typing from collections.abc import MutableSequence -from typing import Any, TypedDict, Tuple - +from typing import Any, TypedDict, Tuple, Optional from gcp_core.errors import ResourceNotFoundError from loguru import logger import proto # type: ignore from port_ocean.context.event import event from port_ocean.core.handlers.port_app_config.models import ResourceConfig -from gcp_core.overrides import GCPCloudResourceConfig +from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig from port_ocean.context.ocean import ocean import json from pathlib import Path @@ -82,15 +81,24 @@ def should_use_snake_case() -> bool: Returns: bool: True to use snake_case, False to preserve API's original case style """ + selector = get_current_resource_config().selector preserve_api_case = getattr(selector, "preserve_api_response_case_style", False) return not preserve_api_case -def parse_protobuf_message(message: proto.Message) -> dict[str, Any]: +def parse_protobuf_message( + message: proto.Message, + config: Optional[ProtoConfig] = None, +) -> dict[str, Any]: """ Parse protobuf message to dict, controlling field name case style. """ + if config and config.preserving_proto_field_name is not None: + use_snake_case = not config.preserving_proto_field_name + return proto.Message.to_dict( + message, preserving_proto_field_name=use_snake_case + ) use_snake_case = should_use_snake_case() return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 93da63f57e..482c6bf928 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -5,6 +5,7 @@ from fastapi import Request, Response from loguru import logger + from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE @@ -13,7 +14,13 @@ GotFeedCreatedSuccessfullyMessageError, ) from gcp_core.feed_event import get_project_name_from_ancestors, parse_asset_data -from gcp_core.overrides import GCPCloudResourceSelector +from gcp_core.overrides import ( + GCPCloudResourceSelector, + GCPPortAppConfig, + GCPResourceSelector, + ProtoConfig, +) +from port_ocean.context.event import event from gcp_core.search.iterators import iterate_per_available_project from gcp_core.search.resource_searches import ( feed_event_to_resource, @@ -180,19 +187,41 @@ async def feed_events_callback(request: Request) -> Response: logger.info( f"Got Real-Time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}" ) - asset_resource_data = await feed_event_to_resource( - asset_type, asset_name, asset_project, asset_data - ) - if asset_data.get("deleted") is True: - logger.info( - f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port" + resource_configs = typing.cast( + GCPPortAppConfig, event.port_app_config + ).resources + matching_resource_configs = [ + resource_config + for resource_config in resource_configs + if ( + resource_config.kind == asset_type + and isinstance(resource_config.selector, GCPResourceSelector) + ) + ] + for matching_resource_config in matching_resource_configs: + selector = matching_resource_config.selector + config = ProtoConfig( + preserving_proto_field_name=bool( + getattr(selector, "preserve_api_response_case_style", False) + ) ) - await ocean.unregister_raw(asset_type, [asset_resource_data]) - else: - logger.info( - f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port" + asset_resource_data = await feed_event_to_resource( + asset_type, + asset_name, + asset_project, + asset_data, + config, ) - await ocean.register_raw(asset_type, [asset_resource_data]) + if asset_data.get("deleted") is True: + logger.info( + f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port" + ) + await ocean.unregister_raw(asset_type, [asset_resource_data]) + else: + logger.info( + f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port" + ) + await ocean.register_raw(asset_type, [asset_resource_data]) except AssetHasNoProjectAncestorError: logger.exception( f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet." diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index ffa9e4610a..13aef788c8 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.82" +version = "0.1.83" description = "A GCP ocean integration" authors = ["Matan Geva "] diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index f1b8beecf8..a80c9ce70b 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -94,13 +94,10 @@ async def test_get_single_subscription( "state": 0, "topic": "", } - mock_project = "project_name" # Act within event context async with event_context("test_event"): - actual_subscription = await get_single_subscription( - mock_project, "subscription_name" - ) + actual_subscription = await get_single_subscription("subscription_name") # Assert assert actual_subscription == expected_subscription @@ -231,13 +228,10 @@ async def test_preserve_case_style_combined( "state": 0, "topic": "projects/project_name/topics/topic_name", } - mock_project = "project_name" # Act within event context for preserve_case_style = True async with event_context("test_event"): - actual_subscription_true = await get_single_subscription( - mock_project, "subscription_name" - ) + actual_subscription_true = await get_single_subscription("subscription_name") # Assert for preserve_case_style = True assert actual_subscription_true == expected_subscription_true @@ -264,9 +258,7 @@ async def test_preserve_case_style_combined( # Act within event context for preserve_case_style = False async with event_context("test_event"): - actual_subscription_false = await get_single_subscription( - mock_project, "subscription_name" - ) + actual_subscription_false = await get_single_subscription("subscription_name") # Assert for preserve_case_style = False assert actual_subscription_false == expected_subscription_false