Browse Source
This commit adds: 1) Methods to SSLContext class that match CPython signature: - `SSLContext.load_cert_chain(certfile, keyfile)` - `SSLContext.load_verify_locations(cafile=, cadata=)` - `SSLContext.get_ciphers()` --> ["CIPHERSUITE"] - `SSLContext.set_ciphers(["CIPHERSUITE"])` 2) `sslsocket.cipher()` to get current ciphersuite and protocol version. 3) `ssl.MBEDTLS_VERSION` string constant. 4) Certificate verification errors info instead of `MBEDTLS_ERR_X509_CERT_VERIFY_FAILED`. 5) Tests in `net_inet` and `multi_net` to test these new methods. `SSLContext.load_cert_chain` method allows loading key and cert from disk passing a filepath in `certfile` or `keyfile` options. `SSLContext.load_verify_locations`'s `cafile` option enables the same functionality for ca files. Signed-off-by: Carlos Gil <carlosgilglez@gmail.com>pull/13098/head
Carlosgg
1 year ago
committed by
Damien George
25 changed files with 722 additions and 113 deletions
@ -0,0 +1,29 @@ |
|||
# Basic test of ssl.SSLContext get_ciphers() and set_ciphers() methods. |
|||
|
|||
try: |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
|
|||
ciphers = ctx.get_ciphers() |
|||
|
|||
for ci in ciphers: |
|||
print(ci) |
|||
|
|||
ctx.set_ciphers(ciphers[:1]) |
|||
|
|||
# Test error cases. |
|||
|
|||
try: |
|||
ctx.set_ciphers(ciphers[0]) |
|||
except TypeError as e: |
|||
print(e) |
|||
|
|||
try: |
|||
ctx.set_ciphers(["BAR"]) |
|||
except OSError as e: |
|||
print(e) |
@ -0,0 +1,6 @@ |
|||
TLS-ECDHE-ECDSA-WITH-AES-256-CBC-SHA384 |
|||
TLS-ECDHE-ECDSA-WITH-AES-128-CBC-SHA256 |
|||
TLS-RSA-WITH-AES-256-CBC-SHA256 |
|||
TLS-RSA-WITH-AES-128-CBC-SHA256 |
|||
object 'str' isn't a tuple or list |
|||
(-24192, 'MBEDTLS_ERR_SSL_BAD_CONFIG') |
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,59 @@ |
|||
# Test creating an SSL connection when server_hostname is required but not specified. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
cert = cafile = "multi_net/rsa_cert.der" |
|||
key = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(cafile) |
|||
os.stat(key) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
multitest.broadcast("ready") |
|||
try: |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
except Exception as e: |
|||
print(e) |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cafile=cafile) |
|||
# The client will reject the SSL connection so wait until the server is ready |
|||
# so that it can see the rejection (otherwise it sees a closed socket). |
|||
multitest.wait("ready") |
|||
try: |
|||
s = client_ctx.wrap_socket(s) |
|||
except Exception as e: |
|||
print(e) |
|||
s.close() |
@ -0,0 +1,4 @@ |
|||
--- instance0 --- |
|||
(-29312, 'MBEDTLS_ERR_SSL_CONN_EOF') |
|||
--- instance1 --- |
|||
CERT_REQUIRED requires server_hostname |
@ -0,0 +1,55 @@ |
|||
# Test creating an SSL connection and getting the peer certificate. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
cert = cafile = "multi_net/rsa_cert.der" |
|||
key = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(cafile) |
|||
os.stat(key) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
print(s2.read(16)) |
|||
s2.write(b"server to client") |
|||
s2.close() |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cafile=cafile) |
|||
s = client_ctx.wrap_socket(s, server_hostname="micropython.local") |
|||
print(s.getpeercert(True)) |
|||
s.write(b"client to server") |
|||
print(s.read(16)) |
|||
s.close() |
@ -0,0 +1,5 @@ |
|||
--- instance0 --- |
|||
b'client to server' |
|||
--- instance1 --- |
|||
None |
|||
b'server to client' |
@ -0,0 +1,60 @@ |
|||
# Test creating an SSL connection with certificates as bytes objects. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
certfile = "multi_net/rsa_cert.der" |
|||
keyfile = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(certfile) |
|||
os.stat(keyfile) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
with open(certfile, "rb") as cf: |
|||
cert = cadata = cf.read() |
|||
|
|||
with open(keyfile, "rb") as kf: |
|||
key = kf.read() |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
print(s2.read(16)) |
|||
s2.write(b"server to client") |
|||
s2.close() |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cadata=cadata) |
|||
s = client_ctx.wrap_socket(s, server_hostname="micropython.local") |
|||
s.write(b"client to server") |
|||
print(s.read(16)) |
|||
s.close() |
@ -0,0 +1,4 @@ |
|||
--- instance0 --- |
|||
b'client to server' |
|||
--- instance1 --- |
|||
b'server to client' |
@ -0,0 +1,58 @@ |
|||
# Test creating an SSL connection with specified ciphers. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
cert = cafile = "multi_net/rsa_cert.der" |
|||
key = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(cafile) |
|||
os.stat(key) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
assert isinstance(s2.cipher(), tuple) |
|||
print(s2.read(16)) |
|||
s2.write(b"server to client") |
|||
s2.close() |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
ciphers = client_ctx.get_ciphers() |
|||
assert "TLS-RSA-WITH-AES-256-CBC-SHA256" in ciphers |
|||
client_ctx.set_ciphers(["TLS-RSA-WITH-AES-256-CBC-SHA256"]) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cafile=cafile) |
|||
s = client_ctx.wrap_socket(s, server_hostname="micropython.local") |
|||
s.write(b"client to server") |
|||
print(s.read(16)) |
|||
s.close() |
@ -0,0 +1,4 @@ |
|||
--- instance0 --- |
|||
b'client to server' |
|||
--- instance1 --- |
|||
b'server to client' |
@ -0,0 +1,54 @@ |
|||
# Test creating an SSL connection with certificates from a file. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
cert = cafile = "multi_net/rsa_cert.der" |
|||
key = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(cafile) |
|||
os.stat(key) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
print(s2.read(16)) |
|||
s2.write(b"server to client") |
|||
s2.close() |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cafile=cafile) |
|||
s = client_ctx.wrap_socket(s, server_hostname="micropython.local") |
|||
s.write(b"client to server") |
|||
print(s.read(16)) |
|||
s.close() |
@ -0,0 +1,4 @@ |
|||
--- instance0 --- |
|||
b'client to server' |
|||
--- instance1 --- |
|||
b'server to client' |
@ -0,0 +1,58 @@ |
|||
# Test creating an SSL connection with an invalid certificate. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
cert = cafile = "multi_net/rsa_cert.der" |
|||
key = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(cafile) |
|||
os.stat(key) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
try: |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
except Exception as e: |
|||
print(e) |
|||
multitest.broadcast("finished") |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cafile=cafile) |
|||
try: |
|||
s = client_ctx.wrap_socket(s, server_hostname="foobar.local") |
|||
except Exception as e: |
|||
print(e) |
|||
# Don't close the socket until the server has seen our SSL rejection. |
|||
multitest.wait("finished") |
|||
s.close() |
@ -0,0 +1,6 @@ |
|||
--- instance0 --- |
|||
(-30592, 'MBEDTLS_ERR_SSL_FATAL_ALERT_MESSAGE') |
|||
--- instance1 --- |
|||
|
|||
The certificate Common Name (CN) does not match with the expected CN |
|||
|
@ -0,0 +1,58 @@ |
|||
# Test creating an SSL connection with an expired certificate. |
|||
|
|||
try: |
|||
import os |
|||
import socket |
|||
import ssl |
|||
except ImportError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
PORT = 8000 |
|||
|
|||
# These are test certificates. See tests/README.md for details. |
|||
cert = cafile = "multi_net/expired_cert.der" |
|||
key = "multi_net/rsa_key.der" |
|||
|
|||
try: |
|||
os.stat(cafile) |
|||
os.stat(key) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
# Server |
|||
def instance0(): |
|||
multitest.globals(IP=multitest.get_network_ip()) |
|||
s = socket.socket() |
|||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|||
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) |
|||
s.listen(1) |
|||
multitest.next() |
|||
s2, _ = s.accept() |
|||
server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
|||
server_ctx.load_cert_chain(cert, key) |
|||
try: |
|||
s2 = server_ctx.wrap_socket(s2, server_side=True) |
|||
except Exception as e: |
|||
print(e) |
|||
multitest.broadcast("finished") |
|||
s.close() |
|||
|
|||
|
|||
# Client |
|||
def instance1(): |
|||
multitest.next() |
|||
s = socket.socket() |
|||
s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) |
|||
client_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
client_ctx.verify_mode = ssl.CERT_REQUIRED |
|||
client_ctx.load_verify_locations(cafile=cafile) |
|||
try: |
|||
s = client_ctx.wrap_socket(s, server_hostname="micropython.local") |
|||
except Exception as e: |
|||
print(e) |
|||
# Don't close the socket until the server has seen our SSL rejection. |
|||
multitest.wait("finished") |
|||
s.close() |
@ -0,0 +1,6 @@ |
|||
--- instance0 --- |
|||
(-30592, 'MBEDTLS_ERR_SSL_FATAL_ALERT_MESSAGE') |
|||
--- instance1 --- |
|||
|
|||
The certificate validity has expired |
|||
|
Binary file not shown.
@ -0,0 +1,52 @@ |
|||
import os |
|||
import socket |
|||
import ssl |
|||
|
|||
# This certificate was obtained from micropython.org using openssl: |
|||
# $ openssl s_client -showcerts -connect micropython.org:443 </dev/null 2>/dev/null |
|||
# The certificate is from Let's Encrypt: |
|||
# 1 s:/C=US/O=Let's Encrypt/CN=R3 |
|||
# i:/C=US/O=Internet Security Research Group/CN=ISRG Root X1 |
|||
# Validity |
|||
# Not Before: Sep 4 00:00:00 2020 GMT |
|||
# Not After : Sep 15 16:00:00 2025 GMT |
|||
# Copy PEM content to a file (certmpy.pem) and convert to DER e.g. |
|||
# $ openssl x509 -in certmpy.pem -out certmpy.der -outform DER |
|||
# Then convert to hex format, eg using binascii.hexlify(data). |
|||
|
|||
|
|||
ca_cert_chain = "mpycert.der" |
|||
try: |
|||
os.stat(ca_cert_chain) |
|||
except OSError: |
|||
print("SKIP") |
|||
raise SystemExit |
|||
|
|||
|
|||
def main(use_stream=True): |
|||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
|||
|
|||
context.verify_mode = ssl.CERT_REQUIRED |
|||
assert context.verify_mode == ssl.CERT_REQUIRED |
|||
|
|||
context.load_verify_locations(cafile=ca_cert_chain) |
|||
|
|||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|||
addr = socket.getaddrinfo("micropython.org", 443)[0][-1] |
|||
|
|||
# CPython can wrap the socket even if not connected yet. |
|||
# ssl_sock = context.wrap_socket(s, server_hostname='micropython.org') |
|||
# ssl_sock.connect(addr) |
|||
|
|||
# MicroPython needs to connect first, CPython can do this too. |
|||
s.connect(addr) |
|||
# server_hostname must match CN (Common Name) in the certificate |
|||
# presented by the server |
|||
ssl_sock = context.wrap_socket(s, server_hostname="micropython.org") |
|||
ssl_sock.write(b"GET / HTTP/1.0\r\n\r\n") |
|||
print(ssl_sock.read(17)) |
|||
assert isinstance(ssl_sock.cipher(), tuple) |
|||
ssl_sock.close() |
|||
|
|||
|
|||
main() |
@ -0,0 +1 @@ |
|||
b'HTTP/1.1 200 OK\r\n' |
Loading…
Reference in new issue