Browse Source

autorelay: Split libp2p.EnableAutoRelay into 2 functions (#2022)

* Split libp2p.EnableAutoRelay into 2 functions

Provide two specific ways to enable the autorelay subsystem
libp2p.EnableAutoRelayWithStaticRelays
libp2p.EnableAutoRelayWithPeerSource

* remove minInterval from WithPeerSource

* Use PeerSource type

* Fix typo

* Update p2p/host/autorelay/options.go

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
Co-authored-by: Marco Munizaga <marco@marcopolo.io>
Co-authored-by: Marten Seemann <martenseemann@gmail.com>
rcmgr-metrics-prefix
Sukun 2 years ago
committed by GitHub
parent
commit
f9c02c1e46
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      options.go
  2. 94
      p2p/host/autorelay/autorelay_test.go
  3. 40
      p2p/host/autorelay/options.go
  4. 4
      p2p/host/autorelay/relay_finder.go
  5. 4
      p2p/protocol/holepunch/holepunch_test.go

28
options.go

@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/protocol"
@ -307,6 +308,8 @@ func EnableRelayService(opts ...relayv2.Option) Option {
//
// This subsystem performs automatic address rewriting to advertise relay addresses when it
// detects that the node is publicly unreachable (e.g. behind a NAT).
//
// Deprecated: Use EnableAutoRelayWithStaticRelays or EnableAutoRelayWithPeerSource
func EnableAutoRelay(opts ...autorelay.Option) Option {
return func(cfg *Config) error {
cfg.EnableAutoRelay = true
@ -315,6 +318,31 @@ func EnableAutoRelay(opts ...autorelay.Option) Option {
}
}
// EnableAutoRelayWithStaticRelays configures libp2p to enable the AutoRelay subsystem using
// the provided relays as relay candidates.
// This subsystem performs automatic address rewriting to advertise relay addresses when it
// detects that the node is publicly unreachable (e.g. behind a NAT).
func EnableAutoRelayWithStaticRelays(static []peer.AddrInfo, opts ...autorelay.Option) Option {
return func(cfg *Config) error {
cfg.EnableAutoRelay = true
cfg.AutoRelayOpts = append([]autorelay.Option{autorelay.WithStaticRelays(static)}, opts...)
return nil
}
}
// EnableAutoRelayWithPeerSource configures libp2p to enable the AutoRelay
// subsystem using the provided PeerSource callback to get more relay
// candidates. This subsystem performs automatic address rewriting to advertise
// relay addresses when it detects that the node is publicly unreachable (e.g.
// behind a NAT).
func EnableAutoRelayWithPeerSource(peerSource autorelay.PeerSource, opts ...autorelay.Option) Option {
return func(cfg *Config) error {
cfg.EnableAutoRelay = true
cfg.AutoRelayOpts = append([]autorelay.Option{autorelay.WithPeerSource(peerSource)}, opts...)
return nil
}
}
// ForceReachabilityPublic overrides automatic reachability detection in the AutoNAT subsystem,
// forcing the local node to believe it is reachable externally.
func ForceReachabilityPublic() Option {

94
p2p/host/autorelay/autorelay_test.go

@ -52,11 +52,22 @@ func usedRelays(h host.Host) []peer.ID {
return peers
}
func newPrivateNode(t *testing.T, opts ...autorelay.Option) host.Host {
func newPrivateNode(t *testing.T, peerSource func(context.Context, int) <-chan peer.AddrInfo,
opts ...autorelay.Option) host.Host {
t.Helper()
h, err := libp2p.New(
libp2p.ForceReachabilityPrivate(),
libp2p.EnableAutoRelay(opts...),
libp2p.EnableAutoRelayWithPeerSource(peerSource, opts...),
)
require.NoError(t, err)
return h
}
func newPrivateNodeWithStaticRelays(t *testing.T, static []peer.AddrInfo, opts ...autorelay.Option) host.Host {
t.Helper()
h, err := libp2p.New(
libp2p.ForceReachabilityPrivate(),
libp2p.EnableAutoRelayWithStaticRelays(static, opts...),
)
require.NoError(t, err)
return h
@ -117,7 +128,7 @@ func newRelayV1(t *testing.T) host.Host {
func TestSingleCandidate(t *testing.T) {
var counter int
h := newPrivateNode(t,
autorelay.WithPeerSource(func(_ context.Context, num int) <-chan peer.AddrInfo {
func(_ context.Context, num int) <-chan peer.AddrInfo {
counter++
require.Equal(t, 1, num)
peerChan := make(chan peer.AddrInfo, num)
@ -126,10 +137,11 @@ func TestSingleCandidate(t *testing.T) {
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
return peerChan
}, time.Hour),
},
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -151,15 +163,16 @@ func TestSingleRelay(t *testing.T) {
close(peerChan)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(_ context.Context, num int) <-chan peer.AddrInfo {
func(_ context.Context, num int) <-chan peer.AddrInfo {
require.False(t, called, "expected the peer source callback to only have been called once")
called = true
require.Equal(t, numCandidates, num)
return peerChan
}, time.Hour),
},
autorelay.WithMaxCandidates(numCandidates),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -178,15 +191,16 @@ func TestPreferRelayV2(t *testing.T) {
})
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo {
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
return peerChan
}, time.Hour),
},
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -196,10 +210,11 @@ func TestPreferRelayV2(t *testing.T) {
func TestWaitForCandidates(t *testing.T) {
peerChan := make(chan peer.AddrInfo)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
func(context.Context, int) <-chan peer.AddrInfo { return peerChan },
autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -244,18 +259,19 @@ func TestBackoff(t *testing.T) {
var counter int32 // to be used atomically
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo {
func(context.Context, int) <-chan peer.AddrInfo {
// always return the same node, and make sure we don't try to connect to it too frequently
atomic.AddInt32(&counter, 1)
peerChan := make(chan peer.AddrInfo, 1)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
close(peerChan)
return peerChan
}, time.Second),
},
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithBackoff(backoff),
autorelay.WithClock(cl),
autorelay.WithMinInterval(time.Second),
)
defer h.Close()
@ -280,8 +296,8 @@ func TestStaticRelays(t *testing.T) {
staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()})
}
h := newPrivateNode(t,
autorelay.WithStaticRelays(staticRelays),
h := newPrivateNodeWithStaticRelays(t,
staticRelays,
autorelay.WithNumRelays(1),
)
defer h.Close()
@ -298,8 +314,9 @@ func TestRelayV1(t *testing.T) {
close(peerChan)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
func(context.Context, int) <-chan peer.AddrInfo { return peerChan },
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -314,9 +331,10 @@ func TestRelayV1(t *testing.T) {
close(peerChan)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
func(context.Context, int) <-chan peer.AddrInfo { return peerChan },
autorelay.WithBootDelay(0),
autorelay.WithCircuitV1Support(),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -338,11 +356,12 @@ func TestConnectOnDisconnect(t *testing.T) {
relays = append(relays, r)
}
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
func(context.Context, int) <-chan peer.AddrInfo { return peerChan },
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(num),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
@ -387,18 +406,19 @@ func TestMaxAge(t *testing.T) {
close(peerChans)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo {
func(context.Context, int) <-chan peer.AddrInfo {
c, ok := <-peerChans
if !ok {
t.Fatal("unexpected call to PeerSource")
}
return c
}, time.Second),
},
autorelay.WithNumRelays(1),
autorelay.WithMaxCandidates(100),
autorelay.WithBootDelay(0),
autorelay.WithMaxCandidateAge(20*time.Minute),
autorelay.WithClock(cl),
autorelay.WithMinInterval(time.Second),
)
defer h.Close()
@ -449,18 +469,6 @@ func TestMaxAge(t *testing.T) {
require.Contains(t, ids, relays[0])
}
func TestIncorrectInit(t *testing.T) {
// Check if we panic if we do not correctly initialize the autorelay system.
// Common since it's easy to initialize without passing in the correct options: https://github.com/libp2p/go-libp2p/issues/1852
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected to panic")
}
}()
_ = newPrivateNode(t)
}
func expectDeltaInAddrUpdated(t *testing.T, addrUpdated event.Subscription, expectedDelta int) {
t.Helper()
delta := 0
@ -507,8 +515,8 @@ func TestReconnectToStaticRelays(t *testing.T) {
staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()})
}
h := newPrivateNode(t,
autorelay.WithStaticRelays(staticRelays),
h := newPrivateNodeWithStaticRelays(t,
staticRelays,
autorelay.WithClock(cl),
)
@ -532,3 +540,25 @@ func TestReconnectToStaticRelays(t *testing.T) {
cl.Add(time.Hour)
expectDeltaInAddrUpdated(t, addrUpdated, -1)
}
func TestMinInterval(t *testing.T) {
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
r1 := newRelay(t)
t.Cleanup(func() { r1.Close() })
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
return peerChan
},
autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour),
autorelay.WithMinInterval(500*time.Millisecond),
)
defer h.Close()
// The second call to peerSource should happen after 1 second
require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 1*time.Second, 100*time.Millisecond)
}

40
p2p/host/autorelay/options.go

@ -10,9 +10,22 @@ import (
"github.com/benbjohnson/clock"
)
// AutoRelay will call this function when it needs new candidates because it is
// not connected to the desired number of relays or we get disconnected from one
// of the relays. Implementations must send *at most* numPeers, and close the
// channel when they don't intend to provide any more peers. AutoRelay will not
// call the callback again until the channel is closed. Implementations should
// send new peers, but may send peers they sent before. AutoRelay implements a
// per-peer backoff (see WithBackoff). See WithMinInterval for setting the
// minimum interval between calls to the callback. The context.Context passed
// may be canceled when AutoRelay feels satisfied, it will be canceled when the
// node is shutting down. If the context is canceled you MUST close the output
// channel at some point.
type PeerSource func(ctx context.Context, num int) <-chan peer.AddrInfo
type config struct {
clock clock.Clock
peerSource func(ctx context.Context, num int) <-chan peer.AddrInfo
peerSource PeerSource
// minimum interval used to call the peerSource callback
minInterval time.Duration
// see WithMinCandidates
@ -40,6 +53,7 @@ var defaultConfig = config{
backoff: time.Hour,
desiredRelays: 2,
maxCandidateAge: 30 * time.Minute,
minInterval: 30 * time.Second,
}
var (
@ -65,7 +79,7 @@ func WithStaticRelays(static []peer.AddrInfo) Option {
c <- static[i]
}
return c
}, 30*time.Second)(c)
})(c)
WithMinCandidates(len(static))(c)
WithMaxCandidates(len(static))(c)
WithNumRelays(len(static))(c)
@ -75,23 +89,12 @@ func WithStaticRelays(static []peer.AddrInfo) Option {
}
// WithPeerSource defines a callback for AutoRelay to query for more relay candidates.
// AutoRelay will call this function when it needs new candidates is connected to the desired number of
// relays, and it has enough candidates (in case we get disconnected from one of the relays).
// Implementations must send *at most* numPeers, and close the channel when they don't intend to provide
// any more peers.
// AutoRelay will not call the callback again until the channel is closed.
// Implementations should send new peers, but may send peers they sent before. AutoRelay implements
// a per-peer backoff (see WithBackoff).
// minInterval is the minimum interval this callback is called with, even if AutoRelay needs new candidates.
// The context.Context passed MAY be canceled when AutoRelay feels satisfied, it will be canceled when the node is shutting down.
// If the channel is canceled you MUST close the output channel at some point.
func WithPeerSource(f func(ctx context.Context, numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option {
func WithPeerSource(f PeerSource) Option {
return func(c *config) error {
if c.peerSource != nil {
return errAlreadyHavePeerSource
}
c.peerSource = f
c.minInterval = minInterval
return nil
}
}
@ -174,3 +177,12 @@ func WithClock(cl clock.Clock) Option {
return nil
}
}
// WithMinInterval sets the minimum interval after which peerSource callback will be called for more
// candidates even if AutoRelay needs new candidates.
func WithMinInterval(interval time.Duration) Option {
return func(c *config) error {
c.minInterval = interval
return nil
}
}

4
p2p/host/autorelay/relay_finder.go

@ -60,7 +60,7 @@ type relayFinder struct {
ctxCancel context.CancelFunc
ctxCancelMx sync.Mutex
peerSource func(context.Context, int) <-chan peer.AddrInfo
peerSource PeerSource
candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex
@ -83,7 +83,7 @@ type relayFinder struct {
cachedAddrsExpiry time.Time
}
func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int) <-chan peer.AddrInfo, conf *config) *relayFinder {
func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) *relayFinder {
if peerSource == nil {
panic("Can not create a new relayFinder. Need a Peer Source fn or a list of static relays. Refer to the documentation around `libp2p.EnableAutoRelay`")
}

4
p2p/protocol/holepunch/holepunch_test.go

@ -417,9 +417,9 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
h, err := libp2p.New(
libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")),
libp2p.EnableRelay(),
libp2p.EnableAutoRelay(
libp2p.EnableAutoRelayWithStaticRelays(
[]peer.AddrInfo{pi},
autorelay.WithCircuitV1Support(),
autorelay.WithStaticRelays([]peer.AddrInfo{pi}),
),
libp2p.ForceReachabilityPrivate(),
libp2p.ResourceManager(&network.NullResourceManager{}),

Loading…
Cancel
Save