Browse Source

switch local route binding to use netroute (#134)

switch local route binding to use netroute
pull/1424/head
Will 5 years ago
committed by GitHub
parent
commit
3ec3559c03
  1. 2
      p2p/transport/quic/libp2pquic_suite_test.go
  2. 10
      p2p/transport/quic/netlink_linux.go
  3. 9
      p2p/transport/quic/netlink_other.go
  4. 39
      p2p/transport/quic/reuse.go
  5. 42
      p2p/transport/quic/reuse_linux_test.go
  6. 68
      p2p/transport/quic/reuse_not_win.go
  7. 33
      p2p/transport/quic/reuse_test.go
  8. 30
      p2p/transport/quic/reuse_win.go
  9. 11
      p2p/transport/quic/transport.go

2
p2p/transport/quic/libp2pquic_suite_test.go

@ -32,7 +32,7 @@ var (
func isGarbageCollectorRunning() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuseBase).runGarbageCollector")
return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).runGarbageCollector")
}
var _ = BeforeEach(func() {

10
p2p/transport/quic/netlink_linux.go

@ -1,10 +0,0 @@
// +build linux
package libp2pquic
import "golang.org/x/sys/unix"
// We just need netlink_route here.
// note: We should avoid the use of netlink_xfrm or netlink_netfilter has it is
// not allowed by Android in his base policy.
var SupportedNlFamilies = []int{unix.NETLINK_ROUTE}

9
p2p/transport/quic/netlink_other.go

@ -1,9 +0,0 @@
// +build !linux
// +build !windows
package libp2pquic
import "github.com/vishvananda/netlink/nl"
// SupportedNlFamilies is the default netlink families used by the netlink package
var SupportedNlFamilies = nl.SupportedNlFamilies

39
p2p/transport/quic/reuse_base.go → p2p/transport/quic/reuse.go

@ -6,6 +6,7 @@ import (
"time"
filter "github.com/libp2p/go-maddr-filter"
"github.com/libp2p/go-netroute"
)
// Constant. Defined as variables to simplify testing.
@ -51,7 +52,7 @@ func (c *reuseConn) ShouldGarbageCollect(now time.Time) bool {
return !c.unusedSince.IsZero() && c.unusedSince.Add(maxUnusedDuration).Before(now)
}
type reuseBase struct {
type reuse struct {
mutex sync.Mutex
filters *filter.Filters
@ -63,15 +64,15 @@ type reuseBase struct {
global map[int]*reuseConn
}
func newReuseBase(filters *filter.Filters) reuseBase {
return reuseBase{
func newReuse(filters *filter.Filters) *reuse {
return &reuse{
filters: filters,
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
}
}
func (r *reuseBase) runGarbageCollector() {
func (r *reuse) runGarbageCollector() {
ticker := time.NewTicker(garbageCollectInterval)
defer ticker.Stop()
@ -110,17 +111,37 @@ func (r *reuseBase) runGarbageCollector() {
}
// must be called while holding the mutex
func (r *reuseBase) maybeStartGarbageCollector() {
func (r *reuse) maybeStartGarbageCollector() {
if !r.garbageCollectorRunning {
r.garbageCollectorRunning = true
go r.runGarbageCollector()
}
}
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
var ip *net.IP
if router, err := netroute.New(); err == nil {
_, _, src, err := router.Route(raddr.IP)
if err == nil && !src.IsUnspecified() {
ip = &src
}
}
r.mutex.Lock()
defer r.mutex.Unlock()
conn, err := r.dialLocked(network, raddr, ip)
if err != nil {
return nil, err
}
conn.IncreaseCount()
r.maybeStartGarbageCollector()
return conn, nil
}
func (r *reuseBase) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP) (*reuseConn, error) {
for _, ip := range ips {
func (r *reuse) dialLocked(network string, raddr *net.UDPAddr, source *net.IP) (*reuseConn, error) {
if source != nil {
// We already have at least one suitable connection...
if conns, ok := r.unicast[ip.String()]; ok {
if conns, ok := r.unicast[source.String()]; ok {
// ... we don't care which port we're dialing from. Just use the first.
for _, c := range conns {
return c, nil
@ -152,7 +173,7 @@ func (r *reuseBase) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP)
return rconn, nil
}
func (r *reuseBase) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err

42
p2p/transport/quic/reuse_linux_test.go

@ -1,42 +0,0 @@
// +build linux
package libp2pquic
import (
"net"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Reuse (on Linux)", func() {
var reuse *reuse
BeforeEach(func() {
var err error
reuse, err = newReuse(nil)
Expect(err).ToNot(HaveOccurred())
})
Context("creating and reusing connections", func() {
AfterEach(func() { closeAllConns(reuse) })
It("reuses a connection it created for listening on a specific interface", func() {
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
ips, err := reuse.getSourceIPs("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(ips).ToNot(BeEmpty())
// listen
addr, err := net.ResolveUDPAddr("udp4", ips[0].String()+":0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
// dial
conn, err := reuse.Dial("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})
})
})

68
p2p/transport/quic/reuse_not_win.go

@ -1,68 +0,0 @@
// +build !windows
package libp2pquic
import (
"net"
filter "github.com/libp2p/go-maddr-filter"
"github.com/vishvananda/netlink"
)
type reuse struct {
reuseBase
handle *netlink.Handle // Only set on Linux. nil on other systems.
}
func newReuse(filters *filter.Filters) (*reuse, error) {
handle, err := netlink.NewHandle(SupportedNlFamilies...)
if err == netlink.ErrNotImplemented {
handle = nil
} else if err != nil {
return nil, err
}
return &reuse{
reuseBase: newReuseBase(filters),
handle: handle,
}, nil
}
// Get the source IP that the kernel would use for dialing.
// This only works on Linux.
// On other systems, this returns an empty slice of IP addresses.
func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, error) {
if r.handle == nil {
return nil, nil
}
routes, err := r.handle.RouteGet(raddr.IP)
if err != nil {
return nil, err
}
ips := make([]net.IP, 0, len(routes))
for _, route := range routes {
ips = append(ips, route.Src)
}
return ips, nil
}
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
ips, err := r.getSourceIPs(network, raddr)
if err != nil {
return nil, err
}
r.mutex.Lock()
defer r.mutex.Unlock()
conn, err := r.dialLocked(network, raddr, ips)
if err != nil {
return nil, err
}
conn.IncreaseCount()
r.maybeStartGarbageCollector()
return conn, nil
}

33
p2p/transport/quic/reuse_test.go

@ -4,6 +4,7 @@ import (
"net"
"time"
"github.com/libp2p/go-netroute"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
@ -32,13 +33,19 @@ func closeAllConns(reuse *reuse) {
Eventually(isGarbageCollectorRunning).Should(BeFalse())
}
func OnPlatformsWithRoutingTablesIt(description string, f interface{}) {
if _, err := netroute.New(); err == nil {
It(description, f)
} else {
PIt(description, f)
}
}
var _ = Describe("Reuse", func() {
var reuse *reuse
BeforeEach(func() {
var err error
reuse, err = newReuse(nil)
Expect(err).ToNot(HaveOccurred())
reuse = newReuse(nil)
})
Context("creating and reusing connections", func() {
@ -85,6 +92,26 @@ var _ = Describe("Reuse", func() {
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})
OnPlatformsWithRoutingTablesIt("reuses a connection it created for listening on a specific interface", func() {
router, err := netroute.New()
Expect(err).ToNot(HaveOccurred())
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
_, _, ip, err := router.Route(raddr.IP)
Expect(err).ToNot(HaveOccurred())
// listen
addr, err := net.ResolveUDPAddr("udp4", ip.String()+":0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
// dial
conn, err := reuse.Dial("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})
})
Context("garbage-collecting connections", func() {

30
p2p/transport/quic/reuse_win.go

@ -1,30 +0,0 @@
// +build windows
package libp2pquic
import (
"net"
filter "github.com/libp2p/go-maddr-filter"
)
type reuse struct {
reuseBase
}
func newReuse(filters *filter.Filters) (*reuse, error) {
return &reuse{reuseBase: newReuseBase(filters)}, nil
}
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
conn, err := r.dialLocked(network, raddr, nil)
if err != nil {
return nil, err
}
conn.IncreaseCount()
r.maybeStartGarbageCollector()
return conn, nil
}

11
p2p/transport/quic/transport.go

@ -49,14 +49,9 @@ type connManager struct {
}
func newConnManager(filters *filter.Filters) (*connManager, error) {
reuseUDP4, err := newReuse(filters)
if err != nil {
return nil, err
}
reuseUDP6, err := newReuse(filters)
if err != nil {
return nil, err
}
reuseUDP4 := newReuse(filters)
reuseUDP6 := newReuse(filters)
return &connManager{
reuseUDP4: reuseUDP4,
reuseUDP6: reuseUDP6,

Loading…
Cancel
Save