Browse Source

autonat: don't use autonat for address discovery (#2148)

* dont use autonat for address discovery

* dont increase timeout
pull/2151/head v0.26.1
Sukun 2 years ago
committed by GitHub
parent
commit
c738a48d02
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 109
      p2p/host/autonat/autonat.go
  2. 18
      p2p/host/autonat/autonat_test.go
  3. 21
      p2p/host/autonat/client.go
  4. 5
      p2p/host/autonat/interface.go
  5. 21
      p2p/host/autonat/svc_test.go
  6. 14
      p2p/host/basic/basic_host.go

109
p2p/host/autonat/autonat.go

@ -2,7 +2,6 @@ package autonat
import (
"context"
"errors"
"math/rand"
"sync/atomic"
"time"
@ -20,6 +19,8 @@ import (
var log = logging.Logger("autonat")
const maxConfidence = 3
// AmbientAutoNAT is the implementation of ambient NAT autodiscovery
type AmbientAutoNAT struct {
host host.Host
@ -31,9 +32,9 @@ type AmbientAutoNAT struct {
backgroundRunning chan struct{} // is closed when the background go routine exits
inboundConn chan network.Conn
observations chan autoNATResult
observations chan network.Reachability
// status is an autoNATResult reflecting current status.
status atomic.Pointer[autoNATResult]
status atomic.Pointer[network.Reachability]
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
@ -58,11 +59,6 @@ type StaticAutoNAT struct {
service *autoNATService
}
type autoNATResult struct {
network.Reachability
address ma.Multiaddr
}
// New creates a new NAT autodiscovery system attached to a host
func New(h host.Host, options ...Option) (AutoNAT, error) {
var err error
@ -111,13 +107,14 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),
observations: make(chan network.Reachability, 1),
emitReachabilityChanged: emitReachabilityChanged,
service: service,
recentProbes: make(map[peer.ID]time.Time),
}
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})
reachability := network.ReachabilityUnknown
as.status.Store(&reachability)
subscriber, err := as.host.EventBus().Subscribe(
[]any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)},
@ -137,25 +134,15 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() network.Reachability {
s := as.status.Load()
return s.Reachability
return *s
}
func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load()
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability})
status := *as.status.Load()
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status})
if as.metricsTracer != nil {
as.metricsTracer.ReachabilityStatus(status.Reachability)
}
}
// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load()
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
as.metricsTracer.ReachabilityStatus(status)
}
return s.address, nil
}
func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool {
@ -174,7 +161,6 @@ func (as *AmbientAutoNAT) background() {
// before starting autodetection
delay := as.config.bootDelay
var lastAddrUpdated time.Time
subChan := as.subscriber.Out()
defer as.subscriber.Close()
defer as.emitReachabilityChanged.Close()
@ -187,10 +173,6 @@ func (as *AmbientAutoNAT) background() {
// new inbound connection.
case conn := <-as.inboundConn:
localAddrs := as.host.Addrs()
ca := as.status.Load()
if ca.address != nil {
localAddrs = append(localAddrs, ca.address)
}
if manet.IsPublicAddr(conn.RemoteMultiaddr()) &&
!ipInList(conn.RemoteMultiaddr(), localAddrs) {
as.lastInbound = time.Now()
@ -199,16 +181,15 @@ func (as *AmbientAutoNAT) background() {
case e := <-subChan:
switch e := e.(type) {
case event.EvtLocalAddressesUpdated:
if !lastAddrUpdated.Add(time.Second).After(time.Now()) {
lastAddrUpdated = time.Now()
if as.confidence > 1 {
as.confidence--
}
// On local address update, reduce confidence from maximum so that we schedule
// the next probe sooner
if as.confidence == maxConfidence {
as.confidence--
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := as.status.Load()
if currentStatus.Reachability == network.ReachabilityUnknown {
currentStatus := *as.status.Load()
if currentStatus == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
}
}
@ -256,7 +237,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
currentStatus := as.status.Load()
currentStatus := *as.status.Load()
nextProbe := fixedNow
// Don't look for peers in the peer store more than once per second.
@ -268,13 +249,13 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
}
if !as.lastProbe.IsZero() {
untilNext := as.config.refreshInterval
if currentStatus.Reachability == network.ReachabilityUnknown {
if currentStatus == network.ReachabilityUnknown {
untilNext = as.config.retryInterval
} else if as.confidence < 3 {
} else if as.confidence < maxConfidence {
untilNext = as.config.retryInterval
} else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
} else if currentStatus == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext *= 2
} else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
} else if currentStatus != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext /= 5
}
@ -289,14 +270,14 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
}
// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load()
func (as *AmbientAutoNAT) recordObservation(observation network.Reachability) {
currentStatus := *as.status.Load()
if observation.Reachability == network.ReachabilityPublic {
log.Debugf("NAT status is public")
if observation == network.ReachabilityPublic {
changed := false
if currentStatus.Reachability != network.ReachabilityPublic {
if currentStatus != network.ReachabilityPublic {
// Aggressively switch to public from other states ignoring confidence
log.Debugf("NAT status is public")
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
@ -304,19 +285,20 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
as.service.Enable()
}
changed = true
} else if as.confidence < 3 {
} else if as.confidence < maxConfidence {
as.confidence++
}
as.status.Store(&observation)
if changed {
as.emitStatus()
}
} else if observation.Reachability == network.ReachabilityPrivate {
log.Debugf("NAT status is private")
if currentStatus.Reachability != network.ReachabilityPrivate {
} else if observation == network.ReachabilityPrivate {
if currentStatus != network.ReachabilityPrivate {
if as.confidence > 0 {
as.confidence--
} else {
log.Debugf("NAT status is private")
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(&observation)
@ -325,7 +307,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
}
as.emitStatus()
}
} else if as.confidence < 3 {
} else if as.confidence < maxConfidence {
as.confidence++
as.status.Store(&observation)
}
@ -334,8 +316,8 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})
if currentStatus.Reachability != network.ReachabilityUnknown {
as.status.Store(&observation)
if currentStatus != network.ReachabilityUnknown {
if as.service != nil {
as.service.Enable()
}
@ -376,19 +358,18 @@ func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout)
defer cancel()
a, err := cli.DialBack(ctx, pi.ID)
err := cli.DialBack(ctx, pi.ID)
var result autoNATResult
var result network.Reachability
switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
result.Reachability = network.ReachabilityPublic
result.address = a
log.Debugf("Dialback through %s successful", pi.ID.Pretty())
result = network.ReachabilityPublic
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
result.Reachability = network.ReachabilityPrivate
result = network.ReachabilityPrivate
default:
result.Reachability = network.ReachabilityUnknown
result = network.ReachabilityUnknown
}
select {
@ -455,14 +436,6 @@ func (s *StaticAutoNAT) Status() network.Reachability {
return s.reachability
}
// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
if s.reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
return nil, errors.New("no available address")
}
func (s *StaticAutoNAT) Close() error {
if s.service != nil {
s.service.Disable()

18
p2p/host/autonat/autonat_test.go

@ -15,7 +15,6 @@ import (
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
@ -209,8 +208,7 @@ func TestAutoNATObservationRecording(t *testing.T) {
t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err)
}
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(network.ReachabilityPublic)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("failed to transition to public.")
}
@ -218,7 +216,7 @@ func TestAutoNATObservationRecording(t *testing.T) {
expectEvent(t, s, network.ReachabilityPublic, 3*time.Second)
// a single recording should have confidence still at 0, and transition to private quickly.
an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil})
an.recordObservation(network.ReachabilityPrivate)
if an.Status() != network.ReachabilityPrivate {
t.Fatalf("failed to transition to private.")
}
@ -226,22 +224,20 @@ func TestAutoNATObservationRecording(t *testing.T) {
expectEvent(t, s, network.ReachabilityPrivate, 3*time.Second)
// stronger public confidence should be harder to undo.
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(network.ReachabilityPublic)
an.recordObservation(network.ReachabilityPublic)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("failed to transition to public.")
}
expectEvent(t, s, network.ReachabilityPublic, 3*time.Second)
an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil})
an.recordObservation(network.ReachabilityPrivate)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("too-extreme private transition.")
}
// don't emit events on observed address change
newAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/12345")
an.recordObservation(autoNATResult{network.ReachabilityPublic, newAddr})
// Don't emit events if reachability hasn't changed
an.recordObservation(network.ReachabilityPublic)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("reachability should stay public")
}

