Browse Source

kad put (#274)

Signed-off-by: turuslan <turuslan.devbox@gmail.com>
pull/275/head
Ruslan Tushov 4 weeks ago
committed by GitHub
parent
commit
d85fdc6a51
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      example/02-kademlia/rendezvous_chat.cpp
  2. 3
      include/libp2p/protocol/kademlia/common.hpp
  3. 3
      include/libp2p/protocol/kademlia/config.hpp
  4. 2
      include/libp2p/protocol/kademlia/impl/executors_factory.hpp
  5. 6
      include/libp2p/protocol/kademlia/impl/find_peer_executor.hpp
  6. 2
      include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp
  7. 2
      include/libp2p/protocol/kademlia/message.hpp
  8. 14
      include/libp2p/protocol/kademlia/node_id.hpp
  9. 4
      src/peer/address_repository/inmem_address_repository.cpp
  10. 6
      src/protocol/kademlia/impl/add_provider_executor.cpp
  11. 30
      src/protocol/kademlia/impl/find_peer_executor.cpp
  12. 6
      src/protocol/kademlia/impl/find_providers_executor.cpp
  13. 6
      src/protocol/kademlia/impl/get_value_executor.cpp
  14. 48
      src/protocol/kademlia/impl/kademlia_impl.cpp
  15. 4
      src/protocol/kademlia/message.cpp
  16. 5
      test/libp2p/transport/tcp/tcp_integration_test.cpp

2
example/02-kademlia/rendezvous_chat.cpp

@ -337,7 +337,7 @@ int main(int argc, char *argv[]) {
auto peer_id =
libp2p::peer::PeerId::fromHash(cid.content_address).value();
[[maybe_unused]] auto res = kademlia->findPeer(peer_id, [&](auto) {
[[maybe_unused]] auto res = kademlia->findPeer(peer_id, [&](auto, auto) {
// Say to world about his providing
provide();

3
include/libp2p/protocol/kademlia/common.hpp

@ -26,7 +26,8 @@ namespace libp2p::protocol::kademlia {
using Time = std::chrono::milliseconds;
using ValueAndTime = std::pair<Value, Time>;
using FoundPeerInfoHandler = std::function<void(outcome::result<PeerInfo>)>;
using FoundPeerInfoHandler =
std::function<void(outcome::result<PeerInfo>, std::vector<PeerId>)>;
using FoundProvidersHandler =
std::function<void(outcome::result<std::vector<PeerInfo>>)>;
using FoundValueHandler = std::function<void(outcome::result<Value>)>;

3
include/libp2p/protocol/kademlia/config.hpp

@ -148,6 +148,9 @@ namespace libp2p::protocol::kademlia {
// https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120
size_t query_initial_peers = K_VALUE;
// https://github.com/libp2p/rust-libp2p/blob/9a45db3f82b760c93099e66ec77a7a772d1f6cd3/protocols/kad/src/query/peers/closest.rs#L336-L346
size_t replication_factor = K_VALUE;
};
} // namespace libp2p::protocol::kademlia

2
include/libp2p/protocol/kademlia/impl/executors_factory.hpp

@ -46,7 +46,7 @@ namespace libp2p::protocol::kademlia {
ContentId sought_key, FoundProvidersHandler handler) = 0;
virtual std::shared_ptr<FindPeerExecutor> createFindPeerExecutor(
PeerId peer_id, FoundPeerInfoHandler handler) = 0;
HashedKey key, FoundPeerInfoHandler handler) = 0;
};
} // namespace libp2p::protocol::kademlia

6
include/libp2p/protocol/kademlia/impl/find_peer_executor.hpp

@ -35,7 +35,7 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
PeerId peer_id,
HashedKey target,
FoundPeerInfoHandler handler);
~FindPeerExecutor() override;
@ -70,9 +70,9 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<SessionHost> session_host_;
// Secondary
const PeerId sought_peer_id_;
const NodeId target_;
HashedKey target_;
std::unordered_set<PeerId> nearest_peer_ids_;
std::vector<PeerId> succeeded_peers_;
FoundPeerInfoHandler handler_;
// Auxiliary

2
include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp

@ -112,7 +112,7 @@ namespace libp2p::protocol::kademlia {
ContentId content_id, FoundProvidersHandler handler) override;
std::shared_ptr<FindPeerExecutor> createFindPeerExecutor(
PeerId peer_id, FoundPeerInfoHandler handler) override;
HashedKey key, FoundPeerInfoHandler handler) override;
outcome::result<void> findRandomPeer() override;
void randomWalk();

2
include/libp2p/protocol/kademlia/message.hpp

@ -77,7 +77,7 @@ namespace libp2p::protocol::kademlia {
Message createGetProvidersRequest(const Key &key,
boost::optional<PeerInfo> self_announce);
Message createFindNodeRequest(const PeerId &node,
Message createFindNodeRequest(Key key,
boost::optional<PeerInfo> self_announce);
} // namespace libp2p::protocol::kademlia

14
include/libp2p/protocol/kademlia/node_id.hpp

@ -10,6 +10,7 @@
#include <climits>
#include <cstring>
#include <memory>
#include <optional>
#include <span>
#include <vector>
@ -106,4 +107,17 @@ namespace libp2p::protocol::kademlia {
Hash256 data_;
};
struct HashedKey {
HashedKey(Key key, std::optional<PeerId> peer)
: key{std::move(key)},
hash{NodeId::hash(this->key)},
peer{std::move(peer)} {}
// NOLINTNEXTLINE(google-explicit-constructor)
HashedKey(Key key) : HashedKey{std::move(key), std::nullopt} {}
// NOLINTNEXTLINE(google-explicit-constructor)
HashedKey(const PeerId &peer) : HashedKey{peer.toVector(), peer} {}
Key key;
NodeId hash;
std::optional<PeerId> peer;
};
} // namespace libp2p::protocol::kademlia

4
src/peer/address_repository/inmem_address_repository.cpp

@ -123,9 +123,9 @@ namespace libp2p::peer {
auto &addresses = *peer_it->second;
auto expires_at = Clock::now() + ttl;
std::for_each(addresses.begin(), addresses.end(), [expires_at](auto &item) {
for (auto &item : addresses) {
item.second = expires_at;
});
}
return outcome::success();
} // namespace libp2p::peer

6
src/protocol/kademlia/impl/add_provider_executor.cpp

@ -36,9 +36,9 @@ namespace libp2p::protocol::kademlia {
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_);
}
log_.debug("created");
}

