Browse Source

Fix Dialer (#170)

* Fix dialer to establish only one outgoing connection by peer

Signed-off-by: Igor Egorov <igor@soramitsu.co.jp>
pull/171/head
Igor Egorov 3 years ago
committed by GitHub
parent
commit
c97b3959e7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      include/libp2p/network/impl/dialer_impl.hpp
  2. 4
      include/libp2p/network/impl/network_impl.hpp
  3. 222
      src/network/impl/dialer_impl.cpp
  4. 2
      src/network/impl/network_impl.cpp
  5. 4
      src/peer/address_repository/inmem_address_repository.cpp
  6. 18
      test/libp2p/network/dialer_test.cpp

42
include/libp2p/network/impl/dialer_impl.hpp

@ -6,6 +6,9 @@
#ifndef LIBP2P_DIALER_IMPL_HPP
#define LIBP2P_DIALER_IMPL_HPP
#include <set>
#include <unordered_map>
#include <libp2p/basic/scheduler.hpp>
#include <libp2p/network/connection_manager.hpp>
#include <libp2p/network/dialer.hpp>
@ -15,7 +18,8 @@
namespace libp2p::network {
class DialerImpl : public Dialer {
class DialerImpl : public Dialer,
public std::enable_shared_from_this<DialerImpl> {
public:
~DialerImpl() override = default;
@ -39,11 +43,47 @@ namespace libp2p::network {
StreamResultFunc cb) override;
private:
// A context to handle an intermediary state of the peer we are dialing to
// but the connection is not yet established
struct DialCtx {
/// Known and scheduled addresses to try to dial via
std::set<multi::Multiaddress> addresses;
/// Timeout for a single connection attempt
std::chrono::milliseconds timeout;
/// Addresses we already tried, but no connection was established
std::set<multi::Multiaddress> tried_addresses;
/// Callbacks for all who requested a connection to the peer
std::vector<Dialer::DialResultFunc> callbacks;
/// Result temporary storage to propagate via callbacks
boost::optional<DialResult> result;
// ^ used when all connecting attempts failed and no more known peer
// addresses are left
// indicates that at least one attempt to dial was happened
// (at least one supported network transport was found and used)
bool dialled = false;
};
// Perform a single attempt to dial to the peer via the next known address
void rotate(const peer::PeerId &peer_id);
// Finalize dialing to the peer and propagate a given result to all
// connection requesters
void completeDial(const peer::PeerId &peer_id, const DialResult &result);
std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect_;
std::shared_ptr<TransportManager> tmgr_;
std::shared_ptr<ConnectionManager> cmgr_;
std::shared_ptr<ListenerManager> listener_;
std::shared_ptr<basic::Scheduler> scheduler_;
log::Logger log_;
// peers we are currently dialing to
std::unordered_map<peer::PeerId, DialCtx> dialing_peers_;
};
} // namespace libp2p::network

4
include/libp2p/network/impl/network_impl.hpp

@ -16,7 +16,7 @@ namespace libp2p::network {
~NetworkImpl() override = default;
NetworkImpl(std::shared_ptr<ListenerManager> listener,
std::unique_ptr<Dialer> dialer,
std::shared_ptr<Dialer> dialer,
std::shared_ptr<ConnectionManager> cmgr);
void closeConnections(const peer::PeerId &p) override;
@ -29,7 +29,7 @@ namespace libp2p::network {
private:
std::shared_ptr<ListenerManager> listener_;
std::unique_ptr<Dialer> dialer_;
std::shared_ptr<Dialer> dialer_;
std::shared_ptr<ConnectionManager> cmgr_;
};

222
src/network/impl/dialer_impl.cpp

@ -3,24 +3,38 @@
* SPDX-License-Identifier: Apache-2.0
*/
#include <functional>
#include <iostream>
#include <libp2p/connection/stream.hpp>
#include <libp2p/log/logger.hpp>
#include <libp2p/network/impl/dialer_impl.hpp>
#define TRACE_ENABLED 0
#include <libp2p/common/trace.hpp>
namespace libp2p::network {
void DialerImpl::dial(const peer::PeerInfo &p, DialResultFunc cb,
std::chrono::milliseconds timeout) {
SL_TRACE(log_, "Dialing to {}", p.id.toBase58().substr(46));
if (auto c = cmgr_->getBestConnectionForPeer(p.id); c != nullptr) {
// we have connection to this peer
TRACE("reusing connection to peer {}", p.id.toBase58().substr(46));
scheduler_->schedule([cb{std::move(cb)}, c{std::move(c)}] () mutable {
cb(std::move(c));
});
SL_TRACE(log_, "Reusing connection to peer {}",
p.id.toBase58().substr(46));
scheduler_->schedule(
[cb{std::move(cb)}, c{std::move(c)}]() mutable { cb(std::move(c)); });
return;
}
if (auto ctx = dialing_peers_.find(p.id); dialing_peers_.end() != ctx) {
SL_TRACE(log_, "Dialing to {} is already in progress",
p.id.toBase58().substr(46));
// populate known addresses for in-progress dial if any new appear
for (const auto &addr : p.addresses) {
if (0 == ctx->second.tried_addresses.count(addr)) {
ctx->second.addresses.insert(addr);
}
}
ctx->second.callbacks.emplace_back(std::move(cb));
return;
}
@ -28,83 +42,111 @@ namespace libp2p::network {
// did user supply its addresses in {@param p}?
if (p.addresses.empty()) {
// we don't have addresses of peer p
scheduler_->schedule([cb{std::move(cb)}] {
cb(std::errc::destination_address_required);
});
scheduler_->schedule(
[cb{std::move(cb)}] { cb(std::errc::destination_address_required); });
return;
}
struct DialHandlerCtx {
bool connected;
size_t calls_remain;
DialResultFunc cb;
DialCtx new_ctx{.addresses = {p.addresses.begin(), p.addresses.end()},
.timeout = timeout};
new_ctx.callbacks.emplace_back(std::move(cb));
bool scheduled = dialing_peers_.emplace(p.id, std::move(new_ctx)).second;
BOOST_ASSERT(scheduled);
rotate(p.id);
}
void DialerImpl::rotate(const peer::PeerId &peer_id) {
auto ctx_found = dialing_peers_.find(peer_id);
if (dialing_peers_.end() == ctx_found) {
SL_ERROR(log_, "State inconsistency - cannot dial {}",
peer_id.toBase58());
return;
}
auto &&ctx = ctx_found->second;
if (ctx.addresses.empty() and not ctx.dialled) {
completeDial(peer_id, std::errc::address_family_not_supported);
return;
}
if (ctx.addresses.empty() and ctx.result.has_value()) {
completeDial(peer_id, ctx.result.value());
return;
}
if (ctx.addresses.empty()) {
// this would never happen. Previous if-statement should work instead'
completeDial(peer_id, std::errc::host_unreachable);
return;
}
DialHandlerCtx(size_t addresses, DialResultFunc cb)
: connected(false), calls_remain(addresses), cb{std::move(cb)} {}
};
auto handler_ctx = std::make_shared<DialHandlerCtx>(p.addresses.size(), cb);
auto dial_handler =
[listener{listener_}, ctx{std::move(handler_ctx)}](
[wp{weak_from_this()}, peer_id](
outcome::result<std::shared_ptr<connection::CapableConnection>>
connection_result) {
--ctx->calls_remain;
if (ctx->connected) {
// we already got connected to the peer via some other address
if (connection_result) {
// lets close the redundant connection if so
auto &&conn = connection_result.value();
if (not conn->isClosed()) {
auto close_res = conn->close();
result) {
if (auto self = wp.lock()) {
auto ctx_found = self->dialing_peers_.find(peer_id);
if (self->dialing_peers_.end() == ctx_found) {
SL_ERROR(
self->log_,
"State inconsistency - uninteresting dial result for peer {}",
peer_id.toBase58());
if (result.has_value() and not result.value()->isClosed()) {
auto close_res = result.value()->close();
BOOST_ASSERT(close_res);
}
} // otherwise we don't care about any failure since that was going
// to be a redundant connection to the moment
return;
}
if (result.has_value()) {
self->listener_->onConnection(result);
self->completeDial(peer_id, result);
return;
}
// store an error otherwise and reschedule one more rotate
ctx_found->second.result = std::move(result);
self->scheduler_->schedule([wp, peer_id] {
if (auto self = wp.lock()) {
self->rotate(peer_id);
}
});
return;
}
if (connection_result) {
// we've got the first successful connection to the peer, hooray!
ctx->connected = true;
// allow the connection accept inbound streams
listener->onConnection(connection_result);
// return connection to the user
ctx->cb(connection_result.value());
return;
// closing the connection when dialer and connection requester
// callback no more exist
if (result.has_value() and not result.value()->isClosed()) {
auto close_res = result.value()->close();
BOOST_ASSERT(close_res);
}
// here we handle failed attempt to connect
if (0 == ctx->calls_remain) {
// that was the last attempt to connect and we are still not
// connected so lets report an error to the user
ctx->cb(connection_result.error());
return;
} // otherwise we don't care about this particular failure because at
// the end we either would get connected or will do the same -
// report the error to the user inside the callback of the last
// dial attempt
};
bool dialled{false};
// for all multiaddresses supplied in peerinfo
for (auto &&ma : p.addresses) {
// try to find best possible transport
if (auto tr = this->tmgr_->findBest(ma); tr != nullptr) {
// we can dial to this peer!
dialled = true;
// dial using best transport
tr->dial(p.id, ma, dial_handler, timeout);
// All the dials are still to be executed sequentially within the single
// boost::asio::io_context. Immediate spawn of all of them allows us to
// have the closest timeout to the specified instead of
// NumberOfMultiaddresses multiplied by a single timeout value.
}
auto first_addr = ctx.addresses.begin();
const auto addr = *first_addr;
ctx.tried_addresses.insert(addr);
ctx.addresses.erase(first_addr);
if (auto tr = tmgr_->findBest(addr); nullptr != tr) {
ctx.dialled = true;
SL_TRACE(log_, "Dial to {} via {}", peer_id.toBase58().substr(46),
addr.getStringAddress());
tr->dial(peer_id, addr, dial_handler, ctx.timeout);
} else {
scheduler_->schedule([wp{weak_from_this()}, peer_id] {
if (auto self = wp.lock()) {
self->rotate(peer_id);
}
});
}
}
if (not dialled) {
// we did not find supported transport
scheduler_->schedule([cb{std::move(cb)}] {
cb(std::errc::address_family_not_supported);
});
void DialerImpl::completeDial(const peer::PeerId &peer_id,
const DialResult &result) {
if (auto ctx_found = dialing_peers_.find(peer_id);
dialing_peers_.end() != ctx_found) {
auto &&ctx = ctx_found->second;
for (auto i = 0u; i < ctx.callbacks.size(); ++i) {
scheduler_->schedule(
[result, cb{std::move(ctx.callbacks[i])}] { cb(result); });
}
dialing_peers_.erase(ctx_found);
}
}
@ -112,10 +154,11 @@ namespace libp2p::network {
const peer::Protocol &protocol,
StreamResultFunc cb,
std::chrono::milliseconds timeout) {
// 1. make new connection or reuse existing
this->dial(
SL_TRACE(log_, "New stream to {} for {} (peer info)",
p.id.toBase58().substr(46), protocol);
dial(
p,
[this, cb{std::move(cb)}, protocol](
[self{shared_from_this()}, cb{std::move(cb)}, protocol](
outcome::result<std::shared_ptr<connection::CapableConnection>>
rconn) mutable {
if (!rconn) {
@ -123,18 +166,14 @@ namespace libp2p::network {
}
auto &&conn = rconn.value();
// 2. open new stream on that connection
conn->newStream(
[this, cb{std::move(cb)},
protocol](outcome::result<std::shared_ptr<connection::Stream>>
rstream) mutable {
if (!rstream) {
return cb(rstream.error());
}
this->multiselect_->simpleStreamNegotiate(
rstream.value(), protocol, std::move(cb));
});
auto result = conn->newStream();
if (!result) {
self->scheduler_->schedule(
[cb{std::move(cb)}, result] { cb(result); });
return;
}
self->multiselect_->simpleStreamNegotiate(result.value(), protocol,
std::move(cb));
},
timeout);
}
@ -142,19 +181,18 @@ namespace libp2p::network {
void DialerImpl::newStream(const peer::PeerId &peer_id,
const peer::Protocol &protocol,
StreamResultFunc cb) {
SL_TRACE(log_, "New stream to {} for {} (peer id)",
peer_id.toBase58().substr(46), protocol);
auto conn = cmgr_->getBestConnectionForPeer(peer_id);
if (!conn) {
scheduler_->schedule([cb{std::move(cb)}] {
cb(std::errc::not_connected);
});
scheduler_->schedule(
[cb{std::move(cb)}] { cb(std::errc::not_connected); });
return;
}
auto result = conn->newStream();
if (!result) {
scheduler_->schedule([cb{std::move(cb)}, result] {
cb(result);
});
scheduler_->schedule([cb{std::move(cb)}, result] { cb(result); });
return;
}
@ -172,12 +210,14 @@ namespace libp2p::network {
tmgr_(std::move(tmgr)),
cmgr_(std::move(cmgr)),
listener_(std::move(listener)),
scheduler_(std::move(scheduler)) {
scheduler_(std::move(scheduler)),
log_(log::createLogger("DialerImpl", "network")) {
BOOST_ASSERT(multiselect_ != nullptr);
BOOST_ASSERT(tmgr_ != nullptr);
BOOST_ASSERT(cmgr_ != nullptr);
BOOST_ASSERT(listener_ != nullptr);
BOOST_ASSERT(scheduler_ != nullptr);
BOOST_ASSERT(log_ != nullptr);
}
} // namespace libp2p::network

2
src/network/impl/network_impl.cpp

@ -24,7 +24,7 @@ namespace libp2p::network {
}
NetworkImpl::NetworkImpl(std::shared_ptr<ListenerManager> listener,
std::unique_ptr<Dialer> dialer,
std::shared_ptr<Dialer> dialer,
std::shared_ptr<ConnectionManager> cmgr)
: listener_(std::move(listener)),
dialer_(std::move(dialer)),

4
src/peer/address_repository/inmem_address_repository.cpp

@ -99,8 +99,8 @@ namespace libp2p::peer {
auto expires_at = Clock::now() + ttl;
for (const auto &m : ma) {
auto [addr_it, added] = addresses.emplace(m, expires_at);
if (added) {
auto [addr_it, emplaced] = addresses.emplace(m, expires_at);
if (emplaced) {
signal_added_(p, m);
added = true;
} else {

18
test/libp2p/network/dialer_test.cpp

@ -20,6 +20,7 @@
#include "mock/libp2p/transport/transport_mock.hpp"
#include "testutil/gmock_actions.hpp"
#include "testutil/outcome.hpp"
#include "testutil/prepare_loggers.hpp"
using namespace libp2p;
using namespace network;
@ -36,6 +37,12 @@ using ::testing::Eq;
using ::testing::Return;
struct DialerTest : public ::testing::Test {
void SetUp() override {
testutil::prepareLoggers();
dialer = std::make_shared<DialerImpl>(proto_muxer, tmgr, cmgr, listener,
scheduler);
}
std::shared_ptr<StreamMock> stream = std::make_shared<StreamMock>();
std::shared_ptr<CapableConnectionMock> connection =
@ -60,8 +67,7 @@ struct DialerTest : public ::testing::Test {
std::shared_ptr<Scheduler> scheduler =
std::make_shared<SchedulerImpl>(scheduler_backend, Scheduler::Config{});
std::shared_ptr<Dialer> dialer = std::make_shared<DialerImpl>(
proto_muxer, tmgr, cmgr, listener, scheduler);
std::shared_ptr<Dialer> dialer;
multi::Multiaddress ma1 = "/ip4/127.0.0.1/tcp/1"_multiaddr;
multi::Multiaddress ma2 = "/ip4/127.0.0.1/tcp/2"_multiaddr;
@ -236,7 +242,7 @@ TEST_F(DialerTest, DialExistingConnection) {
///
/**
* @given existing connection to peer
* @given no connections to peer
* @when newStream is executed
* @then get failure
*/
@ -248,7 +254,7 @@ TEST_F(DialerTest, NewStreamFailed) {
// report random error.
// we simulate a case when "newStream" gets error
outcome::result<std::shared_ptr<Stream>> r = std::errc::io_error;
EXPECT_CALL(*connection, newStream(_)).WillOnce(Arg0CallbackWithArg(r));
EXPECT_CALL(*connection, newStream()).WillOnce(Return(r));
bool executed = false;
dialer->newStream(pinfo, protocol, [&](auto &&rstream) {
@ -275,7 +281,7 @@ TEST_F(DialerTest, NewStreamNegotiationFailed) {
.WillOnce(Return(connection));
// newStream returns valid stream
EXPECT_CALL(*connection, newStream(_)).WillOnce(Arg0CallbackWithArg(stream));
EXPECT_CALL(*connection, newStream()).WillOnce(Return(stream));
outcome::result<std::shared_ptr<Stream>> r = std::errc::io_error;
@ -307,7 +313,7 @@ TEST_F(DialerTest, NewStreamSuccess) {
.WillOnce(Return(connection));
// newStream returns valid stream
EXPECT_CALL(*connection, newStream(_)).WillOnce(Arg0CallbackWithArg(stream));
EXPECT_CALL(*connection, newStream()).WillOnce(Return(stream));
EXPECT_CALL(*proto_muxer, simpleStreamNegotiate(_, protocol, _))
.WillOnce(Arg2CallbackWithArg(stream));

Loading…
Cancel
Save