Skip to content

Commit

Permalink
ruff: T6583: Reformat to comply with code style
Browse files Browse the repository at this point in the history
  • Loading branch information
indrajitr committed Jan 17, 2025
1 parent a31d5b6 commit 5f06e1c
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 102 deletions.
148 changes: 101 additions & 47 deletions python/vyos/kea.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
'time_offset': 'time-offset',
'wpad_url': 'wpad-url',
'ipv6_only_preferred': 'v6-only-preferred',
'captive_portal': 'v4-captive-portal'
'captive_portal': 'v4-captive-portal',
}

kea6_options = {
Expand All @@ -55,21 +55,23 @@
'nisplus_domain': 'nisp-domain-name',
'nisplus_server': 'nisp-servers',
'sntp_server': 'sntp-servers',
'captive_portal': 'v6-captive-portal'
'captive_portal': 'v6-captive-portal',
}

kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'


def _format_hex_string(in_str):
out_str = ""
out_str = ''
# if input is divisible by 2, add : every 2 chars
if len(in_str) > 0 and len(in_str) % 2 == 0:
out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2]))
out_str = ':'.join(a + b for a, b in zip(in_str[::2], in_str[1::2]))
else:
out_str = in_str

return out_str


def _find_list_of_dict_index(lst, key='ip', value=''):
"""
Find the index entry of list of dict matching the dict value
Expand All @@ -81,53 +83,71 @@ def _find_list_of_dict_index(lst, key='ip', value=''):
idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None)
return idx


def kea_parse_options(config):
options = []

for node, option_name in kea4_options.items():
if node not in config:
continue

value = ", ".join(config[node]) if isinstance(config[node], list) else config[node]
value = (
', '.join(config[node]) if isinstance(config[node], list) else config[node]
)
options.append({'name': option_name, 'data': value})

if 'client_prefix_length' in config:
options.append({'name': 'subnet-mask', 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length'])})
options.append(
{
'name': 'subnet-mask',
'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length']),
}
)

if 'ip_forwarding' in config:
options.append({'name': 'ip-forwarding', 'data': "true"})
options.append({'name': 'ip-forwarding', 'data': 'true'})

if 'static_route' in config:
default_route = ''

if 'default_router' in config:
default_route = isc_static_route('0.0.0.0/0', config['default_router'])

routes = [isc_static_route(route, route_options['next_hop']) for route, route_options in config['static_route'].items()]

options.append({'name': 'rfc3442-static-route', 'data': ", ".join(routes if not default_route else routes + [default_route])})
options.append({'name': 'windows-static-route', 'data': ", ".join(routes)})
routes = [
isc_static_route(route, route_options['next_hop'])
for route, route_options in config['static_route'].items()
]

options.append(
{
'name': 'rfc3442-static-route',
'data': ', '.join(
routes if not default_route else routes + [default_route]
),
}
)
options.append({'name': 'windows-static-route', 'data': ', '.join(routes)})

if 'time_zone' in config:
with open("/usr/share/zoneinfo/" + config['time_zone'], "rb") as f:
tz_string = f.read().split(b"\n")[-2].decode("utf-8")
with open('/usr/share/zoneinfo/' + config['time_zone'], 'rb') as f:
tz_string = f.read().split(b'\n')[-2].decode('utf-8')

options.append({'name': 'pcode', 'data': tz_string})
options.append({'name': 'tcode', 'data': config['time_zone']})

unifi_controller = dict_search_args(config, 'vendor_option', 'ubiquiti', 'unifi_controller')
unifi_controller = dict_search_args(
config, 'vendor_option', 'ubiquiti', 'unifi_controller'
)
if unifi_controller:
options.append({
'name': 'unifi-controller',
'data': unifi_controller,
'space': 'ubnt'
})
options.append(
{'name': 'unifi-controller', 'data': unifi_controller, 'space': 'ubnt'}
)

return options


def kea_parse_subnet(subnet, config):
out = {'subnet': subnet, 'id': int(config['subnet_id'])}
options = []

if 'option' in config:
out['option-data'] = kea_parse_options(config['option'])
Expand All @@ -149,9 +169,7 @@ def kea_parse_subnet(subnet, config):
pools = []
for num, range_config in config['range'].items():
start, stop = range_config['start'], range_config['stop']
pool = {
'pool': f'{start} - {stop}'
}
pool = {'pool': f'{start} - {stop}'}

if 'option' in range_config:
pool['option-data'] = kea_parse_options(range_config['option'])
Expand Down Expand Up @@ -188,24 +206,31 @@ def kea_parse_subnet(subnet, config):
reservation['option-data'] = kea_parse_options(host_config['option'])

if 'bootfile_name' in host_config['option']:
reservation['boot-file-name'] = host_config['option']['bootfile_name']
reservation['boot-file-name'] = host_config['option'][
'bootfile_name'
]

if 'bootfile_server' in host_config['option']:
reservation['next-server'] = host_config['option']['bootfile_server']
reservation['next-server'] = host_config['option'][
'bootfile_server'
]

reservations.append(reservation)
out['reservations'] = reservations

return out


def kea6_parse_options(config):
options = []

for node, option_name in kea6_options.items():
if node not in config:
continue

value = ", ".join(config[node]) if isinstance(config[node], list) else config[node]
value = (
', '.join(config[node]) if isinstance(config[node], list) else config[node]
)
options.append({'name': option_name, 'data': value})

if 'sip_server' in config:
Expand All @@ -221,17 +246,20 @@ def kea6_parse_options(config):
hosts.append(server)

if addrs:
options.append({'name': 'sip-server-addr', 'data': ", ".join(addrs)})
options.append({'name': 'sip-server-addr', 'data': ', '.join(addrs)})

if hosts:
options.append({'name': 'sip-server-dns', 'data': ", ".join(hosts)})
options.append({'name': 'sip-server-dns', 'data': ', '.join(hosts)})

cisco_tftp = dict_search_args(config, 'vendor_option', 'cisco', 'tftp-server')
if cisco_tftp:
options.append({'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp})
options.append(
{'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp}
)

return options


def kea6_parse_subnet(subnet, config):
out = {'subnet': subnet, 'id': int(config['subnet_id'])}

Expand Down Expand Up @@ -269,12 +297,14 @@ def kea6_parse_subnet(subnet, config):
pd_pool = {
'prefix': prefix,
'prefix-len': int(pd_conf['prefix_length']),
'delegated-len': int(pd_conf['delegated_length'])
'delegated-len': int(pd_conf['delegated_length']),
}

if 'excluded_prefix' in pd_conf:
pd_pool['excluded-prefix'] = pd_conf['excluded_prefix']
pd_pool['excluded-prefix-len'] = int(pd_conf['excluded_prefix_length'])
pd_pool['excluded-prefix-len'] = int(
pd_conf['excluded_prefix_length']
)

pd_pools.append(pd_pool)

Expand All @@ -294,9 +324,7 @@ def kea6_parse_subnet(subnet, config):
if 'disable' in host_config:
continue

reservation = {
'hostname': host
}
reservation = {'hostname': host}

if 'mac' in host_config:
reservation['hw-address'] = host_config['mac']
Expand All @@ -305,10 +333,10 @@ def kea6_parse_subnet(subnet, config):
reservation['duid'] = host_config['duid']

if 'ipv6_address' in host_config:
reservation['ip-addresses'] = [ host_config['ipv6_address'] ]
reservation['ip-addresses'] = [host_config['ipv6_address']]

if 'ipv6_prefix' in host_config:
reservation['prefixes'] = [ host_config['ipv6_prefix'] ]
reservation['prefixes'] = [host_config['ipv6_prefix']]

if 'option' in host_config:
reservation['option-data'] = kea6_parse_options(host_config['option'])
Expand All @@ -319,6 +347,7 @@ def kea6_parse_subnet(subnet, config):

return out


def _ctrl_socket_command(inet, command, args=None):
path = kea_ctrl_socket.format(inet=inet)

Expand All @@ -345,6 +374,7 @@ def _ctrl_socket_command(inet, command, args=None):

return json.loads(result.decode('utf-8'))


def kea_get_leases(inet):
leases = _ctrl_socket_command(inet, f'lease{inet}-get-all')

Expand All @@ -353,6 +383,7 @@ def kea_get_leases(inet):

return leases['arguments']['leases']


def kea_delete_lease(inet, ip_address):
args = {'ip-address': ip_address}

Expand All @@ -363,6 +394,7 @@ def kea_delete_lease(inet, ip_address):

return False


def kea_get_active_config(inet):
config = _ctrl_socket_command(inet, 'config-get')

Expand All @@ -371,12 +403,18 @@ def kea_get_active_config(inet):

return config


def kea_get_dhcp_pools(config, inet):
shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
shared_networks = dict_search_args(
config, 'arguments', f'Dhcp{inet}', 'shared-networks'
)
return [network['name'] for network in shared_networks] if shared_networks else []


def kea_get_pool_from_subnet_id(config, inet, subnet_id):
shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
shared_networks = dict_search_args(
config, 'arguments', f'Dhcp{inet}', 'shared-networks'
)

if not shared_networks:
return None
Expand All @@ -397,7 +435,9 @@ def kea_get_static_mappings(config, inet, pools=[]) -> list:
Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration
:return list
"""
shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks')
shared_networks = dict_search_args(
config, 'arguments', f'Dhcp{inet}', 'shared-networks'
)

