Browse Source

Outbound streams optimistic creation (#131)

* yamux connection sends window update msg on new outbound streams

* optimistic outbound stream creation added

* fix
pull/132/head
art-gor 4 years ago
committed by GitHub
parent
commit
c76b5f091e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      include/libp2p/connection/capable_connection.hpp
  2. 3
      include/libp2p/host/basic_host/basic_host.hpp
  3. 11
      include/libp2p/host/host.hpp
  4. 2
      include/libp2p/muxer/mplex/mplexed_connection.hpp
  5. 2
      include/libp2p/muxer/yamux/yamuxed_connection.hpp
  6. 8
      include/libp2p/network/dialer.hpp
  7. 3
      include/libp2p/network/impl/dialer_impl.hpp
  8. 6
      src/host/basic_host/basic_host.cpp
  9. 23
      src/muxer/mplex/mplexed_connection.cpp
  10. 34
      src/muxer/yamux/yamuxed_connection.cpp
  11. 23
      src/network/impl/dialer_impl.cpp
  12. 3
      test/mock/libp2p/connection/capable_connection_mock.hpp
  13. 3
      test/mock/libp2p/host/host_mock.hpp
  14. 3
      test/mock/libp2p/network/dialer_mock.hpp

6
include/libp2p/connection/capable_connection.hpp

@ -41,6 +41,12 @@ namespace libp2p::connection {
*/
virtual void stop() = 0;
/**
* @brief Opens new stream in a synchronous (optimistic) manner
* @return Stream or error
*/
virtual outcome::result<std::shared_ptr<Stream>> newStream() = 0;
/**
* @brief Opens new stream using this connection
* @param cb - callback to be called, when a new stream is established or

3
include/libp2p/host/basic_host/basic_host.hpp

@ -61,6 +61,9 @@ namespace libp2p::host {
const StreamResultHandler &handler,
std::chrono::milliseconds timeout) override;
void newStream(const peer::PeerId &peer_id, const peer::Protocol &protocol,
const StreamResultHandler &handler) override;
outcome::result<void> listen(const multi::Multiaddress &ma) override;
outcome::result<void> closeListener(const multi::Multiaddress &ma) override;

11
include/libp2p/host/host.hpp

@ -176,6 +176,17 @@ namespace libp2p {
std::chrono::milliseconds::zero());
}
/**
* @brief Open new stream to the peer {@param peer} with protocol
* {@param protocol} in optimistic way. Assuming that connection exists.
* @param peer stream will be opened to this peer
* @param protocol "speak" using this protocol
* @param handler callback, will be executed on success or fail
*/
virtual void newStream(
const peer::PeerId &peer_id, const peer::Protocol &protocol,
const StreamResultHandler &handler) = 0;
/**
* @brief Create listener on given multiaddress.
* @param ma address

2
include/libp2p/muxer/mplex/mplexed_connection.hpp

@ -46,6 +46,8 @@ namespace libp2p::connection {
void stop() override;
outcome::result<std::shared_ptr<Stream>> newStream() override;
void newStream(StreamHandlerFunc cb) override;
void onStream(NewStreamHandlerFunc cb) override;

2
include/libp2p/muxer/yamux/yamuxed_connection.hpp

@ -48,6 +48,8 @@ namespace libp2p::connection {
void stop() override;
outcome::result<std::shared_ptr<Stream>> newStream() override;
void newStream(StreamHandlerFunc cb) override;
void onStream(NewStreamHandlerFunc cb) override;

8
include/libp2p/network/dialer.hpp

@ -59,6 +59,14 @@ namespace libp2p::network {
newStream(peer_info, protocol, std::move(cb),
std::chrono::milliseconds::zero());
}
/**
* NewStream returns a new stream to given peer p.
* If there is no connection to p, returns error.
*/
virtual void newStream(const peer::PeerId &peer_id,
const peer::Protocol &protocol,
StreamResultFunc cb) = 0;
};
} // namespace libp2p::network

3
include/libp2p/network/impl/dialer_impl.hpp

@ -33,6 +33,9 @@ namespace libp2p::network {
StreamResultFunc cb,
std::chrono::milliseconds timeout) override;
void newStream(const peer::PeerId &peer_id, const peer::Protocol &protocol,
StreamResultFunc cb) override;
private:
std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect_;
std::shared_ptr<TransportManager> tmgr_;

6
src/host/basic_host/basic_host.cpp

@ -119,6 +119,12 @@ namespace libp2p::host {
network_->getDialer().newStream(p, protocol, handler, timeout);
}
void BasicHost::newStream(const peer::PeerId &peer_id,
const peer::Protocol &protocol,
const StreamResultHandler &handler) {
network_->getDialer().newStream(peer_id, protocol, handler);
}
outcome::result<void> BasicHost::listen(const multi::Multiaddress &ma) {
return network_->getListener().listen(ma);
}

23
src/muxer/mplex/mplexed_connection.cpp

