Browse Source

autorelay: fix busy loop bug and flaky tests in relay finder (#2208)

* Don't run clock add in eventually loop

* Fix busy loop

* Fix scheduling bug

* Add new mock clock

* Add busy loop test

* With comments

* Fix comment

* Move mockclock to separate file

* Fix race

* Fix potential deadlock

* Fix flaky TestBackoff

* Fix import

* Fix how mock implements interface
transport-tests-stream-read-deadline
Marco Munizaga 2 years ago
committed by GitHub
parent
commit
950151e861
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 128
      core/test/mockclock.go
  2. 44
      core/test/mockclock_test.go
  3. 113
      p2p/host/autorelay/autorelay_test.go
  4. 54
      p2p/host/autorelay/options.go
  5. 54
      p2p/host/autorelay/relay_finder.go

128
core/test/mockclock.go

@ -0,0 +1,128 @@
package test
import (
"sort"
"sync"
"time"
)
type MockClock struct {
mu sync.Mutex
now time.Time
timers []*mockInstantTimer
advanceBySem chan struct{}
}
type mockInstantTimer struct {
c *MockClock
mu sync.Mutex
when time.Time
active bool
ch chan time.Time
}
func (t *mockInstantTimer) Ch() <-chan time.Time {
return t.ch
}
func (t *mockInstantTimer) Reset(d time.Time) bool {
t.mu.Lock()
defer t.mu.Unlock()
wasActive := t.active
t.active = true
t.when = d
// Schedule any timers that need to run. This will run this timer if t.when is before c.now
go t.c.AdvanceBy(0)
return wasActive
}
func (t *mockInstantTimer) Stop() bool {
t.mu.Lock()
defer t.mu.Unlock()
wasActive := t.active
t.active = false
return wasActive
}
func NewMockClock() *MockClock {
return &MockClock{now: time.Unix(0, 0), advanceBySem: make(chan struct{}, 1)}
}
func (c *MockClock) InstantTimer(when time.Time) *mockInstantTimer {
c.mu.Lock()
defer c.mu.Unlock()
t := &mockInstantTimer{
c: c,
when: when,
ch: make(chan time.Time, 1),
active: true,
}
c.timers = append(c.timers, t)
return t
}
// Since implements autorelay.ClockWithInstantTimer
func (c *MockClock) Since(t time.Time) time.Duration {
c.mu.Lock()
defer c.mu.Unlock()
return c.now.Sub(t)
}
func (c *MockClock) Now() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.now
}
func (c *MockClock) AdvanceBy(dur time.Duration) {
c.advanceBySem <- struct{}{}
defer func() { <-c.advanceBySem }()
c.mu.Lock()
now := c.now
endTime := c.now.Add(dur)
c.mu.Unlock()
// sort timers by when
if len(c.timers) > 1 {
sort.Slice(c.timers, func(i, j int) bool {
c.timers[i].mu.Lock()
c.timers[j].mu.Lock()
defer c.timers[i].mu.Unlock()
defer c.timers[j].mu.Unlock()
return c.timers[i].when.Before(c.timers[j].when)
})
}
for _, t := range c.timers {
t.mu.Lock()
if !t.active {
t.mu.Unlock()
continue
}
if !t.when.After(now) {
t.active = false
t.mu.Unlock()
// This may block if the channel is full, but that's intended. This way our mock clock never gets too far ahead of consumer.
// This also prevents us from dropping times because we're advancing too fast.
t.ch <- now
} else if !t.when.After(endTime) {
now = t.when
c.mu.Lock()
c.now = now
c.mu.Unlock()
t.active = false
t.mu.Unlock()
// This may block if the channel is full, but that's intended. See comment above
t.ch <- c.now
} else {
t.mu.Unlock()
}
}
c.mu.Lock()
c.now = endTime
c.mu.Unlock()
}

44
core/test/mockclock_test.go

@ -0,0 +1,44 @@
package test
import (
"testing"
"time"
)
func TestMockClock(t *testing.T) {
cl := NewMockClock()
t1 := cl.InstantTimer(cl.Now().Add(2 * time.Second))
t2 := cl.InstantTimer(cl.Now().Add(time.Second))
// Advance the clock by 500ms
cl.AdvanceBy(time.Millisecond * 500)
// No event
select {
case <-t1.Ch():
t.Fatal("t1 fired early")
case <-t2.Ch():
t.Fatal("t2 fired early")
default:
}
// Advance the clock by 500ms
cl.AdvanceBy(time.Millisecond * 500)
// t2 fires
select {
case <-t1.Ch():
t.Fatal("t1 fired early")
case <-t2.Ch():
}
// Advance the clock by 2s
cl.AdvanceBy(time.Second * 2)
// t1 fires
select {
case <-t1.Ch():
case <-t2.Ch():
t.Fatal("t2 fired again")
}
}

