Browse Source

Fix kademlia (#191)

Signed-off-by: ortyomka <iurin.art@gmail.com>
Co-authored-by: turuslan <turuslan.devbox@gmail.com>
pull/193/head v0.1.7
Ruslan Tushov 2 years ago
committed by GitHub
parent
commit
48f3457dd7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      example/02-kademlia/rendezvous_chat.cpp
  2. 44
      include/libp2p/protocol/kademlia/content_id.hpp
  3. 13
      include/libp2p/protocol/kademlia/impl/put_value_executor.hpp
  4. 1
      include/libp2p/protocol/kademlia/message.hpp
  5. 2
      include/libp2p/protocol/kademlia/node_id.hpp
  6. 1
      src/protocol/kademlia/impl/CMakeLists.txt
  7. 54
      src/protocol/kademlia/impl/content_id.cpp
  8. 2
      src/protocol/kademlia/impl/find_providers_executor.cpp
  9. 2
      src/protocol/kademlia/impl/get_value_executor.cpp
  10. 75
      src/protocol/kademlia/impl/kademlia_impl.cpp
  11. 36
      src/protocol/kademlia/impl/put_value_executor.cpp
  12. 36
      src/protocol/kademlia/message.cpp
  13. 2
      test/libp2p/protocol/kademlia/content_routing_table_test.cpp

6
example/02-kademlia/rendezvous_chat.cpp

@ -283,7 +283,7 @@ int main(int argc, char *argv[]) {
host->setProtocolHandler({"/chat/1.1.0"}, handleIncomingStream);
// Key for group of chat
libp2p::protocol::kademlia::ContentId content_id("meet me here");
auto content_id = libp2p::protocol::kademlia::makeKeySha256("meet me here");
auto &scheduler = injector.create<libp2p::basic::Scheduler &>();
@ -330,8 +330,8 @@ int main(int argc, char *argv[]) {
host->start();
auto cid = libp2p::multi::ContentIdentifierCodec::decode(content_id.data)
.value();
auto cid =
libp2p::multi::ContentIdentifierCodec::decode(content_id).value();
auto peer_id =
libp2p::peer::PeerId::fromHash(cid.content_address).value();

44
include/libp2p/protocol/kademlia/content_id.hpp

@ -6,50 +6,14 @@
#ifndef LIBP2P_KADEMLIA_KADEMLIA_CONTENTID
#define LIBP2P_KADEMLIA_KADEMLIA_CONTENTID
#include <boost/optional.hpp>
#include <gsl/span>
#include <libp2p/common/byteutil.hpp>
#include <string_view>
#include <vector>
namespace libp2p::protocol::kademlia {
/// DHT key. Arbitrary bytes.
using ContentId = common::ByteArray;
/// DHT key. Contains sha256 hash of Key bytes in CIDv0 format
struct ContentId {
std::vector<uint8_t> data;
/// Ctor for consistency. Data will contain sha256 of empty input
ContentId();
explicit ContentId(std::string_view str);
explicit ContentId(const std::vector<uint8_t> &v);
bool operator<(const ContentId &x) const {
return data < x.data;
}
bool operator==(const ContentId &x) const {
return data == x.data;
}
/// Validates and stores CID received from the network
static boost::optional<ContentId> fromWire(std::string_view str);
static boost::optional<ContentId> fromWire(gsl::span<const uint8_t> bytes);
private:
struct FromWire {};
ContentId(FromWire, gsl::span<const uint8_t> bytes);
};
ContentId makeKeySha256(std::string_view str);
} // namespace libp2p::protocol::kademlia
namespace std {
template <>
struct hash<libp2p::protocol::kademlia::ContentId> {
std::size_t operator()(
const libp2p::protocol::kademlia::ContentId &x) const;
};
} // namespace std
#endif // LIBP2P_KADEMLIA_KADEMLIA_CONTENTID

13
include/libp2p/protocol/kademlia/impl/put_value_executor.hpp

@ -28,7 +28,8 @@
namespace libp2p::protocol::kademlia {
class PutValueExecutor
: public std::enable_shared_from_this<PutValueExecutor> {
: public ResponseHandler,
public std::enable_shared_from_this<PutValueExecutor> {
public:
PutValueExecutor(const Config &config, std::shared_ptr<Host> host,
std::shared_ptr<basic::Scheduler> scheduler,
@ -39,6 +40,16 @@ namespace libp2p::protocol::kademlia {
outcome::result<void> start();
/// @see ResponseHandler::responseTimeout
Time responseTimeout() const override;
/// @see ResponseHandler::match
bool match(const Message &msg) const override;
/// @see ResponseHandler::onResult
void onResult(const std::shared_ptr<Session> &session,
outcome::result<Message> msg_res) override;
private:
/// Spawns new request
void spawn();

1
include/libp2p/protocol/kademlia/message.hpp

@ -17,7 +17,6 @@ namespace libp2p::protocol::kademlia {
INVALID_CONNECTEDNESS = 1,
INVALID_PEER_ID,
INVALID_ADDRESSES,
INVALID_KEY,
};
enum class Type {

2
include/libp2p/protocol/kademlia/node_id.hpp

@ -66,7 +66,7 @@ namespace libp2p::protocol::kademlia {
}
explicit NodeId(const ContentId &content_id) {
auto digest_res = crypto::sha256(content_id.data);
auto digest_res = crypto::sha256(content_id);
BOOST_ASSERT(digest_res.has_value());
data_ = std::move(digest_res.value());
}

1
src/protocol/kademlia/impl/CMakeLists.txt

@ -17,6 +17,7 @@ libp2p_add_library(p2p_kademlia
)
target_link_libraries(p2p_kademlia
p2p_basic_scheduler
p2p_byteutil
p2p_kademlia_message
p2p_kademlia_error
)

54
src/protocol/kademlia/impl/content_id.cpp

@ -5,18 +5,12 @@
#include <libp2p/protocol/kademlia/content_id.hpp>
#include <cstring>
#include <libp2p/crypto/sha/sha256.hpp>
#include <libp2p/multi/content_identifier.hpp>
#include <libp2p/multi/content_identifier_codec.hpp>
namespace libp2p::protocol::kademlia {
ContentId::ContentId()
: data(multi::ContentIdentifierCodec::encodeCIDV0("", 0)) {}
ContentId::ContentId(std::string_view str) {
ContentId makeKeySha256(std::string_view str) {
auto digest_res = crypto::sha256(gsl::make_span(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const uint8_t *>(str.data()),
@ -27,51 +21,7 @@ namespace libp2p::protocol::kademlia {
libp2p::multi::HashType::sha256, digest_res.value());
BOOST_ASSERT(mhash_res.has_value());
data = multi::ContentIdentifierCodec::encodeCIDV1(
return multi::ContentIdentifierCodec::encodeCIDV1(
libp2p::multi::MulticodecType::Code::RAW, mhash_res.value());
}
ContentId::ContentId(const std::vector<uint8_t> &v)
: data(multi::ContentIdentifierCodec::encodeCIDV0(v.data(), v.size())) {}
boost::optional<ContentId> ContentId::fromWire(std::string_view str) {
gsl::span<const uint8_t> bytes(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const uint8_t *>(str.data()),
static_cast<ptrdiff_t>(str.size()));
return fromWire(bytes);
}
boost::optional<ContentId> ContentId::fromWire(
gsl::span<const uint8_t> bytes) {
outcome::result<multi::ContentIdentifier> cid_res =
multi::ContentIdentifierCodec::decode(bytes);
if (cid_res.has_value()) {
auto &cid = cid_res.value();
if (cid.content_address.getType() == multi::sha256) {
return ContentId(FromWire{}, bytes);
}
}
return {};
}
ContentId::ContentId(FromWire, gsl::span<const uint8_t> bytes)
: data(bytes.begin(), bytes.end()) {}
} // namespace libp2p::protocol::kademlia
namespace std {
std::size_t hash<libp2p::protocol::kademlia::ContentId>::operator()(
const libp2p::protocol::kademlia::ContentId &x) const {
size_t h = 0;
size_t sz = x.data.size();
// N.B. x.data() is a hash itself
const size_t n = sizeof(size_t);
if (sz >= n) {
memcpy(&h, &x.data[sz - n], n);
}
return h;
}
} // namespace std

2
src/protocol/kademlia/impl/find_providers_executor.cpp

@ -218,7 +218,7 @@ namespace libp2p::protocol::kademlia {
// Check if message type is appropriate
msg.type == Message::Type::kGetProviders
// Check if response is accorded to request
&& msg.key == content_id_.data;
&& msg.key == content_id_;
}
void FindProvidersExecutor::onResult(const std::shared_ptr<Session> &session,

2
src/protocol/kademlia/impl/get_value_executor.cpp

@ -207,7 +207,7 @@ namespace libp2p::protocol::kademlia {
// Check if message type is appropriate
msg.type == Message::Type::kGetValue
// Check if response is accorded to request
&& msg.key == key_.data;
&& msg.key == key_;
}
void GetValueExecutor::onResult(const std::shared_ptr<Session> &session,

75
src/protocol/kademlia/impl/kademlia_impl.cpp

@ -112,7 +112,7 @@ namespace libp2p::protocol::kademlia {
}
outcome::result<void> KademliaImpl::putValue(Key key, Value value) {
log_.debug("CALL: PutValue ({})", multi::detail::encodeBase58(key.data));
log_.debug("CALL: PutValue ({})", multi::detail::encodeBase58(key));
if (auto res = storage_->putValue(key, std::move(value));
not res.has_value()) {
@ -124,7 +124,7 @@ namespace libp2p::protocol::kademlia {
outcome::result<void> KademliaImpl::getValue(const Key &key,
FoundValueHandler handler) {
log_.debug("CALL: GetValue ({})", multi::detail::encodeBase58(key.data));
log_.debug("CALL: GetValue ({})", multi::detail::encodeBase58(key));
// Check if has actual value locally
if (auto res = storage_->getValue(key); res.has_value()) {
@ -144,7 +144,7 @@ namespace libp2p::protocol::kademlia {
outcome::result<void> KademliaImpl::provide(const Key &key,
bool need_notify) {
log_.debug("CALL: Provide ({})", multi::detail::encodeBase58(key.data));
log_.debug("CALL: Provide ({})", multi::detail::encodeBase58(key));
content_routing_table_->addProvider(key, self_id_);
@ -159,8 +159,7 @@ namespace libp2p::protocol::kademlia {
outcome::result<void> KademliaImpl::findProviders(
const Key &key, size_t limit, FoundProvidersHandler handler) {
log_.debug("CALL: FindProviders ({})",
multi::detail::encodeBase58(key.data));
log_.debug("CALL: FindProviders ({})", multi::detail::encodeBase58(key));
// Try to find locally
auto providers = content_routing_table_->getProvidersFor(key, limit);
@ -304,7 +303,7 @@ namespace libp2p::protocol::kademlia {
}
auto &[key, value, ts] = msg.record.value();
log_.debug("MSG: PutValue ({})", multi::detail::encodeBase58(key.data));
log_.debug("MSG: PutValue ({})", multi::detail::encodeBase58(key));
auto validation_res = validator_->validate(key, value);
if (not validation_res) {
@ -318,6 +317,15 @@ namespace libp2p::protocol::kademlia {
log_.warn("incoming PutValue failed: {}", res.error().message());
return;
}
// echo request
auto buffer = std::make_shared<std::vector<uint8_t>>();
if (not msg.serialize(*buffer)) {
session->close(Error::MESSAGE_SERIALIZE_ERROR);
BOOST_UNREACHABLE_RETURN();
}
session->write(buffer, {});
}
void KademliaImpl::onGetValue(const std::shared_ptr<Session> &session,
@ -327,16 +335,9 @@ namespace libp2p::protocol::kademlia {
return;
}
auto cid_res = ContentId::fromWire(msg.key);
if (!cid_res) {
log_.warn("incoming GetValue failed: invalid key in message");
return;
}
auto &cid = cid_res.value();
log_.debug("MSG: GetValue ({})", multi::detail::encodeBase58(cid.data));
log_.debug("MSG: GetValue ({})", multi::detail::encodeBase58(msg.key));
if (auto providers = content_routing_table_->getProvidersFor(cid);
if (auto providers = content_routing_table_->getProvidersFor(msg.key);
not providers.empty()) {
std::vector<Message::Peer> peers;
peers.reserve(config_.closerPeerCount);
@ -356,10 +357,10 @@ namespace libp2p::protocol::kademlia {
msg.provider_peers = std::move(peers);
}
auto res = storage_->getValue(cid);
auto res = storage_->getValue(msg.key);
if (res) {
auto &[value, expire] = res.value();
msg.record = Message::Record{std::move(cid), std::move(value),
msg.record = Message::Record{std::move(msg.key), std::move(value),
std::to_string(expire.count())};
}
@ -379,21 +380,14 @@ namespace libp2p::protocol::kademlia {
return;
}
auto cid_res = ContentId::fromWire(msg.key);
if (not cid_res) {
log_.warn("AddProvider failed: invalid key in message");
return;
}
auto &cid = cid_res.value();
log_.debug("MSG: AddProvider ({})", multi::detail::encodeBase58(cid.data));
log_.debug("MSG: AddProvider ({})", multi::detail::encodeBase58(msg.key));
auto &providers = msg.provider_peers.value();
for (auto &provider : providers) {
if (auto peer_id_res = session->stream()->remotePeerId()) {
if (peer_id_res.value() == provider.info.id) {
// Save providers who have provided themselves
content_routing_table_->addProvider(cid, provider.info.id);
content_routing_table_->addProvider(msg.key, provider.info.id);
addPeer(provider.info, false);
}
}
@ -407,17 +401,10 @@ namespace libp2p::protocol::kademlia {
return;
}
auto cid_res = ContentId::fromWire(msg.key);
if (not cid_res) {
log_.warn("GetProviders failed: invalid key in message");
return;
}
auto &cid = cid_res.value();
log_.debug("MSG: GetProviders ({})", multi::detail::encodeBase58(cid.data));
log_.debug("MSG: GetProviders ({})", multi::detail::encodeBase58(msg.key));
auto peer_ids = content_routing_table_->getProvidersFor(
cid, config_.closerPeerCount * 2);
msg.key, config_.closerPeerCount * 2);
if (not peer_ids.empty()) {
std::vector<Message::Peer> peers;
@ -441,7 +428,7 @@ namespace libp2p::protocol::kademlia {
}
peer_ids = peer_routing_table_->getNearestPeers(
NodeId(cid), config_.closerPeerCount * 2);
NodeId(msg.key), config_.closerPeerCount * 2);
if (not peer_ids.empty()) {
std::vector<Message::Peer> peers;
@ -475,6 +462,11 @@ namespace libp2p::protocol::kademlia {
void KademliaImpl::onFindNode(const std::shared_ptr<Session> &session,
Message &&msg) {
if (msg.key.empty()) {
log_.warn("FindNode failed: empty key in message");
return;
}
if (msg.closer_peers) {
for (auto &peer : msg.closer_peers.value()) {
if (peer.conn_status != Message::Connectedness::CAN_NOT_CONNECT) {
@ -493,17 +485,10 @@ namespace libp2p::protocol::kademlia {
msg.closer_peers.reset();
}
auto cid_res = ContentId::fromWire(msg.key);
if (not cid_res) {
log_.warn("FindNode failed: invalid key in message");
return;
}
auto &cid = cid_res.value();
log_.debug("MSG: FindNode ({})", multi::detail::encodeBase58(cid.data));
log_.debug("MSG: FindNode ({})", multi::detail::encodeBase58(msg.key));
auto ids = peer_routing_table_->getNearestPeers(
NodeId(cid), config_.closerPeerCount * 2);
NodeId(msg.key), config_.closerPeerCount * 2);
std::vector<Message::Peer> peers;
peers.reserve(config_.closerPeerCount);

36
src/protocol/kademlia/impl/put_value_executor.cpp

@ -132,15 +132,41 @@ namespace libp2p::protocol::kademlia {
auto session = session_host_->openSession(stream);
if (!session->write(serialized_request_, shared_from_this())) {
--requests_in_progress_;
log_.debug("write to {} failed; done {}, active {}, in queue {}", addr,
requests_succeed_, requests_in_progress_,
addressees_.size() - addressees_idx_);
spawn();
}
}
Time PutValueExecutor::responseTimeout() const {
return config_.responseTimeout;
}
bool PutValueExecutor::match(const Message &msg) const {
return
// Check if message type is appropriate
msg.type == Message::Type::kPutValue
// Check if response is accorded to request
&& msg.key == key_;
}
void PutValueExecutor::onResult(const std::shared_ptr<Session> &session,
outcome::result<Message> msg_res) {
--requests_in_progress_;
if (session->write(serialized_request_, {})) {
if (msg_res.has_error()) {
log_.debug(
"write to {} failed; done {}, active {}, in queue {}; error: {}",
session->stream()->remotePeerId().value().toBase58(),
requests_succeed_, requests_in_progress_,
addressees_.size() - addressees_idx_, msg_res.error().message());
} else {
++requests_succeed_;
log_.debug("write to {} successfuly; done {}, active {}, in queue {}",
addr, requests_succeed_, requests_in_progress_,
addressees_.size() - addressees_idx_);
} else {
log_.debug("write to {} failed; done {}, active {}, in queue {}", addr,
session->stream()->remotePeerId().value().toBase58(),
requests_succeed_, requests_in_progress_,
addressees_.size() - addressees_idx_);
}

36
src/protocol/kademlia/message.cpp

@ -19,8 +19,6 @@ OUTCOME_CPP_DEFINE_CATEGORY(libp2p::protocol::kademlia, Message::Error, e) {
return "invalid peer id";
case E::INVALID_ADDRESSES:
return "invalid peer addresses";
case E::INVALID_KEY:
return "invalid key";
}
return "unknown error (libp2p::protocol::kademlia::Message::Error)";
}
@ -48,7 +46,8 @@ namespace libp2p::protocol::kademlia {
auto peer_id_res = PeerId::fromBytes(gsl::span<const uint8_t>(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const uint8_t *>(src.id().data()), src.id().size()));
reinterpret_cast<const uint8_t *>(src.id().data()),
gsl::narrow<ptrdiff_t>(src.id().size())));
if (!peer_id_res) {
return Message::Error::INVALID_PEER_ID;
}
@ -57,7 +56,8 @@ namespace libp2p::protocol::kademlia {
for (const auto &addr : src.addrs()) {
auto res = multi::Multiaddress::create(gsl::span<const uint8_t>(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const uint8_t *>(addr.data()), addr.size()));
reinterpret_cast<const uint8_t *>(addr.data()),
gsl::narrow<ptrdiff_t>(addr.size())));
if (!res) {
return Message::Error::INVALID_ADDRESSES;
}
@ -87,16 +87,10 @@ namespace libp2p::protocol::kademlia {
}
template <class PbContainer>
outcome::result<void> assign_record(Message::Record &dst,
const PbContainer &src) {
auto ca_res = Key::fromWire(src.key());
if (!ca_res) {
return Message::Error::INVALID_KEY;
}
dst.key = std::move(ca_res.value());
void assign_record(Message::Record &dst, const PbContainer &src) {
assign_blob(dst.key, src.key());
dst.time_received = src.timereceived();
assign_blob(dst.value, src.value());
return outcome::success();
}
} // namespace
@ -113,7 +107,7 @@ namespace libp2p::protocol::kademlia {
bool Message::deserialize(const void *data, size_t sz) {
clear();
pb::Message pb_msg;
if (!pb_msg.ParseFromArray(data, sz)) {
if (!pb_msg.ParseFromArray(data, gsl::narrow<int>(sz))) {
error_message_ = "Invalid protobuf data";
return false;
}
@ -125,11 +119,7 @@ namespace libp2p::protocol::kademlia {
assign_blob(key, pb_msg.key());
if (pb_msg.has_record()) {
record.emplace();
auto res = assign_record(record.value(), pb_msg.record());
if (not res.has_value()) {
error_message_ = "Bad record: " + res.error().message();
return false;
}
assign_record(record.value(), pb_msg.record());
}
auto closer_res = assign_peers(closer_peers, pb_msg.closerpeers());
if (not closer_res) {
@ -151,7 +141,7 @@ namespace libp2p::protocol::kademlia {
if (record) {
const Record &rec_src = record.value();
pb::Record rec;
rec.set_key(rec_src.key.data.data(), rec_src.key.data.size());
rec.set_key(rec_src.key.data(), rec_src.key.size());
rec.set_value(rec_src.value.data(), rec_src.value.size());
rec.set_timereceived(rec_src.time_received);
*pb_msg.mutable_record() = std::move(rec);
@ -187,7 +177,7 @@ namespace libp2p::protocol::kademlia {
buffer.resize(prefix_sz + msg_sz);
memcpy(buffer.data(), varint_vec.data(), prefix_sz);
return pb_msg.SerializeToArray(buffer.data() + prefix_sz, // NOLINT
msg_sz);
gsl::narrow<int>(msg_sz));
}
void Message::selfAnnounce(PeerInfo self) {
@ -206,7 +196,7 @@ namespace libp2p::protocol::kademlia {
boost::optional<PeerInfo> self_announce) {
Message msg;
msg.type = Message::Type::kGetValue;
msg.key = key.data;
msg.key = key;
if (self_announce) {
msg.selfAnnounce(std::move(self_announce.value()));
}
@ -216,7 +206,7 @@ namespace libp2p::protocol::kademlia {
Message createAddProviderRequest(PeerInfo self, const Key &key) {
Message msg;
msg.type = Message::Type::kAddProvider;
msg.key = key.data;
msg.key = key;
msg.provider_peers = Message::Peers{
{Message::Peer{std::move(self), Message::Connectedness::CAN_CONNECT}}};
return msg;
@ -226,7 +216,7 @@ namespace libp2p::protocol::kademlia {
boost::optional<PeerInfo> self_announce) {
Message msg;
msg.type = Message::Type::kGetProviders;
msg.key = key.data;
msg.key = key;
if (self_announce) {
msg.selfAnnounce(std::move(self_announce.value()));
}

2
test/libp2p/protocol/kademlia/content_routing_table_test.cpp

@ -43,7 +43,7 @@ struct ContentRoutingTableTest : public ::testing::Test {
std::shared_ptr<Bus> bus_;
std::unique_ptr<ContentRoutingTable> table_;
PeerId self_id = "1"_peerid;
ContentId cid = ContentId{"content_key"};
ContentId cid = makeKeySha256("content_key");
};
template <typename A>

Loading…
Cancel
Save