Browse Source

better nat mapping

1. Update to work with https://github.com/libp2p/go-libp2p-nat/pull/14.
2. Avoid observed addrs when our NAT tells us about external addrs.
3. Ignore bad addrs reported by our NAT. Substitute with observed addrs.
4. Fix https://github.com/libp2p/go-libp2p/issues/428.
pull/549/head
Steven Allen 6 years ago
parent
commit
3617750071
  1. 2
      go.mod
  2. 2
      go.sum
  3. 156
      p2p/host/basic/basic_host.go
  4. 223
      p2p/host/basic/natmgr.go
  5. 4
      p2p/protocol/identify/id.go
  6. 40
      p2p/protocol/identify/obsaddr.go
  7. 4
      package.json

2
go.mod

@ -19,7 +19,7 @@ require (
github.com/libp2p/go-libp2p-interface-pnet v0.0.1 github.com/libp2p/go-libp2p-interface-pnet v0.0.1
github.com/libp2p/go-libp2p-loggables v0.0.1 github.com/libp2p/go-libp2p-loggables v0.0.1
github.com/libp2p/go-libp2p-metrics v0.0.1 github.com/libp2p/go-libp2p-metrics v0.0.1
github.com/libp2p/go-libp2p-nat v0.0.1 github.com/libp2p/go-libp2p-nat v0.0.2
github.com/libp2p/go-libp2p-net v0.0.1 github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-netutil v0.0.1 github.com/libp2p/go-libp2p-netutil v0.0.1
github.com/libp2p/go-libp2p-peer v0.0.1 github.com/libp2p/go-libp2p-peer v0.0.1

2
go.sum

@ -108,6 +108,8 @@ github.com/libp2p/go-libp2p-metrics v0.0.1 h1:yumdPC/P2VzINdmcKZd0pciSUCpou+s0lw
github.com/libp2p/go-libp2p-metrics v0.0.1/go.mod h1:jQJ95SXXA/K1VZi13h52WZMa9ja78zjyy5rspMsC/08= github.com/libp2p/go-libp2p-metrics v0.0.1/go.mod h1:jQJ95SXXA/K1VZi13h52WZMa9ja78zjyy5rspMsC/08=
github.com/libp2p/go-libp2p-nat v0.0.1 h1:on/zju7XE+JXc8gH+vTKmIh2UJFC1K8kGnJYluQrlz4= github.com/libp2p/go-libp2p-nat v0.0.1 h1:on/zju7XE+JXc8gH+vTKmIh2UJFC1K8kGnJYluQrlz4=
github.com/libp2p/go-libp2p-nat v0.0.1/go.mod h1:4L6ajyUIlJvx1Cbh5pc6Ma6vMDpKXf3GgLO5u7W0oQ4= github.com/libp2p/go-libp2p-nat v0.0.1/go.mod h1:4L6ajyUIlJvx1Cbh5pc6Ma6vMDpKXf3GgLO5u7W0oQ4=
github.com/libp2p/go-libp2p-nat v0.0.2 h1:sKI5hiCsGFhuEKdXMsF9mywQu2qhfoIGX6a+VG6zelE=
github.com/libp2p/go-libp2p-nat v0.0.2/go.mod h1:QrjXQSD5Dj4IJOdEcjHRkWTSomyxRo6HnUkf/TfQpLQ=
github.com/libp2p/go-libp2p-net v0.0.1 h1:xJ4Vh4yKF/XKb8fd1Ev0ebAGzVjMxXzrxG2kjtU+F5Q= github.com/libp2p/go-libp2p-net v0.0.1 h1:xJ4Vh4yKF/XKb8fd1Ev0ebAGzVjMxXzrxG2kjtU+F5Q=
github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c= github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
github.com/libp2p/go-libp2p-netutil v0.0.1 h1:LgD6+skofkOx8z6odD9+MZHKjupv3ng1u6KRhaADTnA= github.com/libp2p/go-libp2p-netutil v0.0.1 h1:LgD6+skofkOx8z6odD9+MZHKjupv3ng1u6KRhaADTnA=

156
p2p/host/basic/basic_host.go

@ -3,12 +3,14 @@ package basichost
import ( import (
"context" "context"
"io" "io"
"net"
"time" "time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess" goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context" goprocessctx "github.com/jbenet/goprocess/context"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inat "github.com/libp2p/go-libp2p-nat"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
@ -17,6 +19,7 @@ import (
ping "github.com/libp2p/go-libp2p/p2p/protocol/ping" ping "github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns" madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
msmux "github.com/multiformats/go-multistream" msmux "github.com/multiformats/go-multistream"
) )
@ -485,17 +488,15 @@ func (h *BasicHost) Addrs() []ma.Multiaddr {
} }
// mergeAddrs merges input address lists, leave only unique addresses // mergeAddrs merges input address lists, leave only unique addresses
func mergeAddrs(addrLists ...[]ma.Multiaddr) (uniqueAddrs []ma.Multiaddr) { func dedupAddrs(addrs []ma.Multiaddr) (uniqueAddrs []ma.Multiaddr) {
exists := make(map[string]bool) exists := make(map[string]bool)
for _, addrList := range addrLists { for _, addr := range addrs {
for _, addr := range addrList { k := string(addr.Bytes())
k := string(addr.Bytes()) if exists[k] {
if exists[k] { continue
continue
}
exists[k] = true
uniqueAddrs = append(uniqueAddrs, addr)
} }
exists[k] = true
uniqueAddrs = append(uniqueAddrs, addr)
} }
return uniqueAddrs return uniqueAddrs
} }
@ -507,19 +508,140 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {
if err != nil { if err != nil {
log.Debug("error retrieving network interface addrs") log.Debug("error retrieving network interface addrs")
} }
var observedAddrs []ma.Multiaddr var natMappings []inat.Mapping
if h.ids != nil {
// peer observed addresses
observedAddrs = h.ids.OwnObservedAddrs()
}
var natAddrs []ma.Multiaddr
// natmgr is nil if we do not use nat option; // natmgr is nil if we do not use nat option;
// h.natmgr.NAT() is nil if not ready, or no nat is available. // h.natmgr.NAT() is nil if not ready, or no nat is available.
if h.natmgr != nil && h.natmgr.NAT() != nil { if h.natmgr != nil && h.natmgr.NAT() != nil {
natAddrs = h.natmgr.NAT().ExternalAddrs() natMappings = h.natmgr.NAT().Mappings()
} }
return mergeAddrs(listenAddrs, observedAddrs, natAddrs) finalAddrs := listenAddrs
if len(natMappings) > 0 {
// We have successfully mapped ports on our NAT. Use those
// instead of observed addresses (mostly).
// First, generate a mapping table.
// protocol -> internal port -> external addr
ports := make(map[string]map[int]net.Addr)
for _, m := range natMappings {
addr, err := m.ExternalAddr()
if err != nil {
// mapping not ready yet.
continue
}
protoPorts, ok := ports[m.Protocol()]
if !ok {
protoPorts = make(map[int]net.Addr)
ports[m.Protocol()] = protoPorts
}
protoPorts[m.InternalPort()] = addr
}
// Next, apply this mapping to our addresses.
for _, listen := range listenAddrs {
found := false
transport, rest := ma.SplitFunc(listen, func(c ma.Component) bool {
if found {
return true
}
switch c.Protocol().Code {
case ma.P_TCP, ma.P_UDP:
found = true
}
return false
})
if !manet.IsThinWaist(transport) {
continue
}
naddr, err := manet.ToNetAddr(transport)
if err != nil {
log.Error("error parsing net multiaddr %q: %s", transport, err)
continue
}
var (
ip net.IP
iport int
protocol string
)
switch naddr := naddr.(type) {
case *net.TCPAddr:
ip = naddr.IP
iport = naddr.Port
protocol = "tcp"
case *net.UDPAddr:
ip = naddr.IP
iport = naddr.Port
protocol = "udp"
default:
continue
}
if !ip.IsGlobalUnicast() {
// We only map global unicast ports.
continue
}
mappedAddr, ok := ports[protocol][iport]
if !ok {
// Not mapped.
continue
}
mappedMaddr, err := manet.FromNetAddr(mappedAddr)
if err != nil {
log.Errorf("mapped addr can't be turned into a multiaddr %q: %s", mappedAddr, err)
continue
}
// Did the router give us a routable public addr?
if manet.IsPublicAddr(mappedMaddr) {
// Yes, use it.
extMaddr := mappedMaddr
if rest != nil {
extMaddr = ma.Join(extMaddr, rest)
}
// Add in the mapped addr.
finalAddrs = append(finalAddrs, extMaddr)
continue
}
// No. Ok, let's try our observed addresses.
// Now, check if we have any observed addresses that
// differ from the one reported by the router. Routers
// don't always give the most accurate information.
observed := h.ids.ObservedAddrsFor(listen)
if len(observed) == 0 {
continue
}
// Drop the IP from the external maddr
_, extMaddrNoIP := ma.SplitFirst(mappedMaddr)
for _, obsMaddr := range observed {
// Extract a public observed addr.
ip, _ := ma.SplitFirst(obsMaddr)
if ip == nil || !manet.IsPublicAddr(ip) {
continue
}
finalAddrs = append(finalAddrs, ma.Join(ip, extMaddrNoIP))
}
}
} else {
var observedAddrs []ma.Multiaddr
if h.ids != nil {
observedAddrs = h.ids.OwnObservedAddrs()
}
finalAddrs = append(finalAddrs, observedAddrs...)
}
return dedupAddrs(finalAddrs)
} }
// Close shuts down the Host's services (network, etc). // Close shuts down the Host's services (network, etc).

