Browse Source

move go-reuseport-transport here

pull/1459/head
Marten Seemann 3 years ago
parent
commit
def3c71f4f
  1. 113
      p2p/net/reuseport/dial.go
  2. 80
      p2p/net/reuseport/listen.go
  3. 90
      p2p/net/reuseport/multidialer.go
  4. 35
      p2p/net/reuseport/reuseport.go
  5. 44
      p2p/net/reuseport/reuseport_plan9.go
  6. 37
      p2p/net/reuseport/reuseport_posix.go
  7. 51
      p2p/net/reuseport/reuseport_test.go
  8. 16
      p2p/net/reuseport/singledialer.go
  9. 35
      p2p/net/reuseport/transport.go
  10. 280
      p2p/net/reuseport/transport_test.go

113
p2p/net/reuseport/dial.go

@ -0,0 +1,113 @@
package tcpreuse
import (
"context"
"net"
"github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type dialer interface {
Dial(network, addr string) (net.Conn, error)
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}
// Dial dials the given multiaddr, reusing ports we're currently listening on if
// possible.
//
// Dial attempts to be smart about choosing the source port. For example, If
// we're dialing a loopback address and we're listening on one or more loopback
// ports, Dial will randomly choose one of the loopback ports and addresses and
// reuse it.
func (t *Transport) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
return t.DialContext(context.Background(), raddr)
}
// DialContext is like Dial but takes a context.
func (t *Transport) DialContext(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
network, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
var d dialer
switch network {
case "tcp4":
d = t.v4.getDialer(network)
case "tcp6":
d = t.v6.getDialer(network)
default:
return nil, ErrWrongProto
}
conn, err := d.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
maconn, err := manet.WrapNetConn(conn)
if err != nil {
conn.Close()
return nil, err
}
return maconn, nil
}
func (n *network) getDialer(network string) dialer {
n.mu.RLock()
d := n.dialer
n.mu.RUnlock()
if d == nil {
n.mu.Lock()
defer n.mu.Unlock()
if n.dialer == nil {
n.dialer = n.makeDialer(network)
}
d = n.dialer
}
return d
}
func (n *network) makeDialer(network string) dialer {
if !reuseport.Available() {
log.Debug("reuseport not available")
return &net.Dialer{}
}
var unspec net.IP
switch network {
case "tcp4":
unspec = net.IPv4zero
case "tcp6":
unspec = net.IPv6unspecified
default:
panic("invalid network: must be either tcp4 or tcp6")
}
// How many ports are we listening on.
var port = 0
for l := range n.listeners {
newPort := l.Addr().(*net.TCPAddr).Port
switch {
case newPort == 0: // Any port, ignore (really, we shouldn't get this case...).
case port == 0: // Haven't selected a port yet, choose this one.
port = newPort
case newPort == port: // Same as the selected port, continue...
default: // Multiple ports, use the multi dialer
return newMultiDialer(unspec, n.listeners)
}
}
// None.
if port == 0 {
return &net.Dialer{}
}
// One. Always dial from the single port we're listening on.
laddr := &net.TCPAddr{
IP: unspec,
Port: port,
}
return (*singleDialer)(laddr)
}

80
p2p/net/reuseport/listen.go

