-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path2_pandas_to_aerleon.py
157 lines (120 loc) · 4.21 KB
/
2_pandas_to_aerleon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import pandas as pd
from rich import print as rprint
#
# 1. Excel to Pandas
#
# Read the excel file/worksheets into pandas DataFrames
df_flows = pd.read_excel("firewall_flows.xlsx", sheet_name="flows")
df_networks = pd.read_excel("firewall_flows.xlsx", sheet_name="networks")
df_services = pd.read_excel("firewall_flows.xlsx", sheet_name="services")
#
# 2. Pandas to Aerleon
#
# Functions for working with DataFrame data
def get_firewalls(df):
"""Get list of firewalls"""
return df["firewall"].drop_duplicates().unique().tolist()
def get_platform(df, fw):
"""Get platform for a firewall"""
return df.query("firewall == @fw").platform.to_list()[0]
def get_filter_names(df, fw):
"""Get list of policy names for a firewall"""
return df.query("firewall == @fw")["filter_name"].drop_duplicates().tolist()
def get_filter_terms(df, fw, filter):
"""Get filters for a firewall"""
return df.query("firewall == @fw & filter_name == @filter")
def build_term(row):
"""Build term for a filter"""
return {
"name": row.description.lower().replace(" ", "-"),
"source-address": row.src_ip,
"destination-address": row.dst_ip,
"destination-port": str(row.dst_port),
"protocol": row.proto,
"action": "accept",
}
def build_aerlon_policy(df_flows):
"""
Build Aerlon policy from firewall flows.
Structure of Aerlon policy:
[
{
"filename": "fw1-asa",
"filters": [
{
"header": { "targets": { "ciscoasa": "acl-outside" }},
"terms": [
{
"name": "client-to-web",
"source-address": "ALL",
"destination-address": "ALL",
"destination-port": "HTTPS",
"protocol": "tcp",
"action": "accept"
}
]
},
// More filters...
]
},
// More firewalls...
]
"""
firewalls = get_firewalls(df_flows)
fw_all_policy = []
# Loop through firewalls
for fw in firewalls:
# Create a dictionary for each firewall
fw_filters = {"filters": [], "filename": fw}
# Get platform for a firewall
platform = get_platform(df_flows, fw)
# Get list of filter names for a firewall
filter_names = get_filter_names(df_flows, fw)
# Loop through filter names for firewall
for filter_name in filter_names:
# Create a dictionary for each filter
filter_dict = {
"header": {"targets": {platform: filter_name}},
"terms": [],
}
# Get filter terms
filter_terms = get_filter_terms(
df_flows, fw, filter_name
) # Replaced filter with filter_name
# Loop through and build filter terms
for row in filter_terms.itertuples():
# Build term for a filter
filter_dict["terms"].append(
build_term(row)
)
# Add filter to firewall
fw_filters["filters"].append(
filter_dict
)
# Add firewall to list of firewalls
fw_all_policy.append(fw_filters)
# Return policy containing all filters for all firewalls
return fw_all_policy
def build_aerlon_def_network(df_networks):
"""Build Aerlon network definition from firewall networks."""
networks = {"networks": {}}
for net in df_networks.itertuples():
networks["networks"][net.network] = {"values": [{"address": net.address}]}
return networks
def build_aerlon_def_service(df_services):
"""Build Aerlon service definition from firewall services."""
services = {"services": {}}
for svc in df_services.itertuples():
services["services"][svc.service] = [
{"protocol": svc.protocol, "port": svc.port}
]
return services
if __name__ == "__main__":
# Build Aerlon policy
aerlon_policy = build_aerlon_policy(df_flows)
aerlon_def_network = build_aerlon_def_network(df_networks)
aerlon_def_service = build_aerlon_def_service(df_services)
# Print Aerlon policy
rprint(aerlon_policy)
rprint(aerlon_def_network)
rprint(aerlon_def_service)