From feb151560463881816da3e98f7bf830d70fe0b4a Mon Sep 17 00:00:00 2001 From: Szymon Basan <116343782+sbasan@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:15:45 +0100 Subject: [PATCH] dev: security mode and nextHopIpv6 (#5) * allow empty string as security policy mode * fix: policy mode default to security * fix: data policy nextHopIpv6 action set entry * bump sdk dev version --- catalystwan/models/common.py | 13 ++++++++++++- .../models/policy/definition/traffic_data.py | 10 +++++++--- .../policy/definition/zone_based_firewall.py | 12 ++++-------- catalystwan/models/policy/policy_definition.py | 6 ++++++ catalystwan/models/policy/security.py | 16 ++++++++-------- pyproject.toml | 2 +- 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/catalystwan/models/common.py b/catalystwan/models/common.py index 9d02afc3..97f9e4c7 100644 --- a/catalystwan/models/common.py +++ b/catalystwan/models/common.py @@ -676,7 +676,18 @@ def str_as_interface_list(val: Union[str, Sequence[InterfaceStr]]) -> Sequence[I "umbrella", ] -PolicyModeType = Literal["security", "unified"] +_PolicyModeType = Literal["security", "unified"] + +def parse_policy_mode(val: Optional[str]) -> _PolicyModeType: + if isinstance(val, str) and val == "unified": + return "unified" + return "security" + +PolicyModeType = Annotated[ + _PolicyModeType, + BeforeValidator(parse_policy_mode) +] + CoreRegion = Literal[ "core", diff --git a/catalystwan/models/policy/definition/traffic_data.py b/catalystwan/models/policy/definition/traffic_data.py index 500a8c7d..d1a7b91c 100644 --- a/catalystwan/models/policy/definition/traffic_data.py +++ b/catalystwan/models/policy/definition/traffic_data.py @@ -1,6 +1,6 @@ # Copyright 2023 Cisco Systems, Inc. and its affiliates -from ipaddress import IPv4Address, IPv4Network, IPv6Network +from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network from typing import List, Literal, Optional, Set, Tuple, Union, overload from uuid import UUID @@ -49,6 +49,7 @@ Match, NATAction, NextHopActionEntry, + NextHopIpv6ActionEntry, NextHopLooseEntry, PacketLengthEntry, PLPEntry, @@ -274,8 +275,11 @@ def associate_nat_action( self._insert_action(nat_action) @accept_action - def associate_next_hop_action(self, next_hop: IPv4Address, loose: bool = False) -> None: - self._insert_action_in_set(NextHopActionEntry(value=next_hop)) + def associate_next_hop_action(self, next_hop: Union[IPv4Address, IPv6Address], loose: bool = False) -> None: + if isinstance(next_hop, IPv6Address): + self._insert_action_in_set(NextHopIpv6ActionEntry(value=next_hop)) + else: + self._insert_action_in_set(NextHopActionEntry(value=next_hop)) self._insert_action_in_set(NextHopLooseEntry(value=loose)) @accept_action diff --git a/catalystwan/models/policy/definition/zone_based_firewall.py b/catalystwan/models/policy/definition/zone_based_firewall.py index a2987d27..ef6d7267 100644 --- a/catalystwan/models/policy/definition/zone_based_firewall.py +++ b/catalystwan/models/policy/definition/zone_based_firewall.py @@ -7,6 +7,7 @@ from pydantic import BaseModel, ConfigDict, Field from typing_extensions import Annotated +from catalystwan.models.common import PolicyModeType from catalystwan.models.misc.application_protocols import ApplicationProtocol from catalystwan.models.policy.policy_definition import ( AdvancedInspectionProfileAction, @@ -220,12 +221,6 @@ class ZoneBasedFWPolicyEntry(BaseModel): model_config = ConfigDict(populate_by_name=True) -class ZoneBasedFWPolicyHeader(PolicyDefinitionBase): - type: Literal["zoneBasedFW"] = "zoneBasedFW" - mode: str = Field(default="security") - model_config = ConfigDict(populate_by_name=True) - - class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase): default_action: ZoneBasedFirewallDefaultAction = Field( default=ZoneBasedFirewallDefaultAction(type="drop"), @@ -236,9 +231,10 @@ class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase): entries: List[ZoneBasedFWPolicyEntry] = [] -class ZoneBasedFWPolicy(ZoneBasedFWPolicyHeader): +class ZoneBasedFWPolicy(PolicyDefinitionBase): + model_config = ConfigDict(populate_by_name=True) type: Literal["zoneBasedFW"] = "zoneBasedFW" - mode: Literal["security", "unified"] = "security" + mode: PolicyModeType = "security" definition: ZoneBasedFWPolicyDefinition = ZoneBasedFWPolicyDefinition() def add_ipv4_rule( diff --git a/catalystwan/models/policy/policy_definition.py b/catalystwan/models/policy/policy_definition.py index 02802294..bcd9d701 100644 --- a/catalystwan/models/policy/policy_definition.py +++ b/catalystwan/models/policy/policy_definition.py @@ -492,6 +492,11 @@ class NextHopActionEntry(BaseModel): value: Union[IPv4Address, IPv6Address] +class NextHopIpv6ActionEntry(BaseModel): + field: Literal["nextHopIpv6"] = "nextHopIpv6" + value: IPv6Address + + class NextHopMatchEntry(BaseModel): field: Literal["nextHop"] = "nextHop" ref: UUID @@ -1205,6 +1210,7 @@ class CloudSaaSAction(BaseModel): MetricEntry, MetricTypeEntry, NextHopActionEntry, + NextHopIpv6ActionEntry, NextHopLooseEntry, OMPTagEntry, OriginatorEntry, diff --git a/catalystwan/models/policy/security.py b/catalystwan/models/policy/security.py index 6d6cf9f1..c926c39b 100644 --- a/catalystwan/models/policy/security.py +++ b/catalystwan/models/policy/security.py @@ -124,7 +124,7 @@ class UnifiedSecurityPolicyDefinition(PolicyDefinition): class SecurityPolicy(PolicyCreationPayload): - policy_mode: Literal[None, "security"] = Field( + policy_mode: Literal[None, "", "security"] = Field( default="security", serialization_alias="policyMode", validation_alias="policyMode" ) policy_type: str = Field(default="feature", serialization_alias="policyType", validation_alias="policyType") @@ -138,26 +138,26 @@ class SecurityPolicy(PolicyCreationPayload): def get_assemby_item_uuids(self) -> Set[UUID]: return set((item.definition_id for item in self.policy_definition.assembly)) - def add_item(self, item: SecurityPolicyAssemblyItem) -> None: + def _add_item(self, item: SecurityPolicyAssemblyItem) -> None: self.policy_definition.assembly.append(item) def add_zone_based_fw(self, definition_id: UUID) -> None: - self.add_item(ZoneBasedFWAssemblyItem(definition_id=definition_id)) + self._add_item(ZoneBasedFWAssemblyItem(definition_id=definition_id)) def add_dns_security(self, definition_id: UUID) -> None: - self.add_item(DNSSecurityAssemblyItem(definition_id=definition_id)) + self._add_item(DNSSecurityAssemblyItem(definition_id=definition_id)) def add_intrusion_prevention(self, definition_id: UUID) -> None: - self.add_item(IntrusionPreventionAssemblyItem(definition_id=definition_id)) + self._add_item(IntrusionPreventionAssemblyItem(definition_id=definition_id)) def add_url_filtering(self, definition_id: UUID) -> None: - self.add_item(URLFilteringAssemblyItem(definition_id=definition_id)) + self._add_item(URLFilteringAssemblyItem(definition_id=definition_id)) def add_advanced_malware_protection(self, definition_id: UUID) -> None: - self.add_item(AdvancedMalwareProtectionAssemblyItem(definition_id=definition_id)) + self._add_item(AdvancedMalwareProtectionAssemblyItem(definition_id=definition_id)) def add_ssl_decryption(self, definition_id: UUID) -> None: - self.add_item(SSLDecryptionAssemblyItem(definition_id=definition_id)) + self._add_item(SSLDecryptionAssemblyItem(definition_id=definition_id)) @field_validator("policy_definition", mode="before") @classmethod diff --git a/pyproject.toml b/pyproject.toml index 81f534bc..1942163a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "catalystwan" -version = "0.40.0dev0" +version = "0.40.0dev1" description = "Cisco Catalyst WAN SDK for Python" authors = ["kagorski "] readme = "README.md"