@ -12,10 +12,11 @@
# define UNIQUE_NAME(base) base##__LINE__
# endif // UNIQUE_NAME
# define OUTCOME_CB_I(var, res) \
auto & & ( var ) = ( res ) ; \
if ( ( var ) . has_error ( ) ) { \
return cb ( ( var ) . error ( ) ) ; \
# define OUTCOME_CB_I(var, res) \
auto & & ( var ) = ( res ) ; \
if ( ( var ) . has_error ( ) ) { \
self - > eraseWriteBuffer ( ctx . write_buffer ) ; \
return cb ( ( var ) . error ( ) ) ; \
}
# define OUTCOME_CB_NAME_I(var, val, res) \
@ -40,10 +41,7 @@ namespace libp2p::connection {
frame_buffer_ {
std : : make_shared < common : : ByteArray > ( security : : noise : : kMaxMsgLen ) } ,
framer_ { std : : make_shared < security : : noise : : InsecureReadWriter > (
raw_connection_ , frame_buffer_ ) } ,
already_read_ { 0 } ,
already_wrote_ { 0 } ,
plaintext_len_to_write_ { 0 } {
raw_connection_ , frame_buffer_ ) } {
BOOST_ASSERT ( raw_connection_ ) ;
BOOST_ASSERT ( key_marshaller_ ) ;
BOOST_ASSERT ( encoder_cs_ ) ;
@ -63,63 +61,90 @@ namespace libp2p::connection {
void NoiseConnection : : read ( gsl : : span < uint8_t > out , size_t bytes ,
libp2p : : basic : : Reader : : ReadCallbackFunc cb ) {
OperationContext context { . bytes_served = 0 ,
. total_bytes = bytes ,
. write_buffer = write_buffers_ . end ( ) } ;
read ( out , bytes , context , std : : move ( cb ) ) ;
}
void NoiseConnection : : read ( gsl : : span < uint8_t > out , size_t bytes ,
OperationContext ctx , ReadCallbackFunc cb ) {
size_t out_size { out . empty ( ) ? 0u : static_cast < size_t > ( out . size ( ) ) } ;
BOOST_ASSERT ( out_size > = bytes ) ;
if ( bytes = = 0 ) {
auto n { already_read_ } ;
already_read_ = 0 ;
return cb ( n ) ;
if ( 0 = = bytes ) {
BOOST_ASSERT ( ctx . bytes_served = = ctx . total_bytes ) ;
return cb ( ctx . bytes_served ) ;
}
readSome ( out , bytes ,
[ self { shared_from_this ( ) } , out , bytes ,
cb { std : : move ( cb ) } ] ( auto _n ) mutable {
[ self { shared_from_this ( ) } , out , bytes , cb { std : : move ( cb ) } ,
ctx ] ( auto _n ) mutable {
OUTCOME_CB ( n , _n ) ;
self - > already_read_ + = n ;
self - > read ( out . subspan ( n ) , bytes - n , std : : move ( cb ) ) ;
ctx . bytes_served + = n ;
self - > read ( out . subspan ( n ) , bytes - n , ctx , std : : move ( cb ) ) ;
} ) ;
}
void NoiseConnection : : readSome ( gsl : : span < uint8_t > out , size_t bytes ,
libp2p : : basic : : Reader : : ReadCallbackFunc cb ) {
OperationContext context { . bytes_served = 0 ,
. total_bytes = bytes ,
. write_buffer = write_buffers_ . end ( ) } ;
readSome ( out , bytes , context , std : : move ( cb ) ) ;
}
void NoiseConnection : : readSome ( gsl : : span < uint8_t > out , size_t bytes ,
OperationContext ctx , ReadCallbackFunc cb ) {
if ( not frame_buffer_ - > empty ( ) ) {
auto n { std : : min ( bytes , frame_buffer_ - > size ( ) ) } ;
auto begin { frame_buffer_ - > begin ( ) } ;
auto end { begin + n } ;
auto end { begin + static_cast < int64_t > ( n ) } ;
std : : copy ( begin , end , out . begin ( ) ) ;
frame_buffer_ - > erase ( begin , end ) ;
return cb ( n ) ;
}
framer_ - > read ( [ self { shared_from_this ( ) } , out , bytes ,
cb { std : : move ( cb ) } ] ( auto _data ) mutable {
framer_ - > read ( [ self { shared_from_this ( ) } , out , bytes , cb { std : : move ( cb ) } ,
ctx ] ( auto _data ) mutable {
OUTCOME_CB ( data , _data ) ;
OUTCOME_CB ( decrypted , self - > decoder_cs_ - > decrypt ( { } , * data , { } ) ) ;
self - > frame_buffer_ - > assign ( decrypted . begin ( ) , decrypted . end ( ) ) ;
self - > readSome ( out , bytes , std : : move ( cb ) ) ;
self - > readSome ( out , bytes , ctx , std : : move ( cb ) ) ;
} ) ;
}
void NoiseConnection : : write ( gsl : : span < const uint8_t > in , size_t bytes ,
libp2p : : basic : : Writer : : WriteCallbackFunc cb ) {
if ( 0 = = plaintext_len_to_write_ ) {
plaintext_len_to_write_ = bytes ;
}
if ( bytes = = 0 ) {
BOOST_ASSERT ( already_wrote_ > = plaintext_len_to_write_ ) ;
auto n { plaintext_len_to_write_ } ;
already_wrote_ = 0 ;
plaintext_len_to_write_ = 0 ;
return cb ( n ) ;
OperationContext context { . bytes_served = 0 ,
. total_bytes = bytes ,
. write_buffer = write_buffers_ . end ( ) } ;
write ( in , bytes , context , std : : move ( cb ) ) ;
}
void NoiseConnection : : write ( gsl : : span < const uint8_t > in , size_t bytes ,
NoiseConnection : : OperationContext ctx ,
basic : : Writer : : WriteCallbackFunc cb ) {
auto * self { this } ; // for OUTCOME_CB
if ( 0 = = bytes ) {
BOOST_ASSERT ( ctx . bytes_served > = ctx . total_bytes ) ;
eraseWriteBuffer ( ctx . write_buffer ) ;
return cb ( ctx . total_bytes ) ;
}
auto n { std : : min ( bytes , security : : noise : : kMaxPlainText ) } ;
OUTCOME_CB ( encrypted , encoder_cs_ - > encrypt ( { } , in . subspan ( 0 , n ) , { } ) ) ;
writing_ = std : : move ( encrypted ) ;
framer_ - > write ( writing_ ,
[ self { shared_from_this ( ) } , in { in . subspan ( n ) } ,
bytes { bytes - n } , cb { std : : move ( cb ) } ] ( auto _n ) mutable {
OUTCOME_CB ( n , _n ) ;
self - > already_wrote_ + = n ;
self - > write ( in , bytes , std : : move ( cb ) ) ;
} ) ;
if ( write_buffers_ . end ( ) = = ctx . write_buffer ) {
constexpr auto dummy_size = 1 ;
constexpr auto dummy_value = 0x0 ;
ctx . write_buffer =
write_buffers_ . emplace ( write_buffers_ . end ( ) , dummy_size , dummy_value ) ;
}
ctx . write_buffer - > swap ( encrypted ) ;
framer_ - > write (
* ctx . write_buffer ,
[ self { shared_from_this ( ) } , in { in . subspan ( static_cast < int64_t > ( n ) ) } ,
bytes { bytes - n } , cb { std : : move ( cb ) } , ctx ] ( auto _n ) mutable {
OUTCOME_CB ( n , _n ) ;
ctx . bytes_served + = n ;
self - > write ( in , bytes , ctx , std : : move ( cb ) ) ;
} ) ;
}
void NoiseConnection : : writeSome ( gsl : : span < const uint8_t > in , size_t bytes ,
@ -165,4 +190,12 @@ namespace libp2p::connection {
const {
return remote_ ;
}
void NoiseConnection : : eraseWriteBuffer ( BufferList : : iterator & iterator ) {
if ( write_buffers_ . end ( ) = = iterator ) {
return ;
}
write_buffers_ . erase ( iterator ) ;
iterator = write_buffers_ . end ( ) ;
}
} // namespace libp2p::connection