30
src/protocol/kademlia/impl/find_peer_executor.cpp

@ -25,25 +25,24 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
PeerId sought_peer_id,
HashedKey target,
FoundPeerInfoHandler handler)
: config_(config),
host_(std::move(host)),
scheduler_(std::move(scheduler)),
session_host_(std::move(session_host)),
sought_peer_id_(std::move(sought_peer_id)),
target_(sought_peer_id_),
target_{std::move(target)},
handler_(std::move(handler)),
log_("KademliaExecutor", "kademlia", "FindPeer", ++instance_number) {
auto nearest_peer_ids = peer_routing_table->getNearestPeers(
target_, config_.query_initial_peers);
target_.hash, config_.query_initial_peers);
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_.hash);
}
log_.debug("created");
}
@ -69,7 +68,7 @@ namespace libp2p::protocol::kademlia {
}
Message request =
createFindNodeRequest(sought_peer_id_, std::move(self_announce));
createFindNodeRequest(target_.key, std::move(self_announce));
if (!request.serialize(*serialized_request_)) {
done_ = true;
return Error::MESSAGE_SERIALIZE_ERROR;
@ -100,7 +99,7 @@ namespace libp2p::protocol::kademlia {
} else {
log_.debug("done: {}", result.error());
}
handler_(result);
handler_(result, std::move(succeeded_peers_));
}
void FindPeerExecutor::spawn() {
@ -220,8 +219,6 @@ namespace libp2p::protocol::kademlia {
return
// Check if message type is appropriate
msg.type == Message::Type::kFindNode;
// Check if response is accorded to request
// && msg.key == sought_peer_id_.toVector()
}
void FindPeerExecutor::onResult(const std::shared_ptr<Session> &session,
@ -258,6 +255,8 @@ namespace libp2p::protocol::kademlia {
requests_in_progress_,
queue_.size());
succeeded_peers_.emplace_back(remote_peer_id);
// Append gotten peer to queue
if (msg.closer_peers) {
for (auto &peer : msg.closer_peers.value()) {
@ -283,7 +282,7 @@ namespace libp2p::protocol::kademlia {
}
// Found
if (peer.info.id == sought_peer_id_) {
if (peer.info.id == target_.peer) {
done(peer.info);
}
@ -299,10 +298,15 @@ namespace libp2p::protocol::kademlia {
// New peer add to queue
if (auto [it, ok] = nearest_peer_ids_.emplace(peer.info.id); ok) {
queue_.emplace(*it, target_);
queue_.emplace(*it, target_.hash);
}
}
}
if (succeeded_peers_.size() >= config_.replication_factor) {
// https://github.com/libp2p/rust-libp2p/blob/9a45db3f82b760c93099e66ec77a7a772d1f6cd3/protocols/kad/src/query/peers/closest.rs#L336-L346
done(Error::VALUE_NOT_FOUND);
}
}
} // namespace libp2p::protocol::kademlia

6
src/protocol/kademlia/impl/find_providers_executor.cpp

@ -48,9 +48,9 @@ namespace libp2p::protocol::kademlia {
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_);
}
log_.debug("created");
}