21
p2p/host/autonat/client.go

@ -11,8 +11,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autonat/pb"
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
)
// NewAutoNATClient creates a fresh instance of an AutoNATClient
@ -36,22 +34,22 @@ type client struct {
// Note: A returned error Message_E_DIAL_ERROR does not imply that the server
// actually performed a dial attempt. Servers that run a version < v0.20.0 also
// return Message_E_DIAL_ERROR if the dial was skipped due to the dialPolicy.
func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error) {
func (c *client) DialBack(ctx context.Context, p peer.ID) error {
s, err := c.h.NewStream(ctx, p, AutoNATProto)
if err != nil {
return nil, err
return err
}
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to autonat service: %s", err)
s.Reset()
return nil, err
return err
}
if err := s.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for autonat stream: %s", err)
s.Reset()
return nil, err
return err
}
defer s.Scope().ReleaseMemory(maxMsgSize)
@ -66,17 +64,17 @@ func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
req := newDialMessage(peer.AddrInfo{ID: c.h.ID(), Addrs: c.addrFunc()})
if err := w.WriteMsg(req); err != nil {
s.Reset()
return nil, err
return err
}
var res pb.Message
if err := r.ReadMsg(&res); err != nil {
s.Reset()
return nil, err
return err
}
if res.GetType() != pb.Message_DIAL_RESPONSE {
s.Reset()
return nil, fmt.Errorf("unexpected response: %s", res.GetType().String())
return fmt.Errorf("unexpected response: %s", res.GetType().String())
}
status := res.GetDialResponse().GetStatus()
@ -85,10 +83,9 @@ func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
}
switch status {
case pb.Message_OK:
addr := res.GetDialResponse().GetAddr()
return ma.NewMultiaddrBytes(addr)
return nil
default:
return nil, Error{Status: status, Text: res.GetDialResponse().GetStatusText()}
return Error{Status: status, Text: res.GetDialResponse().GetStatusText()}
}
}

