From 78bfa538c25d58c0d11f32e8fdc3b1b1b855c40d Mon Sep 17 00:00:00 2001
From: Adebayo Oluwadunsin Iyanuoluwa
<88881603+oiadebayo@users.noreply.github.com>
Date: Sat, 28 Dec 2024 16:29:35 +0100
Subject: [PATCH] [Integration][GCP] Fixed issues preventing the proper
processing of real-time events (#1247)
# Description
What - Updated the `should_use_snake_case` function to support real time
event processing by allowing it to use matching_resource_configs when
processing real-time events. Resync processes continue using the
existing implementation with get_current_resource_config()
Why - The previous implementation relied solely on
get_current_resource_config(), which is unsuitable for webhook event
processing because the `event.resource_config` used for resync events
returns `None` during realtime events
How - Modified the `should_use_snake_case` function to accept an
optional `matching_resource_configs` parameter and also updated the
`feed_events_callback` function to `pass matching_resource_configs` to
`should_use_snake_case` during webhook processing
## Type of change
Please leave one option from the following and delete the rest:
- [x] Bug fix (non-breaking change which fixes an issue)
All tests should be run against the port production
environment(using a testing org).
### Core testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
### Integration testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)
### Preflight checklist
- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account
## Screenshots
Include screenshots from your environment showing how the resources of
the integration will look.
## API Documentation
Provide links to the API documentation used for this integration.
---------
Co-authored-by: PagesCoffy
Co-authored-by: Michael Kofi Armah
Co-authored-by: MatanGevaPort
---
integrations/gcp/CHANGELOG.md | 7 ++
integrations/gcp/gcp_core/overrides.py | 6 +-
.../gcp/gcp_core/search/resource_searches.py | 71 +++++++++++++------
integrations/gcp/gcp_core/utils.py | 16 +++--
integrations/gcp/main.py | 53 ++++++++++----
integrations/gcp/pyproject.toml | 2 +-
.../gcp_core/search/test_resource_searches.py | 14 +---
7 files changed, 118 insertions(+), 51 deletions(-)
diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md
index 7702ce3d85..15c55a5934 100644
--- a/integrations/gcp/CHANGELOG.md
+++ b/integrations/gcp/CHANGELOG.md
@@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## 0.1.83 (2024-12-29)
+
+
+### Bug Fixes
+
+- Fixed the issue with `get_current_resource_config()` and the `preserveApiResponseCaseStyle` selector in real-time event which leads to the failure of the event processing
+
## 0.1.82 (2024-12-26)
diff --git a/integrations/gcp/gcp_core/overrides.py b/integrations/gcp/gcp_core/overrides.py
index 3fcef3a8b4..a0d99fa133 100644
--- a/integrations/gcp/gcp_core/overrides.py
+++ b/integrations/gcp/gcp_core/overrides.py
@@ -5,7 +5,7 @@
ResourceConfig,
Selector,
)
-from pydantic import Field
+from pydantic import Field, BaseModel
class GCPCloudResourceSelector(Selector):
@@ -39,3 +39,7 @@ class GCPPortAppConfig(PortAppConfig):
resources: list[GCPCloudResourceConfig | GCPResourceConfig | ResourceConfig] = (
Field(default_factory=list)
)
+
+
+class ProtoConfig(BaseModel):
+ preserving_proto_field_name: typing.Optional[bool] = None
diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py
index 00877e4202..6fab7b5360 100644
--- a/integrations/gcp/gcp_core/search/resource_searches.py
+++ b/integrations/gcp/gcp_core/search/resource_searches.py
@@ -1,4 +1,4 @@
-from typing import Any
+from typing import Any, Optional
import typing
from google.api_core.exceptions import NotFound, PermissionDenied
@@ -15,7 +15,6 @@
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM
from port_ocean.utils.cache import cache_iterator_result
-
from gcp_core.errors import ResourceNotFoundError
from gcp_core.utils import (
EXTRA_PROJECT_FIELD,
@@ -28,6 +27,7 @@
from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT
from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS
from asyncio import BoundedSemaphore
+from gcp_core.overrides import ProtoConfig
DEFAULT_SEMAPHORE = BoundedSemaphore(MAXIMUM_CONCURRENT_REQUESTS)
@@ -213,34 +213,46 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE:
yield organizations
-async def get_single_project(project_name: str) -> RAW_ITEM:
+async def get_single_project(
+ project_name: str, config: Optional[ProtoConfig] = None
+) -> RAW_ITEM:
async with ProjectsAsyncClient() as projects_client:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
- )
+ ),
+ config,
)
-async def get_single_folder(folder_name: str) -> RAW_ITEM:
+async def get_single_folder(
+ folder_name: str, config: Optional[ProtoConfig] = None
+) -> RAW_ITEM:
async with FoldersAsyncClient() as folders_client:
return parse_protobuf_message(
await folders_client.get_folder(
name=folder_name, timeout=DEFAULT_REQUEST_TIMEOUT
- )
+ ),
+ config,
)
-async def get_single_organization(organization_name: str) -> RAW_ITEM:
+async def get_single_organization(
+ organization_name: str, config: Optional[ProtoConfig] = None
+) -> RAW_ITEM:
async with OrganizationsAsyncClient() as organizations_client:
return parse_protobuf_message(
await organizations_client.get_organization(
name=organization_name, timeout=DEFAULT_REQUEST_TIMEOUT
- )
+ ),
+ config,
)
-async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM:
+async def get_single_topic(
+ topic_id: str,
+ config: Optional[ProtoConfig] = None,
+) -> RAW_ITEM:
"""
The Topics are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Here the PublisherAsyncClient is used, ignoring state in assets inventory
@@ -249,11 +261,15 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM:
return parse_protobuf_message(
await async_publisher_client.get_topic(
topic=topic_id, timeout=DEFAULT_REQUEST_TIMEOUT
- )
+ ),
+ config,
)
-async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM:
+async def get_single_subscription(
+ subscription_id: str,
+ config: Optional[ProtoConfig] = None,
+) -> RAW_ITEM:
"""
Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Here the SubscriberAsyncClient is used, ignoring state in assets inventory
@@ -262,7 +278,8 @@ async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_
return parse_protobuf_message(
await async_subscriber_client.get_subscription(
subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT
- )
+ ),
+ config,
)
@@ -287,35 +304,45 @@ async def search_single_resource(
async def feed_event_to_resource(
- asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any]
+ asset_type: str,
+ asset_name: str,
+ project_id: str,
+ asset_data: dict[str, Any],
+ config: Optional[ProtoConfig] = None,
) -> RAW_ITEM:
resource = None
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
- resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
+ resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
- resource = await get_single_topic(project_id, topic_name)
- resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
+ resource = await get_single_topic(topic_name, config)
+ resource[EXTRA_PROJECT_FIELD] = await get_single_project(
+ project_id, config
+ )
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
- resource = await get_single_subscription(project_id, topic_name)
- resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
+ resource = await get_single_subscription(topic_name, config)
+ resource[EXTRA_PROJECT_FIELD] = await get_single_project(
+ project_id, config
+ )
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
- resource = await get_single_folder(folder_id)
+ resource = await get_single_folder(folder_id, config)
case AssetTypesWithSpecialHandling.ORGANIZATION:
organization_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
- resource = await get_single_organization(organization_id)
+ resource = await get_single_organization(organization_id, config)
case AssetTypesWithSpecialHandling.PROJECT:
- resource = await get_single_project(project_id)
+ resource = await get_single_project(project_id, config)
case _:
resource = asset_data["asset"]["resource"]["data"]
- resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
+ resource[EXTRA_PROJECT_FIELD] = await get_single_project(
+ project_id, config
+ )
return resource
diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py
index e2c0e5dbac..422699c6fb 100644
--- a/integrations/gcp/gcp_core/utils.py
+++ b/integrations/gcp/gcp_core/utils.py
@@ -3,15 +3,14 @@
import os
import typing
from collections.abc import MutableSequence
-from typing import Any, TypedDict, Tuple
-
+from typing import Any, TypedDict, Tuple, Optional
from gcp_core.errors import ResourceNotFoundError
from loguru import logger
import proto # type: ignore
from port_ocean.context.event import event
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
-from gcp_core.overrides import GCPCloudResourceConfig
+from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
from port_ocean.context.ocean import ocean
import json
from pathlib import Path
@@ -82,15 +81,24 @@ def should_use_snake_case() -> bool:
Returns:
bool: True to use snake_case, False to preserve API's original case style
"""
+
selector = get_current_resource_config().selector
preserve_api_case = getattr(selector, "preserve_api_response_case_style", False)
return not preserve_api_case
-def parse_protobuf_message(message: proto.Message) -> dict[str, Any]:
+def parse_protobuf_message(
+ message: proto.Message,
+ config: Optional[ProtoConfig] = None,
+) -> dict[str, Any]:
"""
Parse protobuf message to dict, controlling field name case style.
"""
+ if config and config.preserving_proto_field_name is not None:
+ use_snake_case = not config.preserving_proto_field_name
+ return proto.Message.to_dict(
+ message, preserving_proto_field_name=use_snake_case
+ )
use_snake_case = should_use_snake_case()
return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case)
diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py
index 93da63f57e..482c6bf928 100644
--- a/integrations/gcp/main.py
+++ b/integrations/gcp/main.py
@@ -5,6 +5,7 @@
from fastapi import Request, Response
from loguru import logger
+
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
@@ -13,7 +14,13 @@
GotFeedCreatedSuccessfullyMessageError,
)
from gcp_core.feed_event import get_project_name_from_ancestors, parse_asset_data
-from gcp_core.overrides import GCPCloudResourceSelector
+from gcp_core.overrides import (
+ GCPCloudResourceSelector,
+ GCPPortAppConfig,
+ GCPResourceSelector,
+ ProtoConfig,
+)
+from port_ocean.context.event import event
from gcp_core.search.iterators import iterate_per_available_project
from gcp_core.search.resource_searches import (
feed_event_to_resource,
@@ -180,19 +187,41 @@ async def feed_events_callback(request: Request) -> Response:
logger.info(
f"Got Real-Time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}"
)
- asset_resource_data = await feed_event_to_resource(
- asset_type, asset_name, asset_project, asset_data
- )
- if asset_data.get("deleted") is True:
- logger.info(
- f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
+ resource_configs = typing.cast(
+ GCPPortAppConfig, event.port_app_config
+ ).resources
+ matching_resource_configs = [
+ resource_config
+ for resource_config in resource_configs
+ if (
+ resource_config.kind == asset_type
+ and isinstance(resource_config.selector, GCPResourceSelector)
+ )
+ ]
+ for matching_resource_config in matching_resource_configs:
+ selector = matching_resource_config.selector
+ config = ProtoConfig(
+ preserving_proto_field_name=bool(
+ getattr(selector, "preserve_api_response_case_style", False)
+ )
)
- await ocean.unregister_raw(asset_type, [asset_resource_data])
- else:
- logger.info(
- f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
+ asset_resource_data = await feed_event_to_resource(
+ asset_type,
+ asset_name,
+ asset_project,
+ asset_data,
+ config,
)
- await ocean.register_raw(asset_type, [asset_resource_data])
+ if asset_data.get("deleted") is True:
+ logger.info(
+ f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
+ )
+ await ocean.unregister_raw(asset_type, [asset_resource_data])
+ else:
+ logger.info(
+ f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
+ )
+ await ocean.register_raw(asset_type, [asset_resource_data])
except AssetHasNoProjectAncestorError:
logger.exception(
f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet."
diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml
index ffa9e4610a..13aef788c8 100644
--- a/integrations/gcp/pyproject.toml
+++ b/integrations/gcp/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
-version = "0.1.82"
+version = "0.1.83"
description = "A GCP ocean integration"
authors = ["Matan Geva "]
diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py
index f1b8beecf8..a80c9ce70b 100644
--- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py
+++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py
@@ -94,13 +94,10 @@ async def test_get_single_subscription(
"state": 0,
"topic": "",
}
- mock_project = "project_name"
# Act within event context
async with event_context("test_event"):
- actual_subscription = await get_single_subscription(
- mock_project, "subscription_name"
- )
+ actual_subscription = await get_single_subscription("subscription_name")
# Assert
assert actual_subscription == expected_subscription
@@ -231,13 +228,10 @@ async def test_preserve_case_style_combined(
"state": 0,
"topic": "projects/project_name/topics/topic_name",
}
- mock_project = "project_name"
# Act within event context for preserve_case_style = True
async with event_context("test_event"):
- actual_subscription_true = await get_single_subscription(
- mock_project, "subscription_name"
- )
+ actual_subscription_true = await get_single_subscription("subscription_name")
# Assert for preserve_case_style = True
assert actual_subscription_true == expected_subscription_true
@@ -264,9 +258,7 @@ async def test_preserve_case_style_combined(
# Act within event context for preserve_case_style = False
async with event_context("test_event"):
- actual_subscription_false = await get_single_subscription(
- mock_project, "subscription_name"
- )
+ actual_subscription_false = await get_single_subscription("subscription_name")
# Assert for preserve_case_style = False
assert actual_subscription_false == expected_subscription_false