Skip to content

Commit

Permalink
fixed the delete entities bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yaelibarg committed Jan 15, 2025
1 parent b0e5283 commit ce50ccf
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 85 deletions.
20 changes: 16 additions & 4 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,30 @@ async def upsert_entity(
if result_entity.is_using_search_identifier:
return None

# In order to save memory we'll keep only the identifier, blueprint and relations of the
# upserted entity result for later calculations
return self._reduce_entity(result_entity)

@staticmethod
def _reduce_entity(entity: Entity) -> Entity:
"""
Reduces an entity to only keep identifier, blueprint and processed relations.
This helps save memory by removing unnecessary data.
Args:
entity: The entity to reduce
Returns:
Entity: A new entity with only the essential data
"""
reduced_entity = Entity(
identifier=result_entity.identifier, blueprint=result_entity.blueprint
identifier=entity.identifier, blueprint=entity.blueprint
)

# Turning dict typed relations (raw search relations) is required
# for us to be able to successfully calculate the participation related entities
# and ignore the ones that don't as they weren't upserted
reduced_entity.relations = {
key: None if isinstance(relation, dict) else relation
for key, relation in result_entity.relations.items()
for key, relation in entity.relations.items()
}

return reduced_entity
Expand Down
48 changes: 21 additions & 27 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async def _map_entities_compared_with_port(
entities: list[Entity],
resource: ResourceConfig,
user_agent_type: UserAgentType,
) -> tuple[list[Entity], list[Entity]]:
) -> list[Entity]:
query = self._create_entities_identifier_query(entities)
entities_at_port_with_properties = await ocean.port_client.search_entities(
user_agent_type,
Expand All @@ -176,7 +176,7 @@ async def _map_entities_compared_with_port(

if len(entities_at_port_with_properties) > 0:
return map_entities(entities, entities_at_port_with_properties)
return entities, []
return entities

async def _register_resource_raw(
self,
Expand All @@ -190,11 +190,10 @@ async def _register_resource_raw(
[(resource, results)], parse_all, send_raw_data_examples_amount
)
modified_objects = []
unrelevant_entities = []

if ocean.app.is_saas():
try:
changed_entities, unrelevant_entities = await self._map_entities_compared_with_port(
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
Expand All @@ -203,25 +202,27 @@ async def _register_resource_raw(
if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
modified_objects = await self.entities_state_applier.upsert(
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("no changed entities, not upserting", total_entities=len(objects_diff[0].entity_selector_diff.passed))

modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]

except Exception as e:
logger.warning(f"Failed to map entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
else:
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
errors=objects_diff[0].errors,
misonfigured_entity_keys=objects_diff[0].misonfigured_entity_keys,
unrelevant_entities=unrelevant_entities
misonfigured_entity_keys=objects_diff[0].misonfigured_entity_keys
)

async def _unregister_resource_raw(
Expand All @@ -237,7 +238,7 @@ async def _unregister_resource_raw(
return [], []

objects_diff = await self._calculate_raw([(resource, results)])
entities_selector_diff, errors, _, _ = objects_diff[0]
entities_selector_diff, errors, _ = objects_diff[0]

await self.entities_state_applier.delete(
entities_selector_diff.passed, user_agent_type
Expand All @@ -247,7 +248,7 @@ async def _unregister_resource_raw(

async def _register_in_batches(
self, resource_config: ResourceConfig, user_agent_type: UserAgentType
) -> tuple[list[Entity], list[Exception], list[Entity]]:
) -> tuple[list[Entity], list[Exception]]:
results, errors = await self._get_resource_raw_results(resource_config)
async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = []
raw_results: RAW_RESULT = []
Expand All @@ -262,7 +263,6 @@ async def _register_in_batches(
)

passed_entities = []
unrelevant_entities = []
if raw_results:
calculation_result = await self._register_resource_raw(
resource_config,
Expand All @@ -272,7 +272,6 @@ async def _register_in_batches(
)
errors.extend(calculation_result.errors)
passed_entities = list(calculation_result.entity_selector_diff.passed)
unrelevant_entities = list(calculation_result.unrelevant_entities)

for generator in async_generators:
try:
Expand All @@ -290,14 +289,13 @@ async def _register_in_batches(
)
errors.extend(calculation_result.errors)
passed_entities.extend(calculation_result.entity_selector_diff.passed)
unrelevant_entities.extend(calculation_result.unrelevant_entities)
except* OceanAbortException as error:
errors.append(error)

logger.info(
f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected"
)
return passed_entities, errors, unrelevant_entities
return passed_entities, errors

async def register_raw(
self,
Expand Down Expand Up @@ -326,7 +324,7 @@ async def register_raw(
if not resource_mappings:
return []

diffs, errors, _, misconfigured_entity_keys = zip(
diffs, errors, misconfigured_entity_keys = zip(
*await asyncio.gather(
*(
self._register_resource_raw(
Expand Down Expand Up @@ -564,8 +562,7 @@ async def sync_raw_all(
return

logger.info("Starting resync diff calculation")
flat_created_entities, errors, unrelevant_entities = zip_and_sum(creation_results) or [
[],
flat_created_entities, errors = zip_and_sum(creation_results) or [
[],
[],
]
Expand All @@ -584,15 +581,12 @@ async def sync_raw_all(
logger.info(
f"Running resync diff calculation, number of entities created during sync: {len(flat_created_entities)}"
)
if ocean.app.is_saas() and unrelevant_entities:
await self.entities_state_applier._safe_delete(unrelevant_entities, flat_created_entities, user_agent_type)
else:
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": flat_created_entities},
user_agent_type,
)
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": flat_created_entities},
user_agent_type,
)

logger.info("Resync finished successfully")
1 change: 0 additions & 1 deletion port_ocean/core/ocean_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class EntitySelectorDiff(NamedTuple):
class CalculationResult(NamedTuple):
entity_selector_diff: EntitySelectorDiff
errors: list[Exception]
unrelevant_entities: list[Entity] = field(default_factory=list)
misonfigured_entity_keys: dict[str, str] = field(default_factory=dict)


Expand Down
17 changes: 4 additions & 13 deletions port_ocean/core/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,19 @@ def are_entities_different(first_entity: Entity, second_entity: Entity) -> bool:

def map_entities(
third_party_entities: list[Entity], port_entities: list[Entity]
) -> tuple[list[Entity], list[Entity]]:
) -> list[Entity]:
"""
Maps the entities into two lists:
- Filtered list of third party entities, excluding matches found in port_entities that needs to be upserted
- List of entities that are not relevant that should be deleted from Port
Maps the entities into filtered list of third party entities, excluding matches found in port_entities that needs to be upserted
Args:
third_party_entities: List of entities from third party source
port_entities: List of existing Port entities
Returns:
tuple[list[Entity], list[Entity]]: Filtered list of third party entities, excluding matches found in port_entities and list of entities that are not relevant
list[Entity]: Filtered list of third party entities, excluding matches found in port_entities
"""
port_entities_dict = {}
third_party_entities_dict = {}
changed_entities = []
unrelevant_entities = []

for entity in port_entities:
key = (entity.identifier, entity.blueprint)
Expand All @@ -171,10 +168,4 @@ def map_entities(
elif are_entities_different(entity, port_entities_dict[key]):
changed_entities.append(entity)

for entity in port_entities:
key = (entity.identifier, entity.blueprint)
entity_at_third_party = third_party_entities_dict.get(key, None)
if entity_at_third_party is None:
unrelevant_entities.append(entity)

return changed_entities, unrelevant_entities
return changed_entities
42 changes: 23 additions & 19 deletions port_ocean/tests/core/handlers/mixins/test_sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,10 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any:

@pytest.mark.asyncio
async def test_register_raw(
mock_sync_raw_mixin_with_jq_processor: SyncRawMixin, mock_ocean: Ocean
mock_sync_raw_mixin_with_jq_processor: SyncRawMixin,
mock_ocean: Ocean,
mock_context: PortOceanContext,
monkeypatch: pytest.MonkeyPatch,
) -> None:
kind = "service"
user_agent_type = UserAgentType.exporter
Expand All @@ -428,6 +431,9 @@ async def test_register_raw(
},
]

# Set is_saas to False
monkeypatch.setattr(mock_context.app, "is_saas", lambda: False)

async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event:
# Use patch to mock the method instead of direct assignment
with patch.object(
Expand Down Expand Up @@ -490,7 +496,10 @@ def upsert_side_effect(

@pytest.mark.asyncio
async def test_unregister_raw(
mock_sync_raw_mixin_with_jq_processor: SyncRawMixin, mock_ocean: Ocean
mock_sync_raw_mixin_with_jq_processor: SyncRawMixin,
mock_ocean: Ocean,
mock_context: PortOceanContext,
monkeypatch: pytest.MonkeyPatch,
) -> None:
kind = "service"
user_agent_type = UserAgentType.exporter
Expand All @@ -506,6 +515,9 @@ async def test_unregister_raw(
},
]

# Set is_saas to False
monkeypatch.setattr(mock_context.app, "is_saas", lambda: False)

async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event:
# Use patch to mock the method instead of direct assignment
with patch.object(
Expand Down Expand Up @@ -584,15 +596,12 @@ async def test_map_entities_compared_with_port_no_port_entities(
mock_ocean.port_client.search_entities.return_value = [] # type: ignore

# Execute test
changed_entities, irelevant_entities = (
await mock_sync_raw_mixin._map_entities_compared_with_port(
entities, resource, UserAgentType.exporter
)
changed_entities = await mock_sync_raw_mixin._map_entities_compared_with_port(
entities, resource, UserAgentType.exporter
)

# Verify results
assert len(changed_entities) == 2
assert len(irelevant_entities) == 0
assert [e.identifier for e in changed_entities] == ["entity_1", "entity_2"]


Expand Down Expand Up @@ -674,7 +683,6 @@ class CalculationResult:
errors: List[Any]
misconfigurations: List[Any]
misonfigured_entity_keys: Optional[List[Any]] = None
unrelevant_entities: Optional[List[Entity]] = None


@pytest.mark.asyncio
Expand All @@ -689,8 +697,8 @@ async def test_register_resource_raw_saas_no_changes(

# Mock dependencies
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[], unrelevant_entities=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([], [entity])) # type: ignore
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
Expand All @@ -704,8 +712,7 @@ async def test_register_resource_raw_saas_no_changes(
)

# Assertions
assert len(result.entity_selector_diff.passed) == 0
assert len(result.unrelevant_entities or []) == 1
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
Expand All @@ -723,8 +730,8 @@ async def test_register_resource_raw_saas_with_changes(

# Mock dependencies
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[], unrelevant_entities=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity], [])) # type: ignore
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
Expand All @@ -739,7 +746,6 @@ async def test_register_resource_raw_saas_with_changes(

# Assertions
assert len(result.entity_selector_diff.passed) == 1
assert len(result.unrelevant_entities or []) == 0
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
Expand All @@ -762,7 +768,6 @@ async def test_register_resource_raw_non_saas(
errors=[],
misconfigurations=[],
misonfigured_entity_keys=[],
unrelevant_entities=[],
)
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[calculation_result]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock() # type: ignore
Expand All @@ -780,7 +785,6 @@ async def test_register_resource_raw_non_saas(

# Assertions
assert len(result.entity_selector_diff.passed) == 1
assert result.unrelevant_entities == calculation_result.unrelevant_entities
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_not_called()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once()
Expand All @@ -799,8 +803,8 @@ async def test_register_resource_raw_saas_with_errors(
# Mock dependencies
failed_entity = Entity(identifier="1", blueprint="service")
error = Exception("Test error")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[], unrelevant_entities=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([], [])) # type: ignore
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
Expand Down
Loading

0 comments on commit ce50ccf

Please sign in to comment.