diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index c0b9926..b7caa62 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -1,4 +1,5 @@ from .muxed_connection_interface import IMuxedConn +from transport.stream.Stream import Stream class MuxedConn(IMuxedConn): """ @@ -26,12 +27,14 @@ class MuxedConn(IMuxedConn): """ pass - def open_stream(self, protocol_id, stream_name): + def open_stream(self, protocol_id, stream_name, peer_id, multi_addr): """ creates a new muxed_stream :return: a new stream """ - pass + + return Stream(peer_id, multi_addr, self) + def accept_stream(self): """ diff --git a/network/swarm.py b/network/swarm.py index bc63c90..edd4446 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -1,6 +1,6 @@ from .network_interface import INetwork -from ..connection.muxed_connection import MuxedConnection -from ..connection.raw_connection import RawConnection +from muxer.mplex.muxed_connection import MuxedConn +from transport.connection.raw_connection import RawConnection class Swarm(INetwork): @@ -41,7 +41,7 @@ class Swarm(INetwork): muxed_connection = MuxedConnection(conn, True) else: raise Exception("No IP and port in addr") - return muxed_connection.open_stream(protocol_id, "") + return muxed_connection.open_stream(protocol_id, "", peer_id, addrs) def listen(self, *args): """ diff --git a/transport/connection/raw_connection.py b/transport/connection/raw_connection.py index e93e19e..efc3706 100644 --- a/transport/connection/raw_connection.py +++ b/transport/connection/raw_connection.py @@ -3,14 +3,20 @@ from .raw_connection_interface import IRawConnection class RawConnection(IRawConnection): - def __init__(self, ip, port): + def __init__(self, ip, port, reader, writer): self.conn_ip = ip self.conn_port = port - self.reader, self.writer = self.open_connection() + self.reader = reader + self.writer = writer - async def open_connection(self): - """ - opens a connection on self.ip and self.port - :return: a raw connection - """ - return await asyncio.open_connection(self.conn_ip, self.conn_port) + # def __init__(self, ip, port): + # self.conn_ip = ip + # self.conn_port = port + # self.reader, self.writer = self.open_connection() + + # async def open_connection(self): + # """ + # opens a connection on self.ip and self.port + # :return: a raw connection + # """ + # return await asyncio.open_connection(self.conn_ip, self.conn_port) diff --git a/transport/connection/raw_connection_interface.py b/transport/connection/raw_connection_interface.py index 8c07a95..2684355 100644 --- a/transport/connection/raw_connection_interface.py +++ b/transport/connection/raw_connection_interface.py @@ -6,10 +6,10 @@ class IRawConnection(ABC): open_connection should return such a connection """ - @abstractmethod - async def open_connection(self): - """ - opens a connection on ip and port - :return: a raw connection - """ - pass + # @abstractmethod + # async def open_connection(self): + # """ + # opens a connection on ip and port + # :return: a raw connection + # """ + # pass diff --git a/transport/stream/stream.py b/transport/stream/stream.py index cce6ad6..34daf97 100644 --- a/transport/stream/stream.py +++ b/transport/stream/stream.py @@ -4,13 +4,17 @@ from .stream_interface import IStream class Stream(IStream): def __init__(self, peer_id, multi_addr, connection): - IStream.__init__(self, peer_id, multi_addr) + IStream.__init__(self, peer_id, multi_addr, connection) self.peer_id = peer_id - stream_ip = multi_addr.get_protocol_value("ip4") - stream_port = multi_addr.get_protocol_value("tcp") + self.multi_addr = multi_addr + + self.stream_ip = multi_addr.get_protocol_value("ip4") + self.stream_port = multi_addr.get_protocol_value("tcp") + self.reader = connection.reader self.writer = connection.writer + # TODO should construct protocol id from constructor self.protocol_id = None diff --git a/transport/tcp/tcp.py b/transport/tcp/tcp.py index 6ea962d..7981ede 100644 --- a/transport/tcp/tcp.py +++ b/transport/tcp/tcp.py @@ -1,6 +1,7 @@ import asyncio -from .transport_interface import ITransport -from .listener_interface import IListener +from transport.transport_interface import ITransport +from transport.listener_interface import IListener +from transport.connection.raw_connection import RawConnection class TCP(ITransport): @@ -67,9 +68,10 @@ class TCP(ITransport): :return: True if successful """ _multiaddr_dict = multiaddr.to_dict() - reader, writer = await asyncio.open_connection(_multiaddr_dict.host,\ - _multiaddr_dict.port) - return False + host = _multiaddr_dict.host + port = _multiaddr_dict.port + reader, writer = open_conn(host, port) + return RawConnection(host, port, reader, writer) # TODO dial behavior not fully understood def create_listener(self, handler_function, options=None): @@ -81,3 +83,7 @@ class TCP(ITransport): :return: a listener object that implements listener_interface.py """ return self.Listener(handler_function) + +async def open_conn(host, port): + reader, writer = await asyncio.open_connection(host, port) + return reader, writer