@ -48,6 +48,25 @@ namespace libp2p::connection {
log_->info("stopping an mplex connection");
}
outcome::result<std::shared_ptr<Stream>> MplexedConnection::newStream() {
if (!is_active_) {
return Error::CONNECTION_INACTIVE;
}
if (streams_.size() >= config_.maximum_streams) {
return Error::TOO_MANY_STREAMS;
}
StreamId new_stream_id{last_issued_stream_number_++, true};
auto new_stream_frame =
createFrameBytes(MplexFrame::Flag::NEW_STREAM, new_stream_id.number);
write({std::move(new_stream_frame), [](auto &&) {}});
auto new_stream =
std::make_shared<MplexStream>(shared_from_this(), new_stream_id);
streams_[new_stream_id] = new_stream;
return new_stream;
}
void MplexedConnection::newStream(StreamHandlerFunc cb) {
// TODO(107): Reentrancy
@ -138,12 +157,12 @@ namespace libp2p::connection {
}
void MplexedConnection::deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) {
ReadCallbackFunc cb) {
connection_->deferReadCallback(res, std::move(cb));
}
void MplexedConnection::deferWriteCallback(std::error_code ec,
WriteCallbackFunc cb) {
WriteCallbackFunc cb) {
connection_->deferWriteCallback(ec, std::move(cb));
}

34
src/muxer/yamux/yamuxed_connection.cpp

@ -87,6 +87,29 @@ namespace libp2p::connection {
started_ = false;
}
outcome::result<std::shared_ptr<Stream>> YamuxedConnection::newStream() {
if (!started_) {
return YamuxError::CONNECTION_STOPPED;
}
if (streams_.size() >= config_.maximum_streams) {
return YamuxError::TOO_MANY_STREAMS;
}
auto stream_id = new_stream_id_;
new_stream_id_ += 2;
enqueue(newStreamMsg(stream_id));
// Now we self-acked the new stream
auto stream = std::make_shared<YamuxStream>(
shared_from_this(), *this, stream_id, config_.maximum_window_size,
basic::WriteQueue::kDefaultSizeLimit);
streams_[stream_id] = stream;
return stream;
}
void YamuxedConnection::newStream(StreamHandlerFunc cb) {
if (!started_) {
return connection_->deferWriteCallback(
@ -410,13 +433,16 @@ namespace libp2p::connection {
return true;
}
} else if (streams_.count(frame.stream_id) != 0) {
log()->debug("received ACK on existing stream id");
ok = false;
} else {
auto it = pending_outbound_streams_.find(frame.stream_id);
if (it == pending_outbound_streams_.end()) {
log()->debug("received ACK on unknown stream id");
if (streams_.count(frame.stream_id) != 0) {
// Stream was opened in optimistic manner
log()->debug("ignoring received ACK on existing stream id {}",
frame.stream_id);
return true;
}
log()->debug("received ACK on unknown stream id {}", frame.stream_id);
ok = false;
}
stream_handler = std::move(it->second);

23
src/network/impl/dialer_impl.cpp

@ -131,14 +131,31 @@ namespace libp2p::network {
}
this->multiselect_->simpleStreamNegotiate(
rstream.value(),
protocol,
std::move(cb));
rstream.value(), protocol, std::move(cb));
});
},
timeout);
}
void DialerImpl::newStream(const peer::PeerId &peer_id,
const peer::Protocol &protocol,
StreamResultFunc cb) {
// REENTRANT, fix
auto conn = cmgr_->getBestConnectionForPeer(peer_id);
if (!conn) {
return cb(std::errc::not_connected);
}
auto result = conn->newStream();
if (!result) {
return cb(result);
}
multiselect_->simpleStreamNegotiate(result.value(), protocol,
std::move(cb));
}
DialerImpl::DialerImpl(
std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect,
std::shared_ptr<TransportManager> tmgr,

3
test/mock/libp2p/connection/capable_connection_mock.hpp

@ -16,6 +16,8 @@ namespace libp2p::connection {
public:
~CapableConnectionMock() override = default;
MOCK_METHOD0(newStream, outcome::result<std::shared_ptr<Stream>>());
MOCK_METHOD1(newStream, void(StreamHandlerFunc));
MOCK_METHOD1(onStream, void(NewStreamHandlerFunc));
@ -59,6 +61,7 @@ namespace libp2p::connection {
~CapableConnBasedOnRawConnMock() override = default;
MOCK_METHOD0(newStream, outcome::result<std::shared_ptr<Stream>>());
MOCK_METHOD1(newStream, void(StreamHandlerFunc));
MOCK_METHOD1(onStream, void(NewStreamHandlerFunc));

3
test/mock/libp2p/host/host_mock.hpp

@ -42,6 +42,9 @@ namespace libp2p {
void(const peer::PeerInfo &p, const peer::Protocol &protocol,
const StreamResultHandler &handler,
std::chrono::milliseconds));
MOCK_METHOD3(newStream,
void(const peer::PeerId &p, const peer::Protocol &protocol,
const StreamResultHandler &handler));
MOCK_METHOD1(listen, outcome::result<void>(const multi::Multiaddress &ma));
MOCK_METHOD1(closeListener,
outcome::result<void>(const multi::Multiaddress &ma));

3
test/mock/libp2p/network/dialer_mock.hpp

@ -24,6 +24,9 @@ namespace libp2p::network {
MOCK_METHOD4(newStream,
void(const peer::PeerInfo &, const peer::Protocol &,
StreamResultFunc, std::chrono::milliseconds));
MOCK_METHOD3(newStream,
void(const peer::PeerId &, const peer::Protocol &,
StreamResultFunc));
};
} // namespace libp2p::network

Loading…
Cancel
Save