Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(json): add json response feature #29

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# WiFi Manager

Lang : Micropython
Tested : 1.8 and 1.9.3
Tested : 1.15

<b>Description</b> : WiFi manager for ESP8266 - ESP12 - ESP32 for micropython

Expand All @@ -11,11 +11,19 @@ Tested : 1.8 and 1.9.3
- Save wifi password in "wifi.dat" (csv format)
- Easy to apply

<b>Added Features:</b>
- Add Json response feature instead of html
- Can Integrate to any native app with json response

<b>Usage:</b>

Upload main.py and wifimgr.py to ESP.
Upload main.py and wifimanager-html.py to ESP.(for configure with browser)
Use wifimanager-json.py to ESP.(for configure with Native App)
Write your code into main.py or import it from main.py.

- 192.168.1.4 will return available network ssid in json response.
- 192.168.1.4/configure send ssid and password in json data and it will return success/failed message in json response.

<b>Logic:</b>
1. step: Check "wifi.dat" file and try saved networks/passwords.
2. step: Publish web page to configure new wifi.
Expand Down
38 changes: 24 additions & 14 deletions wifimgr.py → wifimanager-html.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ def get_connection():
wlan_sta.active(True)
networks = wlan_sta.scan()

AUTHMODE = {0: "open", 1: "WEP", 2: "WPA-PSK", 3: "WPA2-PSK", 4: "WPA/WPA2-PSK"}
AUTHMODE = {0: "open", 1: "WEP", 2: "WPA-PSK",
3: "WPA2-PSK", 4: "WPA/WPA2-PSK"}
for ssid, bssid, channel, rssi, authmode, hidden in sorted(networks, key=lambda x: x[3], reverse=True):
ssid = ssid.decode('utf-8')
encrypted = authmode > 0
print("ssid: %s chan: %d rssi: %d authmode: %s" % (ssid, channel, rssi, AUTHMODE.get(authmode, '?')))
print("ssid: %s chan: %d rssi: %d authmode: %s" %
(ssid, channel, rssi, AUTHMODE.get(authmode, '?')))
if encrypted:
if ssid in profiles:
password = profiles[ssid]
Expand Down Expand Up @@ -93,13 +95,14 @@ def do_connect(ssid, password):
time.sleep(0.1)
print('.', end='')
if connected:
print('\nConnected. Network config: ', wlan_sta.ifconfig())
print('\nConnected. Network config: ', wlan_sta.ifconfig()[0])
else:
wlan_sta.disconnect()
print('\nFailed. Not Connected to: ' + ssid)
return connected


def send_header(client, status_code=200, content_length=None ):
def send_header(client, status_code=200, content_length=None):
client.sendall("HTTP/1.0 {} OK\r\n".format(status_code))
client.sendall("Content-Type: text/html\r\n")
if content_length is not None:
Expand Down Expand Up @@ -135,7 +138,7 @@ def handle_root(client):
client.sendall("""\
<tr>
<td colspan="2">
<input type="radio" name="ssid" value="{0}" />{0}
<input type="radio" name="ssid" value="{0}"/>{0}
</td>
</tr>
""".format(ssid))
Expand Down Expand Up @@ -186,11 +189,13 @@ def handle_configure(client, request):
return False
# version 1.9 compatibility
try:
ssid = match.group(1).decode("utf-8").replace("%3F", "?").replace("%21", "!")
password = match.group(2).decode("utf-8").replace("%3F", "?").replace("%21", "!")
ssid = match.group(1).decode(
"utf-8").replace("%3F", "?").replace("%21", "!").replace("%2F", "/").replace("%20", " ").replace("+", " ")
password = match.group(2).decode(
"utf-8").replace("%3F", "?").replace("%21", "!").replace("%2F", "/").replace("%20", " ").replace("+", " ")
except Exception:
ssid = match.group(1).replace("%3F", "?").replace("%21", "!")
password = match.group(2).replace("%3F", "?").replace("%21", "!")
ssid = match.group(1).replace("%3F", "?").replace("%21", "!").replace("%2F", "/").replace("%20", " ").replace("+", " ")
password = match.group(2).replace("%3F", "?").replace("%21", "!").replace("%2F", "/").replace("%20", " ").replace("+", " ")

