Browse Source

backoff: fix flaky tests in backoff cache (#1516)

* Use mock clock to test time based properties of backoff cache

* Use mockclock library
pull/1522/head
Marco Munizaga 2 years ago
committed by GitHub
parent
commit
bedce2290d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      p2p/discovery/backoff/backoffcache.go
  2. 95
      p2p/discovery/backoff/backoffcache_test.go
  3. 18
      p2p/discovery/mocks/mocks.go
  4. 4
      p2p/discovery/routing/routing_test.go

47
p2p/discovery/backoff/backoffcache.go

@ -21,6 +21,8 @@ type BackoffDiscovery struct {
parallelBufSz int
returnedBufSz int
clock clock
}
type BackoffDiscoveryOption func(*BackoffDiscovery) error
@ -33,6 +35,8 @@ func NewBackoffDiscovery(disc discovery.Discovery, stratFactory BackoffFactory,
parallelBufSz: 32,
returnedBufSz: 32,
clock: realClock{},
}
for _, opt := range opts {
@ -68,6 +72,24 @@ func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption {
}
}
type clock interface {
Now() time.Time
}
type realClock struct{}
func (c realClock) Now() time.Time {
return time.Now()
}
// withClock lets you override the default time.Now() call. Useful for tests.
func withClock(c clock) BackoffDiscoveryOption {
return func(b *BackoffDiscovery) error {
b.clock = c
return nil
}
}
type backoffCache struct {
// strat is assigned on creation and not written to
strat BackoffStrategy
@ -78,6 +100,8 @@ type backoffCache struct {
peers map[peer.ID]peer.AddrInfo
sendingChs map[chan peer.AddrInfo]int
ongoing bool
clock clock
}
func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
@ -112,6 +136,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
peers: make(map[peer.ID]peer.AddrInfo),
sendingChs: make(map[chan peer.AddrInfo]int),
strat: d.stratFactory(),
clock: d.clock,
}
d.peerCacheMux.Lock()
@ -128,7 +153,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
c.mux.Lock()
defer c.mux.Unlock()
timeExpired := time.Now().After(c.nextDiscover)
timeExpired := d.clock.Now().After(c.nextDiscover)
// If it's not yet time to search again and no searches are in progress then return cached peers
if !(timeExpired || c.ongoing) {
@ -180,19 +205,19 @@ func findPeerDispatcher(ctx context.Context, c *backoffCache, pch <-chan peer.Ad
defer func() {
c.mux.Lock()
for ch := range c.sendingChs {
close(ch)
}
// If the peer addresses have changed reset the backoff
if checkUpdates(c.prevPeers, c.peers) {
c.strat.Reset()
c.prevPeers = c.peers
}
c.nextDiscover = time.Now().Add(c.strat.Delay())
c.nextDiscover = c.clock.Now().Add(c.strat.Delay())
c.ongoing = false
c.peers = make(map[peer.ID]peer.AddrInfo)
for ch := range c.sendingChs {
close(ch)
}
c.sendingChs = make(map[chan peer.AddrInfo]int)
c.mux.Unlock()
}()
@ -221,13 +246,9 @@ func findPeerDispatcher(ctx context.Context, c *backoffCache, pch <-chan peer.Ad
c.peers[ai.ID] = sendAi
for ch, rem := range c.sendingChs {
ch <- sendAi
if rem == 1 {
close(ch)
delete(c.sendingChs, ch)
break
} else if rem > 0 {
rem--
if rem > 0 {
ch <- sendAi
c.sendingChs[ch] = rem - 1
}
}

95
p2p/discovery/backoff/backoffcache_test.go

@ -3,7 +3,6 @@ package backoff
import (
"context"
"math/rand"
"os"
"testing"
"time"
@ -13,18 +12,14 @@ import (
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/peer"
)
func scaleDuration(t time.Duration) time.Duration {
if os.Getenv("CI") != "" {
return 3 * t
}
return t
}
mockClock "github.com/benbjohnson/clock"
)
type delayedDiscovery struct {
disc discovery.Discovery
delay time.Duration
clock *mockClock.Mock
}
func (d *delayedDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
@ -38,11 +33,25 @@ func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
}
ch := make(chan peer.AddrInfo, 32)
doneCh := make(chan struct{})
go func() {
defer close(ch)
defer close(doneCh)
for ai := range dch {
ch <- ai
time.Sleep(d.delay)
d.clock.Sleep(d.delay)
}
}()
// Tick the clock forward to advance the sleep above
go func() {
for {
select {
case <-doneCh:
return
default:
d.clock.Add(d.delay)
}
}
}()
@ -75,7 +84,8 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discServer := mocks.NewDiscoveryServer()
clock := mockClock.NewMock()
discServer := mocks.NewDiscoveryServer(clock)
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
@ -85,15 +95,15 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) {
d2 := mocks.NewDiscoveryClient(h2, discServer)
bkf := NewExponentialBackoff(
scaleDuration(time.Millisecond*100),
scaleDuration(time.Second*10),
time.Millisecond*100,
time.Second*10,
NoJitter,
scaleDuration(time.Millisecond*100),
time.Millisecond*100,
2.5,
0,
rand.NewSource(0),
)
dCache, err := NewBackoffDiscovery(d1, bkf)
dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock))
if err != nil {
t.Fatal(err)
}
@ -102,22 +112,27 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) {
// try adding a peer then find it
d1.Advertise(ctx, ns, discovery.TTL(time.Hour))
// Advance clock by one step
clock.Add(1)
assertNumPeers(t, ctx, dCache, ns, 1)
// add a new peer and make sure it is still hidden by the caching layer
d2.Advertise(ctx, ns, discovery.TTL(time.Hour))
// Advance clock by one step
clock.Add(1)
assertNumPeers(t, ctx, dCache, ns, 1)
// wait for cache to expire and check for the new peer
time.Sleep(scaleDuration(time.Millisecond * 110))
clock.Add(time.Millisecond * 110)
assertNumPeers(t, ctx, dCache, ns, 2)
}
func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
clock := mockClock.NewMock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discServer := mocks.NewDiscoveryServer()
discServer := mocks.NewDiscoveryServer(clock)
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
@ -128,15 +143,15 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
// Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms.
bkf := NewExponentialBackoff(
scaleDuration(time.Millisecond*100),
scaleDuration(time.Second*10),
time.Millisecond*100,
time.Second*10,
NoJitter,
scaleDuration(time.Millisecond*100),
time.Millisecond*100,
2.5,
0,
rand.NewSource(0),
)
dCache, err := NewBackoffDiscovery(d1, bkf)
dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock))
if err != nil {
t.Fatal(err)
}
@ -144,28 +159,30 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
const ns = "test"
// try adding a peer then find it
d1.Advertise(ctx, ns, discovery.TTL(scaleDuration(time.Hour)))
d1.Advertise(ctx, ns, discovery.TTL(time.Hour))
// Advance clock by one step
clock.Add(1)
assertNumPeers(t, ctx, dCache, ns, 1)
// wait a little to make sure the extra request doesn't modify the backoff
time.Sleep(scaleDuration(time.Millisecond * 50)) // 50 < 100
clock.Add(time.Millisecond * 50) // 50 < 100
assertNumPeers(t, ctx, dCache, ns, 1)
// wait for backoff to expire and check if we increase it
time.Sleep(scaleDuration(time.Millisecond * 60)) // 50+60 > 100
clock.Add(time.Millisecond * 60) // 50+60 > 100
assertNumPeers(t, ctx, dCache, ns, 1)
d2.Advertise(ctx, ns, discovery.TTL(scaleDuration(time.Millisecond*400)))
d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400))
time.Sleep(scaleDuration(time.Millisecond * 150)) // 150 < 250
clock.Add(time.Millisecond * 150) // 150 < 250
assertNumPeers(t, ctx, dCache, ns, 1)
time.Sleep(scaleDuration(time.Millisecond * 150)) // 150 + 150 > 250
clock.Add(time.Millisecond * 150) // 150 + 150 > 250
assertNumPeers(t, ctx, dCache, ns, 2)
// check that the backoff has been reset
// also checks that we can decrease our peer count (i.e. not just growing a set)
time.Sleep(scaleDuration(time.Millisecond * 110)) // 110 > 100, also 150+150+110>400
clock.Add(time.Millisecond * 110) // 110 > 100, also 150+150+110>400
assertNumPeers(t, ctx, dCache, ns, 1)
}
@ -173,7 +190,8 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discServer := mocks.NewDiscoveryServer()
clock := mockClock.NewMock()
discServer := mocks.NewDiscoveryServer(clock)
// Testing with n larger than most internal buffer sizes (32)
n := 40
@ -185,10 +203,10 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
advertisers[i] = mocks.NewDiscoveryClient(h, discServer)
}
d1 := &delayedDiscovery{advertisers[0], scaleDuration(time.Millisecond * 10)}
d1 := &delayedDiscovery{advertisers[0], time.Millisecond * 10, clock}
bkf := NewFixedBackoff(scaleDuration(time.Millisecond * 200))
dCache, err := NewBackoffDiscovery(d1, bkf)
bkf := NewFixedBackoff(time.Millisecond * 200)
dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock))
if err != nil {
t.Fatal(err)
}
@ -200,6 +218,8 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
t.Fatal(err)
}
}
// Advance clock by one step
clock.Add(1)
ch1, err := dCache.FindPeers(ctx, ns)
if err != nil {
@ -232,7 +252,8 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discServer := mocks.NewDiscoveryServer()
clock := mockClock.NewMock()
discServer := mocks.NewDiscoveryServer(clock)
// Testing with n larger than most internal buffer sizes (32)
n := 40
@ -247,10 +268,10 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
d1 := mocks.NewDiscoveryClient(h1, discServer)
discoveryInterval := scaleDuration(time.Millisecond * 10)
discoveryInterval := time.Millisecond * 10
bkf := NewFixedBackoff(discoveryInterval)
dCache, err := NewBackoffDiscovery(d1, bkf)
dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock))
if err != nil {
t.Fatal(err)
}
@ -261,6 +282,8 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
for i := 0; i < n; i++ {
advertisers[i].Advertise(ctx, ns, discovery.TTL(time.Hour))
}
// Advance clock by one step
clock.Add(1)
// Request all peers, all will be present
assertNumPeersWithLimit(t, ctx, dCache, ns, n, n)
@ -269,7 +292,7 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1)
// Wait a little time but don't allow cache to expire
time.Sleep(discoveryInterval / 10)
clock.Add(discoveryInterval / 10)
// Request peers with a lower limit this time using cache
// Here we are testing that the cache logic does not block when there are more peers known than the limit requested
@ -277,7 +300,7 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1)
// Wait for next discovery so next request will bypass cache
time.Sleep(scaleDuration(time.Millisecond * 100))
clock.Add(time.Millisecond * 100)
// Ask for all peers again
assertNumPeersWithLimit(t, ctx, dCache, ns, n, n)

