Browse Source

Merge pull request #52 from zixuanzh/libp2p-new-node

End to end messaging bugfixes
pull/55/head
stuckinaboot 6 years ago
committed by GitHub
parent
commit
b4272918d9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      host/basic_host.py
  2. 9
      libp2p/libp2p.py
  3. 76
      muxer/mplex/muxed_connection.py
  4. 8
      muxer/mplex/muxed_stream.py
  5. 2
      muxer/mplex/utils.py
  6. 2
      network/multiaddr.py
  7. 8
      network/stream/net_stream.py
  8. 58
      network/swarm.py
  9. 1
      requirements.txt
  10. 47
      tests/libp2p/test_libp2p.py
  11. 35
      transport/tcp/tcp.py

5
host/basic_host.py

@ -11,7 +11,6 @@ class BasicHost(IHost):
def __init__(self, _network): def __init__(self, _network):
self.network = _network self.network = _network
self.peerstore = self.network.peerstore self.peerstore = self.network.peerstore
# self.stream_handlers = {}
def get_id(self): def get_id(self):
""" """
@ -49,13 +48,13 @@ class BasicHost(IHost):
# protocol_id can be a list of protocol_ids # protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
def new_stream(self, peer_id, protocol_id): async def new_stream(self, peer_id, protocol_id):
""" """
:param peer_id: peer_id that host is connecting :param peer_id: peer_id that host is connecting
:param proto_id: protocol id that stream runs on :param proto_id: protocol id that stream runs on
:return: true if successful :return: true if successful
""" """
# TODO: host should return a mux stream not a raw stream # TODO: host should return a mux stream not a raw stream
stream = self.network.new_stream(peer_id) stream = await self.network.new_stream(peer_id, protocol_id)
stream.set_protocol(protocol_id) stream.set_protocol(protocol_id)
return stream return stream

9
libp2p/libp2p.py

@ -8,7 +8,7 @@ from Crypto.PublicKey import RSA
class Libp2p(object): class Libp2p(object):
def __init__(self, idOpt = None, \ def __init__(self, idOpt = None, \
transportOpt = ["/ip4/127.0.0.1/tcp/10000"], \ transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \
muxerOpt = ["mplex/6.7.0"], \ muxerOpt = ["mplex/6.7.0"], \
secOpt = ["secio"], \ secOpt = ["secio"], \
peerstore = PeerStore()): peerstore = PeerStore()):
@ -25,16 +25,15 @@ class Libp2p(object):
self.secOpt = secOpt self.secOpt = secOpt
self.peerstore = peerstore self.peerstore = peerstore
def new_node(self): async def new_node(self):
upgrader = TransportUpgrader(self.secOpt, self.transportOpt) upgrader = TransportUpgrader(self.secOpt, self.transportOpt)
swarm = Swarm(self.idOpt, self.peerstore, upgrader) swarm = Swarm(self.idOpt, self.peerstore, upgrader)
tcp = TCP() tcp = TCP()
swarm.add_transport(tcp) swarm.add_transport(tcp)
swarm.listen(self.transportOpts) await swarm.listen(self.transportOpt[0])
host = BasicHost(swarm) host = BasicHost(swarm)
# TODO MuxedConnection currently contains all muxing logic # TODO MuxedConnection currently contains all muxing logic (move to a Muxer)
# TODO routing unimplemented # TODO routing unimplemented
return host return host

76
muxer/mplex/muxed_connection.py