223
p2p/host/basic/natmgr.go

@ -1,11 +1,12 @@
package basichost package basichost
import ( import (
"context" "net"
"strconv"
"sync" "sync"
goprocess "github.com/jbenet/goprocess" goprocess "github.com/jbenet/goprocess"
lgbl "github.com/libp2p/go-libp2p-loggables" goprocessctx "github.com/jbenet/goprocess/context"
inat "github.com/libp2p/go-libp2p-nat" inat "github.com/libp2p/go-libp2p-nat"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
@ -37,11 +38,14 @@ func NewNATManager(net inet.Network) NATManager {
// * closing the natManager closes the nat and its mappings. // * closing the natManager closes the nat and its mappings.
type natManager struct { type natManager struct {
net inet.Network net inet.Network
natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.) natmu sync.RWMutex
nat *inat.NAT nat *inat.NAT
ready chan struct{} // closed once the nat is ready to process port mappings ready chan struct{} // closed once the nat is ready to process port mappings
proc goprocess.Process // natManager has a process + children. can be closed.
refreshMu sync.Mutex
proc goprocess.Process // natManager has a process + children. can be closed.
} }
func newNatManager(net inet.Network) *natManager { func newNatManager(net inet.Network) *natManager {
@ -74,7 +78,6 @@ func (nmgr *natManager) Ready() <-chan struct{} {
} }
func (nmgr *natManager) discoverNAT() { func (nmgr *natManager) discoverNAT() {
nmgr.proc.Go(func(worker goprocess.Process) { nmgr.proc.Go(func(worker goprocess.Process) {
// inat.DiscoverNAT blocks until the nat is found or a timeout // inat.DiscoverNAT blocks until the nat is found or a timeout
// is reached. we unfortunately cannot specify timeouts-- the // is reached. we unfortunately cannot specify timeouts-- the
@ -87,131 +90,137 @@ func (nmgr *natManager) discoverNAT() {
// to avoid leaking resources in a non-obvious way. the only case // to avoid leaking resources in a non-obvious way. the only case
// this affects is when the daemon is being started up and _immediately_ // this affects is when the daemon is being started up and _immediately_
// asked to close. other services are also starting up, so ok to wait. // asked to close. other services are also starting up, so ok to wait.
discoverdone := make(chan struct{})
var nat *inat.NAT natInstance, err := inat.DiscoverNAT(goprocessctx.OnClosingContext(worker))
go func() { if err != nil {
defer close(discoverdone) log.Error("DiscoverNAT error:", err)
nat = inat.DiscoverNAT() close(nmgr.ready)
}()
// by this point -- after finding the NAT -- we may have already
// be closing. if so, just exit.
select {
case <-worker.Closing():
return return
case <-discoverdone:
if nat == nil { // no nat, or failed to get it.
return
}
} }
// wire up the nat to close when nmgr closes.
// nmgr.proc is our parent, and waiting for us.
nmgr.proc.AddChild(nat.Process())
// set the nat.
nmgr.natmu.Lock() nmgr.natmu.Lock()
nmgr.nat = nat nmgr.nat = natInstance
nmgr.natmu.Unlock() nmgr.natmu.Unlock()
// signal that we're ready to process nat mappings:
close(nmgr.ready) close(nmgr.ready)
// wire up the nat to close when nmgr closes.
// nmgr.proc is our parent, and waiting for us.
nmgr.proc.AddChild(nmgr.nat.Process())
// sign natManager up for network notifications // sign natManager up for network notifications
// we need to sign up here to avoid missing some notifs // we need to sign up here to avoid missing some notifs
// before the NAT has been found. // before the NAT has been found.
nmgr.net.Notify((*nmgrNetNotifiee)(nmgr)) nmgr.net.Notify((*nmgrNetNotifiee)(nmgr))
nmgr.refresh()
// if any interfaces were brought up while we were setting up
// the nat, now is the time to setup port mappings for them.
// we release ready, then grab them to avoid losing any. adding
// a port mapping is idempotent, so its ok to add the same twice.
addrs := nmgr.net.ListenAddresses()
for _, addr := range addrs {
// we do it async because it's slow and we may want to close beforehand
go addPortMapping(nmgr, addr)
}
}) })
} }
// NAT returns the natManager's nat object. this may be nil, if func (nmgr *natManager) refresh() {
// (a) the search process is still ongoing, or (b) the search process
// found no nat. Clients must check whether the return value is nil.
func (nmgr *natManager) NAT() *inat.NAT {
nmgr.natmu.Lock()
defer nmgr.natmu.Unlock()
return nmgr.nat
}
func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) {
nat := nmgr.NAT() nat := nmgr.NAT()
if nat == nil { if nat == nil {
panic("natManager addPortMapping called without a nat.") // Nothing to do.
return
} }
// first, check if the port mapping already exists. nmgr.proc.Go(func(_ goprocess.Process) {
for _, mapping := range nat.Mappings() { nmgr.refreshMu.Lock()
if mapping.InternalAddr().Equal(intaddr) { defer nmgr.refreshMu.Unlock()
return // it exists! return.
ports := map[string]map[int]bool{
"tcp": map[int]bool{},
"udp": map[int]bool{},
} }
} for _, maddr := range nmgr.net.ListenAddresses() {
// Strip the IP
maIP, rest := ma.SplitFirst(maddr)
if maIP == nil || rest == nil {
continue
}
ctx := context.TODO() switch maIP.Protocol().Code {
lm := make(lgbl.DeferredMap) case ma.P_IP6, ma.P_IP4:
lm["internalAddr"] = func() interface{} { return intaddr.String() } default:
continue
}
defer log.EventBegin(ctx, "natMgrAddPortMappingWait", lm).Done() // Only bother if we're listening on a
// unicast/unspecified IP.
ip := net.IP(maIP.RawValue())
if !(ip.IsGlobalUnicast() || ip.IsUnspecified()) {
continue
}
select { // Extract the port/protocol
case <-nmgr.proc.Closing(): proto, _ := ma.SplitFirst(rest)
lm["outcome"] = "cancelled" if proto == nil {
return // no use. continue
case <-nmgr.ready: // wait until it's ready. }
}
// actually start the port map (sub-event because waiting may take a while) var protocol string
defer log.EventBegin(ctx, "natMgrAddPortMapping", lm).Done() switch proto.Protocol().Code {
case ma.P_TCP:
protocol = "tcp"
case ma.P_UDP:
protocol = "udp"
default:
continue
}
// get the nat port, err := strconv.ParseUint(proto.Value(), 10, 16)
m, err := nat.NewMapping(intaddr) if err != nil {
if err != nil { // bug in multiaddr
lm["outcome"] = "failure" panic(err)
lm["error"] = err }
return ports[protocol][int(port)] = false
} }
extaddr, err := m.ExternalAddr() var wg sync.WaitGroup
if err != nil { defer wg.Wait()
lm["outcome"] = "failure"
lm["error"] = err // Close old mappings
return for _, m := range nat.Mappings() {
} mappedPort := m.InternalPort()
if _, ok := ports[m.Protocol()][mappedPort]; !ok {
// No longer need this mapping.
wg.Add(1)
go func(m inat.Mapping) {
defer wg.Done()
m.Close()
}(m)
} else {
// already mapped
ports[m.Protocol()][mappedPort] = true
}
}
lm["outcome"] = "success" // Create new mappings.
lm["externalAddr"] = func() interface{} { return extaddr.String() } for proto, pports := range ports {
log.Infof("established nat port mapping: %s <--> %s", intaddr, extaddr) for port, mapped := range pports {
if mapped {
continue
}
wg.Add(1)
go func(proto string, port int) {
defer wg.Done()
_, err := nat.NewMapping(proto, port)
if err != nil {
log.Errorf("failed to port-map %s port %d: %s", proto, port, err)
}
}(proto, port)
}
}
})
} }
func rmPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { // NAT returns the natManager's nat object. this may be nil, if
nat := nmgr.NAT() // (a) the search process is still ongoing, or (b) the search process
if nat == nil { // found no nat. Clients must check whether the return value is nil.
panic("natManager rmPortMapping called without a nat.") func (nmgr *natManager) NAT() *inat.NAT {
} nmgr.natmu.Lock()
defer nmgr.natmu.Unlock()
// list the port mappings (it may be gone on it's own, so we need to return nmgr.nat
// check this list, and not store it ourselves behind the scenes)
// close mappings for this internal address.
for _, mapping := range nat.Mappings() {
if mapping.InternalAddr().Equal(intaddr) {
mapping.Close()
}
}
} }
// nmgrNetNotifiee implements the network notification listening part
// of the natManager. this is merely listening to Listen() and ListenClose()
// events.
type nmgrNetNotifiee natManager type nmgrNetNotifiee natManager
func (nn *nmgrNetNotifiee) natManager() *natManager { func (nn *nmgrNetNotifiee) natManager() *natManager {
@ -219,19 +228,11 @@ func (nn *nmgrNetNotifiee) natManager() *natManager {
} }
func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) { func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) {
if nn.natManager().NAT() == nil { nn.natManager().refresh()
return // not ready or doesnt exist.
}
addPortMapping(nn.natManager(), addr)
} }
func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) { func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) {
if nn.natManager().NAT() == nil { nn.natManager().refresh()
return // not ready or doesnt exist.
}
rmPortMapping(nn.natManager(), addr)
} }
func (nn *nmgrNetNotifiee) Connected(inet.Network, inet.Conn) {} func (nn *nmgrNetNotifiee) Connected(inet.Network, inet.Conn) {}

