-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmikrotik_addrlist_upd.py
180 lines (150 loc) · 6.97 KB
/
mikrotik_addrlist_upd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from sys import exit
from argparse import ArgumentParser
from urllib.request import Request, urlopen
from related_utils import generate_connector, generate_telegram_bot, markdownv2_converter, asns_and_urls
from related_utils import lists_subtraction, ips_from_data, ips_from_asn, collapse_ips, print_output, Report
def args_parser():
parser = ArgumentParser(description='RouterOS list updater.')
parser.add_argument('-s', '--sshconf', type=str, help='Path to ssh_config.', required=False)
parser.add_argument('-n', '--host', type=str,
help='Host (in ssh_config or IP/URL for API).', required=True)
parser.add_argument('-a', '--login', type=str, help='API username for login.', required=False)
parser.add_argument('-p', '--password', type=str, help='API password for login.', required=False)
parser.add_argument('-u', '--url', type=str,
help='URLs or/and ASNs (comma separated) to IP list.', required=True)
parser.add_argument('-i', '--list', type=str, help='Name of address list.', required=True)
parser.add_argument('-l', '--label', type=str, help='Comment as label in list.', required=True)
parser.add_argument('-b', '--bottoken', type=str, help='Telegram Bot token.', required=False)
parser.add_argument('-c', '--chatid', type=str, help='Telegram chat id.', required=False)
arguments = parser.parse_args().__dict__
return arguments
class ListUpdater:
def __init__(self, args):
self.report = Report()
self.ip_list_add = []
self.ip_list_fresh = []
self.ip_list_remove = []
self.ip_list_current = []
self.label = args['label']
self.list_name = args['list']
self.ip_list_url = args['url']
self.asn_pattern = r'[Aa][Ss][1-9]\d{0,9}'
self.connect = generate_connector(args)
self.emoji = {
'device': '\U0001F4F6', # 📶
'list': '\U0001F4CB', # 📋
'tag': '\U0001F4CE', # 📎
}
def run(self):
self.generate_lists()
if self.ip_list_add or self.ip_list_remove:
self.generate_report()
self.update_ip_on_device()
def generate_lists(self):
self.generate_fresh_ip_list()
self.generate_current_ip_list()
self.ip_list_add = lists_subtraction(self.ip_list_fresh, self.ip_list_current)
self.ip_list_remove = lists_subtraction(self.ip_list_current, self.ip_list_fresh)
def generate_fresh_ip_list(self):
asns, urls = asns_and_urls(self.ip_list_url)
ip_list_all = []
headers = {'User-Agent': 'Mozilla/5.0'}
if asns:
for asn in asns:
ip_list_all += ips_from_asn(asn, collapse=False)
if urls:
for url in urls:
data_list = urlopen(Request(url, headers=headers))
content = data_list.read().decode(data_list.headers.get_content_charset('UTF-8'))
ip_list = ips_from_data(content, collapse=False)
ip_list_all += ip_list
if ip_list_all:
self.ip_list_fresh = collapse_ips(ip_list_all)
else:
exit('Source list is empty.')
def generate_current_ip_list(self):
pass
def update_ip_on_device(self):
pass
def get_identity(self):
pass
def generate_report(self):
identity = markdownv2_converter(self.get_identity())
list_name = markdownv2_converter(self.list_name)
label = markdownv2_converter(self.label)
self.report.add(f'Отчёт об изменении на {self.emoji["device"]}*{identity}*')
self.report.add(f' списка {self.emoji["list"]}__{list_name}__ с меткой {self.emoji["tag"]}\#{label}\n\n')
if self.ip_list_add:
self.report.add(f'Добавлено:\n')
self.report.add(f'```\n', True)
for ip_elem in self.ip_list_add:
self.report.add(f'{ip_elem}\n')
self.report.add(f'```\n', True)
if self.ip_list_remove:
self.report.add(f'Удалено:\n')
self.report.add(f'```\n', True)
for ip_elem in self.ip_list_remove:
self.report.add(f'{ip_elem}\n')
self.report.add(f'```\n', True)
class ListUpdaterSSH(ListUpdater):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self):
self.connect.enable()
super().run()
self.connect.disconnect()
def generate_current_ip_list(self):
command = f'/ip firewall address-list print without-paging where comment={self.label}'
output = print_output(self.connect, command)
if output:
self.ip_list_current = ips_from_data(output)
def update_ip_on_device(self):
for ip_addr in self.ip_list_remove:
entry = f'/ip firewall address-list find address={ip_addr}'
self.connect.send_command(f'/ip firewall address-list remove [{entry}]')
for ip_addr in self.ip_list_add:
entry = f'list={self.list_name} comment={self.label} address={ip_addr}'
self.connect.send_command(f'/ip firewall address-list add {entry}')
def get_identity(self):
command = '/system identity print'
identity = print_output(self.connect, command)
identity_name = re.match(r'^name: (.*)$', identity).group(1)
return identity_name
class ListUpdaterAPI(ListUpdater):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def generate_current_ip_list(self):
address_list = self.connect.get_resource('/ip/firewall/address-list').get(
comment=self.label,
list=self.list_name,
)
self.ip_list_current = [addr['address'] for addr in address_list]
def update_ip_on_device(self):
address_list = self.connect.get_resource('/ip/firewall/address-list')
for ip_addr in self.ip_list_remove:
addr_id = address_list.get(comment=self.label, list=self.list_name, address=ip_addr)[0]['id']
address_list.remove(numbers=addr_id)
for ip_addr in self.ip_list_add:
address_list.add(list=self.list_name, comment=self.label, address=ip_addr)
def get_identity(self):
identity = self.connect.get_resource('/').call('system/identity/print')
identity_name = identity[0]['name']
return identity_name
def main():
args_in = args_parser()
telegram_bot = generate_telegram_bot(args_in['bottoken'], args_in['chatid'])
if args_in['sshconf']:
list_upd = ListUpdaterSSH(args_in)
elif args_in['login'] and args_in['password']:
list_upd = ListUpdaterAPI(args_in)
else:
exit('SSH or API?')
list_upd.run()
if list_upd.report.messages[0] and telegram_bot and telegram_bot.alive():
for message in list_upd.report.messages:
telegram_bot.send_text_message(message)
if __name__ == '__main__':
main()