Browse Source

reuse listening connections for dialing

pull/1424/head
lnykww 6 years ago
committed by Marten Seemann
parent
commit
d1193da2f8
  1. 27
      p2p/transport/quic/listener.go
  2. 137
      p2p/transport/quic/reuse.go
  3. 79
      p2p/transport/quic/reuse_test.go
  4. 82
      p2p/transport/quic/transport.go

27
p2p/transport/quic/listener.go

@ -12,14 +12,13 @@ import (
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// A listener listens for QUIC connections.
type listener struct {
quicListener quic.Listener
transport tpt.Transport
quicListener quic.Listener
conn *reuseConn
transport *transport
privKey ic.PrivKey
localPeer peer.ID
localMultiaddr ma.Multiaddr
@ -27,7 +26,7 @@ type listener struct {
var _ tpt.Listener = &listener{}
func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) {
func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) {
var tlsConf tls.Config
tlsConf.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) {
// return a tls.Config that verifies the peer's certificate chain.
@ -37,19 +36,7 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID,
conf, _ := identity.ConfigForAny()
return conf, nil
}
lnet, host, err := manet.DialArgs(addr)
if err != nil {
return nil, err
}
laddr, err := net.ResolveUDPAddr(lnet, host)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(lnet, laddr)
if err != nil {
return nil, err
}
ln, err := quic.Listen(conn, &tlsConf, quicConfig)
ln, err := quic.Listen(rconn, &tlsConf, quicConfig)
if err != nil {
return nil, err
}
@ -58,8 +45,9 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID,
return nil, err
}
return &listener{
conn: rconn,
quicListener: ln,
transport: transport,
transport: t,
privKey: key,
localPeer: localPeer,
localMultiaddr: localMultiaddr,
@ -113,6 +101,7 @@ func (l *listener) setupConn(sess quic.Session) (tpt.CapableConn, error) {
// Close closes the listener.
func (l *listener) Close() error {
l.conn.DecreaseCount()
return l.quicListener.Close()
}

137
p2p/transport/quic/reuse.go

@ -0,0 +1,137 @@
package libp2pquic
import (
"net"
"sync"
"sync/atomic"
"github.com/vishvananda/netlink"
)
type reuseConn struct {
net.PacketConn
refCount int32 // to be used as an atomic
}
func newReuseConn(conn net.PacketConn) *reuseConn {
return &reuseConn{PacketConn: conn}
}
func (c *reuseConn) IncreaseCount() { atomic.AddInt32(&c.refCount, 1) }
func (c *reuseConn) DecreaseCount() { atomic.AddInt32(&c.refCount, -1) }
func (c *reuseConn) GetCount() int { return int(atomic.LoadInt32(&c.refCount)) }
type reuse struct {
mutex sync.Mutex
unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
global map[int]*reuseConn
}
func newReuse() *reuse {
return &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
}
}
func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, error) {
// Determine the source address that the kernel would use for this IP address.
// Note: This only works on Linux.
// On other OSes, this will return a netlink.ErrNotImplemetned.
routes, err := (&netlink.Handle{}).RouteGet(raddr.IP)
if err != nil {
return nil, err
}
ips := make([]net.IP, 0, len(routes))
for _, route := range routes {
ips = append(ips, route.Src)
}
return ips, nil
}
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
ips, err := r.getSourceIPs(network, raddr)
if err != nil && err != netlink.ErrNotImplemented {
return nil, err
}
r.mutex.Lock()
defer r.mutex.Unlock()
conn, err := r.dialLocked(network, raddr, ips)
if err != nil {
return nil, err
}
conn.IncreaseCount()
return conn, nil
}
func (r *reuse) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP) (*reuseConn, error) {
for _, ip := range ips {
// We already have at least one suitable connection...
if conns, ok := r.unicast[ip.String()]; ok {
// ... we don't care which port we're dialing from. Just use the first.
for _, c := range conns {
return c, nil
}
}
}
// Use a connection listening on 0.0.0.0 (or ::).
// Again, we don't care about the port number.
for _, conn := range r.global {
return conn, nil
}
// We don't have a connection that we can use for dialing.
// Dial a new connection from a random port.
var addr *net.UDPAddr
switch network {
case "udp4":
addr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
case "udp6":
addr = &net.UDPAddr{IP: net.IPv6zero, Port: 0}
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
rconn := newReuseConn(conn)
r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
return rconn, nil
}
func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
localAddr := conn.LocalAddr().(*net.UDPAddr)
rconn := newReuseConn(conn)
rconn.IncreaseCount()
r.mutex.Lock()
defer r.mutex.Unlock()
// Deal with listen on a global address
if laddr.IP.IsUnspecified() {
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.global[laddr.Port] = rconn
return rconn, err
}
// Deal with listen on a unicast address
if _, ok := r.unicast[localAddr.IP.String()]; !ok {
r.unicast[laddr.IP.String()] = make(map[int]*reuseConn)
}
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.unicast[laddr.IP.String()][localAddr.Port] = rconn
return rconn, err
}

79
p2p/transport/quic/reuse_test.go