4
p2p/protocol/identify/id.go

@ -74,6 +74,10 @@ func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr {
return ids.observedAddrs.Addrs() return ids.observedAddrs.Addrs()
} }
func (ids *IDService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr {
return ids.observedAddrs.AddrsFor(local)
}
func (ids *IDService) IdentifyConn(c inet.Conn) { func (ids *IDService) IdentifyConn(c inet.Conn) {
ids.currmu.Lock() ids.currmu.Lock()
if wait, found := ids.currid[c]; found { if wait, found := ids.currid[c]; found {

40
p2p/protocol/identify/obsaddr.go

@ -30,6 +30,7 @@ type ObservedAddr struct {
func (oa *ObservedAddr) activated(ttl time.Duration) bool { func (oa *ObservedAddr) activated(ttl time.Duration) bool {
// cleanup SeenBy set // cleanup SeenBy set
now := time.Now() now := time.Now()
for k, ob := range oa.SeenBy { for k, ob := range oa.SeenBy {
if now.Sub(ob.seenTime) > ttl*ActivationThresh { if now.Sub(ob.seenTime) > ttl*ActivationThresh {
delete(oa.SeenBy, k) delete(oa.SeenBy, k)
@ -51,6 +52,43 @@ type ObservedAddrSet struct {
ttl time.Duration ttl time.Duration
} }
// AddrsFor return all activated observed addresses associated with the given
// (resolved) listen address.
func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
oas.Lock()
defer oas.Unlock()
// for zero-value.
if len(oas.addrs) == 0 {
return nil
}
key := string(addr.Bytes())
observedAddrs, ok := oas.addrs[key]
if !ok {
return
}
now := time.Now()
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
for _, a := range observedAddrs {
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
}
if len(filteredAddrs) > 0 {
oas.addrs[key] = filteredAddrs
} else {
delete(oas.addrs, key)
}
return addrs
}
// Addrs return all activated observed addresses // Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.Lock() oas.Lock()
@ -92,7 +130,7 @@ func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
now := time.Now() now := time.Now()
observerString := observerGroup(observer) observerString := observerGroup(observer)
localString := local.String() localString := string(local.Bytes())
ob := observation{ ob := observation{
seenTime: now, seenTime: now,
connDirection: direction, connDirection: direction,

4
package.json

@ -122,9 +122,9 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmZ1zCb95y9oHaJRMQmbXh3FgUuwz1V2mbCXBztnhehJkL", "hash": "QmRbx7DYHgw3uNn2RuU2nv9Bdh96ZdtT65CG1CGPNRQcGZ",
"name": "go-libp2p-nat", "name": "go-libp2p-nat",
"version": "0.8.12" "version": "0.8.13"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",

Loading…
Cancel
Save