-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquack4.py
180 lines (150 loc) · 6.18 KB
/
quack4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
import sys
import csv
import threading
import time
from datetime import datetime
from scapy.all import *
import subprocess
import numpy as np
# Interfaces and channels configuration
interfaces = ["wlan1", "wlan2", "wlan3", "wlan4"]
channels = [1, 6, 11]
hop_interval = 0.1 # Interval to switch channels
# CSV file names
network_csv = "wifi_networks.csv"
device_csv = "detected_devices.csv"
fft_csv = "fft_results.csv"
# Data storage
devices = {} # {MAC: (RSSI, Channel, Timestamp, Interface)}
networks = {} # {SSID: {signal_strength, noise, channel, timestamp, interface}}
rssi_buffers = {} # {MAC: [RSSI values]} - Store signal history for FFT
# Ensure script runs with sudo privileges
def ensure_sudo():
if os.geteuid() != 0:
print("[*] Restarting with sudo...")
os.execvp('sudo', ['sudo', 'python3'] + sys.argv)
# Set an interface to monitor mode
def enable_monitor_mode(interface):
subprocess.run(["sudo", "ip", "link", "set", interface, "down"])
subprocess.run(["sudo", "iwconfig", interface, "mode", "monitor"])
subprocess.run(["sudo", "ip", "link", "set", interface, "up"])
# Set the channel for a Wi-Fi interface
def set_channel(interface, channel):
subprocess.run(["sudo", "iwconfig", interface, "channel", str(channel)])
# Clear the console screen
def clear_screen():
os.system('cls' if os.name == 'nt' else 'clear')
# Save FFT results to CSV
def save_fft_to_csv(mac, fft_values):
with open(fft_csv, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow([mac, datetime.now()] + fft_values)
file.flush()
# Compute and log FFT for a given MAC's RSSI history
def compute_fft(mac):
if mac in rssi_buffers and len(rssi_buffers[mac]) >= 8: # Wait until 8+ samples
rssi_data = np.array(rssi_buffers[mac])
fft_result = np.abs(np.fft.fft(rssi_data))
save_fft_to_csv(mac, fft_result.tolist())
rssi_buffers[mac].clear() # Clear buffer after logging
# Save networks to CSV
def save_networks_to_csv():
with open(network_csv, 'a', newline='') as file:
writer = csv.writer(file)
for ssid, info in networks.items():
writer.writerow([
ssid, info['signal_strength'], info.get('noise', 'N/A'),
info['channel'], info['timestamp'], info['interface']
])
file.flush()
# Save devices to CSV
def save_devices_to_csv():
with open(device_csv, 'a', newline='') as file:
writer = csv.writer(file)
for mac, data in devices.items():
writer.writerow([mac, data[0], data[1], data[2], data[3]])
file.flush()
# Display detected devices and networks
def display_data():
clear_screen()
print("Detected Devices:")
for mac, data in devices.items():
print(f"MAC: {mac} | RSSI: {data[0]} dBm | Channel: {data[1]} | Interface: {data[3]}")
print("\nNearby Networks:")
for ssid, info in networks.items():
print(f"SSID: {ssid} | RSSI: {info['signal_strength']} dBm | Noise: {info.get('noise', 'N/A')} | Channel: {info['channel']}")
# Packet handler
def packet_handler(packet, interface, channel):
if packet.haslayer(Dot11):
mac = packet.addr2
if mac:
# RSSI extraction
rssi = packet[RadioTap].dBm_AntSignal if packet.haslayer(RadioTap) else None
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Update RSSI buffer and compute FFT
if mac not in rssi_buffers:
rssi_buffers[mac] = []
rssi_buffers[mac].append(rssi)
compute_fft(mac)
# Update device info and save to CSV
devices[mac] = (rssi, channel, timestamp, interface)
display_data()
save_devices_to_csv()
# Handle network SSID info from beacons or probe responses
if packet.type == 0 and packet.subtype in (8, 5): # Beacon/Probe Response
ssid = packet.info.decode() if packet.info else "<Hidden>"
noise = packet.getfieldval('dBm_noise') if hasattr(packet, 'dBm_noise') else 'N/A'
# Update network info and save to CSV
networks[ssid] = {
'signal_strength': rssi, 'noise': noise,
'channel': channel, 'timestamp': timestamp, 'interface': interface
}
display_data()
save_networks_to_csv()
# Start sniffing on a given interface
def sniff_interface(interface):
sniff(iface=interface, prn=lambda pkt: packet_handler(pkt, interface, current_channel[interface]), store=0)
# Channel hopping function
def hop_channels():
while True:
for interface in interfaces:
for channel in channels:
set_channel(interface, channel)
current_channel[interface] = channel
time.sleep(hop_interval)
# Main function
def main():
ensure_sudo()
# Enable monitor mode for each interface
for interface in interfaces:
enable_monitor_mode(interface)
# Track current channels per interface
global current_channel
current_channel = {interface: channels[0] for interface in interfaces}
# Start channel hopping thread
channel_thread = threading.Thread(target=hop_channels)
channel_thread.daemon = True
channel_thread.start()
# Start sniffing on all interfaces
sniff_threads = []
for interface in interfaces:
thread = threading.Thread(target=sniff_interface, args=(interface,))
sniff_threads.append(thread)
thread.start()
# Wait for all threads to complete (run indefinitely)
for thread in sniff_threads:
thread.join()
if __name__ == "__main__":
# Prepare CSV files with headers
with open(network_csv, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["SSID", "Signal Strength (dBm)", "Noise (dBm)", "Channel", "Timestamp", "Interface"])
with open(device_csv, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["MAC", "Signal Strength (dBm)", "Channel", "Timestamp", "Interface"])
with open(fft_csv, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["MAC", "Timestamp", "FFT Values"])
# Run the main function
main()