-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworking2
155 lines (137 loc) · 5.51 KB
/
working2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import csv
import time
import numpy as np
import subprocess
from scapy.all import sniff
from scapy.layers.dot11 import Dot11, Dot11Beacon, Dot11ProbeResp, Dot11Elt, RadioTap
from datetime import datetime
import sys
# Set Wi-Fi interfaces
interfaces = ["wlan1", "wlan2", "wlan3", "wlan4"]
channels = [1, 6, 11] # Channels to rotate between
devices = {} # Store device info
networks = {} # Store network info
# External save path (change this if needed)
save_path = "/mnt/usb/wifi_data"
local_path = "./wifi_data" # Fallback if external drive is not available
# Function to ensure the save path exists
def get_save_path():
path = save_path if os.path.exists(save_path) else local_path
os.makedirs(path, exist_ok=True)
return path
# Enable monitor mode for an interface
def enable_monitor_mode(interface):
print(f"[*] Enabling monitor mode on {interface}...")
subprocess.run(["sudo", "ip", "link", "set", interface, "down"], check=True)
subprocess.run(["sudo", "iwconfig", interface, "mode", "monitor"], check=True)
subprocess.run(["sudo", "ip", "link", "set", interface, "up"], check=True)
# Set the Wi-Fi channel
def set_channel(interface, channel):
print(f"[*] Setting {interface} to channel {channel}...")
subprocess.run(["sudo", "iwconfig", interface, "channel", str(channel)], check=True)
# Save device data to CSV
def save_devices_to_csv():
path = get_save_path()
filename = os.path.join(path, "wifi_devices.csv")
with open(filename, 'a', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=[
'Datetime', 'MAC Address', 'Signal Strength (dBm)',
'Router MAC', 'Time of Flight', 'FFT'
])
for mac, info in devices.items():
writer.writerow({
'Datetime': datetime.now(), 'MAC Address': mac,
'Signal Strength (dBm)': info['rssi'],
'Router MAC': info['router_mac'],
'Time of Flight': info['tof'], 'FFT': info['fft']
})
# Save network data to CSV
def save_networks_to_csv():
path = get_save_path()
filename = os.path.join(path, "wifi_networks.csv")
with open(filename, 'a', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=[
'Datetime', 'SSID', 'Signal Strength (dBm)',
'Noise (dBm)', 'FFT'
])
for ssid, data in networks.items():
writer.writerow({
'Datetime': datetime.now(), 'SSID': ssid,
'Signal Strength (dBm)': data['rssi'],
'Noise (dBm)': data['noise'], 'FFT': data['fft']
})
# Perform FFT analysis on signal strengths
def perform_fft(signal_strengths):
if len(signal_strengths) > 0:
fft_result = np.fft.fft(signal_strengths)
frequencies = np.fft.fftfreq(len(fft_result))
return fft_result, frequencies
return None, None
# Packet handler function
def packet_handler(packet):
if packet.haslayer(Dot11):
# Handle devices
if packet.haslayer(RadioTap) and hasattr(packet[RadioTap], 'dBm_AntSignal'):
rssi = packet[RadioTap].dBm_AntSignal
mac_address = packet.addr2
router_mac = packet.addr1 if hasattr(packet, 'addr1') else None
tof = getattr(packet[RadioTap], 'TimeOfFlight', None)
devices[mac_address] = {'rssi': rssi, 'router_mac': router_mac, 'tof': tof, 'fft': None}
# Handle networks
if packet.haslayer(Dot11Beacon) or packet.haslayer(Dot11ProbeResp):
ssid = packet[Dot11Elt].info.decode(errors='ignore')
noise = getattr(packet[RadioTap], 'noise', None)
if ssid not in networks:
networks[ssid] = {'rssi': rssi, 'noise': noise, 'fft': None}
else:
networks[ssid]['rssi'] = rssi
# Perform FFT on signal strengths
signal_strengths = [info['rssi'] for info in devices.values()]
fft_result, frequencies = perform_fft(signal_strengths)
if fft_result is not None:
for i, mac in enumerate(devices):
devices[mac]['fft'] = fft_result[i]
for i, ssid in enumerate(networks):
networks[ssid]['fft'] = fft_result[i]
# Save data
save_devices_to_csv()
save_networks_to_csv()
# Rotate channels every 0.1 seconds
def rotate_channels():
while True:
for interface in interfaces:
for channel in channels:
set_channel(interface, channel)
time.sleep(0.1)
# Main function to start sniffing
def main():
try:
# Enable monitor mode for all interfaces
for interface in interfaces:
enable_monitor_mode(interface)
# Start a thread to rotate channels
from threading import Thread
Thread(target=rotate_channels, daemon=True).start()
print("[*] Listening for packets...")
# Sniff packets indefinitely
while True:
for interface in interfaces:
sniff(iface=interface, prn=packet_handler, store=0, timeout=10)
except Exception as e:
print(f"[!] Error: {e}. Restarting...")
restart_script()
# Restart the script as sudo
def restart_script():
try:
if os.geteuid() != 0:
print("[*] Restarting with sudo...")
os.execvp("sudo", ["sudo"] + sys.argv)
else:
main()
except Exception as e:
print(f"[!] Failed to restart: {e}")
time.sleep(5)
restart_script()
if __name__ == "__main__":
restart_script()