mappings = []

Expand All @@ -414,7 +454,9 @@ def kea_get_static_mappings(config, inet, pools=[]) -> list:
mapping = {'pool': p, 'subnet': subnet['subnet']}
mapping.update(reservation)
# rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address'
mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None))
mapping['ip'] = mapping.pop(
'ipv6-address', mapping.pop('ip-address', None)
)
# rename 'hw-address' to 'mac'
mapping['mac'] = mapping.pop('hw-address', None)
mappings.append(mapping)
Expand All @@ -432,18 +474,28 @@ def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list
data = []
for lease in leases:
lifetime = lease['valid-lft']
expiry = (lease['cltt'] + lifetime)
expiry = lease['cltt'] + lifetime

lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc)
lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
lease['start_timestamp'] = datetime.fromtimestamp(
expiry - lifetime, timezone.utc
)
lease['expire_timestamp'] = (
datetime.fromtimestamp(expiry, timezone.utc) if expiry else None
)

data_lease = {}
data_lease['ip'] = lease['ip-address']
lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'}
data_lease['state'] = lease_state_long[lease['state']]
data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-'
data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
data_lease['origin'] = 'local' # TODO: Determine remote in HA
data_lease['pool'] = (
kea_get_pool_from_subnet_id(config, inet, lease['subnet-id'])
if config
else '-'
)
data_lease['end'] = (
lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None
)
data_lease['origin'] = 'local' # TODO: Determine remote in HA
# remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client`
data_lease['hostname'] = lease.get('hostname', '-').rstrip('.')

Expand All @@ -463,7 +515,9 @@ def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list
data_lease['remaining'] = '-'

if lease['valid-lft'] > 0:
data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc)
data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(
timezone.utc
)

if data_lease['remaining'].days >= 0:
# substraction gives us a timedelta object which can't be formatted with strftime
Expand Down
Loading

0 comments on commit 5f06e1c

Please sign in to comment.