diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..5ac77bb --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,25 @@ +# models/__init__.py + +# trunk-ignore(ruff/F401) +from .arp_table import ArpTableEntry + +# trunk-ignore(ruff/F401) +from .content_version import ContentVersion + +# trunk-ignore(ruff/F401) +from .assurance_report import AssuranceReport + +# trunk-ignore(ruff/F401) +from .ip_sec_tunnel import IPSecTunnelEntry + +# trunk-ignore(ruff/F401) +from .license import LicenseFeatureEntry + +# trunk-ignore(ruff/F401) +from .nics import NetworkInterfaceStatus + +# trunk-ignore(ruff/F401) +from .routes import RouteEntry + +# trunk-ignore(ruff/F401) +from .session_stats import SessionStats diff --git a/models/arp_table.py b/models/arp_table.py new file mode 100644 index 0000000..70fe0ef --- /dev/null +++ b/models/arp_table.py @@ -0,0 +1,11 @@ +# models/arp_table.py +from pydantic import BaseModel + + +class ArpTableEntry(BaseModel): + interface: str + ip: str + mac: str + port: str + status: str + ttl: int diff --git a/models/assurance_report.py b/models/assurance_report.py new file mode 100644 index 0000000..5760ae3 --- /dev/null +++ b/models/assurance_report.py @@ -0,0 +1,22 @@ +# models/assurance_report.py + +from typing import Dict, Optional +from pydantic import BaseModel +from .arp_table import ArpTableEntry +from .content_version import ContentVersion +from .ip_sec_tunnel import IPSecTunnelEntry +from .license import LicenseFeatureEntry +from .nics import NetworkInterfaceStatus +from .routes import RouteEntry +from .session_stats import SessionStats + + +class AssuranceReport(BaseModel): + hostname: str + arp_table: Optional[Dict[str, ArpTableEntry]] = None + content_version: Optional[ContentVersion] = None + ip_sec_tunnels: Optional[Dict[str, IPSecTunnelEntry]] = None + license: Optional[Dict[str, LicenseFeatureEntry]] = None + nics: Optional[Dict[str, NetworkInterfaceStatus]] = None + routes: Optional[Dict[str, RouteEntry]] = None + session_stats: Optional[SessionStats] = None diff --git a/models/content_version.py b/models/content_version.py new file mode 100644 index 0000000..14d4c9d --- /dev/null +++ b/models/content_version.py @@ -0,0 +1,7 @@ +# models/content_version.py + +from pydantic import BaseModel + + +class ContentVersion(BaseModel): + version: str diff --git a/models/ip_sec_tunnel.py b/models/ip_sec_tunnel.py new file mode 100644 index 0000000..a55a54c --- /dev/null +++ b/models/ip_sec_tunnel.py @@ -0,0 +1,16 @@ +# models/ip_sec_tunnel.py + +from pydantic import BaseModel, Field + + +class IPSecTunnelEntry(BaseModel): + peerip: str + name: str + outer_if: str = Field(..., alias="outer-if") + gwid: str + localip: str + state: str + inner_if: str = Field(..., alias="inner-if") + mon: str + owner: str + id: str diff --git a/models/license.py b/models/license.py new file mode 100644 index 0000000..3e8d7b7 --- /dev/null +++ b/models/license.py @@ -0,0 +1,17 @@ +# models/license.py + +from pydantic import BaseModel, Field +from typing import Optional +from collections import OrderedDict + + +class LicenseFeatureEntry(BaseModel): + feature: str + description: str + serial: str + issued: str + expires: str + expired: str + base_license_name: Optional[str] = Field(None, alias="base-license-name") + authcode: Optional[str] + custom: Optional[OrderedDict] = None diff --git a/models/nics.py b/models/nics.py new file mode 100644 index 0000000..f2b1619 --- /dev/null +++ b/models/nics.py @@ -0,0 +1,19 @@ +# models/nics.py + +from pydantic import BaseModel, validator, root_validator + + +class NetworkInterfaceStatus(BaseModel): + status: str + + @root_validator(pre=True) + def parse_root(cls, values): + if isinstance(values, str): + return {"status": values} + raise ValueError("Invalid input for network interface status") + + @validator("status") + def check_status(cls, v): + if v not in ("up", "down"): + raise ValueError('Status must be "up" or "down"') + return v diff --git a/models/routes.py b/models/routes.py new file mode 100644 index 0000000..1005bf2 --- /dev/null +++ b/models/routes.py @@ -0,0 +1,15 @@ +# models/route.py + +from pydantic import BaseModel, Field +from typing import Optional + + +class RouteEntry(BaseModel): + virtual_router: str = Field(..., alias="virtual-router") + destination: str + nexthop: str + metric: str + flags: str + age: Optional[str] + interface: Optional[str] + route_table: str = Field(..., alias="route-table") diff --git a/models/session_stats.py b/models/session_stats.py new file mode 100644 index 0000000..142fc85 --- /dev/null +++ b/models/session_stats.py @@ -0,0 +1,57 @@ +# models/session_stats.py + +from pydantic import BaseModel, Field +from typing import Optional + + +class SessionStats(BaseModel): + age_accel_thresh: Optional[str] = Field(..., alias="age-accel-thresh") + age_accel_tsf: Optional[str] = Field(..., alias="age-accel-tsf") + age_scan_ssf: Optional[str] = Field(..., alias="age-scan-ssf") + age_scan_thresh: Optional[str] = Field(..., alias="age-scan-thresh") + age_scan_tmo: Optional[str] = Field(..., alias="age-scan-tmo") + cps: Optional[str] + dis_def: Optional[str] = Field(..., alias="dis-def") + dis_sctp: Optional[str] = Field(..., alias="dis-sctp") + dis_tcp: Optional[str] = Field(..., alias="dis-tcp") + dis_udp: Optional[str] = Field(..., alias="dis-udp") + icmp_unreachable_rate: Optional[str] = Field(..., alias="icmp-unreachable-rate") + kbps: Optional[str] + max_pending_mcast: Optional[str] = Field(..., alias="max-pending-mcast") + num_active: Optional[str] = Field(..., alias="num-active") + num_bcast: Optional[str] = Field(..., alias="num-bcast") + num_gtpc: Optional[str] = Field(..., alias="num-gtpc") + num_gtpu_active: Optional[str] = Field(..., alias="num-gtpu-active") + num_gtpu_pending: Optional[str] = Field(..., alias="num-gtpu-pending") + num_http2_5gc: Optional[str] = Field(..., alias="num-http2-5gc") + num_icmp: Optional[str] = Field(..., alias="num-icmp") + num_imsi: Optional[str] = Field(..., alias="num-imsi") + num_installed: Optional[str] = Field(..., alias="num-installed") + num_max: Optional[str] = Field(..., alias="num-max") + num_mcast: Optional[str] = Field(..., alias="num-mcast") + num_pfcpc: Optional[str] = Field(..., alias="num-pfcpc") + num_predict: Optional[str] = Field(..., alias="num-predict") + num_sctp_assoc: Optional[str] = Field(..., alias="num-sctp-assoc") + num_sctp_sess: Optional[str] = Field(..., alias="num-sctp-sess") + num_tcp: Optional[str] = Field(..., alias="num-tcp") + num_udp: Optional[str] = Field(..., alias="num-udp") + pps: Optional[str] + tcp_cong_ctrl: Optional[str] = Field(..., alias="tcp-cong-ctrl") + tcp_reject_siw_thresh: Optional[str] = Field(..., alias="tcp-reject-siw-thresh") + tmo_5gcdelete: Optional[str] = Field(..., alias="tmo-5gcdelete") + tmo_cp: Optional[str] = Field(..., alias="tmo-cp") + tmo_def: Optional[str] = Field(..., alias="tmo-def") + tmo_icmp: Optional[str] = Field(..., alias="tmo-icmp") + tmo_sctp: Optional[str] = Field(..., alias="tmo-sctp") + tmo_sctpcookie: Optional[str] = Field(..., alias="tmo-sctpcookie") + tmo_sctpinit: Optional[str] = Field(..., alias="tmo-sctpinit") + tmo_sctpshutdown: Optional[str] = Field(..., alias="tmo-sctpshutdown") + tmo_tcp: Optional[str] = Field(..., alias="tmo-tcp") + tmo_tcp_delayed_ack: Optional[str] = Field(..., alias="tmo-tcp-delayed-ack") + tmo_tcp_unverif_rst: Optional[str] = Field(..., alias="tmo-tcp-unverif-rst") + tmo_tcphalfclosed: Optional[str] = Field(..., alias="tmo-tcphalfclosed") + tmo_tcphandshake: Optional[str] = Field(..., alias="tmo-tcphandshake") + tmo_tcpinit: Optional[str] = Field(..., alias="tmo-tcpinit") + tmo_tcptimewait: Optional[str] = Field(..., alias="tmo-tcptimewait") + tmo_udp: Optional[str] = Field(..., alias="tmo-udp") + vardata_rate: Optional[str] = Field(..., alias="vardata-rate") diff --git a/upgrade.py b/upgrade.py index 60c6858..61102bb 100644 --- a/upgrade.py +++ b/upgrade.py @@ -16,13 +16,16 @@ # Palo Alto Networks panos-upgrade-assurance imports from panos_upgrade_assurance.check_firewall import CheckFirewall from panos_upgrade_assurance.firewall_proxy import FirewallProxy -from assurance import AssuranceOptions # third party imports import defusedxml.ElementTree as ET import xmltodict from pydantic import BaseModel +# project imports +from models import AssuranceReport +from assurance import AssuranceOptions + # ---------------------------------------------------------------------------- # Define logging levels @@ -677,7 +680,7 @@ def run_assurance( operation_type: str, action: str, config: Dict[str, Union[str, int, float, bool]], -) -> Union[Dict[str, Union[str, int, float, bool]], None]: +) -> Union[AssuranceReport, None]: """ Execute specified operational tasks on the Firewall and return the results. @@ -765,7 +768,13 @@ def run_assurance( try: logging.info("Running snapshots...") results = snapshot_node.run_snapshots(snapshots_config=actions) - logging.info(results) + logging.debug(results) + + if results: + # Pass the results to the AssuranceReport model + return AssuranceReport(hostname=firewall.hostname, **results) + else: + return None except Exception as e: logging.error("Error running readiness checks: %s", e) @@ -857,7 +866,7 @@ def main() -> None: # Download the target PAN-OS version logging.info(f"Checking if {args['target_version']} is downloaded...") - software_download(firewall, args["target_version"], ha_details) + image_downloaded = software_download(firewall, args["target_version"], ha_details) if deploy_info == "active" or deploy_info == "passive": logging.info( f"{args['target_version']} has been downloaded and sync'd to HA peer." @@ -867,10 +876,32 @@ def main() -> None: # Begin collecting network state information with panos-upgrade-assurance logging.info("Collecting network state information...") - firewall_assurance = create_panos_assurance_connection(firewall) - logging.info( - f"Network state information collected from {firewall_assurance.serial}" - ) + if image_downloaded: + # Use the modified run_assurance function + assurance_report = run_assurance( + firewall, + operation_type="state_snapshot", + action="arp_table,content_version,ip_sec_tunnels,license,nics,routes,session_stats", + config={}, + ) + + # Check if an assurance report was successfully created + if assurance_report: + # Do something with the assurance report, e.g., log it, save it, etc. + logging.info("Assurance Report created successfully") + assurance_report_json = assurance_report.model_dump_json(indent=4) + logging.debug(assurance_report_json) + + file_path = f"logs/{firewall.serial}-assurance.json" + with open(file_path, "w") as file: + file.write(assurance_report_json) + + else: + logging.error("Failed to create Assurance Report") + + logging.info(f"Network state information collected from {firewall.serial}") + + logging.info(f"Network state information collected from {firewall.serial}") if __name__ == "__main__": diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29