Browse Source

autorelay: only try to obtain a reservation once per peer, implement backoff

pull/1587/head
Marten Seemann 2 years ago
parent
commit
f4f663945c
  1. 123
      p2p/host/autorelay/autorelay_test.go
  2. 29
      p2p/host/autorelay/options.go
  3. 153
      p2p/host/autorelay/relay_finder.go

123
p2p/host/autorelay/autorelay_test.go

@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
@ -21,6 +20,8 @@ import (
"github.com/stretchr/testify/require"
)
const protoIDv2 = circuitv2_proto.ProtoIDv2Hop
func numRelays(h host.Host) int {
peers := make(map[peer.ID]struct{})
for _, addr := range h.Addrs() {
@ -71,7 +72,7 @@ func newRelay(t *testing.T) host.Host {
require.NoError(t, err)
require.Eventually(t, func() bool {
for _, p := range h.Mux().Protocols() {
if p == circuitv2_proto.ProtoIDv2Hop {
if p == protoIDv2 {
return true
}
}
@ -103,38 +104,6 @@ func newRelayV1(t *testing.T) host.Host {
return h
}
// creates a node that speaks the relay v2 protocol, but doesn't accept any reservations for the first workAfter tries
func newBrokenRelay(t *testing.T, workAfter int) host.Host {
t.Helper()
h, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
}
}
return addrs
}),
libp2p.EnableRelayService(),
)
require.NoError(t, err)
var n int32
h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) {
str.Reset()
num := atomic.AddInt32(&n, 1)
if int(num) >= workAfter {
h.RemoveStreamHandler(circuitv2_proto.ProtoIDv2Hop)
r, err := relayv2.New(h)
require.NoError(t, err)
t.Cleanup(func() { r.Close() })
}
})
return h
}
func TestSingleCandidate(t *testing.T) {
var counter int
h := newPrivateNode(t,
@ -147,7 +116,7 @@ func TestSingleCandidate(t *testing.T) {
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
return peerChan
}),
}, time.Hour),
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0),
@ -177,7 +146,7 @@ func TestSingleRelay(t *testing.T) {
called = true
require.Equal(t, numCandidates, num)
return peerChan
}),
}, time.Hour),
autorelay.WithMaxCandidates(numCandidates),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
@ -204,7 +173,7 @@ func TestPreferRelayV2(t *testing.T) {
defer close(peerChan)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
return peerChan
}),
}, time.Hour),
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0),
@ -217,7 +186,7 @@ func TestPreferRelayV2(t *testing.T) {
func TestWaitForCandidates(t *testing.T) {
peerChan := make(chan peer.AddrInfo)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour),
@ -264,7 +233,7 @@ func TestMaxCandidateAge(t *testing.T) {
candidates = candidates[1:]
}
return peerChan
}),
}, time.Hour),
autorelay.WithMaxCandidates(numCandidates),
autorelay.WithNumRelays(1),
autorelay.WithMaxCandidateAge(time.Hour),
@ -285,53 +254,57 @@ func TestMaxCandidateAge(t *testing.T) {
}
func TestBackoff(t *testing.T) {
const backoff = 10 * time.Second
peerChan := make(chan peer.AddrInfo)
const backoff = 20 * time.Second
cl := clock.NewMock()
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithBackoff(backoff),
autorelay.WithClock(cl),
r, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.ForceReachabilityPublic(),
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
}
}
return addrs
}),
)
defer h.Close()
r1 := newBrokenRelay(t, 1)
t.Cleanup(func() { r1.Close() })
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
// make sure we don't add any relays yet
require.Never(t, func() bool { return numRelays(h) > 0 }, 100*time.Millisecond, 20*time.Millisecond)
cl.Add(backoff * 2 / 3)
require.Never(t, func() bool { return numRelays(h) > 0 }, 100*time.Millisecond, 20*time.Millisecond)
cl.Add(backoff * 2 / 3)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
}
require.NoError(t, err)
defer r.Close()
var reservations int32
r.SetStreamHandler(protoIDv2, func(str network.Stream) {
atomic.AddInt32(&reservations, 1)
str.Reset()
})
func TestMaxBackoffs(t *testing.T) {
const backoff = 20 * time.Second
cl := clock.NewMock()
peerChan := make(chan peer.AddrInfo)
var counter int
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo {
// always return the same node, and make sure we don't try to connect to it too frequently
counter++
peerChan := make(chan peer.AddrInfo, 1)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
close(peerChan)
return peerChan
}, time.Second),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithBackoff(backoff),
autorelay.WithMaxAttempts(3),
autorelay.WithClock(cl),
)
defer h.Close()
r := newBrokenRelay(t, 4)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
require.Eventually(t, func() bool { return atomic.LoadInt32(&reservations) == 1 }, 3*time.Second, 20*time.Millisecond)
// make sure we don't add any relays yet
for i := 0; i < 5; i++ {
cl.Add(backoff * 3 / 2)
require.Never(t, func() bool { return numRelays(h) > 0 }, 50*time.Millisecond, 20*time.Millisecond)
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
require.Equal(t, 1, int(atomic.LoadInt32(&reservations)))
}
cl.Add(backoff / 2)
require.Eventually(t, func() bool { return atomic.LoadInt32(&reservations) == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, counter, 100) // just make sure we're not busy-looping
require.Equal(t, 2, int(atomic.LoadInt32(&reservations)))
}
func TestStaticRelays(t *testing.T) {
@ -361,7 +334,7 @@ func TestRelayV1(t *testing.T) {
close(peerChan)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithBootDelay(0),
)
defer h.Close()
@ -377,7 +350,7 @@ func TestRelayV1(t *testing.T) {
close(peerChan)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithBootDelay(0),
autorelay.WithCircuitV1Support(),
)

29
p2p/host/autorelay/options.go

@ -11,8 +11,10 @@ import (
)
type config struct {
clock clock.Clock
peerSource func(num int) <-chan peer.AddrInfo
clock clock.Clock
peerSource func(num int) <-chan peer.AddrInfo
// minimum interval used to call the peerSource callback
minInterval time.Duration
staticRelays []peer.AddrInfo
// see WithMinCandidates
minCandidates int
@ -23,8 +25,6 @@ type config struct {
bootDelay time.Duration
// backoff is the time we wait after failing to obtain a reservation with a candidate
backoff time.Duration
// If we fail to obtain a reservation more than maxAttempts, we stop trying.
maxAttempts int
// Number of relays we strive to obtain a reservation with.
desiredRelays int
// see WithMaxCandidateAge
@ -39,7 +39,6 @@ var defaultConfig = config{
maxCandidates: 20,
bootDelay: 3 * time.Minute,
backoff: time.Hour,
maxAttempts: 3,
desiredRelays: 2,
maxCandidateAge: 30 * time.Minute,
}
@ -95,16 +94,21 @@ func WithDefaultStaticRelays() Option {
}
// WithPeerSource defines a callback for AutoRelay to query for more relay candidates.
// AutoRelay will call this function in regular intervals, until it is connected to the desired number of
// AutoRelay will call this function when it needs new candidates is connected to the desired number of
// relays, and it has enough candidates (in case we get disconnected from one of the relays).
// Implementations must send *at most* numPeers, and close the channel when they don't intend to provide
// any more peers. AutoRelay will not call the callback again until the channel is closed.
func WithPeerSource(f func(numPeers int) <-chan peer.AddrInfo) Option {
// any more peers.
// AutoRelay will not call the callback again until the channel is closed.
// Implementations should send new peers, but may send peers they sent before. AutoRelay implements
// a per-peer backoff (see WithBackoff).
// minInterval is the minimum interval this callback is called with, even if AutoRelay needs new candidates.
func WithPeerSource(f func(numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errStaticRelaysPeerSource
}
c.peerSource = f
c.minInterval = minInterval
return nil
}
}
@ -158,15 +162,6 @@ func WithBackoff(d time.Duration) Option {
}
}
// WithMaxAttempts sets the number of times we attempt to obtain a reservation with a candidate.
// If we still fail to obtain a reservation, this candidate is dropped.
func WithMaxAttempts(n int) Option {
return func(c *config) error {
c.maxAttempts = n
return nil
}
}
// WithCircuitV1Support enables support for circuit v1 relays.
func WithCircuitV1Support() Option {
return func(c *config) error {

153
p2p/host/autorelay/relay_finder.go

@ -43,15 +43,8 @@ const (
)
type candidate struct {
added time.Time
supportsRelayV2 bool
ai peer.AddrInfo
numAttempts int
}
type candidateOnBackoff struct {
candidate
nextConnAttempt time.Time
}
// relayFinder is a Host that uses relays for connectivity when a NAT is detected.
@ -72,8 +65,13 @@ type relayFinder struct {
candidateMx sync.Mutex
lastCandidateAdded time.Time
candidates map[peer.ID]*candidate
candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time
handleNewCandidateTrigger chan struct{} // cap: 1
backoff map[peer.ID]time.Time
handleNewCandidateTrigger chan struct{} // cap: 1
// Any time _something_ hapens that might cause us to need new candidates.
// This could be
// * the disconnection of a relay
// * the failed attempt to obtain a reservation with a current candidate
maybeRequestNewCandidates chan struct{} // cap: 1.
relayUpdated chan struct{}
@ -91,16 +89,16 @@ func newRelayFinder(host *basic.BasicHost, peerSource func(int) <-chan peer.Addr
conf: conf,
peerSource: peerSource,
candidates: make(map[peer.ID]*candidate),
backoff: make(map[peer.ID]time.Time),
candidateFound: make(chan struct{}, 1),
handleNewCandidateTrigger: make(chan struct{}, 1),
maybeRequestNewCandidates: make(chan struct{}, 1),
relays: make(map[peer.ID]*circuitv2.Reservation),
relayUpdated: make(chan struct{}, 1),
}
}
func (rf *relayFinder) background(ctx context.Context) {
relayDisconnected := make(chan struct{}, 1)
if rf.usesStaticRelay() {
rf.refCount.Add(1)
go func() {
@ -111,7 +109,7 @@ func (rf *relayFinder) background(ctx context.Context) {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx, relayDisconnected)
rf.findNodes(ctx)
}()
}
@ -152,10 +150,7 @@ func (rf *relayFinder) background(ctx context.Context) {
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
log.Debugw("disconnected from relay", "id", evt.Peer)
delete(rf.relays, evt.Peer)
select {
case relayDisconnected <- struct{}{}:
default:
}
rf.notifyMaybeNeedNewCandidates()
push = true
}
rf.relayMx.Unlock()
@ -174,8 +169,14 @@ func (rf *relayFinder) background(ctx context.Context) {
case now := <-refreshTicker.C:
push = rf.refreshReservations(ctx, now)
case now := <-backoffTicker.C:
log.Debug("backoff timer fired")
rf.checkForCandidatesOnBackoff(now)
rf.candidateMx.Lock()
for id, t := range rf.backoff {
if !t.Add(rf.conf.backoff).After(now) {
log.Debugw("removing backoff for node", "id", id)
delete(rf.backoff, id)
}
}
rf.candidateMx.Unlock()
case <-ctx.Done():
return
}
@ -193,46 +194,58 @@ func (rf *relayFinder) background(ctx context.Context) {
// It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context, relayDisconnected <-chan struct{}) {
func (rf *relayFinder) findNodes(ctx context.Context) {
peerChan := rf.peerSource(rf.conf.maxCandidates)
lastCallToPeerSource := rf.conf.clock.Now()
timer := newTimer(rf.conf.clock)
for {
rf.candidateMx.Lock()
numCandidates := len(rf.candidates)
rf.candidateMx.Unlock()
if peerChan == nil {
now := rf.conf.clock.Now()
nextAllowedCallToPeerSource := lastCallToPeerSource.Add(rf.conf.minInterval).Sub(now)
if numCandidates < rf.conf.minCandidates {
log.Debugw("not enough candidates. Resetting timer", "num", numCandidates, "desired", rf.conf.minCandidates)
timer.Reset(nextAllowedCallToPeerSource)
} else if !rf.lastCandidateAdded.IsZero() {
newestCandidateAge := now.Sub(rf.lastCandidateAdded)
log.Debugw("resetting timer. candidate will be too old in", "dur", rf.conf.maxCandidateAge-newestCandidateAge)
t := rf.conf.maxCandidateAge - newestCandidateAge
if t < nextAllowedCallToPeerSource {
t = nextAllowedCallToPeerSource
}
timer.Reset(t)
}
}
select {
case <-relayDisconnected:
timer.Reset(0)
case <-rf.maybeRequestNewCandidates:
continue
case now := <-timer.Chan():
timer.SetRead()
if peerChan != nil {
// We're still reading peers from the peerChan. No need to query for more peers now.
continue
}
rf.relayMx.Lock()
numRelays := len(rf.relays)
rf.relayMx.Unlock()
rf.candidateMx.Lock()
numCandidates := len(rf.candidates)
rf.candidateMx.Unlock()
// Even if we are connected to the desired number of relays, or have enough candidates,
// we want to make sure that our candidate list doesn't become outdated.
newestCandidateAge := now.Sub(rf.lastCandidateAdded)
if (numRelays >= rf.conf.desiredRelays || numCandidates >= rf.conf.maxCandidates) &&
newestCandidateAge < rf.conf.maxCandidateAge {
timer.Reset(rf.conf.maxCandidateAge - newestCandidateAge)
continue
}
lastCallToPeerSource = now
peerChan = rf.peerSource(rf.conf.maxCandidates)
case pi, ok := <-peerChan:
if !ok {
peerChan = nil
timer.Reset(rf.conf.maxCandidateAge - rf.conf.clock.Now().Sub(rf.lastCandidateAdded))
continue
}
log.Debugw("found node", "id", pi.ID)
rf.candidateMx.Lock()
numCandidates := len(rf.candidates)
backoffStart, isOnBackoff := rf.backoff[pi.ID]
rf.candidateMx.Unlock()
if isOnBackoff {
log.Debugw("skipping node that we recently failed to obtain a reservation with", "id", pi.ID, "last attempt", rf.conf.clock.Since(backoffStart))
continue
}
if numCandidates >= rf.conf.maxCandidates {
log.Debugw("skipping node. Already have enough candidates", "id", pi.ID, "num", numCandidates, "max", rf.conf.maxCandidates)
continue
@ -266,6 +279,13 @@ func (rf *relayFinder) handleStaticRelays(ctx context.Context) {
rf.notifyNewCandidate()
}
func (rf *relayFinder) notifyMaybeNeedNewCandidates() {
select {
case rf.maybeRequestNewCandidates <- struct{}{}:
default:
}
}
func (rf *relayFinder) notifyNewCandidate() {
select {
case rf.candidateFound <- struct{}{}:
@ -433,11 +453,16 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
usingRelay := rf.usingRelay(id)
rf.relayMx.Unlock()
if usingRelay {
rf.candidateMx.Lock()
delete(rf.candidates, id)
rf.candidateMx.Unlock()
rf.notifyMaybeNeedNewCandidates()
continue
}
rsvp, err := rf.connectToRelay(ctx, cand)
if err != nil {
log.Debugw("failed to connect to relay", "peer", id, "error", err)
rf.notifyMaybeNeedNewCandidates()
continue
}
log.Debugw("adding new relay", "id", id)
@ -445,6 +470,7 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
rf.relays[id] = rsvp
numRelays := len(rf.relays)
rf.relayMx.Unlock()
rf.notifyMaybeNeedNewCandidates()
rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection
@ -452,6 +478,7 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
case rf.relayUpdated <- struct{}{}:
default:
}
if numRelays >= rf.conf.desiredRelays {
break
}
@ -476,6 +503,10 @@ func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*ci
return nil, fmt.Errorf("failed to connect: %w", err)
}
}
rf.candidateMx.Lock()
rf.backoff[id] = rf.conf.clock.Now()
rf.candidateMx.Unlock()
var err error
if cand.supportsRelayV2 {
rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai)
@ -487,60 +518,12 @@ func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*ci
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
if failed {
cand.numAttempts++
delete(rf.candidates, id)
// We failed to obtain a reservation for too many times. We give up.
if cand.numAttempts >= rf.conf.maxAttempts {
return nil, fmt.Errorf("failed to obtain a reservation too may times: %w", err)
}
rf.moveCandidateToBackoff(cand)
return nil, err
}
return rsvp, nil
}
// must be called with mutex locked
func (rf *relayFinder) moveCandidateToBackoff(cand *candidate) {
if len(rf.candidatesOnBackoff) >= rf.conf.maxCandidates {
log.Debugw("already have enough candidates on backoff. Dropping.", "id", cand.ai.ID)
return
}
log.Debugw("moving candidate to backoff", "id", cand.ai.ID)
backoff := rf.conf.backoff * (1 << (cand.numAttempts - 1))
// introduce a bit of jitter
backoff = (backoff * time.Duration(16+rand.Intn(8))) / time.Duration(20)
rf.candidatesOnBackoff = append(rf.candidatesOnBackoff, &candidateOnBackoff{
candidate: *cand,
nextConnAttempt: rf.conf.clock.Now().Add(backoff),
})
}
func (rf *relayFinder) checkForCandidatesOnBackoff(now time.Time) {
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
for _, cand := range rf.candidatesOnBackoff {
if cand.nextConnAttempt.After(now) {
log.Debug("break")
break
}
if len(rf.candidates) >= rf.conf.maxCandidates {
// drop this candidate if we already have enough others
log.Debugw("cannot move backoff'ed candidate back. Already have enough candidates.", "id", cand.ai.ID)
} else {
log.Debugw("moving backoff'ed candidate back", "id", cand.ai.ID)
rf.candidates[cand.ai.ID] = &candidate{
added: cand.added,
supportsRelayV2: cand.supportsRelayV2,
ai: cand.ai,
numAttempts: cand.numAttempts,
}
rf.notifyNewCandidate()
}
rf.candidatesOnBackoff = rf.candidatesOnBackoff[1:]
}
}
func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) bool {
rf.relayMx.Lock()

Loading…
Cancel
Save