forked from Sudomemo/sudomemoDNS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsudomemoDNS.py
255 lines (199 loc) · 7.7 KB
/
sudomemoDNS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# sudomemoDNS
from datetime import datetime
from json import loads
from socket import socket, AF_INET, SOCK_DGRAM, gethostbyname
from sys import platform
from time import sleep
from dnslib import A, AAAA, CNAME, MX, NS, SOA, TXT
from dnslib import DNSLabel, QTYPE, RD, RR
from dnslib.server import DNSServer
def get_platform():
platforms = {
'linux1': 'Linux',
'linux2': 'Linux',
'darwin': 'macOS',
'win32': 'Windows'
}
if platform not in platforms:
return platform
return platforms[platform]
SUDOMEMODNS_VERSION = "1.2.1"
# Adds preceding zeros to IP addresses
# e.g `1.1.1.1` becomes `001.001.001.001`
def format_ip(address):
octets = str(address).split(".")
return f"{int(octets[0]):03d}.{int(octets[1]):03d}.{int(octets[2]):03d}.{int(octets[3]):03d}"
def get_ip():
s = socket(AF_INET, SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except:
IP = '127.0.0.1'
finally:
s.close()
return IP
EPOCH = datetime(1970, 1, 1)
SERIAL = int((datetime.utcnow() - EPOCH).total_seconds())
MY_IP = get_ip()
print("+===============================+")
print("| Sudomemo DNS Server |")
print(f"| Version {SUDOMEMODNS_VERSION} |")
print("+===============================+\n")
print("== Welcome to sudomemoDNS! ==")
print(
"This server will allow you to connect to Sudomemo when your Internet Service Provider does not work with custom DNS.\n")
print("== How To Use ==")
print("First, make sure that your console is connected to the same network as this computer.\n")
print("Then, put these settings in for DNS on your console:")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print(f"Primary DNS: {format_ip(MY_IP)}")
print("Secondary DNS: 008.008.008.008")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
print("== Getting Help ==")
print("Need help? Visit our Discord server or check out https://support.sudomemo.net\n")
print("[INFO] Starting sudomemoDNS...")
TYPE_LOOKUP = {
A: QTYPE.A,
AAAA: QTYPE.AAAA,
CNAME: QTYPE.CNAME,
MX: QTYPE.MX,
NS: QTYPE.NS,
SOA: QTYPE.SOA,
TXT: QTYPE.TXT,
}
# Can't seem to turn off DNSLogger with a None type so let's just null it out with a dummy function
class SudomemoDNSLogger(object):
def log_recv(self, handler, data):
pass
def log_send(self, handler, data):
pass
def log_request(self, handler, request):
print("[INFO] Received DNS request from console at " + handler.client_address[0])
def log_reply(self, handler, reply):
print("[INFO] Sent response to console at " + handler.client_address[0])
def log_error(self, handler, e):
print("[ERROR] Invalid DNS request from " + handler.client_address[0])
def log_truncated(self, handler, reply):
pass
def log_data(self, dnsobj):
pass
class Record:
def __init__(self, rdata_type, *args, rtype=None, rname=None, ttl=None, **kwargs):
if isinstance(rdata_type, RD):
# actually an instance, not a type
self._rtype = TYPE_LOOKUP[rdata_type.__class__]
rdata = rdata_type
else:
self._rtype = TYPE_LOOKUP[rdata_type]
if rdata_type == SOA and len(args) == 2:
# add sensible times to SOA
args += ((
SERIAL, # serial number
60 * 60 * 1, # refresh
60 * 60 * 3, # retry
60 * 60 * 24, # expire
60 * 60 * 1, # minimum
),)
rdata = rdata_type(*args)
if rtype:
self._rtype = rtype
self._rname = rname
self.kwargs = dict(
rdata=rdata,
ttl=self.sensible_ttl() if ttl is None else ttl,
**kwargs,
)
def try_rr(self, q):
if q.qtype == QTYPE.ANY or q.qtype == self._rtype:
return self.as_rr(q.qname)
def as_rr(self, alt_rname):
return RR(rname=self._rname or alt_rname, rtype=self._rtype, **self.kwargs)
def sensible_ttl(self):
if self._rtype in (QTYPE.NS, QTYPE.SOA):
return 60 * 60 * 24
else:
return 300
@property
def is_soa(self):
return self._rtype == QTYPE.SOA
def __str__(self):
return '{} {}'.format(QTYPE[self._rtype], self.kwargs)
ZONES = {}
try:
get_zones = open("dns_zones.json", "rb")
except OSError as e:
print("[ERROR] Unable load DNS data.")
print("[ERROR] Exception: ", e)
exit(1)
try:
zones = loads(get_zones.read())
except ValueError as e:
print("[ERROR] Unable load DNS data: Invalid JSON.")
print("[ERROR] Exception: ", e)
exit(1)
for zone in zones:
if zone["type"] == "a":
ZONES[zone["name"]] = [Record(A, zone["value"])]
elif zone["type"] == "p":
ZONES[zone["name"]] = [Record(A, gethostbyname(zone["value"]))]
print("[INFO] DNS information loaded successfully.")
class Resolver:
def __init__(self):
self.zones = {DNSLabel(k): v for k, v in ZONES.items()}
def resolve(self, request, handler):
reply = request.reply()
zone = self.zones.get(request.q.qname)
if zone is not None:
for zone_records in zone:
rr = zone_records.try_rr(request.q)
rr and reply.add_answer(rr)
else:
# no direct zone so look for an SOA record for a higher level zone
for zone_label, zone_records in self.zones.items():
if request.q.qname.matchSuffix(zone_label):
try:
soa_record = next(r for r in zone_records if r.is_soa)
except StopIteration:
continue
else:
reply.add_answer(soa_record.as_rr(zone_label))
break
return reply
resolver = Resolver()
dnsLogger = SudomemoDNSLogger()
print("[INFO] Detected operating system:", get_platform())
if get_platform() == 'linux':
print("[INFO] Please note that you will have to run this as root or with permissions to bind to UDP port 53.")
print("[INFO] If you aren't seeing any requests, check that this is the case first with lsof -i:53 (requires lsof)")
print("[INFO] To run as root, prefix the command with 'sudo'")
elif get_platform() == 'macOS':
print("[INFO] Please note that you will have to run this as root or with permissions to bind to UDP port 53.")
print("[INFO] If you aren't seeing any requests, check that this is the case first with lsof -i:53 (requires lsof)")
print("[INFO] To run as root, prefix the command with 'sudo'")
elif get_platform() == 'Windows':
print(
"[INFO] Please note that you may have to allow this application through the firewall. If so, a popup will appear in a moment.")
print(
"[INFO] If you are not seeing any requests, make sure you have allowed this application through the firewall. If you have already done so, disregard this message.")
try:
servers = [
DNSServer(resolver=resolver, port=53, address=MY_IP, tcp=True, logger=dnsLogger),
DNSServer(resolver=resolver, port=53, address=MY_IP, tcp=False, logger=dnsLogger)
]
except PermissionError:
print("[ERROR] Permission error: Check that you are running this as an administrator or root")
exit(1)
print("[INFO] sudomemoDNS is ready. Now waiting for DNS requests from your console...")
if __name__ == '__main__':
for s in servers:
s.start_thread()
try:
while 1:
sleep(0.1)
except KeyboardInterrupt:
pass
finally:
for s in servers:
s.stop()