diff --git a/include/libp2p/security/noise/noise_connection.hpp b/include/libp2p/security/noise/noise_connection.hpp index b1472166..39d270fb 100644 --- a/include/libp2p/security/noise/noise_connection.hpp +++ b/include/libp2p/security/noise/noise_connection.hpp @@ -75,6 +75,12 @@ namespace libp2p::connection { outcome::result remotePublicKey() const override; private: + void read(gsl::span out, size_t bytes, OperationContext ctx, + ReadCallbackFunc cb); + + void readSome(gsl::span out, size_t bytes, OperationContext ctx, + ReadCallbackFunc cb); + void write(gsl::span in, size_t bytes, OperationContext ctx, WriteCallbackFunc cb); @@ -88,7 +94,6 @@ namespace libp2p::connection { std::shared_ptr decoder_cs_; std::shared_ptr frame_buffer_; std::shared_ptr framer_; - size_t already_read_; BufferList write_buffers_; log::Logger log_ = log::createLogger("NoiseConnection"); diff --git a/src/security/noise/noise_connection.cpp b/src/security/noise/noise_connection.cpp index eaa7332f..0d0c35d9 100644 --- a/src/security/noise/noise_connection.cpp +++ b/src/security/noise/noise_connection.cpp @@ -41,8 +41,7 @@ namespace libp2p::connection { frame_buffer_{ std::make_shared(security::noise::kMaxMsgLen)}, framer_{std::make_shared( - raw_connection_, frame_buffer_)}, - already_read_{0} { + raw_connection_, frame_buffer_)} { BOOST_ASSERT(raw_connection_); BOOST_ASSERT(key_marshaller_); BOOST_ASSERT(encoder_cs_); @@ -62,24 +61,39 @@ namespace libp2p::connection { void NoiseConnection::read(gsl::span out, size_t bytes, libp2p::basic::Reader::ReadCallbackFunc cb) { + OperationContext context{.bytes_served = 0, + .total_bytes = bytes, + .write_buffer = write_buffers_.end()}; + read(out, bytes, context, std::move(cb)); + } + + void NoiseConnection::read(gsl::span out, size_t bytes, + OperationContext ctx, ReadCallbackFunc cb) { size_t out_size{out.empty() ? 0u : static_cast(out.size())}; BOOST_ASSERT(out_size >= bytes); - if (bytes == 0) { - auto n{already_read_}; - already_read_ = 0; - return cb(n); + if (0 == bytes) { + BOOST_ASSERT(ctx.bytes_served == ctx.total_bytes); + return cb(ctx.bytes_served); } readSome(out, bytes, - [self{shared_from_this()}, out, bytes, - cb{std::move(cb)}](auto _n) mutable { + [self{shared_from_this()}, out, bytes, cb{std::move(cb)}, + ctx](auto _n) mutable { OUTCOME_CB(n, _n); - self->already_read_ += n; - self->read(out.subspan(n), bytes - n, std::move(cb)); + ctx.bytes_served += n; + self->read(out.subspan(n), bytes - n, ctx, std::move(cb)); }); } void NoiseConnection::readSome(gsl::span out, size_t bytes, libp2p::basic::Reader::ReadCallbackFunc cb) { + OperationContext context{.bytes_served = 0, + .total_bytes = bytes, + .write_buffer = write_buffers_.end()}; + readSome(out, bytes, context, std::move(cb)); + } + + void NoiseConnection::readSome(gsl::span out, size_t bytes, + OperationContext ctx, ReadCallbackFunc cb) { if (not frame_buffer_->empty()) { auto n{std::min(bytes, frame_buffer_->size())}; auto begin{frame_buffer_->begin()}; @@ -88,12 +102,12 @@ namespace libp2p::connection { frame_buffer_->erase(begin, end); return cb(n); } - framer_->read([self{shared_from_this()}, out, bytes, - cb{std::move(cb)}](auto _data) mutable { + framer_->read([self{shared_from_this()}, out, bytes, cb{std::move(cb)}, + ctx](auto _data) mutable { OUTCOME_CB(data, _data); OUTCOME_CB(decrypted, self->decoder_cs_->decrypt({}, *data, {})); self->frame_buffer_->assign(decrypted.begin(), decrypted.end()); - self->readSome(out, bytes, std::move(cb)); + self->readSome(out, bytes, ctx, std::move(cb)); }); }