@ -0,0 +1,80 @@
package tcpreuse
import (
"net"
"github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type listener struct {
manet.Listener
network *network
}
func (l *listener) Close() error {
l.network.mu.Lock()
delete(l.network.listeners, l)
l.network.dialer = nil
l.network.mu.Unlock()
return l.Listener.Close()
}
// Listen listens on the given multiaddr.
//
// If reuseport is supported, it will be enabled for this listener and future
// dials from this transport may reuse the port.
//
// Note: You can listen on the same multiaddr as many times as you want
// (although only *one* listener will end up handling the inbound connection).
func (t *Transport) Listen(laddr ma.Multiaddr) (manet.Listener, error) {
nw, naddr, err := manet.DialArgs(laddr)
if err != nil {
return nil, err
}
var n *network
switch nw {
case "tcp4":
n = &t.v4
case "tcp6":
n = &t.v6
default:
return nil, ErrWrongProto
}
if !reuseport.Available() {
return manet.Listen(laddr)
}
nl, err := reuseport.Listen(nw, naddr)
if err != nil {
return manet.Listen(laddr)
}
if _, ok := nl.Addr().(*net.TCPAddr); !ok {
nl.Close()
return nil, ErrWrongProto
}
malist, err := manet.WrapNetListener(nl)
if err != nil {
nl.Close()
return nil, err
}
list := &listener{
Listener: malist,
network: n,
}
n.mu.Lock()
defer n.mu.Unlock()
if n.listeners == nil {
n.listeners = make(map[*listener]struct{})
}
n.listeners[list] = struct{}{}
n.dialer = nil
return list, nil
}

90
p2p/net/reuseport/multidialer.go

@ -0,0 +1,90 @@
package tcpreuse
import (
"context"
"fmt"
"math/rand"
"net"
"github.com/libp2p/go-netroute"
)
type multiDialer struct {
listeningAddresses []*net.TCPAddr
loopback []*net.TCPAddr
unspecified []*net.TCPAddr
fallback net.TCPAddr
}
func (d *multiDialer) Dial(network, addr string) (net.Conn, error) {
return d.DialContext(context.Background(), network, addr)
}
func randAddr(addrs []*net.TCPAddr) *net.TCPAddr {
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))]
}
return nil
}
// DialContext dials a target addr.
// Dialing preference is
// * If there is a listener on the local interface the OS expects to use to route towards addr, use that.
// * If there is a listener on a loopback address, addr is loopback, use that.
// * If there is a listener on an undefined address (0.0.0.0 or ::), use that.
// * Use the fallback IP specified during construction, with a port that's already being listened on, if one exists.
func (d *multiDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
tcpAddr, err := net.ResolveTCPAddr(network, addr)
if err != nil {
return nil, err
}
ip := tcpAddr.IP
if !ip.IsLoopback() && !ip.IsGlobalUnicast() {
return nil, fmt.Errorf("undialable IP: %s", ip)
}
if router, err := netroute.New(); err == nil {
if _, _, preferredSrc, err := router.Route(ip); err == nil {
for _, optAddr := range d.listeningAddresses {
if optAddr.IP.Equal(preferredSrc) {
return reuseDial(ctx, optAddr, network, addr)
}
}
}
}
if ip.IsLoopback() && len(d.loopback) > 0 {
return reuseDial(ctx, randAddr(d.loopback), network, addr)
}
if len(d.unspecified) == 0 {
return reuseDial(ctx, &d.fallback, network, addr)
}
return reuseDial(ctx, randAddr(d.unspecified), network, addr)
}
func newMultiDialer(unspec net.IP, listeners map[*listener]struct{}) (m dialer) {
addrs := make([]*net.TCPAddr, 0)
loopback := make([]*net.TCPAddr, 0)
unspecified := make([]*net.TCPAddr, 0)
existingPort := 0
for l := range listeners {
addr := l.Addr().(*net.TCPAddr)
addrs = append(addrs, addr)
if addr.IP.IsLoopback() {
loopback = append(loopback, addr)
} else if addr.IP.IsGlobalUnicast() && existingPort == 0 {
existingPort = addr.Port
} else if addr.IP.IsUnspecified() {
unspecified = append(unspecified, addr)
}
}
m = &multiDialer{
listeningAddresses: addrs,
loopback: loopback,
unspecified: unspecified,
fallback: net.TCPAddr{IP: unspec, Port: existingPort},
}
return
}

35
p2p/net/reuseport/reuseport.go

