|
|
@ -15,6 +15,7 @@ |
|
|
|
#define OUTCOME_CB_I(var, res) \ |
|
|
|
auto && (var) = (res); \ |
|
|
|
if ((var).has_error()) { \ |
|
|
|
self->eraseWriteBuffer(ctx.write_buffer); \ |
|
|
|
return cb((var).error()); \ |
|
|
|
} |
|
|
|
|
|
|
@ -40,10 +41,7 @@ namespace libp2p::connection { |
|
|
|
frame_buffer_{ |
|
|
|
std::make_shared<common::ByteArray>(security::noise::kMaxMsgLen)}, |
|
|
|
framer_{std::make_shared<security::noise::InsecureReadWriter>( |
|
|
|
raw_connection_, frame_buffer_)}, |
|
|
|
already_read_{0}, |
|
|
|
already_wrote_{0}, |
|
|
|
plaintext_len_to_write_{0} { |
|
|
|
raw_connection_, frame_buffer_)} { |
|
|
|
BOOST_ASSERT(raw_connection_); |
|
|
|
BOOST_ASSERT(key_marshaller_); |
|
|
|
BOOST_ASSERT(encoder_cs_); |
|
|
@ -63,62 +61,89 @@ namespace libp2p::connection { |
|
|
|
|
|
|
|
void NoiseConnection::read(gsl::span<uint8_t> out, size_t bytes, |
|
|
|
libp2p::basic::Reader::ReadCallbackFunc cb) { |
|
|
|
OperationContext context{.bytes_served = 0, |
|
|
|
.total_bytes = bytes, |
|
|
|
.write_buffer = write_buffers_.end()}; |
|
|
|
read(out, bytes, context, std::move(cb)); |
|
|
|
} |
|
|
|
|
|
|
|
void NoiseConnection::read(gsl::span<uint8_t> out, size_t bytes, |
|
|
|
OperationContext ctx, ReadCallbackFunc cb) { |
|
|
|
size_t out_size{out.empty() ? 0u : static_cast<size_t>(out.size())}; |
|
|
|
BOOST_ASSERT(out_size >= bytes); |
|
|
|
if (bytes == 0) { |
|
|
|
auto n{already_read_}; |
|
|
|
already_read_ = 0; |
|
|
|
return cb(n); |
|
|
|
if (0 == bytes) { |
|
|
|
BOOST_ASSERT(ctx.bytes_served == ctx.total_bytes); |
|
|
|
return cb(ctx.bytes_served); |
|
|
|
} |
|
|
|
readSome(out, bytes, |
|
|
|
[self{shared_from_this()}, out, bytes, |
|
|
|
cb{std::move(cb)}](auto _n) mutable { |
|
|
|
[self{shared_from_this()}, out, bytes, cb{std::move(cb)}, |
|
|
|
ctx](auto _n) mutable { |
|
|
|
OUTCOME_CB(n, _n); |
|
|
|
self->already_read_ += n; |
|
|
|
self->read(out.subspan(n), bytes - n, std::move(cb)); |
|
|
|
ctx.bytes_served += n; |
|
|
|
self->read(out.subspan(n), bytes - n, ctx, std::move(cb)); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
void NoiseConnection::readSome(gsl::span<uint8_t> out, size_t bytes, |
|
|
|
libp2p::basic::Reader::ReadCallbackFunc cb) { |
|
|
|
OperationContext context{.bytes_served = 0, |
|
|
|
.total_bytes = bytes, |
|
|
|
.write_buffer = write_buffers_.end()}; |
|
|
|
readSome(out, bytes, context, std::move(cb)); |
|
|
|
} |
|
|
|
|
|
|
|
void NoiseConnection::readSome(gsl::span<uint8_t> out, size_t bytes, |
|
|
|
OperationContext ctx, ReadCallbackFunc cb) { |
|
|
|
if (not frame_buffer_->empty()) { |
|
|
|
auto n{std::min(bytes, frame_buffer_->size())}; |
|
|
|
auto begin{frame_buffer_->begin()}; |
|
|
|
auto end{begin + n}; |
|
|
|
auto end{begin + static_cast<int64_t>(n)}; |
|
|
|
std::copy(begin, end, out.begin()); |
|
|
|
frame_buffer_->erase(begin, end); |
|
|
|
return cb(n); |
|
|
|
} |
|
|
|
framer_->read([self{shared_from_this()}, out, bytes, |
|
|
|
cb{std::move(cb)}](auto _data) mutable { |
|
|
|
framer_->read([self{shared_from_this()}, out, bytes, cb{std::move(cb)}, |
|
|
|
ctx](auto _data) mutable { |
|
|
|
OUTCOME_CB(data, _data); |
|
|
|
OUTCOME_CB(decrypted, self->decoder_cs_->decrypt({}, *data, {})); |
|
|
|
self->frame_buffer_->assign(decrypted.begin(), decrypted.end()); |
|
|
|
self->readSome(out, bytes, std::move(cb)); |
|
|
|
self->readSome(out, bytes, ctx, std::move(cb)); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
void NoiseConnection::write(gsl::span<const uint8_t> in, size_t bytes, |
|
|
|
libp2p::basic::Writer::WriteCallbackFunc cb) { |
|
|
|
if (0 == plaintext_len_to_write_) { |
|
|
|
plaintext_len_to_write_ = bytes; |
|
|
|
} |
|
|
|
if (bytes == 0) { |
|
|
|
BOOST_ASSERT(already_wrote_ >= plaintext_len_to_write_); |
|
|
|
auto n{plaintext_len_to_write_}; |
|
|
|
already_wrote_ = 0; |
|
|
|
plaintext_len_to_write_ = 0; |
|
|
|
return cb(n); |
|
|
|
OperationContext context{.bytes_served = 0, |
|
|
|
.total_bytes = bytes, |
|
|
|
.write_buffer = write_buffers_.end()}; |
|
|
|
write(in, bytes, context, std::move(cb)); |
|
|
|
} |
|
|
|
|
|
|
|
void NoiseConnection::write(gsl::span<const uint8_t> in, size_t bytes, |
|
|
|
NoiseConnection::OperationContext ctx, |
|
|
|
basic::Writer::WriteCallbackFunc cb) { |
|
|
|
auto *self{this}; // for OUTCOME_CB
|
|
|
|
if (0 == bytes) { |
|
|
|
BOOST_ASSERT(ctx.bytes_served >= ctx.total_bytes); |
|
|
|
eraseWriteBuffer(ctx.write_buffer); |
|
|
|
return cb(ctx.total_bytes); |
|
|
|
} |
|
|
|
auto n{std::min(bytes, security::noise::kMaxPlainText)}; |
|
|
|
OUTCOME_CB(encrypted, encoder_cs_->encrypt({}, in.subspan(0, n), {})); |
|
|
|
writing_ = std::move(encrypted); |
|
|
|
framer_->write(writing_, |
|
|
|
[self{shared_from_this()}, in{in.subspan(n)}, |
|
|
|
bytes{bytes - n}, cb{std::move(cb)}](auto _n) mutable { |
|
|
|
if (write_buffers_.end() == ctx.write_buffer) { |
|
|
|
constexpr auto dummy_size = 1; |
|
|
|
constexpr auto dummy_value = 0x0; |
|
|
|
ctx.write_buffer = |
|
|
|
write_buffers_.emplace(write_buffers_.end(), dummy_size, dummy_value); |
|
|
|
} |
|
|
|
ctx.write_buffer->swap(encrypted); |
|
|
|
framer_->write( |
|
|
|
*ctx.write_buffer, |
|
|
|
[self{shared_from_this()}, in{in.subspan(static_cast<int64_t>(n))}, |
|
|
|
bytes{bytes - n}, cb{std::move(cb)}, ctx](auto _n) mutable { |
|
|
|
OUTCOME_CB(n, _n); |
|
|
|
self->already_wrote_ += n; |
|
|
|
self->write(in, bytes, std::move(cb)); |
|
|
|
ctx.bytes_served += n; |
|
|
|
self->write(in, bytes, ctx, std::move(cb)); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
@ -165,4 +190,12 @@ namespace libp2p::connection { |
|
|
|
const { |
|
|
|
return remote_; |
|
|
|
} |
|
|
|
|
|
|
|
void NoiseConnection::eraseWriteBuffer(BufferList::iterator &iterator) { |
|
|
|
if (write_buffers_.end() == iterator) { |
|
|
|
return; |
|
|
|
} |
|
|
|
write_buffers_.erase(iterator); |
|
|
|
iterator = write_buffers_.end(); |
|
|
|
} |
|
|
|
} // namespace libp2p::connection
|
|
|
|