@ -0,0 +1,79 @@
package libp2pquic
import (
"net"
"runtime"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Reuse", func() {
var reuse *reuse
BeforeEach(func() {
reuse = newReuse()
})
It("creates a new global connection when listening on 0.0.0.0", func() {
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(1))
})
It("creates a new global connection when listening on [::]", func() {
addr, err := net.ResolveUDPAddr("udp6", "[::]:1234")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Listen("udp6", addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(1))
})
It("creates a new global connection when dialing", func() {
addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Dial("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(1))
laddr := conn.LocalAddr().(*net.UDPAddr)
Expect(laddr.IP.String()).To(Equal("0.0.0.0"))
Expect(laddr.Port).ToNot(BeZero())
})
It("reuses a connection it created for listening when dialing", func() {
// listen
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
// dial
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Dial("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})
if runtime.GOOS == "linux" {
It("reuses a connection it created for listening on a specific interface", func() {
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
ips, err := reuse.getSourceIPs("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(ips).ToNot(BeEmpty())
// listen
addr, err := net.ResolveUDPAddr("udp4", ips[0].String()+":0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
// dial
conn, err := reuse.Dial("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})
}
})

82
p2p/transport/quic/transport.go

@ -3,9 +3,7 @@ package libp2pquic
import (
"context"
"errors"
"fmt"
"net"
"sync"
ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
@ -31,42 +29,42 @@ var quicConfig = &quic.Config{
}
type connManager struct {
mutex sync.Mutex
connIPv4 net.PacketConn
connIPv6 net.PacketConn
reuseUDP4 *reuse
reuseUDP6 *reuse
}
func (c *connManager) GetConnForAddr(network string) (net.PacketConn, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
func newConnManager() *connManager {
return &connManager{
reuseUDP4: newReuse(),
reuseUDP6: newReuse(),
}
}
func (c *connManager) getReuse(network string) (*reuse, error) {
switch network {
case "udp4":
if c.connIPv4 != nil {
return c.connIPv4, nil
}
var err error
c.connIPv4, err = c.createConn(network, "0.0.0.0:0")
return c.connIPv4, err
return c.reuseUDP4, nil
case "udp6":
if c.connIPv6 != nil {
return c.connIPv6, nil
}
var err error
c.connIPv6, err = c.createConn(network, ":0")
return c.connIPv6, err
return c.reuseUDP6, nil
default:
return nil, fmt.Errorf("unsupported network: %s", network)
return nil, errors.New("invalid network: must be either udp4 or udp6")
}
}
func (c *connManager) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
reuse, err := c.getReuse(network)
if err != nil {
return nil, err
}
return reuse.Listen(network, laddr)
}
func (c *connManager) createConn(network, host string) (net.PacketConn, error) {
addr, err := net.ResolveUDPAddr(network, host)
func (c *connManager) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
reuse, err := c.getReuse(network)
if err != nil {
return nil, err
}
return net.ListenUDP(network, addr)
return reuse.Dial(network, raddr)
}
// The Transport implements the tpt.Transport interface for QUIC connections.
@ -94,7 +92,7 @@ func NewTransport(key ic.PrivKey) (tpt.Transport, error) {
privKey: key,
localPeer: localPeer,
identity: identity,
connManager: &connManager{},
connManager: newConnManager(),
}, nil
}
@ -104,7 +102,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
if err != nil {
return nil, err
}
pconn, err := t.connManager.GetConnForAddr(network)
udpAddr, err := net.ResolveUDPAddr(network, host)
if err != nil {
return nil, err
}
@ -113,15 +111,15 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
return nil, err
}
tlsConf, keyCh := t.identity.ConfigForPeer(p)
sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, quicConfig)
pconn, err := t.connManager.Dial(network, udpAddr)
if err != nil {
return nil, err
}
localMultiaddr, err := toQuicMultiaddr(sess.LocalAddr())
sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, quicConfig)
if err != nil {
pconn.DecreaseCount()
return nil, err
}
// Should be ready by this point, don't block.
var remotePubKey ic.PubKey
select {
@ -129,9 +127,19 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
default:
}
if remotePubKey == nil {
pconn.DecreaseCount()
return nil, errors.New("go-libp2p-quic-transport BUG: expected remote pub key to be set")
}
go func() {
<-sess.Context().Done()
pconn.DecreaseCount()
}()
localMultiaddr, err := toQuicMultiaddr(pconn.LocalAddr())
if err != nil {
pconn.DecreaseCount()
return nil, err
}
return &conn{
sess: sess,
transport: t,
@ -151,7 +159,19 @@ func (t *transport) CanDial(addr ma.Multiaddr) bool {
// Listen listens for new QUIC connections on the passed multiaddr.
func (t *transport) Listen(addr ma.Multiaddr) (tpt.Listener, error) {
return newListener(addr, t, t.localPeer, t.privKey, t.identity)
lnet, host, err := manet.DialArgs(addr)
if err != nil {
return nil, err
}
laddr, err := net.ResolveUDPAddr(lnet, host)
if err != nil {
return nil, err
}
conn, err := t.connManager.Listen(lnet, laddr)
if err != nil {
return nil, err
}
return newListener(conn, t, t.localPeer, t.privKey, t.identity)
}
// Proxy returns true if this transport proxies.

Loading…
Cancel
Save