@ -17,8 +17,9 @@ class MuxedConn(IMuxedConn):
self.initiator = initiator self.initiator = initiator
self.buffers = {} self.buffers = {}
self.streams = {} self.streams = {}
self.stream_queue = asyncio.Queue()
self.add_incoming_task() asyncio.ensure_future(self.handle_incoming())
def close(self): def close(self):
""" """
@ -33,7 +34,12 @@ class MuxedConn(IMuxedConn):
""" """
pass pass
def read_buffer(self, stream_id): async def read_buffer(self, stream_id):
# Empty buffer or nonexistent stream
# TODO: propagate up timeout exception and catch
if stream_id not in self.buffers or not self.buffers[stream_id]:
await self.handle_incoming()
data = self.buffers[stream_id] data = self.buffers[stream_id]
self.buffers[stream_id] = bytearray() self.buffers[stream_id] = bytearray()
return data return data
@ -43,37 +49,22 @@ class MuxedConn(IMuxedConn):
creates a new muxed_stream creates a new muxed_stream
:return: a new stream :return: a new stream
""" """
stream = MuxedStream(peer_id, multi_addr, self) stream = MuxedStream(stream_id, multi_addr, self)
self.streams[stream_id] = stream self.streams[stream_id] = stream
self.buffers[stream_id] = bytearray()
return stream return stream
def accept_stream(self): async def accept_stream(self):
""" """
accepts a muxed stream opened by the other end accepts a muxed stream opened by the other end
:return: the accepted stream :return: the accepted stream
""" """
data = bytearray()
while True:
chunk = self.raw_conn.reader.read(100)
if not chunk:
break
data += chunk
header, end_index = decode_uvarint(data, 0)
length, end_index = decode_uvarint(data, end_index)
message = data[end_index, end_index + length]
flag = header & 0x07
stream_id = header >> 3
# TODO update to pull out protocol_id from message # TODO update to pull out protocol_id from message
protocol_id = "/echo/1.0.0" protocol_id = "/echo/1.0.0"
stream_id = await self.stream_queue.get()
stream = MuxedStream(stream_id, False, self) stream = MuxedStream(stream_id, False, self)
return stream, stream_id, protocol_id return stream, stream_id, protocol_id
def send_message(self, flag, data, stream_id): async def send_message(self, flag, data, stream_id):
""" """
sends a message over the connection sends a message over the connection
:param header: header to use :param header: header to use
@ -86,7 +77,8 @@ class MuxedConn(IMuxedConn):
header = encode_uvarint(header) header = encode_uvarint(header)
data_length = encode_uvarint(len(data)) data_length = encode_uvarint(len(data))
_bytes = header + data_length + data _bytes = header + data_length + data
return self.write_to_stream(_bytes)
return await self.write_to_stream(_bytes)
async def write_to_stream(self, _bytes): async def write_to_stream(self, _bytes):
self.raw_conn.writer.write(_bytes) self.raw_conn.writer.write(_bytes)
@ -95,25 +87,23 @@ class MuxedConn(IMuxedConn):
async def handle_incoming(self): async def handle_incoming(self):
data = bytearray() data = bytearray()
while True: try:
chunk = self.raw_conn.reader.read(100) chunk = await asyncio.wait_for(self.raw_conn.reader.read(1024), timeout=5)
if not chunk:
break
data += chunk data += chunk
header, end_index = decode_uvarint(data, 0)
length, end_index = decode_uvarint(data, end_index) header, end_index = decode_uvarint(data, 0)
message = data[end_index, end_index + length] length, end_index = decode_uvarint(data, end_index)
# Deal with other types of messages message = data[end_index:end_index + length + 1]
flag = header & 0x07
stream_id = header >> 3 # Deal with other types of messages
flag = header & 0x07
self.buffers[stream_id] = self.buffers[stream_id] + message stream_id = header >> 3
# Read header
# Read message length if stream_id not in self.buffers:
# Read message into corresponding buffer self.buffers[stream_id] = message
await self.stream_queue.put(stream_id)
def add_incoming_task(self): else:
loop = asyncio.get_event_loop() self.buffers[stream_id] = self.buffers[stream_id] + message
handle_incoming_task = loop.create_task(self.handle_incoming()) except asyncio.TimeoutError:
handle_incoming_task.add_done_callback(self.add_incoming_task) print('timeout!')

8
muxer/mplex/muxed_stream.py

@ -36,19 +36,19 @@ class MuxedStream(IMuxedStream):
else: else:
return HEADER_TAGS[action] - 1 return HEADER_TAGS[action] - 1
def read(self): async def read(self):
""" """
read messages associated with stream from buffer til end of file read messages associated with stream from buffer til end of file
:return: bytes of input :return: bytes of input
""" """
return self.muxed_conn.read_buffer(self.id) return await self.muxed_conn.read_buffer(self.id)
def write(self, data): async def write(self, data):
""" """
write to stream write to stream
:return: number of bytes written :return: number of bytes written
""" """
return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id)
def close(self): def close(self):
""" """

2
muxer/mplex/utils.py

@ -22,4 +22,4 @@ def decode_uvarint(buff, index):
break break
index += 1 index += 1
return result, index return result, index + 1

2
network/multiaddr.py

@ -2,6 +2,8 @@ class MultiAddr:
# Validates input string and constructs internal representation. # Validates input string and constructs internal representation.
def __init__(self, addr): def __init__(self, addr):
self.protocol_map = dict()
# Empty multiaddrs are valid. # Empty multiaddrs are valid.
if not addr: if not addr:
self.protocol_map = dict() self.protocol_map = dict()

8
network/stream/net_stream.py

@ -19,19 +19,19 @@ class NetStream(INetStream):
""" """
self.protocol_id = protocol_id self.protocol_id = protocol_id
def read(self): async def read(self):
""" """
read from stream read from stream
:return: bytes of input until EOF :return: bytes of input until EOF
""" """
return self.muxed_stream.read() return await self.muxed_stream.read()
def write(self, bytes): async def write(self, bytes):
""" """
write to stream write to stream
:return: number of bytes written :return: number of bytes written
""" """
return self.muxed_stream.write(bytes) return await self.muxed_stream.write(bytes)
def close(self): def close(self):
""" """