@ -0,0 +1,35 @@
package tcpreuse
import (
"context"
"net"
reuseport "github.com/libp2p/go-reuseport"
)
var fallbackDialer net.Dialer
// Dials using reuseport and then redials normally if that fails.
func reuseDial(ctx context.Context, laddr *net.TCPAddr, network, raddr string) (con net.Conn, err error) {
if laddr == nil {
return fallbackDialer.DialContext(ctx, network, raddr)
}
d := net.Dialer{
LocalAddr: laddr,
Control: reuseport.Control,
}
con, err = d.DialContext(ctx, network, raddr)
if err == nil {
return con, nil
}
if reuseErrShouldRetry(err) && ctx.Err() == nil {
// We could have an existing socket open or we could have one
// stuck in TIME-WAIT.
log.Debugf("failed to reuse port, will try again with a random port: %s", err)
con, err = fallbackDialer.DialContext(ctx, network, raddr)
}
return con, err
}

44
p2p/net/reuseport/reuseport_plan9.go

@ -0,0 +1,44 @@
package tcpreuse
import (
"net"
"os"
)
const (
EADDRINUSE = "address in use"
ECONNREFUSED = "connection refused"
)
// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func reuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}
// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}
e, ok := err.(*net.OpError)
if !ok {
return true
}
e1, ok := e.Err.(*os.PathError)
if !ok {
return true
}
switch e1.Err.Error() {
case EADDRINUSE:
return true
case ECONNREFUSED:
return false
default:
return true // optimistically default to retry.
}
}

37
p2p/net/reuseport/reuseport_posix.go

@ -0,0 +1,37 @@
//go:build !plan9
// +build !plan9
package tcpreuse
import (
"net"
"syscall"
)
// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func reuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}
// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}
errno, ok := err.(syscall.Errno)
if !ok { // not an errno? who knows what this is. retry.
return true
}
switch errno {
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
return true // failure to bind. retry.
case syscall.ECONNREFUSED:
return false // real dial error
default:
return true // optimistically default to retry.
}
}

51
p2p/net/reuseport/reuseport_test.go

@ -0,0 +1,51 @@
//go:build !plan9
// +build !plan9
package tcpreuse
import (
"net"
"syscall"
"testing"
)
type netTimeoutErr struct {
timeout bool
}
func (e netTimeoutErr) Error() string {
return ""
}
func (e netTimeoutErr) Timeout() bool {
return e.timeout
}
func (e netTimeoutErr) Temporary() bool {
panic("not checked")
}
func TestReuseError(t *testing.T) {
var nte1 net.Error = &netTimeoutErr{true}
var nte2 net.Error = &netTimeoutErr{false}
cases := map[error]bool{
nil: false,
syscall.EADDRINUSE: true,
syscall.EADDRNOTAVAIL: true,
syscall.ECONNREFUSED: false,
nte1: false,
nte2: true, // this ones a little weird... we should check neterror.Temporary() too
// test 'default' to true
syscall.EBUSY: true,
}
for k, v := range cases {
if reuseErrShouldRetry(k) != v {
t.Fatalf("expected %t for %#v", v, k)
}
}
}

16
p2p/net/reuseport/singledialer.go

@ -0,0 +1,16 @@
package tcpreuse
import (
"context"
"net"
)
type singleDialer net.TCPAddr
func (d *singleDialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(context.Background(), network, address)
}
func (d *singleDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
return reuseDial(ctx, (*net.TCPAddr)(d), network, address)
}

35
p2p/net/reuseport/transport.go