113
p2p/host/autorelay/autorelay_test.go

@ -2,6 +2,7 @@ package autorelay_test
import (
"context"
"fmt"
"strings"
"sync/atomic"
"testing"
@ -11,16 +12,30 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
"github.com/benbjohnson/clock"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
const protoIDv2 = circuitv2_proto.ProtoIDv2Hop
type mockClock struct {
*test.MockClock
}
func (c mockClock) InstantTimer(when time.Time) autorelay.InstantTimer {
return c.MockClock.InstantTimer(when)
}
func newMockClock() mockClock {
return mockClock{MockClock: test.NewMockClock()}
}
var _ autorelay.ClockWithInstantTimer = mockClock{}
func numRelays(h host.Host) int {
return len(usedRelays(h))
}
@ -182,7 +197,7 @@ func TestWaitForCandidates(t *testing.T) {
func TestBackoff(t *testing.T) {
const backoff = 20 * time.Second
cl := clock.NewMock()
cl := newMockClock()
r, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.ForceReachabilityPublic(),
@ -201,8 +216,8 @@ func TestBackoff(t *testing.T) {
defer r.Close()
var reservations atomic.Int32
r.SetStreamHandler(protoIDv2, func(str network.Stream) {
reservations.Add(1)
str.Reset()
defer reservations.Add(1)
str.Close()
})
var counter atomic.Int32
@ -218,24 +233,36 @@ func TestBackoff(t *testing.T) {
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithBackoff(backoff),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidateAge(1),
autorelay.WithClock(cl),
autorelay.WithMinInterval(time.Second),
autorelay.WithMinInterval(0),
)
defer h.Close()
require.Eventually(t, func() bool {
return reservations.Load() == 1
}, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load())
}, 3*time.Second, 20*time.Millisecond, "reservations load should be 1")
// We need to wait
cl.AdvanceBy(1) // Increment the time a little so we can make another peer source call
require.Eventually(t, func() bool {
// The reservation will fail, and autorelay will ask the peer source for
// more candidates. Wait until it does so, this way we know that client
// knows the relay connection has failed before we advance the time.
return counter.Load() > 1
}, 2*time.Second, 100*time.Millisecond, "counter load should be 2")
// make sure we don't add any relays yet
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
cl.AdvanceBy(backoff / 3)
require.Equal(t, 1, int(reservations.Load()))
}
cl.Add(backoff)
cl.AdvanceBy(backoff)
require.Eventually(t, func() bool {
return reservations.Load() == 2
}, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load())
require.Less(t, int(counter.Load()), 300) // just make sure we're not busy-looping
}, 3*time.Second, 100*time.Millisecond, "reservations load should be 2")
require.Less(t, int(counter.Load()), 10) // just make sure we're not busy-looping
require.Equal(t, 2, int(reservations.Load()))
}
@ -295,7 +322,7 @@ func TestConnectOnDisconnect(t *testing.T) {
}
func TestMaxAge(t *testing.T) {
cl := clock.NewMock()
cl := newMockClock()
const num = 4
peerChan1 := make(chan peer.AddrInfo, num)
@ -330,7 +357,7 @@ func TestMaxAge(t *testing.T) {
autorelay.WithBootDelay(0),
autorelay.WithMaxCandidateAge(20*time.Minute),
autorelay.WithClock(cl),
autorelay.WithMinInterval(time.Second),
autorelay.WithMinInterval(30*time.Second),
)
defer h.Close()
@ -340,17 +367,16 @@ func TestMaxAge(t *testing.T) {
relays := usedRelays(h)
require.Len(t, relays, 1)
cl.AdvanceBy(time.Minute)
require.Eventually(t, func() bool {
// we don't know exactly when the timer is reset, just advance our timer multiple times if necessary
cl.Add(30 * time.Second)
return len(peerChans) == 0
}, 10*time.Second, 100*time.Millisecond)
cl.Add(10 * time.Minute)
cl.AdvanceBy(10 * time.Minute)
for _, r := range relays2 {
peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
cl.Add(11 * time.Minute)
cl.AdvanceBy(11 * time.Minute)
require.Eventually(t, func() bool {
relays = usedRelays(h)
@ -381,11 +407,21 @@ func TestMaxAge(t *testing.T) {
for _, r := range relays2 {
ids = append(ids, r.ID())
}
require.Eventually(t, func() bool {
for _, id := range ids {
if id == relays[0] {
return true
}
}
fmt.Println("waiting for", ids, "to contain", relays[0])
return false
}, 3*time.Second, 100*time.Millisecond)
require.Contains(t, ids, relays[0])
}
func TestReconnectToStaticRelays(t *testing.T) {
cl := clock.NewMock()
cl := newMockClock()
var staticRelays []peer.AddrInfo
const numStaticRelays = 1
relays := make([]host.Host, 0, numStaticRelays)
@ -403,7 +439,7 @@ func TestReconnectToStaticRelays(t *testing.T) {
)
defer h.Close()
cl.Add(time.Minute)
cl.AdvanceBy(time.Minute)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
@ -419,14 +455,14 @@ func TestReconnectToStaticRelays(t *testing.T) {
return numRelays(h) == 0
}, 10*time.Second, 100*time.Millisecond)
cl.Add(time.Hour)
cl.AdvanceBy(time.Hour)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
}
func TestMinInterval(t *testing.T) {
cl := clock.NewMock()
cl := newMockClock()
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
@ -444,9 +480,40 @@ func TestMinInterval(t *testing.T) {
)
defer h.Close()
cl.Add(500 * time.Millisecond)
cl.AdvanceBy(400 * time.Millisecond)
// The second call to peerSource should happen after 1 second
require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond)
cl.Add(500 * time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
cl.AdvanceBy(600 * time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
}
func TestNoBusyLoop0MinInterval(t *testing.T) {
var calledTimes uint64
cl := newMockClock()
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
atomic.AddUint64(&calledTimes, 1)
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
r1 := newRelay(t)
t.Cleanup(func() { r1.Close() })
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(0),
autorelay.WithBootDelay(time.Hour),
autorelay.WithMinInterval(time.Millisecond),
)
defer h.Close()
require.Never(t, func() bool {
cl.AdvanceBy(time.Second)
val := atomic.LoadUint64(&calledTimes)
return val >= 2
}, 500*time.Millisecond, 100*time.Millisecond)
val := atomic.LoadUint64(&calledTimes)
require.Less(t, val, uint64(2))
}

54
p2p/host/autorelay/options.go

@ -6,8 +6,6 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/benbjohnson/clock"
)
// AutoRelay will call this function when it needs new candidates because it is
@ -24,7 +22,7 @@ import (
type PeerSource func(ctx context.Context, num int) <-chan peer.AddrInfo
type config struct {
clock clock.Clock
clock ClockWithInstantTimer
peerSource PeerSource
// minimum interval used to call the peerSource callback
minInterval time.Duration
@ -45,7 +43,7 @@ type config struct {
}
var defaultConfig = config{
clock: clock.New(),
clock: RealClock{},
minCandidates: 4,
maxCandidates: 20,
bootDelay: 3 * time.Minute,
@ -162,7 +160,53 @@ func WithMaxCandidateAge(d time.Duration) Option {
}
}
func WithClock(cl clock.Clock) Option {
// InstantTimer is a timer that triggers at some instant rather than some duration
type InstantTimer interface {
Reset(d time.Time) bool
Stop() bool
Ch() <-chan time.Time
}
// ClockWithInstantTimer is a clock that can create timers that trigger at some
// instant rather than some duration
type ClockWithInstantTimer interface {
Now() time.Time
Since(t time.Time) time.Duration
InstantTimer(when time.Time) InstantTimer
}
type RealTimer struct{ t *time.Timer }
var _ InstantTimer = (*RealTimer)(nil)
func (t RealTimer) Ch() <-chan time.Time {
return t.t.C
}
func (t RealTimer) Reset(d time.Time) bool {
return t.t.Reset(time.Until(d))
}
func (t RealTimer) Stop() bool {
return t.t.Stop()
}
type RealClock struct{}
var _ ClockWithInstantTimer = RealClock{}
func (RealClock) Now() time.Time {
return time.Now()
}
func (RealClock) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (RealClock) InstantTimer(when time.Time) InstantTimer {
t := time.NewTimer(time.Until(when))
return &RealTimer{t}
}
func WithClock(cl ClockWithInstantTimer) Option {
return func(c *config) error {
c.clock = cl
return nil

54
p2p/host/autorelay/relay_finder.go

@ -77,6 +77,9 @@ type relayFinder struct {
cachedAddrs []ma.Multiaddr
cachedAddrsExpiry time.Time
// A channel that triggers a run of `runScheduledWork`.
triggerRunScheduledWork chan struct{}
}
func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) *relayFinder {
@ -94,6 +97,7 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config)
candidateFound: make(chan struct{}, 1),
maybeConnectToRelayTrigger: make(chan struct{}, 1),
maybeRequestNewCandidates: make(chan struct{}, 1),
triggerRunScheduledWork: make(chan struct{}, 1),
relays: make(map[peer.ID]*circuitv2.Reservation),
relayUpdated: make(chan struct{}, 1),
}
@ -128,32 +132,32 @@ func (rf *relayFinder) background(ctx context.Context) {
}
defer subConnectedness.Close()
bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay)
now := rf.conf.clock.Now()
bootDelayTimer := rf.conf.clock.InstantTimer(now.Add(rf.conf.bootDelay))
defer bootDelayTimer.Stop()
// This is the least frequent event. It's our fallback timer if we don't have any other work to do.
leastFrequentInterval := rf.conf.minInterval
if rf.conf.backoff > leastFrequentInterval {
// Check if leastFrequentInterval is 0 to avoid busy looping
if rf.conf.backoff > leastFrequentInterval || leastFrequentInterval == 0 {
leastFrequentInterval = rf.conf.backoff
}
if rf.conf.maxCandidateAge > leastFrequentInterval {
if rf.conf.maxCandidateAge > leastFrequentInterval || leastFrequentInterval == 0 {
leastFrequentInterval = rf.conf.maxCandidateAge
}
if rsvpRefreshInterval > leastFrequentInterval {
leastFrequentInterval = rf.conf.maxCandidateAge
if rsvpRefreshInterval > leastFrequentInterval || leastFrequentInterval == 0 {
leastFrequentInterval = rsvpRefreshInterval
}
now := rf.conf.clock.Now()
scheduledWork := &scheduledWorkTimes{
leastFrequentInterval: leastFrequentInterval,
nextRefresh: now.Add(rsvpRefreshInterval),
nextBackoff: now.Add(rf.conf.backoff / 5),
nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge / 5),
nextBackoff: now.Add(rf.conf.backoff),
nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge),
nextAllowedCallToPeerSource: now.Add(-time.Second), // allow immediately
}
workTimer := rf.conf.clock.Timer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter).Sub(now))
workTimer := rf.conf.clock.InstantTimer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter))
defer workTimer.Stop()
for {
@ -183,14 +187,19 @@ func (rf *relayFinder) background(ctx context.Context) {
}
case <-rf.candidateFound:
rf.notifyMaybeConnectToRelay()
case <-bootDelayTimer.C:
case <-bootDelayTimer.Ch():
rf.notifyMaybeConnectToRelay()
case <-rf.relayUpdated:
rf.clearCachedAddrsAndSignalAddressChange()
case <-workTimer.C:
now := rf.conf.clock.Now()
case now := <-workTimer.Ch():
// Note: `now` is not guaranteed to be the current time. It's the time
// that the timer was fired. This is okay because we'll schedule
// future work at a specific time.
nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter)
workTimer.Reset(nextTime.Sub(now))
workTimer.Reset(nextTime)
case <-rf.triggerRunScheduledWork:
// Ignore the next time because we aren't scheduling any future work here
_ = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, peerSourceRateLimiter)
case <-ctx.Done():
return
}
@ -223,11 +232,19 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche
}
if now.After(scheduledWork.nextAllowedCallToPeerSource) {
scheduledWork.nextAllowedCallToPeerSource = scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval)
select {
case peerSourceRateLimiter <- struct{}{}:
scheduledWork.nextAllowedCallToPeerSource = now.Add(rf.conf.minInterval)
if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) {
nextTime = scheduledWork.nextAllowedCallToPeerSource
}
default:
}
} else {
// We still need to schedule this work if it's sooner than nextTime
if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) {
nextTime = scheduledWork.nextAllowedCallToPeerSource
}
}
// Find the next time we need to run scheduled work.
@ -240,9 +257,6 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche
if scheduledWork.nextOldCandidateCheck.Before(nextTime) {
nextTime = scheduledWork.nextOldCandidateCheck
}
if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) {
nextTime = scheduledWork.nextAllowedCallToPeerSource
}
if nextTime == now {
// Only happens in CI with a mock clock
nextTime = nextTime.Add(1) // avoids an infinite loop
@ -319,6 +333,10 @@ func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-ch
select {
case <-peerSourceRateLimiter:
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
select {
case rf.triggerRunScheduledWork <- struct{}{}:
default:
}
case <-ctx.Done():
return
}

Loading…
Cancel
Save