Browse Source

optimize: reduce syscalls using a buffered reader.

Previously, each noise message read would make two syscalls:
1. one to read the length prefix.
2. one to read the encrypted payload.

This patch adds bufio.Reader mediation to cushion syscalls, and
significantly enhaces throughput in read-dominated connections, such
as file transfers.
pull/1462/head
Raúl Kripalani 4 years ago
committed by Raúl Kripalani
parent
commit
86b4893f9a
  1. 14
      p2p/security/noise/rw.go
  2. 35
      p2p/security/noise/session.go
  3. 4
      p2p/security/noise/transport_test.go

14
p2p/security/noise/rw.go

@ -129,9 +129,9 @@ func (s *secureSession) Write(data []byte) (int, error) {
return written, nil
}
// readNextInsecureMsgLen reads the length of the next message on the insecure channel.
// readNextInsecureMsgLen reads the length of the next message on the insecureConn channel.
func (s *secureSession) readNextInsecureMsgLen() (int, error) {
_, err := io.ReadFull(s.insecure, s.rlen[:])
_, err := io.ReadFull(s.insecureReader, s.rlen[:])
if err != nil {
return 0, err
}
@ -140,17 +140,17 @@ func (s *secureSession) readNextInsecureMsgLen() (int, error) {
}
// readNextMsgInsecure tries to read exactly len(buf) bytes into buf from
// the insecure channel and returns the error, if any.
// the insecureConn channel and returns the error, if any.
// Ideally, for reading a message, you'd first want to call `readNextInsecureMsgLen`
// to determine the size of the next message to be read from the insecure channel and then call
// to determine the size of the next message to be read from the insecureConn channel and then call
// this function with a buffer of exactly that size.
func (s *secureSession) readNextMsgInsecure(buf []byte) error {
_, err := io.ReadFull(s.insecure, buf)
_, err := io.ReadFull(s.insecureReader, buf)
return err
}
// writeMsgInsecure writes to the insecure conn.
// writeMsgInsecure writes to the insecureConn conn.
// data will be prefixed with its length in bytes, written as a 16-bit uint in network order.
func (s *secureSession) writeMsgInsecure(data []byte) (int, error) {
return s.insecure.Write(data)
return s.insecureConn.Write(data)
}

35
p2p/security/noise/session.go

@ -1,6 +1,7 @@
package noise
import (
"bufio"
"context"
"net"
"sync"
@ -22,7 +23,10 @@ type secureSession struct {
readLock sync.Mutex
writeLock sync.Mutex
insecure net.Conn
insecureConn net.Conn
insecureReader *bufio.Reader // to cushion io read syscalls
// we don't buffer writes to avoid introducing latency; optimisation possible. // TODO revisit
qseek int // queued bytes seek value.
qbuf []byte // queued bytes buffer.
@ -32,15 +36,16 @@ type secureSession struct {
dec *noise.CipherState
}
// newSecureSession creates a Noise session over the given insecure Conn, using
// newSecureSession creates a Noise session over the given insecureConn Conn, using
// the libp2p identity keypair from the given Transport.
func newSecureSession(tpt *Transport, ctx context.Context, insecure net.Conn, remote peer.ID, initiator bool) (*secureSession, error) {
s := &secureSession{
insecure: insecure,
initiator: initiator,
localID: tpt.localID,
localKey: tpt.privateKey,
remoteID: remote,
insecureConn: insecure,
insecureReader: bufio.NewReader(insecure),
initiator: initiator,
localID: tpt.localID,
localKey: tpt.privateKey,
remoteID: remote,
}
// the go-routine we create to run the handshake will
@ -53,7 +58,7 @@ func newSecureSession(tpt *Transport, ctx context.Context, insecure net.Conn, re
select {
case err := <-respCh:
if err != nil {
_ = s.insecure.Close()
_ = s.insecureConn.Close()
}
return s, err
@ -61,14 +66,14 @@ func newSecureSession(tpt *Transport, ctx context.Context, insecure net.Conn, re
// If the context has been cancelled, we close the underlying connection.
// We then wait for the handshake to return because of the first error it encounters
// so we don't return without cleaning up the go-routine.
_ = s.insecure.Close()
_ = s.insecureConn.Close()
<-respCh
return nil, ctx.Err()
}
}
func (s *secureSession) LocalAddr() net.Addr {
return s.insecure.LocalAddr()
return s.insecureConn.LocalAddr()
}
func (s *secureSession) LocalPeer() peer.ID {
@ -84,7 +89,7 @@ func (s *secureSession) LocalPublicKey() crypto.PubKey {
}
func (s *secureSession) RemoteAddr() net.Addr {
return s.insecure.RemoteAddr()
return s.insecureConn.RemoteAddr()
}
func (s *secureSession) RemotePeer() peer.ID {
@ -96,17 +101,17 @@ func (s *secureSession) RemotePublicKey() crypto.PubKey {
}
func (s *secureSession) SetDeadline(t time.Time) error {
return s.insecure.SetDeadline(t)
return s.insecureConn.SetDeadline(t)
}
func (s *secureSession) SetReadDeadline(t time.Time) error {
return s.insecure.SetReadDeadline(t)
return s.insecureConn.SetReadDeadline(t)
}
func (s *secureSession) SetWriteDeadline(t time.Time) error {
return s.insecure.SetWriteDeadline(t)
return s.insecureConn.SetWriteDeadline(t)
}
func (s *secureSession) Close() error {
return s.insecure.Close()
return s.insecureConn.Close()
}

4
p2p/security/noise/transport_test.go

@ -298,7 +298,7 @@ func TestReadUnencryptedFails(t *testing.T) {
msg := make([]byte, len(before)+LengthPrefixLength)
binary.BigEndian.PutUint16(msg, uint16(len(before)))
copy(msg[LengthPrefixLength:], before)
n, err := initConn.insecure.Write(msg)
n, err := initConn.insecureConn.Write(msg)
require.NoError(t, err)
require.Equal(t, len(msg), n)
@ -319,7 +319,7 @@ func TestReadUnencryptedFails(t *testing.T) {
msg = make([]byte, len(before)+LengthPrefixLength)
binary.BigEndian.PutUint16(msg, uint16(len(before)))
copy(msg[LengthPrefixLength:], before)
n, err = initConn.insecure.Write(msg)
n, err = initConn.insecureConn.Write(msg)
require.NoError(t, err)
require.Equal(t, len(msg), n)

Loading…
Cancel
Save