@ -0,0 +1,35 @@
// Package tcpreuse provides a basic transport for automatically (and intelligently) reusing TCP ports.
//
// To use, construct a new Transport and configure listeners tr.Listen(...).
// When dialing (tr.Dial(...)), the transport will attempt to reuse the ports it's currently listening on,
// choosing the best one depending on the destination address.
//
// It is recommended to set set SO_LINGER to 0 for all connections, otherwise
// reusing the port may fail when re-dialing a recently closed connection.
// See https://hea-www.harvard.edu/~fine/Tech/addrinuse.html for details.
package tcpreuse
import (
"errors"
"sync"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("reuseport-transport")
// ErrWrongProto is returned when dialing a protocol other than tcp.
var ErrWrongProto = errors.New("can only dial TCP over IPv4 or IPv6")
// Transport is a TCP reuse transport that reuses listener ports.
// The zero value is safe to use.
type Transport struct {
v4 network
v6 network
}
type network struct {
mu sync.RWMutex
listeners map[*listener]struct{}
dialer dialer
}

280
p2p/net/reuseport/transport_test.go

@ -0,0 +1,280 @@
package tcpreuse
import (
"net"
"runtime"
"testing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var loopbackV4, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
var loopbackV6, _ = ma.NewMultiaddr("/ip6/::1/tcp/0")
var unspecV6, _ = ma.NewMultiaddr("/ip6/::/tcp/0")
var unspecV4, _ = ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
var globalV4 ma.Multiaddr
var globalV6 ma.Multiaddr
func init() {
addrs, err := manet.InterfaceMultiaddrs()
if err != nil {
return
}
for _, addr := range addrs {
if !manet.IsIP6LinkLocal(addr) && !manet.IsIPLoopback(addr) {
tcp, _ := ma.NewMultiaddr("/tcp/0")
switch addr.Protocols()[0].Code {
case ma.P_IP4:
if globalV4 == nil {
globalV4 = addr.Encapsulate(tcp)
}
case ma.P_IP6:
if globalV6 == nil {
globalV6 = addr.Encapsulate(tcp)
}
}
}
}
}
func setLingerZero(c manet.Conn) {
if runtime.GOOS == "darwin" {
c.(interface{ SetLinger(int) error }).SetLinger(0)
}
}
func acceptOne(t *testing.T, listener manet.Listener) <-chan manet.Conn {
t.Helper()
done := make(chan manet.Conn, 1)
go func() {
defer close(done)
c, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
setLingerZero(c)
done <- c
}()
return done
}
func dialOne(t *testing.T, tr *Transport, listener manet.Listener, expected ...int) int {
t.Helper()
connChan := acceptOne(t, listener)
c, err := tr.Dial(listener.Multiaddr())
if err != nil {
t.Fatal(err)
}
setLingerZero(c)
port := c.LocalAddr().(*net.TCPAddr).Port
serverConn := <-connChan
serverConn.Close()
c.Close()
if len(expected) == 0 {
return port
}
for _, p := range expected {
if p == port {
return port
}
}
t.Errorf("dialed %s from %v. expected to dial from port %v", listener.Multiaddr(), c.LocalAddr(), expected)
return 0
}
func TestNoneAndSingle(t *testing.T) {
var trA Transport
var trB Transport
listenerA, err := trA.Listen(loopbackV4)
if err != nil {
t.Fatal(err)
}
defer listenerA.Close()
dialOne(t, &trB, listenerA)
listenerB, err := trB.Listen(loopbackV4)
if err != nil {
t.Fatal(err)
}
defer listenerB.Close()
dialOne(t, &trB, listenerA, listenerB.Addr().(*net.TCPAddr).Port)
}
func TestTwoLocal(t *testing.T) {
var trA Transport
var trB Transport
listenerA, err := trA.Listen(loopbackV4)
if err != nil {
t.Fatal(err)
}
defer listenerA.Close()
listenerB1, err := trB.Listen(loopbackV4)
if err != nil {
t.Fatal(err)
}
defer listenerB1.Close()
listenerB2, err := trB.Listen(loopbackV4)
if err != nil {
t.Fatal(err)
}
defer listenerB2.Close()
dialOne(t, &trB, listenerA,
listenerB1.Addr().(*net.TCPAddr).Port,
listenerB2.Addr().(*net.TCPAddr).Port)
}
func TestGlobalPreferenceV4(t *testing.T) {
if globalV4 == nil {
t.Skip("no global IPv4 addresses configured")
return
}
t.Logf("when listening on %v, should prefer %v over %v", loopbackV4, loopbackV4, globalV4)
testPrefer(t, loopbackV4, loopbackV4, globalV4)
t.Logf("when listening on %v, should prefer %v over %v", loopbackV4, unspecV4, globalV4)
testPrefer(t, loopbackV4, unspecV4, globalV4)
t.Logf("when listening on %v, should prefer %v over %v", globalV4, unspecV4, loopbackV4)
testPrefer(t, globalV4, unspecV4, loopbackV4)
}
func TestGlobalPreferenceV6(t *testing.T) {
if globalV6 == nil {
t.Skip("no global IPv6 addresses configured")
return
}
testPrefer(t, loopbackV6, loopbackV6, globalV6)
testPrefer(t, loopbackV6, unspecV6, globalV6)
testPrefer(t, globalV6, unspecV6, loopbackV6)
}
func TestLoopbackPreference(t *testing.T) {
testPrefer(t, loopbackV4, loopbackV4, unspecV4)
testPrefer(t, loopbackV6, loopbackV6, unspecV6)
}
func testPrefer(t *testing.T, listen, prefer, avoid ma.Multiaddr) {
var trA Transport
var trB Transport
listenerA, err := trA.Listen(listen)
if err != nil {
t.Fatal(err)
}
defer listenerA.Close()
listenerB1, err := trB.Listen(avoid)
if err != nil {
t.Fatal(err)
}
defer listenerB1.Close()
dialOne(t, &trB, listenerA, listenerB1.Addr().(*net.TCPAddr).Port)
listenerB2, err := trB.Listen(prefer)
if err != nil {
t.Fatal(err)
}
defer listenerB2.Close()
dialOne(t, &trB, listenerA, listenerB2.Addr().(*net.TCPAddr).Port)
// Closing the listener should reset the dialer.
listenerB2.Close()
dialOne(t, &trB, listenerA, listenerB1.Addr().(*net.TCPAddr).Port)
}
func TestV6V4(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("This test is failing on OSX: https://github.com/libp2p/go-reuseport-transport/issues/40")
}
testUseFirst(t, loopbackV4, loopbackV4, loopbackV6)
testUseFirst(t, loopbackV6, loopbackV6, loopbackV4)
}
func TestGlobalToGlobal(t *testing.T) {
if globalV4 == nil {
t.Skip("no globalV4 addresses configured")
return
}
testUseFirst(t, globalV4, globalV4, loopbackV4)
testUseFirst(t, globalV6, globalV6, loopbackV6)
}
func testUseFirst(t *testing.T, listen, use, never ma.Multiaddr) {
var trA Transport
var trB Transport
listenerA, err := trA.Listen(globalV4)
if err != nil {
t.Fatal(err)
}
defer listenerA.Close()
listenerB1, err := trB.Listen(loopbackV4)
if err != nil {
t.Fatal(err)
}
defer listenerB1.Close()
// It works (random port)
dialOne(t, &trB, listenerA)
listenerB2, err := trB.Listen(globalV4)
if err != nil {
t.Fatal(err)
}
defer listenerB2.Close()
// Uses globalV4 port.
dialOne(t, &trB, listenerA, listenerB2.Addr().(*net.TCPAddr).Port)
// Closing the listener should reset the dialer.
listenerB2.Close()
// It still works.
dialOne(t, &trB, listenerA)
}
func TestDuplicateGlobal(t *testing.T) {
if globalV4 == nil {
t.Skip("no globalV4 addresses configured")
return
}
var trA Transport
var trB Transport
listenerA, err := trA.Listen(globalV4)
if err != nil {
t.Fatal(err)
}
defer listenerA.Close()
listenerB1, err := trB.Listen(globalV4)
if err != nil {
t.Fatal(err)
}
defer listenerB1.Close()
listenerB2, err := trB.Listen(globalV4)
if err != nil {
t.Fatal(err)
}
defer listenerB2.Close()
// Check which port we're using
port := dialOne(t, &trB, listenerA)
// Check consistency
for i := 0; i < 10; i++ {
dialOne(t, &trB, listenerA, port)
}
}
Loading…
Cancel
Save