5
p2p/host/autonat/interface.go

@ -14,9 +14,6 @@ import (
type AutoNAT interface {
// Status returns the current NAT status
Status() network.Reachability
// PublicAddr returns the public dial address when NAT status is public and an
// error otherwise
PublicAddr() (ma.Multiaddr, error)
io.Closer
}
@ -24,7 +21,7 @@ type AutoNAT interface {
type Client interface {
// DialBack requests from a peer providing AutoNAT services to test dial back
// and report the address on a successful connection.
DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
DialBack(ctx context.Context, p peer.ID) error
}
// AddrFunc is a function returning the candidate addresses for the local host.

21
p2p/host/autonat/svc_test.go

@ -12,7 +12,6 @@ import (
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
@ -58,7 +57,7 @@ func TestAutoNATServiceDialRefused(t *testing.T) {
defer hc.Close()
connect(t, c.host, hc)
_, err := ac.DialBack(ctx, c.host.ID())
err := ac.DialBack(ctx, c.host.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
@ -82,7 +81,7 @@ func TestAutoNATServiceDialSuccess(t *testing.T) {
defer hc.Close()
connect(t, c.host, hc)
_, err := ac.DialBack(ctx, c.host.ID())
err := ac.DialBack(ctx, c.host.ID())
if err != nil {
t.Fatalf("Dial back failed: %s", err.Error())
}
@ -106,12 +105,12 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
defer hc.Close()
connect(t, c.host, hc)
_, err := ac.DialBack(ctx, c.host.ID())
err := ac.DialBack(ctx, c.host.ID())
if err != nil {
t.Fatal(err)
}
_, err = ac.DialBack(ctx, c.host.ID())
err = ac.DialBack(ctx, c.host.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
@ -122,7 +121,7 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
time.Sleep(400 * time.Millisecond)
_, err = ac.DialBack(ctx, c.host.ID())
err = ac.DialBack(ctx, c.host.ID())
if err != nil {
t.Fatal(err)
}
@ -149,7 +148,7 @@ func TestAutoNATServiceGlobalLimiter(t *testing.T) {
hc, ac := makeAutoNATClient(t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
err := ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatal(err)
}
@ -158,7 +157,7 @@ func TestAutoNATServiceGlobalLimiter(t *testing.T) {
hc, ac := makeAutoNATClient(t)
defer hc.Close()
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
err := ac.DialBack(ctx, hs.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
@ -207,7 +206,7 @@ func TestAutoNATServiceStartup(t *testing.T) {
hc, ac := makeAutoNATClient(t)
connect(t, h, hc)
_, err = ac.DialBack(context.Background(), h.ID())
err = ac.DialBack(context.Background(), h.ID())
if err != nil {
t.Fatal("autonat service be active in unknown mode.")
}
@ -215,11 +214,11 @@ func TestAutoNATServiceStartup(t *testing.T) {
sub, _ := h.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
anc := an.(*AmbientAutoNAT)
anc.recordObservation(autoNATResult{Reachability: network.ReachabilityPublic, address: ma.StringCast("/ip4/127.0.0.1/tcp/1234")})
anc.recordObservation(network.ReachabilityPublic)
<-sub.Out()
_, err = ac.DialBack(context.Background(), h.ID())
err = ac.DialBack(context.Background(), h.ID())
if err != nil {
t.Fatalf("autonat should be active, was %v", err)
}

14
p2p/host/basic/basic_host.go

@ -776,7 +776,6 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {
h.addrMu.RLock()
filteredIfaceAddrs := h.filteredInterfaceAddrs
allIfaceAddrs := h.allInterfaceAddrs
autonat := h.autoNat
h.addrMu.RUnlock()
// Iterate over all _unresolved_ listen addresses, resolving our primary
@ -790,19 +789,6 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {
finalAddrs = append(finalAddrs, resolved...)
}
// add autonat PublicAddr Consider the following scenario
// For example, it is deployed on a cloud server,
// it provides an elastic ip accessible to the public network,
// but not have an external network card,
// so net.InterfaceAddrs() not has the public ip
// The host can indeed be dialed !!!
if autonat != nil {
publicAddr, _ := autonat.PublicAddr()
if publicAddr != nil {
finalAddrs = append(finalAddrs, publicAddr)
}
}
finalAddrs = dedupAddrs(finalAddrs)
var natMappings []inat.Mapping

Loading…
Cancel
Save