Browse Source

autorelay: refactor relay finder and start autorelay after identify (#2120)

* Refactor relay_finder and start autorelay after identify

* Clock fork

* Remove multiple timers and use a single rate limiting chan for findNodes

* Remove clock fork

* Rename

* Use scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval)

* Fix flaky test that relied on time
release-v0260
Marco Munizaga 2 years ago
committed by GitHub
parent
commit
b74205d265
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      config/config.go
  2. 5
      p2p/host/autorelay/autorelay.go
  3. 106
      p2p/host/autorelay/autorelay_test.go
  4. 4
      p2p/host/autorelay/host.go
  5. 223
      p2p/host/autorelay/relay_finder.go
  6. 42
      p2p/host/autorelay/timer.go

4
config/config.go

@ -433,7 +433,9 @@ func (cfg *Config) NewNode() (host.Host, error) {
ho = routed.Wrap(h, router)
}
if ar != nil {
return autorelay.NewAutoRelayHost(ho, ar), nil
arh := autorelay.NewAutoRelayHost(ho, ar)
arh.Start()
ho = arh
}
return ho, nil
}

5
p2p/host/autorelay/autorelay.go

@ -49,12 +49,15 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
bhost.AddrsFactory = r.hostAddrs
return r, nil
}
func (r *AutoRelay) Start() {
r.refCount.Add(1)
go func() {
defer r.refCount.Done()
r.background()
}()
return r, nil
}
func (r *AutoRelay) background() {

106
p2p/host/autorelay/autorelay_test.go

@ -2,14 +2,12 @@ package autorelay_test
import (
"context"
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -97,7 +95,7 @@ func newRelay(t *testing.T) host.Host {
}
}
return false
}, 500*time.Millisecond, 10*time.Millisecond)
}, time.Second, 10*time.Millisecond)
return h
}
@ -121,7 +119,7 @@ func TestSingleCandidate(t *testing.T) {
)
defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
// test that we don't add any more relays
require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond)
require.Equal(t, 1, counter, "expected the peer source callback to only have been called once")
@ -179,7 +177,7 @@ func TestWaitForCandidates(t *testing.T) {
r2 := newRelay(t)
t.Cleanup(func() { r2.Close() })
peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()}
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
}
func TestBackoff(t *testing.T) {
@ -225,15 +223,19 @@ func TestBackoff(t *testing.T) {
)
defer h.Close()
require.Eventually(t, func() bool { return reservations.Load() == 1 }, 3*time.Second, 20*time.Millisecond)
require.Eventually(t, func() bool {
return reservations.Load() == 1
}, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load())
// make sure we don't add any relays yet
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
require.Equal(t, 1, int(reservations.Load()))
}
cl.Add(backoff / 2)
require.Eventually(t, func() bool { return reservations.Load() == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, int(counter.Load()), 100) // just make sure we're not busy-looping
cl.Add(backoff)
require.Eventually(t, func() bool {
return reservations.Load() == 2
}, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load())
require.Less(t, int(counter.Load()), 300) // just make sure we're not busy-looping
require.Equal(t, 2, int(reservations.Load()))
}
@ -252,7 +254,7 @@ func TestStaticRelays(t *testing.T) {
)
defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 2*time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 50*time.Millisecond)
}
func TestConnectOnDisconnect(t *testing.T) {
@ -275,7 +277,7 @@ func TestConnectOnDisconnect(t *testing.T) {
)
defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
relaysInUse := usedRelays(h)
require.Len(t, relaysInUse, 1)
oldRelay := relaysInUse[0]
@ -286,7 +288,7 @@ func TestConnectOnDisconnect(t *testing.T) {
}
}
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
relaysInUse = usedRelays(h)
require.Len(t, relaysInUse, 1)
require.NotEqualf(t, oldRelay, relaysInUse[0], "old relay should not be used again")
@ -332,28 +334,31 @@ func TestMaxAge(t *testing.T) {
)
defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
return numRelays(h) > 0
}, 10*time.Second, 100*time.Millisecond)
relays := usedRelays(h)
require.Len(t, relays, 1)
waitFor := 500 * time.Millisecond
tick := 100 * time.Millisecond
if os.Getenv("CI") != "" {
// Only increase the waitFor since we are increasing the mock clock every tick.
waitFor *= 10
}
require.Eventually(t, func() bool {
// we don't know exactly when the timer is reset, just advance our timer multiple times if necessary
cl.Add(time.Second)
cl.Add(30 * time.Second)
return len(peerChans) == 0
}, waitFor, tick)
}, 10*time.Second, 100*time.Millisecond)
cl.Add(10 * time.Minute)
for _, r := range relays2 {
peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
cl.Add(11 * time.Minute)
require.Eventually(t, func() bool {
relays = usedRelays(h)
return len(relays) == 1
}, 10*time.Second, 100*time.Millisecond)
// by now the 3 relays should have been garbage collected
// And we should only be using a single relay. Lets close it.
var oldRelay peer.ID
for _, r := range relays1 {
if r.ID() == relays[0] {
@ -369,7 +374,7 @@ func TestMaxAge(t *testing.T) {
return false
}
return relays[0] != oldRelay
}, 3*time.Second, 100*time.Millisecond)
}, 10*time.Second, 100*time.Millisecond)
require.Len(t, relays, 1)
ids := make([]peer.ID, 0, len(relays2))
@ -379,40 +384,6 @@ func TestMaxAge(t *testing.T) {
require.Contains(t, ids, relays[0])
}
func expectDeltaInAddrUpdated(t *testing.T, addrUpdated event.Subscription, expectedDelta int) {
t.Helper()
delta := 0
for {
select {
case evAny := <-addrUpdated.Out():
ev := evAny.(event.EvtLocalAddressesUpdated)
for _, updatedAddr := range ev.Removed {
if updatedAddr.Action == event.Removed {
if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil {
delta--
if delta == expectedDelta {
return
}
}
}
}
for _, updatedAddr := range ev.Current {
if updatedAddr.Action == event.Added {
if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil {
delta++
if delta == expectedDelta {
return
}
}
}
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for address updated event")
}
}
}
func TestReconnectToStaticRelays(t *testing.T) {
cl := clock.NewMock()
var staticRelays []peer.AddrInfo
@ -428,16 +399,14 @@ func TestReconnectToStaticRelays(t *testing.T) {
h := newPrivateNodeWithStaticRelays(t,
staticRelays,
autorelay.WithClock(cl),
autorelay.WithBackoff(30*time.Minute),
)
defer h.Close()
addrUpdated, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
require.NoError(t, err)
expectDeltaInAddrUpdated(t, addrUpdated, 1)
cl.Add(time.Minute)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
relaysInUse := usedRelays(h)
oldRelay := relaysInUse[0]
@ -446,12 +415,18 @@ func TestReconnectToStaticRelays(t *testing.T) {
r.Network().ClosePeer(h.ID())
}
}
require.Eventually(t, func() bool {
return numRelays(h) == 0
}, 10*time.Second, 100*time.Millisecond)
cl.Add(time.Hour)
expectDeltaInAddrUpdated(t, addrUpdated, -1)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
}
func TestMinInterval(t *testing.T) {
cl := clock.NewMock()
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
@ -461,6 +436,7 @@ func TestMinInterval(t *testing.T) {
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour),
@ -468,7 +444,9 @@ func TestMinInterval(t *testing.T) {
)
defer h.Close()
cl.Add(500 * time.Millisecond)
// The second call to peerSource should happen after 1 second
require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 1*time.Second, 100*time.Millisecond)
cl.Add(500 * time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
}

4
p2p/host/autorelay/host.go

@ -14,6 +14,10 @@ func (h *AutoRelayHost) Close() error {
return h.Host.Close()
}
func (h *AutoRelayHost) Start() {
h.ar.Start()
}
func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost {
return &AutoRelayHost{Host: h, ar: ar}
}

223
p2p/host/autorelay/relay_finder.go

@ -99,11 +99,20 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config)
}
}
type scheduledWorkTimes struct {
leastFrequentInterval time.Duration
nextRefresh time.Time
nextBackoff time.Time
nextOldCandidateCheck time.Time
nextAllowedCallToPeerSource time.Time
}
func (rf *relayFinder) background(ctx context.Context) {
peerSourceRateLimiter := make(chan struct{}, 1)
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
rf.findNodes(ctx, peerSourceRateLimiter)
}()
rf.refCount.Add(1)
@ -121,17 +130,33 @@ func (rf *relayFinder) background(ctx context.Context) {
bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay)
defer bootDelayTimer.Stop()
refreshTicker := rf.conf.clock.Ticker(rsvpRefreshInterval)
defer refreshTicker.Stop()
backoffTicker := rf.conf.clock.Ticker(rf.conf.backoff / 5)
defer backoffTicker.Stop()
oldCandidateTicker := rf.conf.clock.Ticker(rf.conf.maxCandidateAge / 5)
defer oldCandidateTicker.Stop()
for {
// when true, we need to identify push
var push bool
// This is the least frequent event. It's our fallback timer if we don't have any other work to do.
leastFrequentInterval := rf.conf.minInterval
if rf.conf.backoff > leastFrequentInterval {
leastFrequentInterval = rf.conf.backoff
}
if rf.conf.maxCandidateAge > leastFrequentInterval {
leastFrequentInterval = rf.conf.maxCandidateAge
}
if rsvpRefreshInterval > leastFrequentInterval {
leastFrequentInterval = rf.conf.maxCandidateAge
}
now := rf.conf.clock.Now()
scheduledWork := &scheduledWorkTimes{
leastFrequentInterval: leastFrequentInterval,
nextRefresh: now.Add(rsvpRefreshInterval),
nextBackoff: now.Add(rf.conf.backoff / 5),
nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge / 5),
nextAllowedCallToPeerSource: now.Add(-time.Second), // allow immediately
}
workTimer := rf.conf.clock.Timer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter).Sub(now))
defer workTimer.Stop()
for {
select {
case ev, ok := <-subConnectedness.Out():
if !ok {
@ -141,6 +166,8 @@ func (rf *relayFinder) background(ctx context.Context) {
if evt.Connectedness != network.NotConnected {
continue
}
push := false
rf.relayMx.Lock()
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
log.Debugw("disconnected from relay", "id", evt.Peer)
@ -150,85 +177,156 @@ func (rf *relayFinder) background(ctx context.Context) {
push = true
}
rf.relayMx.Unlock()
if push {
rf.clearCachedAddrsAndSignalAddressChange()
}
case <-rf.candidateFound:
rf.notifyMaybeConnectToRelay()
case <-bootDelayTimer.C:
rf.notifyMaybeConnectToRelay()
case <-rf.relayUpdated:
push = true
case now := <-refreshTicker.C:
push = rf.refreshReservations(ctx, now)
case now := <-backoffTicker.C:
rf.candidateMx.Lock()
for id, t := range rf.backoff {
if !t.Add(rf.conf.backoff).After(now) {
log.Debugw("removing backoff for node", "id", id)
delete(rf.backoff, id)
}
}
rf.candidateMx.Unlock()
case now := <-oldCandidateTicker.C:
var deleted bool
rf.candidateMx.Lock()
for id, cand := range rf.candidates {
if !cand.added.Add(rf.conf.maxCandidateAge).After(now) {
deleted = true
log.Debugw("deleting candidate due to age", "id", id)
delete(rf.candidates, id)
}
}
rf.candidateMx.Unlock()
if deleted {
rf.notifyMaybeNeedNewCandidates()
}
rf.clearCachedAddrsAndSignalAddressChange()
case <-workTimer.C:
now := rf.conf.clock.Now()
nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter)
workTimer.Reset(nextTime.Sub(now))
case <-ctx.Done():
return
}
}
}
if push {
rf.relayMx.Lock()
rf.cachedAddrs = nil
rf.relayMx.Unlock()
rf.host.SignalAddressChange()
func (rf *relayFinder) clearCachedAddrsAndSignalAddressChange() {
rf.relayMx.Lock()
rf.cachedAddrs = nil
rf.relayMx.Unlock()
rf.host.SignalAddressChange()
}
func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, peerSourceRateLimiter chan<- struct{}) time.Time {
nextTime := now.Add(scheduledWork.leastFrequentInterval)
if now.After(scheduledWork.nextRefresh) {
scheduledWork.nextRefresh = now.Add(rsvpRefreshInterval)
if rf.refreshReservations(ctx, now) {
rf.clearCachedAddrsAndSignalAddressChange()
}
}
if now.After(scheduledWork.nextBackoff) {
scheduledWork.nextBackoff = rf.clearBackoff(now)
}
if now.After(scheduledWork.nextOldCandidateCheck) {
scheduledWork.nextOldCandidateCheck = rf.clearOldCandidates(now)
}
if now.After(scheduledWork.nextAllowedCallToPeerSource) {
scheduledWork.nextAllowedCallToPeerSource = scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval)
select {
case peerSourceRateLimiter <- struct{}{}:
default:
}
}
// Find the next time we need to run scheduled work.
if scheduledWork.nextRefresh.Before(nextTime) {
nextTime = scheduledWork.nextRefresh
}
if scheduledWork.nextBackoff.Before(nextTime) {
nextTime = scheduledWork.nextBackoff
}
if scheduledWork.nextOldCandidateCheck.Before(nextTime) {
nextTime = scheduledWork.nextOldCandidateCheck
}
if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) {
nextTime = scheduledWork.nextAllowedCallToPeerSource
}
if nextTime == now {
// Only happens in CI with a mock clock
nextTime = nextTime.Add(1) // avoids an infinite loop
}
return nextTime
}
// clearOldCandidates clears old candidates from the map. Returns the next time
// to run this function.
func (rf *relayFinder) clearOldCandidates(now time.Time) time.Time {
// If we don't have any candidates, we should run this again in rf.conf.maxCandidateAge.
nextTime := now.Add(rf.conf.maxCandidateAge)
var deleted bool
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
for id, cand := range rf.candidates {
expiry := cand.added.Add(rf.conf.maxCandidateAge)
if expiry.After(now) {
if expiry.Before(nextTime) {
nextTime = expiry
}
} else {
deleted = true
log.Debugw("deleting candidate due to age", "id", id)
delete(rf.candidates, id)
}
}
if deleted {
rf.notifyMaybeNeedNewCandidates()
}
return nextTime
}
// clearBackoff clears old backoff entries from the map. Returns the next time
// to run this function.
func (rf *relayFinder) clearBackoff(now time.Time) time.Time {
nextTime := now.Add(rf.conf.backoff)
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
for id, t := range rf.backoff {
expiry := t.Add(rf.conf.backoff)
if expiry.After(now) {
if expiry.Before(nextTime) {
nextTime = expiry
}
} else {
log.Debugw("removing backoff for node", "id", id)
delete(rf.backoff, id)
}
}
return nextTime
}
// findNodes accepts nodes from the channel and tests if they support relaying.
// It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context) {
peerChan := rf.peerSource(ctx, rf.conf.maxCandidates)
// peerSourceRateLimiter is used to limit how often we call the peer source.
func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-chan struct{}) {
var peerChan <-chan peer.AddrInfo
var wg sync.WaitGroup
lastCallToPeerSource := rf.conf.clock.Now()
timer := newTimer(rf.conf.clock)
for {
rf.candidateMx.Lock()
numCandidates := len(rf.candidates)
rf.candidateMx.Unlock()
if peerChan == nil {
now := rf.conf.clock.Now()
nextAllowedCallToPeerSource := lastCallToPeerSource.Add(rf.conf.minInterval).Sub(now)
if numCandidates < rf.conf.minCandidates {
log.Debugw("not enough candidates. Resetting timer", "num", numCandidates, "desired", rf.conf.minCandidates)
timer.Reset(nextAllowedCallToPeerSource)
if peerChan == nil && numCandidates < rf.conf.minCandidates {
select {
case <-peerSourceRateLimiter:
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
case <-ctx.Done():
return
}
}
select {
case <-rf.maybeRequestNewCandidates:
continue
case now := <-timer.Chan():
timer.SetRead()
if peerChan != nil {
// We're still reading peers from the peerChan. No need to query for more peers now.
continue
}
lastCallToPeerSource = now
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
case pi, ok := <-peerChan:
if !ok {
wg.Wait()
@ -530,9 +628,12 @@ func (rf *relayFinder) usingRelay(p peer.ID) bool {
// selectCandidates returns an ordered slice of relay candidates.
// Callers should attempt to obtain reservations with the candidates in this order.
func (rf *relayFinder) selectCandidates() []*candidate {
now := rf.conf.clock.Now()
candidates := make([]*candidate, 0, len(rf.candidates))
for _, cand := range rf.candidates {
candidates = append(candidates, cand)
if cand.added.Add(rf.conf.maxCandidateAge).After(now) {
candidates = append(candidates, cand)
}
}
// TODO: better relay selection strategy; this just selects random relays,

42
p2p/host/autorelay/timer.go

@ -1,42 +0,0 @@
package autorelay
import (
"time"
"github.com/benbjohnson/clock"
)
type timer struct {
timer *clock.Timer
running bool
read bool
}
func newTimer(cl clock.Clock) *timer {
t := cl.Timer(100 * time.Hour) // There's no way to initialize a stopped timer
t.Stop()
return &timer{timer: t}
}
func (t *timer) Chan() <-chan time.Time {
return t.timer.C
}
func (t *timer) Stop() {
if !t.running {
return
}
if !t.timer.Stop() && !t.read {
<-t.timer.C
}
t.read = false
}
func (t *timer) SetRead() {
t.read = true
}
func (t *timer) Reset(d time.Duration) {
t.Stop()
t.timer.Reset(d)
}
Loading…
Cancel
Save