Skip to content

Commit

Permalink
new function 'create_firewall_mapping' to construct a mapping between…
Browse files Browse the repository at this point in the history
… firewall hostnames and their associated data and firewall object

Ensure a safe operation by working with a copy of the firewall object within fetch_firewall_info function
  • Loading branch information
cdot65 committed Feb 14, 2024
1 parent 664c81d commit 1595fba
Showing 1 changed file with 129 additions and 34 deletions.
163 changes: 129 additions & 34 deletions pan_os_upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"""

# standard library imports
import copy
import importlib.resources as pkg_resources
import ipaddress
import json
Expand Down Expand Up @@ -3057,6 +3058,67 @@ def console_welcome_banner(
typer.echo(banner)


def create_firewall_mapping(
all_firewalls: List[Firewall], firewalls_info: List[Dict[str, Any]]
) -> Dict[str, Dict[str, Any]]:
"""
Constructs a mapping between firewall hostnames and their associated data, including the corresponding Firewall
object and additional firewall details. This mapping facilitates easy access to both the Firewall object and its
attributes like serial number, management IP, and any other relevant information provided in the firewalls_info list.
This function iterates through each provided firewall's information, matches it with the corresponding Firewall
object based on the serial number, and then combines these into a single dictionary. This combined dictionary is
indexed by the hostname of each firewall, allowing for quick lookup of firewall details and the associated Firewall
object.
Parameters
----------
all_firewalls : List[Firewall]
A list of instantiated Firewall objects, each representing a specific firewall device with connectivity
and operational capabilities.
firewalls_info : List[Dict[str, Any]]
A list of dictionaries, with each dictionary containing detailed information about a firewall, such as
its hostname, serial number, management IP, and potentially other metadata.
Returns
-------
Dict[str, Dict[str, Any]]
A dictionary where each key is a firewall's hostname and each value is a dictionary containing the
corresponding Firewall object under the 'object' key and merged with the firewall's detailed information
from the firewalls_info list.
Example
-------
Creating a mapping of firewalls to their details and objects:
>>> all_firewalls = [Firewall('fw1'), Firewall('fw2')]
>>> firewalls_info = [{'hostname': 'fw1', 'serial': '12345', 'ip': '10.0.0.1'},
{'hostname': 'fw2', 'serial': '67890', 'ip': '10.0.0.2'}]
>>> mapping = create_firewall_mapping(all_firewalls, firewalls_info)
>>> mapping['fw1']
{'object': <Firewall object>, 'hostname': 'fw1', 'serial': '12345', 'ip': '10.0.0.1'}
Notes
-----
- This function assumes that each firewall's serial number is unique and uses it as the key to match
Firewall objects with their corresponding details.
- The function does not validate the presence of keys within the firewalls_info dictionaries; it is
assumed that each dictionary contains at least the 'serial' and 'hostname' keys.
"""
firewall_mapping = {}
firewall_object_mapping = {fw.serial: fw for fw in all_firewalls}

for fw_info in firewalls_info:
serial = fw_info["serial"]
firewall_object = firewall_object_mapping.get(serial)
if firewall_object:
firewall_mapping[fw_info["hostname"]] = {
"object": firewall_object,
**fw_info,
}

return firewall_mapping


def ensure_directory_exists(file_path: str) -> None:
"""
Ensures the existence of the directory path for a given file path, creating it if necessary.
Expand Down Expand Up @@ -3427,20 +3489,46 @@ def get_emoji(action: str) -> str:

def fetch_firewall_info(firewall: Firewall) -> Dict[str, Any]:
"""
Fetches system information for a single firewall and returns it as a dictionary.
Retrieves detailed system information from a specified firewall device and organizes it into a dictionary.
This function communicates with the firewall to gather essential system details such as hostname, IP address,
model, serial number, software version, and application version. It's designed to support diagnostics, inventory
management, and operational monitoring by providing a snapshot of the firewall's current state and configuration.
Parameters
----------
firewall : Firewall
A Firewall object for which to fetch system information.
The Firewall instance from which system information will be fetched. This object should be initialized with
the necessary authentication credentials and network details to facilitate API communication with the firewall.
Returns
-------
Dict[str, Any]
A dictionary containing system information for the firewall.
A dictionary containing key system information elements of the firewall, such as hostname, IP address, model,
serial number, software version, and application version. If the function encounters an error during information
retrieval, it returns a dictionary with the available data and marks the status as "Offline or Unavailable".
Example
-------
Fetching system information for a firewall:
>>> firewall_instance = Firewall(hostname='192.168.1.1', api_username='admin', api_password='admin')
>>> firewall_info = fetch_firewall_info(firewall_instance)
>>> print(firewall_info)
{'hostname': 'fw-hostname', 'ip-address': '192.168.1.1', 'model': 'PA-850', 'serial': '0123456789', 'sw-version': '10.0.0', 'app-version': '8200-1234'}
Notes
-----
- This function is intended for use in environments where firewall configuration and status monitoring is necessary.
- Error handling is implemented to ensure that partial or default information is returned if the firewall is unreachable
or if any issues arise during the data retrieval process, allowing for graceful degradation of functionality.
"""
# Ensure a safe operation by working with a copy of the firewall object
fw_copy = copy.deepcopy(firewall)