58
network/swarm.py

@ -22,13 +22,20 @@ class Swarm(INetwork):
""" """
self.stream_handlers[protocol_id] = stream_handler self.stream_handlers[protocol_id] = stream_handler
def new_stream(self, peer_id, protocol_id): async def new_stream(self, peer_id, protocol_id):
""" """
:param peer_id: peer_id of destination :param peer_id: peer_id of destination
:param protocol_id: protocol id :param protocol_id: protocol id
:return: net stream instance :return: net stream instance
""" """
muxed_conn = None # Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
if not addrs:
raise SwarmException("No known addresses to peer")
multiaddr = addrs[0]
if peer_id in self.connections: if peer_id in self.connections:
""" """
If muxed connection already exists for peer_id, If muxed connection already exists for peer_id,
@ -37,14 +44,8 @@ class Swarm(INetwork):
""" """
muxed_conn = self.connections[peer_id] muxed_conn = self.connections[peer_id]
else: else:
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
# Transport dials peer (gets back a raw conn) # Transport dials peer (gets back a raw conn)
if not addrs: raw_conn = await self.transport.dial(MultiAddr(multiaddr))
raise SwarmException("No known addresses to peer")
first_addr = addrs[0]
raw_conn = self.transport.dial(first_addr)
# Use upgrader to upgrade raw conn to muxed conn # Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True) muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)
@ -54,28 +55,30 @@ class Swarm(INetwork):
# Use muxed conn to open stream, which returns # Use muxed conn to open stream, which returns
# a muxed stream # a muxed stream
stream_id = str(uuid.uuid4()) # TODO: use better stream IDs
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, first_addr) stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr)
# Create a net stream # Create a net stream
net_stream = NetStream(muxed_stream) net_stream = NetStream(muxed_stream)
return net_stream return net_stream
def listen(self, *args): async def listen(self, *args):
""" """
:param *args: one or many multiaddrs to start listening on :param *args: one or many multiaddrs to start listening on
:return: true if at least one success :return: true if at least one success
""" """
"""
# For each multiaddr in args For each multiaddr in args
# Check if a listener for multiaddr exists already Check if a listener for multiaddr exists already
# If listener already exists, continue If listener already exists, continue
# Otherwise, do the following: Otherwise:
# Pass multiaddr into conn handler Capture multiaddr in conn handler
# Have conn handler delegate to stream handler Have conn handler delegate to stream handler
# Call listener listen with the multiaddr Call listener listen with the multiaddr
# Map multiaddr to listener Map multiaddr to listener
"""
for multiaddr_str in args: for multiaddr_str in args:
if multiaddr_str in self.listeners: if multiaddr_str in self.listeners:
return True return True
@ -83,27 +86,28 @@ class Swarm(INetwork):
multiaddr = MultiAddr(multiaddr_str) multiaddr = MultiAddr(multiaddr_str)
multiaddr_dict = multiaddr.to_options() multiaddr_dict = multiaddr.to_options()
def conn_handler(reader, writer): async def conn_handler(reader, writer):
# Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr) # Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr)
raw_conn = RawConnection(multiaddr_dict.host, multiaddr_dict.port, reader, writer) raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False) muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
muxed_stream, stream_id, protocol_id = muxed_conn.accept_stream() muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream()
net_stream = NetStream(muxed_stream) net_stream = NetStream(muxed_stream)
net_stream.set_protocol(protocol_id) net_stream.set_protocol(protocol_id)
# Give to stream handler # Give to stream handler
# TODO: handle case when stream handler is set # TODO: handle case when stream handler is set
self.stream_handlers[protocol_id](net_stream) # TODO: handle case of multiple protocols over same raw connection
await self.stream_handlers[protocol_id](net_stream)
try: try:
# Success # Success
listener = self.transport.create_listener(conn_handler) listener = self.transport.create_listener(conn_handler)
listener.listen(multiaddr) await listener.listen(multiaddr)
return True return True
except IOError: except IOError:
# Failed. Continue looping. # Failed. Continue looping.
print("Failed to connect to: " + multiaddr) print("Failed to connect to: " + str(multiaddr))
# No multiaddr succeeded # No multiaddr succeeded
return False return False

1
requirements.txt

@ -2,3 +2,4 @@ asyncio
pylint pylint
pytest pytest
pycryptodome pycryptodome
pytest-asyncio

47
tests/libp2p/test_libp2p.py

