Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev: security mode and nextHopIpv6 #5

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion catalystwan/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,18 @@ def str_as_interface_list(val: Union[str, Sequence[InterfaceStr]]) -> Sequence[I
"umbrella",
]

PolicyModeType = Literal["security", "unified"]
_PolicyModeType = Literal["security", "unified"]

def parse_policy_mode(val: Optional[str]) -> _PolicyModeType:
if isinstance(val, str) and val == "unified":
return "unified"
return "security"

PolicyModeType = Annotated[
_PolicyModeType,
BeforeValidator(parse_policy_mode)
]


CoreRegion = Literal[
"core",
Expand Down
10 changes: 7 additions & 3 deletions catalystwan/models/policy/definition/traffic_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates

from ipaddress import IPv4Address, IPv4Network, IPv6Network
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import List, Literal, Optional, Set, Tuple, Union, overload
from uuid import UUID

Expand Down Expand Up @@ -49,6 +49,7 @@
Match,
NATAction,
NextHopActionEntry,
NextHopIpv6ActionEntry,
NextHopLooseEntry,
PacketLengthEntry,
PLPEntry,
Expand Down Expand Up @@ -274,8 +275,11 @@ def associate_nat_action(
self._insert_action(nat_action)

@accept_action
def associate_next_hop_action(self, next_hop: IPv4Address, loose: bool = False) -> None:
self._insert_action_in_set(NextHopActionEntry(value=next_hop))
def associate_next_hop_action(self, next_hop: Union[IPv4Address, IPv6Address], loose: bool = False) -> None:
if isinstance(next_hop, IPv6Address):
self._insert_action_in_set(NextHopIpv6ActionEntry(value=next_hop))
else:
self._insert_action_in_set(NextHopActionEntry(value=next_hop))
self._insert_action_in_set(NextHopLooseEntry(value=loose))

@accept_action
Expand Down
12 changes: 4 additions & 8 deletions catalystwan/models/policy/definition/zone_based_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated

from catalystwan.models.common import PolicyModeType
from catalystwan.models.misc.application_protocols import ApplicationProtocol
from catalystwan.models.policy.policy_definition import (
AdvancedInspectionProfileAction,
Expand Down Expand Up @@ -220,12 +221,6 @@ class ZoneBasedFWPolicyEntry(BaseModel):
model_config = ConfigDict(populate_by_name=True)


class ZoneBasedFWPolicyHeader(PolicyDefinitionBase):
type: Literal["zoneBasedFW"] = "zoneBasedFW"
mode: str = Field(default="security")
model_config = ConfigDict(populate_by_name=True)


class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase):
default_action: ZoneBasedFirewallDefaultAction = Field(
default=ZoneBasedFirewallDefaultAction(type="drop"),
Expand All @@ -236,9 +231,10 @@ class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase):
entries: List[ZoneBasedFWPolicyEntry] = []


class ZoneBasedFWPolicy(ZoneBasedFWPolicyHeader):
class ZoneBasedFWPolicy(PolicyDefinitionBase):
model_config = ConfigDict(populate_by_name=True)
type: Literal["zoneBasedFW"] = "zoneBasedFW"
mode: Literal["security", "unified"] = "security"
mode: PolicyModeType = "security"
definition: ZoneBasedFWPolicyDefinition = ZoneBasedFWPolicyDefinition()

def add_ipv4_rule(
Expand Down
6 changes: 6 additions & 0 deletions catalystwan/models/policy/policy_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ class NextHopActionEntry(BaseModel):
value: Union[IPv4Address, IPv6Address]


class NextHopIpv6ActionEntry(BaseModel):
field: Literal["nextHopIpv6"] = "nextHopIpv6"
value: IPv6Address


class NextHopMatchEntry(BaseModel):
field: Literal["nextHop"] = "nextHop"
ref: UUID
Expand Down Expand Up @@ -1205,6 +1210,7 @@ class CloudSaaSAction(BaseModel):
MetricEntry,
MetricTypeEntry,
NextHopActionEntry,
NextHopIpv6ActionEntry,
NextHopLooseEntry,
OMPTagEntry,
OriginatorEntry,
Expand Down
16 changes: 8 additions & 8 deletions catalystwan/models/policy/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class UnifiedSecurityPolicyDefinition(PolicyDefinition):


class SecurityPolicy(PolicyCreationPayload):
policy_mode: Literal[None, "security"] = Field(
policy_mode: Literal[None, "", "security"] = Field(
default="security", serialization_alias="policyMode", validation_alias="policyMode"
)
policy_type: str = Field(default="feature", serialization_alias="policyType", validation_alias="policyType")
Expand All @@ -138,26 +138,26 @@ class SecurityPolicy(PolicyCreationPayload):
def get_assemby_item_uuids(self) -> Set[UUID]:
return set((item.definition_id for item in self.policy_definition.assembly))

def add_item(self, item: SecurityPolicyAssemblyItem) -> None:
def _add_item(self, item: SecurityPolicyAssemblyItem) -> None:
self.policy_definition.assembly.append(item)

def add_zone_based_fw(self, definition_id: UUID) -> None:
self.add_item(ZoneBasedFWAssemblyItem(definition_id=definition_id))
self._add_item(ZoneBasedFWAssemblyItem(definition_id=definition_id))

def add_dns_security(self, definition_id: UUID) -> None:
self.add_item(DNSSecurityAssemblyItem(definition_id=definition_id))
self._add_item(DNSSecurityAssemblyItem(definition_id=definition_id))

def add_intrusion_prevention(self, definition_id: UUID) -> None:
self.add_item(IntrusionPreventionAssemblyItem(definition_id=definition_id))
self._add_item(IntrusionPreventionAssemblyItem(definition_id=definition_id))

def add_url_filtering(self, definition_id: UUID) -> None:
self.add_item(URLFilteringAssemblyItem(definition_id=definition_id))
self._add_item(URLFilteringAssemblyItem(definition_id=definition_id))

def add_advanced_malware_protection(self, definition_id: UUID) -> None:
self.add_item(AdvancedMalwareProtectionAssemblyItem(definition_id=definition_id))
self._add_item(AdvancedMalwareProtectionAssemblyItem(definition_id=definition_id))

def add_ssl_decryption(self, definition_id: UUID) -> None:
self.add_item(SSLDecryptionAssemblyItem(definition_id=definition_id))
self._add_item(SSLDecryptionAssemblyItem(definition_id=definition_id))

@field_validator("policy_definition", mode="before")
@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "catalystwan"
version = "0.40.0dev0"
version = "0.40.0dev1"
description = "Cisco Catalyst WAN SDK for Python"
authors = ["kagorski <kagorski@cisco.com>"]
readme = "README.md"
Expand Down