Browse Source

extmod/webrepl: Allow the page to run from the device (over HTTP).

The device will respond to a non-WS request with a simple page that loads
websocket_content.js from a static host (http or https). However, even
if the resources are https, the page is still http and therefore allows
requesting to a WS (not WSS) websocket on the device.

Removed unused client_handshake from websocket_helper, and then merges the
remainder of this file (server_handshake) into webrepl.py (to reduce
firmware size). Also added the respond-as-HTTP handling to
server_handshake.

The default HTTP response is a simple page that sets the base URL and then
loads webrepl_content.js which document.write's the actual HTML.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
pull/8931/head
Jim Mussared 2 years ago
parent
commit
924e55aca1
  1. 2
      extmod/webrepl/manifest.py
  2. 121
      extmod/webrepl/webrepl.py
  3. 1
      extmod/webrepl/webrepl_setup.py
  4. 85
      extmod/webrepl/websocket_helper.py

2
extmod/webrepl/manifest.py

@ -1 +1 @@
freeze(".", ("webrepl.py", "webrepl_setup.py", "websocket_helper.py"))
freeze(".", ("webrepl.py", "webrepl_setup.py"))

121
extmod/webrepl/webrepl.py

@ -1,14 +1,93 @@
# This module should be imported from REPL, not run from command line.
import socket
import uos
import binascii
import hashlib
import network
import uwebsocket
import websocket_helper
import os
import socket
import sys
import websocket
import _webrepl
listen_s = None
client_s = None
DEBUG = 0
_DEFAULT_STATIC_HOST = const("https://micropython.org/webrepl/")
static_host = _DEFAULT_STATIC_HOST
def server_handshake(cl):
req = cl.makefile("rwb", 0)
# Skip HTTP GET line.
l = req.readline()
if DEBUG:
sys.stdout.write(repr(l))
webkey = None
upgrade = False
websocket = False
while True:
l = req.readline()
if not l:
# EOF in headers.
return False
if l == b"\r\n":
break
if DEBUG:
sys.stdout.write(l)
h, v = [x.strip() for x in l.split(b":", 1)]
if DEBUG:
print((h, v))
if h == b"Sec-WebSocket-Key":
webkey = v
elif h == b"Connection" and b"Upgrade" in v:
upgrade = True
elif h == b"Upgrade" and v == b"websocket":
websocket = True
if not (upgrade and websocket and webkey):
return False
if DEBUG:
print("Sec-WebSocket-Key:", webkey, len(webkey))
d = hashlib.sha1(webkey)
d.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
respkey = d.digest()
respkey = binascii.b2a_base64(respkey)[:-1]
if DEBUG:
print("respkey:", respkey)
cl.send(
b"""\
HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: """
)
cl.send(respkey)
cl.send("\r\n\r\n")
return True
def send_html(cl):
cl.send(
b"""\
HTTP/1.0 200 OK\r
\r
<base href=\""""
)
cl.send(static_host)
cl.send(
b"""\"></base>\r
<script src="webrepl_content.js"></script>\r
"""
)
cl.close()
def setup_conn(port, accept_handler):
global listen_s
@ -25,34 +104,41 @@ def setup_conn(port, accept_handler):
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("WebREPL daemon started on ws://%s:%d" % (iface.ifconfig()[0], port))
print("WebREPL server started on http://%s:%d/" % (iface.ifconfig()[0], port))
return listen_s
def accept_conn(listen_sock):
global client_s
cl, remote_addr = listen_sock.accept()
prev = uos.dupterm(None)
uos.dupterm(prev)
if not server_handshake(cl):
send_html(cl)
return False
prev = os.dupterm(None)
os.dupterm(prev)
if prev:
print("\nConcurrent WebREPL connection from", remote_addr, "rejected")
cl.close()
return
return False
print("\nWebREPL connection from:", remote_addr)
client_s = cl
websocket_helper.server_handshake(cl)
ws = uwebsocket.websocket(cl, True)
ws = websocket.websocket(cl, True)
ws = _webrepl._webrepl(ws)
cl.setblocking(False)
# notify REPL on socket incoming data (ESP32/ESP8266-only)
if hasattr(uos, "dupterm_notify"):
cl.setsockopt(socket.SOL_SOCKET, 20, uos.dupterm_notify)
uos.dupterm(ws)
if hasattr(os, "dupterm_notify"):
cl.setsockopt(socket.SOL_SOCKET, 20, os.dupterm_notify)
os.dupterm(ws)
return True
def stop():
global listen_s, client_s
uos.dupterm(None)
os.dupterm(None)
if client_s:
client_s.close()
if listen_s:
@ -60,6 +146,7 @@ def stop():
def start(port=8266, password=None, accept_handler=accept_conn):
global static_host
stop()
webrepl_pass = password
if webrepl_pass is None:
@ -67,6 +154,8 @@ def start(port=8266, password=None, accept_handler=accept_conn):
import webrepl_cfg
webrepl_pass = webrepl_cfg.PASS
if hasattr(webrepl_cfg, "BASE"):
static_host = webrepl_cfg.BASE
except:
print("WebREPL is not configured, run 'import webrepl_setup'")
@ -75,7 +164,9 @@ def start(port=8266, password=None, accept_handler=accept_conn):
if accept_handler is None:
print("Starting webrepl in foreground mode")
accept_conn(s)
# Run accept_conn to serve HTML until we get a websocket connection.
while not accept_conn(s):
pass
elif password is None:
print("Started webrepl in normal mode")
else:

1
extmod/webrepl/webrepl_setup.py

@ -1,6 +1,5 @@
import sys
# import uos as os
import os
import machine

85
extmod/webrepl/websocket_helper.py

@ -1,85 +0,0 @@
try:
import usys as sys
except ImportError:
import sys
try:
import ubinascii as binascii
except:
import binascii
try:
import uhashlib as hashlib
except:
import hashlib
DEBUG = 0
def server_handshake(sock):
clr = sock.makefile("rwb", 0)
l = clr.readline()
# sys.stdout.write(repr(l))
webkey = None
while 1:
l = clr.readline()
if not l:
raise OSError("EOF in headers")
if l == b"\r\n":
break
# sys.stdout.write(l)
h, v = [x.strip() for x in l.split(b":", 1)]
if DEBUG:
print((h, v))
if h == b"Sec-WebSocket-Key":
webkey = v
if not webkey:
raise OSError("Not a websocket request")
if DEBUG:
print("Sec-WebSocket-Key:", webkey, len(webkey))
d = hashlib.sha1(webkey)
d.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
respkey = d.digest()
respkey = binascii.b2a_base64(respkey)[:-1]
if DEBUG:
print("respkey:", respkey)
sock.send(
b"""\
HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: """
)
sock.send(respkey)
sock.send("\r\n\r\n")
# Very simplified client handshake, works for MicroPython's
# websocket server implementation, but probably not for other
# servers.
def client_handshake(sock):
cl = sock.makefile("rwb", 0)
cl.write(
b"""\
GET / HTTP/1.1\r
Host: echo.websocket.org\r
Connection: Upgrade\r
Upgrade: websocket\r
Sec-WebSocket-Key: foo\r
\r
"""
)
l = cl.readline()
# print(l)
while 1:
l = cl.readline()
if l == b"\r\n":
break
# sys.stdout.write(l)
Loading…
Cancel
Save