18
p2p/discovery/mocks/mocks.go

@ -10,9 +10,14 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)
type clock interface {
Now() time.Time
}
type MockDiscoveryServer struct {
mx sync.Mutex
db map[string]map[peer.ID]*discoveryRegistration
mx sync.Mutex
db map[string]map[peer.ID]*discoveryRegistration
clock clock
}
type discoveryRegistration struct {
@ -20,9 +25,10 @@ type discoveryRegistration struct {
expiration time.Time
}
func NewDiscoveryServer() *MockDiscoveryServer {
func NewDiscoveryServer(clock clock) *MockDiscoveryServer {
return &MockDiscoveryServer{
db: make(map[string]map[peer.ID]*discoveryRegistration),
db: make(map[string]map[peer.ID]*discoveryRegistration),
clock: clock,
}
}
@ -35,7 +41,7 @@ func (s *MockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.
peers = make(map[peer.ID]*discoveryRegistration)
s.db[ns] = peers
}
peers[info.ID] = &discoveryRegistration{info, time.Now().Add(ttl)}
peers[info.ID] = &discoveryRegistration{info, s.clock.Now().Add(ttl)}
return ttl, nil
}
@ -55,7 +61,7 @@ func (s *MockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrI
count = limit
}
iterTime := time.Now()
iterTime := s.clock.Now()
ch := make(chan peer.AddrInfo, count)
numSent := 0
for p, reg := range peers {

4
p2p/discovery/routing/routing_test.go

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/p2p/discovery/mocks"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
@ -115,7 +116,8 @@ func TestDiscoveryRouting(t *testing.T) {
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
dserver := mocks.NewDiscoveryServer()
clock := clock.NewMock()
dserver := mocks.NewDiscoveryServer(clock)
d1 := mocks.NewDiscoveryClient(h1, dserver)
d2 := mocks.NewDiscoveryClient(h2, dserver)

Loading…
Cancel
Save