@ -1,30 +1,39 @@
import pytest
from libp2p.libp2p import Libp2p from libp2p.libp2p import Libp2p
@pytest.mark.asyncio
async def test_simple_messages():
libA = Libp2p(transportOpt=["/ip4/127.0.0.1/tcp/8001/ipfs/hostA"])
libB = Libp2p(transportOpt=["/ip4/127.0.0.1/tcp/8000/ipfs/hostB"])
hostA = await libA.new_node()
hostB = await libB.new_node()
# TODO: are these connections async? how do we wait on responses? async def stream_handler(stream):
def test_simple_messages(): while True:
lib = Libp2p() read_string = (await stream.read()).decode()
print("host B received:" + read_string)
hostA = lib.new_node() response = "ack:" + read_string
hostB = lib.new_node() print("sending response:" + response)
await stream.write(response.encode())
def stream_handler(stream): hostB.set_stream_handler("/echo/1.0.0", stream_handler)
print("stream received in host B")
read_string = stream.read().decode() # Associate the peer with local ip address (see default parameters of Libp2p())
print("host B received: " + read_string) hostA.get_peerstore().add_addr("hostB", "/ip4/127.0.0.1/tcp/8000", 10)
response = "ack: " + read_string stream = await hostA.new_stream("hostB", "/echo/1.0.0")
stream.write(response.encode()) messages = ["hello" + str(x) for x in range(10)]
hostB.set_stream_handler("/echo/1.0.0", stream_handler) for message in messages:
await stream.write(message.encode())
# associate the peer with local ip address (see default parameters of Libp2p()) response = (await stream.read()).decode()
hostA.get_peerstore().add_addr("hostB", "/ip4/127.0.0.1/tcp/10000")
stream = hostA.new_stream("hostB", "/app/1.0.0") print("res: " + response)
message = "hello" assert response == ("ack:" + message)
stream.write(message.encode())
response = stream.read().decode() # Success, terminate pending tasks.
assert response == ("ack: " + message) return

35
transport/tcp/tcp.py

@ -13,25 +13,26 @@ class TCP(ITransport):
def __init__(self, handler_function=None): def __init__(self, handler_function=None):
self.multiaddrs = [] self.multiaddrs = []
self.server = None self.server = None
self.handler = staticmethod(handler_function) self.handler = handler_function
def listen(self, multiaddr): async def listen(self, multiaddr):
""" """
put listener in listening mode and wait for incoming connections put listener in listening mode and wait for incoming connections
:param multiaddr: multiaddr of peer :param multiaddr: multiaddr of peer
:return: return True if successful :return: return True if successful
""" """
_multiaddr = multiaddr
# TODO check for exceptions # TODO check for exceptions
if "ipfs" in multiaddr.get_protocols(): if "ipfs" in _multiaddr.get_protocols():
# ipfs_id = multiaddr.get_ipfs_id() # ipfs_id = multiaddr.get_ipfs_id()
_multiaddr = multiaddr.remove_protocol("ipfs") _multiaddr.remove_protocol("ipfs")
self.multiaddrs.append(multiaddr) self.multiaddrs.append(_multiaddr)
multiaddr_dict = _multiaddr.to_options() multiaddr_dict = _multiaddr.to_options()
loop = asyncio.get_event_loop() coroutine = asyncio.start_server(self.handler, multiaddr_dict['host'],\
coroutine = asyncio.start_server(self.handler, multiaddr_dict.host,\ multiaddr_dict['port'])
multiaddr_dict.port, loop=loop) self.server = await coroutine
self.server = loop.run_until_complete(coroutine)
return True return True
def get_addrs(self): def get_addrs(self):
@ -59,17 +60,19 @@ class TCP(ITransport):
self.server = None self.server = None
return True return True
def dial(self, multiaddr, options=None): async def dial(self, multiaddr, options=None):
""" """
dial a transport to peer listening on multiaddr dial a transport to peer listening on multiaddr
:param multiaddr: multiaddr of peer :param multiaddr: multiaddr of peer
:param options: optional object :param options: optional object
:return: True if successful :return: True if successful
""" """
_multiaddr_dict = multiaddr.to_dict() _multiaddr_dict = multiaddr.to_options()
host = _multiaddr_dict.host host = _multiaddr_dict['host']
port = _multiaddr_dict.port port = _multiaddr_dict['port']
reader, writer = open_conn(host, port)
reader, writer = await asyncio.open_connection(host, port)
return RawConnection(host, port, reader, writer) return RawConnection(host, port, reader, writer)
def create_listener(self, handler_function, options=None): def create_listener(self, handler_function, options=None):
@ -81,7 +84,3 @@ class TCP(ITransport):
:return: a listener object that implements listener_interface.py :return: a listener object that implements listener_interface.py
""" """
return self.Listener(handler_function) return self.Listener(handler_function)
async def open_conn(host, port):
reader, writer = await asyncio.open_connection(host, port)
return reader, writer

Loading…
Cancel
Save