if len(ssid) == 0:
send_response(client, "SSID must be provided", status_code=400)
Expand All @@ -203,13 +208,13 @@ def handle_configure(client, request):
<br><br>
<h1 style="color: #5e9ca0; text-align: center;">
<span style="color: #ff0000;">
ESP successfully connected to WiFi network %(ssid)s.
ESP successfully connected to WiFi network {} with IP {}.
</span>
</h1>
<br><br>
</center>
</html>
""" % dict(ssid=ssid)
""".format(ssid,wlan_sta.ifconfig()[0])
send_response(client, response)
try:
profiles = read_profiles()
Expand Down Expand Up @@ -269,12 +274,15 @@ def start(port=80):
server_socket.bind(addr)
server_socket.listen(1)

print('Connect to WiFi ssid ' + ap_ssid + ', default password: ' + ap_password)
print('Connect to WiFi ssid ' + ap_ssid +
', default password: ' + ap_password)
print('and access the ESP via your favorite web browser at 192.168.4.1.')
print('Listening on:', addr)

while True:
if wlan_sta.isconnected():
# stop AP mode to save energy
wlan_ap.active(False)
return True

client, addr = server_socket.accept()
Expand All @@ -295,9 +303,11 @@ def start(port=80):

# version 1.9 compatibility
try:
url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).decode("utf-8").rstrip("/")
url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP",
request).group(1).decode("utf-8").rstrip("/")
except Exception:
url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).rstrip("/")
url = ure.search(
"(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).rstrip("/")
print("URL is {}".format(url))

if url == "":
Expand Down
238 changes: 238 additions & 0 deletions wifimanager-json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import network
import socket
import ure
import ujson as json
import time

ap_ssid = "WifiManager"
ap_password = "tayfunulu"
ap_authmode = 3 # WPA2

NETWORK_PROFILES = 'wifi.dat'

wlan_ap = network.WLAN(network.AP_IF)
wlan_sta = network.WLAN(network.STA_IF)

server_socket = None


def get_connection():
"""return a working WLAN(STA_IF) instance or None"""

# First check if there already is any connection:
if wlan_sta.isconnected():
return wlan_sta

connected = False
try:
# ESP connecting to WiFi takes time, wait a bit and try again:
time.sleep(3)
if wlan_sta.isconnected():
return wlan_sta

# Read known network profiles from file
profiles = read_profiles()

# Search WiFis in range
wlan_sta.active(True)
networks = wlan_sta.scan()

AUTHMODE = {0: "open", 1: "WEP", 2: "WPA-PSK",
3: "WPA2-PSK", 4: "WPA/WPA2-PSK"}
for ssid, bssid, channel, rssi, authmode, hidden in sorted(networks, key=lambda x: x[3], reverse=True):
ssid = ssid.decode('utf-8')
encrypted = authmode > 0
print("ssid: %s chan: %d rssi: %d authmode: %s" %
(ssid, channel, rssi, AUTHMODE.get(authmode, '?')))
if encrypted:
if ssid in profiles:
password = profiles[ssid]
connected = do_connect(ssid, password)
else:
print("skipping unknown encrypted network")
else: # open
connected = do_connect(ssid, None)
if connected:
break

except OSError as e:
print("exception", str(e))

# start web server for connection manager:
if not connected:
connected = start()

return wlan_sta if connected else None


def read_profiles():
with open(NETWORK_PROFILES) as f:
lines = f.readlines()
profiles = {}
for line in lines:
ssid, password = line.strip("\n").split(";")
profiles[ssid] = password
return profiles


def write_profiles(profiles):
lines = []
for ssid, password in profiles.items():
lines.append("%s;%s\n" % (ssid, password))
with open(NETWORK_PROFILES, "w") as f:
f.write(''.join(lines))


def do_connect(ssid, password):
wlan_sta.active(True)
if wlan_sta.isconnected():
return None
print('Trying to connect to %s...' % ssid)
wlan_sta.connect(ssid, password)
for retry in range(100):
connected = wlan_sta.isconnected()
if connected:
break
time.sleep(0.1)
print('.', end='')
if connected:
print('\nConnected. Network config: ', wlan_sta.ifconfig()[0])
else:
wlan_sta.disconnect()
print('\nFailed. Not Connected to: ' + ssid)
return connected


def send_header(client, status_code=200, content_length=None):
client.sendall("HTTP/1.0 {} OK\r\n".format(status_code))
client.sendall("Content-Type: application/json; encoding='utf8'\r\n")
if content_length is not None:
client.sendall("Content-Length: {}\r\n".format(content_length))
client.sendall("Connection: close\r\n")
client.sendall("\r\n")


def send_response(client, payload, status_code=200):
content_length = len(payload)
send_header(client, status_code, content_length)
if content_length > 0:
client.sendall(payload)
client.close()


def handle_root(client):
wlan_sta.active(True)
ssids = sorted(ssid.decode('utf-8') for ssid, *_ in wlan_sta.scan())
send_header(client)
ssid_dic = {}
while len(ssids):
ssid = ssids.pop(0)
ssid_dic[ssid] = ssid
client.sendall(json.dumps(ssid_dic))
client.close()


def handle_configure(client,request):
print('request:\n',request)
request = request.decode()
body_json = request.split('\r\n\r\n')
data = json.loads(body_json[-1].split('&')[0])
ssid, password = data['ssid'], data['password']
if do_connect(ssid, password):
response = {"message": "Connected successfully",
"ssid": ssid,
"ip_address": wlan_sta.ifconfig()[0]
}
send_response(client, json.dumps(response))
try:
profiles = read_profiles()
except OSError:
profiles = {}
profiles[ssid] = password
write_profiles(profiles)
time.sleep(5)
return True
else:
response = {"message": "Connection Failed",
"ssid":ssid
}
send_response(client, json.dumps(response))
return False


def handle_not_found(client, url):
send_response(client, json.dumps({"message": "Path not found: {}".format(url)}), status_code=404)


def stop():
global server_socket

if server_socket:
server_socket.close()
server_socket = None


def start(port=80):
global server_socket

addr = socket.getaddrinfo('0.0.0.0', port)[0][-1]

stop()

wlan_sta.active(True)
wlan_ap.active(True)

wlan_ap.config(essid=ap_ssid, password=ap_password, authmode=ap_authmode)

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_socket.bind(addr)
server_socket.listen(1)

print('Connect to WiFi ssid ' + ap_ssid +
', default password: ' + ap_password)
print('and access the ESP via your favorite web browser at 192.168.4.1.')
print('Listening on:', addr)

while True:
if wlan_sta.isconnected():
# stop AP mode to save energy
time.sleep(5)
wlan_ap.active(False)
return True

client, addr = server_socket.accept()
print('client connected from', addr)
try:
client.settimeout(5.0)

request = b""
try:
while "\r\n\r\n" not in request:
request += client.recv(512)
except OSError:
pass

print("Request is: {}".format(request))
if "HTTP" not in request: # skip invalid requests
continue

# version 1.9 compatibility
try:
url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP",
request).group(1).decode("utf-8").rstrip("/")
except Exception:
url = ure.search(
"(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).rstrip("/")
print("URL is {}".format(url))

if url == "":
handle_root(client)
elif url == "configure":
handle_configure(client,request)
else:
handle_not_found(client, url)

finally:
client.close()