6
src/protocol/kademlia/impl/get_value_executor.cpp

@ -59,9 +59,9 @@ namespace libp2p::protocol::kademlia {
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_);
}
received_records_ = std::make_unique<Table>();
log_.debug("created");

48
src/protocol/kademlia/impl/kademlia_impl.cpp

@ -129,12 +129,23 @@ namespace libp2p::protocol::kademlia {
outcome::result<void> KademliaImpl::putValue(Key key, Value value) {
log_.debug("CALL: PutValue ({})", multi::detail::encodeBase58(key));
if (auto res = storage_->putValue(key, std::move(value));
not res.has_value()) {
return res.as_failure();
}
return outcome::success();
OUTCOME_TRY(storage_->putValue(key, value));
// `FindPeerExecutor` holds itself using `shared_from_this`
return createFindPeerExecutor(
key,
[weak_self{weak_from_this()}, key, value](
const outcome::result<PeerInfo> &,
std::vector<PeerId> succeeded_peers) {
auto self = weak_self.lock();
if (not self) {
return;
}
std::ignore = self->createPutValueExecutor(
key, value, std::move(succeeded_peers))
->start();
})
->start();
}
outcome::result<void> KademliaImpl::getValue(const Key &key,
@ -274,7 +285,7 @@ namespace libp2p::protocol::kademlia {
if (not peer_info.addresses.empty()) {
scheduler_->schedule(
[handler = std::move(handler), peer_info = std::move(peer_info)] {
handler(peer_info);
handler(peer_info, {});
});
log_.debug("{} found locally", peer_id.toBase58());
@ -330,7 +341,7 @@ namespace libp2p::protocol::kademlia {
return;
}
auto res = putValue(key, value);
auto res = storage_->putValue(key, value);
if (!res) {
log_.warn("incoming PutValue failed: {}", res.error());
return;
@ -555,14 +566,15 @@ namespace libp2p::protocol::kademlia {
multi::Multihash::create(multi::HashType::sha256, hash).value())
.value();
FoundPeerInfoHandler handler =
[wp = weak_from_this()](outcome::result<PeerInfo> res) {
if (auto self = wp.lock()) {
if (res.has_value()) {
self->addPeer(res.value(), false);
}
}
};
FoundPeerInfoHandler handler = [wp = weak_from_this()](
outcome::result<PeerInfo> res,
const std::vector<PeerId> &) {
if (auto self = wp.lock()) {
if (res.has_value()) {
self->addPeer(res.value(), false);
}
}
};
return findPeer(peer_id, handler);
}
@ -682,13 +694,13 @@ namespace libp2p::protocol::kademlia {
}
std::shared_ptr<FindPeerExecutor> KademliaImpl::createFindPeerExecutor(
PeerId peer_id, FoundPeerInfoHandler handler) {
HashedKey key, FoundPeerInfoHandler handler) {
return std::make_shared<FindPeerExecutor>(config_,
host_,
scheduler_,
shared_from_this(),
peer_routing_table_,
std::move(peer_id),
std::move(key),
std::move(handler));
}

4
src/protocol/kademlia/message.cpp

@ -225,11 +225,11 @@ namespace libp2p::protocol::kademlia {
return msg;
}
Message createFindNodeRequest(const PeerId &node,
Message createFindNodeRequest(Key key,
boost::optional<PeerInfo> self_announce) {
Message msg;
msg.type = Message::Type::kFindNode;
msg.key = node.toVector();
msg.key = std::move(key);
if (self_announce) {
msg.selfAnnounce(std::move(self_announce.value()));
}

5
test/libp2p/transport/tcp/tcp_integration_test.cpp

@ -193,8 +193,9 @@ TEST(TCP, SingleListenerCanAcceptManyClients) {
});
context->run_for(500ms);
std::for_each(
clients.begin(), clients.end(), [](std::thread &t) { t.join(); });
for (auto &t : clients) {
t.join();
}
ASSERT_EQ(counter, kClients) << "not all clients' requests were handled";
}

Loading…
Cancel
Save