Browse Source

move go-lipbp2-yamux here

pull/1439/head
Marten Seemann 3 years ago
parent
commit
24e313b2a3
  1. 44
      p2p/muxer/yamux/conn.go
  2. 64
      p2p/muxer/yamux/stream.go
  3. 53
      p2p/muxer/yamux/transport.go
  4. 15
      p2p/muxer/yamux/transport_test.go

44
p2p/muxer/yamux/conn.go

@ -0,0 +1,44 @@
package sm_yamux
import (
"context"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-yamux/v3"
)
// conn implements mux.MuxedConn over yamux.Session.
type conn yamux.Session
var _ network.MuxedConn = &conn{}
// Close closes underlying yamux
func (c *conn) Close() error {
return c.yamux().Close()
}
// IsClosed checks if yamux.Session is in closed state.
func (c *conn) IsClosed() bool {
return c.yamux().IsClosed()
}
// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.yamux().OpenStream(ctx)
if err != nil {
return nil, err
}
return (*stream)(s), nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.yamux().AcceptStream()
return (*stream)(s), err
}
func (c *conn) yamux() *yamux.Session {
return (*yamux.Session)(c)
}

64
p2p/muxer/yamux/stream.go

@ -0,0 +1,64 @@
package sm_yamux
import (
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-yamux/v3"
)
// stream implements mux.MuxedStream over yamux.Stream.
type stream yamux.Stream
var _ network.MuxedStream = &stream{}
func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
}
return n, err
}
func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.yamux().Write(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
}
return n, err
}
func (s *stream) Close() error {
return s.yamux().Close()
}
func (s *stream) Reset() error {
return s.yamux().Reset()
}
func (s *stream) CloseRead() error {
return s.yamux().CloseRead()
}
func (s *stream) CloseWrite() error {
return s.yamux().CloseWrite()
}
func (s *stream) SetDeadline(t time.Time) error {
return s.yamux().SetDeadline(t)
}
func (s *stream) SetReadDeadline(t time.Time) error {
return s.yamux().SetReadDeadline(t)
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return s.yamux().SetWriteDeadline(t)
}
func (s *stream) yamux() *yamux.Stream {
return (*yamux.Stream)(s)
}

53
p2p/muxer/yamux/transport.go

@ -0,0 +1,53 @@
package sm_yamux
import (
"io/ioutil"
"math"
"net"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-yamux/v3"
)
var DefaultTransport *Transport
func init() {
config := yamux.DefaultConfig()
// We've bumped this to 16MiB as this critically limits throughput.
//
// 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with
// 100ms latency. The default gave us 2.4MiB *best case* which was
// totally unacceptable.
config.MaxStreamWindowSize = uint32(16 * 1024 * 1024)
// don't spam
config.LogOutput = ioutil.Discard
// We always run over a security transport that buffers internally
// (i.e., uses a block cipher).
config.ReadBufSize = 0
// Effectively disable the incoming streams limit.
// This is now dynamically limited by the resource manager.
config.MaxIncomingStreams = math.MaxUint32
DefaultTransport = (*Transport)(config)
}
// Transport implements mux.Multiplexer that constructs
// yamux-backed muxed connections.
type Transport yamux.Config
var _ network.Multiplexer = &Transport{}
func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
var s *yamux.Session
var err error
if isServer {
s, err = yamux.Server(nc, t.Config(), scope)
} else {
s, err = yamux.Client(nc, t.Config(), scope)
}
return (*conn)(s), err
}
func (t *Transport) Config() *yamux.Config {
return (*yamux.Config)(t)
}

15
p2p/muxer/yamux/transport_test.go

@ -0,0 +1,15 @@
package sm_yamux
import (
"testing"
tmux "github.com/libp2p/go-libp2p-testing/suites/mux"
)
func TestDefaultTransport(t *testing.T) {
// Yamux doesn't have any backpressure when it comes to opening streams.
// If the peer opens too many streams, those are just reset.
delete(tmux.Subtests, "github.com/libp2p/go-libp2p-testing/suites/mux.SubtestStress1Conn1000Stream10Msg")
tmux.SubtestAll(t, DefaultTransport)
}
Loading…
Cancel
Save