try:
info = firewall.show_system_info()
# Attempt to retrieve system information from the firewall
info = fw_copy.show_system_info()
# Organize and return the fetched information as a dictionary
return {
"hostname": info["system"]["hostname"],
"ip-address": info["system"]["ip-address"],
Expand All @@ -3450,12 +3538,13 @@ def fetch_firewall_info(firewall: Firewall) -> Dict[str, Any]:
"app-version": info["system"]["app-version"],
}
except Exception as e:
print(f"Error retrieving info for {firewall.serial}: {str(e)}")
# Log and return default values in case of an error
logging.error(f"Error retrieving info for {fw_copy.serial}: {str(e)}")
return {
"hostname": firewall.hostname or "Unknown",
"hostname": fw_copy.hostname or "Unknown",
"ip-address": "N/A",
"model": "N/A",
"serial": firewall.serial,
"serial": fw_copy.serial,
"sw-version": "N/A",
"app-version": "N/A",
"status": "Offline or Unavailable",
Expand Down Expand Up @@ -3505,27 +3594,49 @@ def get_firewalls_from_panorama(panorama: Panorama) -> list[Firewall]:

def get_firewalls_info(firewalls: List[Firewall]) -> List[Dict[str, Any]]:
"""
Fetches system information for each firewall in the list concurrently and returns it as a list of dictionaries.
Retrieves detailed system information for a list of firewalls using concurrent executions to improve efficiency.
This function iterates over a list of Firewall objects, fetching system information for each one in parallel to
minimize total execution time. It utilizes a thread pool to handle concurrent requests, making it well-suited for
scenarios where information from multiple devices needs to be aggregated swiftly. The collected information includes,
but is not limited to, software version, system uptime, and serial numbers, structured as a dictionary for each firewall.
Parameters
----------
firewalls : List[Firewall]
A list of Firewall objects for which to fetch system information.
A list of Firewall objects, each representing a device from which system information is to be fetched. These
objects should be initialized with the necessary connection details.
Returns
-------
List[Dict[str, Any]]
A list of dictionaries containing system information for each firewall.
A list of dictionaries, with each dictionary containing system information for a respective firewall. The
structure and content of these dictionaries depend on the implementation of the `fetch_firewall_info` function
but typically include keys such as 'hostname', 'version', 'serial number', etc.
Example
-------
Fetching information for a list of firewall objects:
>>> firewalls = [Firewall('192.168.1.1', api_key='apikey1'), Firewall('192.168.1.2', api_key='apikey2')]
>>> info = get_firewalls_info(firewalls)
# This returns a list of dictionaries, each containing information about a firewall.
Notes
-----
- This function leverages concurrent threads to fetch data, significantly reducing the total time required to
obtain information from multiple devices.
- The actual data fetched and the structure of the returned dictionaries are determined by the `fetch_firewall_info`
function, which this function depends on.
"""
firewalls_info = []
with ThreadPoolExecutor(max_workers=10) as executor:
# Mapping the fetch_firewall_info function to each firewall object
future_to_firewall = {
# Creating a future for each firewall info fetch task
future_to_firewall_info = {
executor.submit(fetch_firewall_info, fw): fw for fw in firewalls
}

# Collecting results as they are completed
for future in as_completed(future_to_firewall):
# Iterating over completed fetch tasks and collecting their results
for future in as_completed(future_to_firewall_info):
firewall_info = future.result()
firewalls_info.append(firewall_info)

Expand Down Expand Up @@ -4389,16 +4500,8 @@ def batch(
)
firewalls_info = get_firewalls_info(all_firewalls)

# Initialize an empty dictionary to map Firewall objects to their details
firewall_mapping = {}

# Iterate over each Firewall object and its corresponding details
for fw, fw_info in zip(all_firewalls, firewalls_info):
# Create a dictionary entry for each firewall with its details
firewall_mapping[fw_info["hostname"]] = {
"object": fw,
**fw_info, # Unpack all info details into the dictionary
}
# Create a mapping of firewalls for selection
firewall_mapping = create_firewall_mapping(all_firewalls, firewalls_info)

# Check if inventory.yaml exists and if it does, read the selected devices
if inventory_file_path.exists():
Expand Down Expand Up @@ -4641,16 +4744,8 @@ def inventory(
)
firewalls_info = get_firewalls_info(all_firewalls)

# Initialize an empty dictionary to map Firewall objects to their details
firewall_mapping = {}

# Iterate over each Firewall object and its corresponding details
for fw, fw_info in zip(all_firewalls, firewalls_info):
# Create a dictionary entry for each firewall with its details
firewall_mapping[fw_info["hostname"]] = {
"object": fw,
**fw_info, # Unpack all info details into the dictionary
}
# Create a mapping of firewalls for selection
firewall_mapping = create_firewall_mapping(all_firewalls, firewalls_info)

user_selected_hostnames = select_devices_from_table(firewall_mapping)

Expand Down

0 comments on commit 1595fba

Please sign in to comment.