diff --git a/core/test/mockclock.go b/core/test/mockclock.go new file mode 100644 index 000000000..1ed725033 --- /dev/null +++ b/core/test/mockclock.go @@ -0,0 +1,128 @@ +package test + +import ( + "sort" + "sync" + "time" +) + +type MockClock struct { + mu sync.Mutex + now time.Time + timers []*mockInstantTimer + advanceBySem chan struct{} +} + +type mockInstantTimer struct { + c *MockClock + mu sync.Mutex + when time.Time + active bool + ch chan time.Time +} + +func (t *mockInstantTimer) Ch() <-chan time.Time { + return t.ch +} + +func (t *mockInstantTimer) Reset(d time.Time) bool { + t.mu.Lock() + defer t.mu.Unlock() + wasActive := t.active + t.active = true + t.when = d + + // Schedule any timers that need to run. This will run this timer if t.when is before c.now + go t.c.AdvanceBy(0) + + return wasActive +} + +func (t *mockInstantTimer) Stop() bool { + t.mu.Lock() + defer t.mu.Unlock() + wasActive := t.active + t.active = false + return wasActive +} + +func NewMockClock() *MockClock { + return &MockClock{now: time.Unix(0, 0), advanceBySem: make(chan struct{}, 1)} +} + +func (c *MockClock) InstantTimer(when time.Time) *mockInstantTimer { + c.mu.Lock() + defer c.mu.Unlock() + t := &mockInstantTimer{ + c: c, + when: when, + ch: make(chan time.Time, 1), + active: true, + } + c.timers = append(c.timers, t) + return t +} + +// Since implements autorelay.ClockWithInstantTimer +func (c *MockClock) Since(t time.Time) time.Duration { + c.mu.Lock() + defer c.mu.Unlock() + return c.now.Sub(t) +} + +func (c *MockClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.now +} + +func (c *MockClock) AdvanceBy(dur time.Duration) { + c.advanceBySem <- struct{}{} + defer func() { <-c.advanceBySem }() + + c.mu.Lock() + now := c.now + endTime := c.now.Add(dur) + c.mu.Unlock() + + // sort timers by when + if len(c.timers) > 1 { + sort.Slice(c.timers, func(i, j int) bool { + c.timers[i].mu.Lock() + c.timers[j].mu.Lock() + defer c.timers[i].mu.Unlock() + defer c.timers[j].mu.Unlock() + return c.timers[i].when.Before(c.timers[j].when) + }) + } + + for _, t := range c.timers { + t.mu.Lock() + if !t.active { + t.mu.Unlock() + continue + } + if !t.when.After(now) { + t.active = false + t.mu.Unlock() + // This may block if the channel is full, but that's intended. This way our mock clock never gets too far ahead of consumer. + // This also prevents us from dropping times because we're advancing too fast. + t.ch <- now + } else if !t.when.After(endTime) { + now = t.when + c.mu.Lock() + c.now = now + c.mu.Unlock() + + t.active = false + t.mu.Unlock() + // This may block if the channel is full, but that's intended. See comment above + t.ch <- c.now + } else { + t.mu.Unlock() + } + } + c.mu.Lock() + c.now = endTime + c.mu.Unlock() +} diff --git a/core/test/mockclock_test.go b/core/test/mockclock_test.go new file mode 100644 index 000000000..5fa0d888a --- /dev/null +++ b/core/test/mockclock_test.go @@ -0,0 +1,44 @@ +package test + +import ( + "testing" + "time" +) + +func TestMockClock(t *testing.T) { + cl := NewMockClock() + t1 := cl.InstantTimer(cl.Now().Add(2 * time.Second)) + t2 := cl.InstantTimer(cl.Now().Add(time.Second)) + + // Advance the clock by 500ms + cl.AdvanceBy(time.Millisecond * 500) + + // No event + select { + case <-t1.Ch(): + t.Fatal("t1 fired early") + case <-t2.Ch(): + t.Fatal("t2 fired early") + default: + } + + // Advance the clock by 500ms + cl.AdvanceBy(time.Millisecond * 500) + + // t2 fires + select { + case <-t1.Ch(): + t.Fatal("t1 fired early") + case <-t2.Ch(): + } + + // Advance the clock by 2s + cl.AdvanceBy(time.Second * 2) + + // t1 fires + select { + case <-t1.Ch(): + case <-t2.Ch(): + t.Fatal("t2 fired again") + } +} diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index b31b465b6..410f8cb6d 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -2,6 +2,7 @@ package autorelay_test import ( "context" + "fmt" "strings" "sync/atomic" "testing" @@ -11,16 +12,30 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/p2p/host/autorelay" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" - "github.com/benbjohnson/clock" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) const protoIDv2 = circuitv2_proto.ProtoIDv2Hop +type mockClock struct { + *test.MockClock +} + +func (c mockClock) InstantTimer(when time.Time) autorelay.InstantTimer { + return c.MockClock.InstantTimer(when) +} + +func newMockClock() mockClock { + return mockClock{MockClock: test.NewMockClock()} +} + +var _ autorelay.ClockWithInstantTimer = mockClock{} + func numRelays(h host.Host) int { return len(usedRelays(h)) } @@ -182,7 +197,7 @@ func TestWaitForCandidates(t *testing.T) { func TestBackoff(t *testing.T) { const backoff = 20 * time.Second - cl := clock.NewMock() + cl := newMockClock() r, err := libp2p.New( libp2p.DisableRelay(), libp2p.ForceReachabilityPublic(), @@ -201,8 +216,8 @@ func TestBackoff(t *testing.T) { defer r.Close() var reservations atomic.Int32 r.SetStreamHandler(protoIDv2, func(str network.Stream) { - reservations.Add(1) - str.Reset() + defer reservations.Add(1) + str.Close() }) var counter atomic.Int32 @@ -218,24 +233,36 @@ func TestBackoff(t *testing.T) { autorelay.WithNumRelays(1), autorelay.WithBootDelay(0), autorelay.WithBackoff(backoff), + autorelay.WithMinCandidates(1), + autorelay.WithMaxCandidateAge(1), autorelay.WithClock(cl), - autorelay.WithMinInterval(time.Second), + autorelay.WithMinInterval(0), ) defer h.Close() require.Eventually(t, func() bool { return reservations.Load() == 1 - }, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load()) + }, 3*time.Second, 20*time.Millisecond, "reservations load should be 1") + // We need to wait + + cl.AdvanceBy(1) // Increment the time a little so we can make another peer source call + require.Eventually(t, func() bool { + // The reservation will fail, and autorelay will ask the peer source for + // more candidates. Wait until it does so, this way we know that client + // knows the relay connection has failed before we advance the time. + return counter.Load() > 1 + }, 2*time.Second, 100*time.Millisecond, "counter load should be 2") + // make sure we don't add any relays yet for i := 0; i < 2; i++ { - cl.Add(backoff / 3) + cl.AdvanceBy(backoff / 3) require.Equal(t, 1, int(reservations.Load())) } - cl.Add(backoff) + cl.AdvanceBy(backoff) require.Eventually(t, func() bool { return reservations.Load() == 2 - }, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load()) - require.Less(t, int(counter.Load()), 300) // just make sure we're not busy-looping + }, 3*time.Second, 100*time.Millisecond, "reservations load should be 2") + require.Less(t, int(counter.Load()), 10) // just make sure we're not busy-looping require.Equal(t, 2, int(reservations.Load())) } @@ -295,7 +322,7 @@ func TestConnectOnDisconnect(t *testing.T) { } func TestMaxAge(t *testing.T) { - cl := clock.NewMock() + cl := newMockClock() const num = 4 peerChan1 := make(chan peer.AddrInfo, num) @@ -330,7 +357,7 @@ func TestMaxAge(t *testing.T) { autorelay.WithBootDelay(0), autorelay.WithMaxCandidateAge(20*time.Minute), autorelay.WithClock(cl), - autorelay.WithMinInterval(time.Second), + autorelay.WithMinInterval(30*time.Second), ) defer h.Close() @@ -340,17 +367,16 @@ func TestMaxAge(t *testing.T) { relays := usedRelays(h) require.Len(t, relays, 1) + cl.AdvanceBy(time.Minute) require.Eventually(t, func() bool { - // we don't know exactly when the timer is reset, just advance our timer multiple times if necessary - cl.Add(30 * time.Second) return len(peerChans) == 0 }, 10*time.Second, 100*time.Millisecond) - cl.Add(10 * time.Minute) + cl.AdvanceBy(10 * time.Minute) for _, r := range relays2 { peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} } - cl.Add(11 * time.Minute) + cl.AdvanceBy(11 * time.Minute) require.Eventually(t, func() bool { relays = usedRelays(h) @@ -381,11 +407,21 @@ func TestMaxAge(t *testing.T) { for _, r := range relays2 { ids = append(ids, r.ID()) } + + require.Eventually(t, func() bool { + for _, id := range ids { + if id == relays[0] { + return true + } + } + fmt.Println("waiting for", ids, "to contain", relays[0]) + return false + }, 3*time.Second, 100*time.Millisecond) require.Contains(t, ids, relays[0]) } func TestReconnectToStaticRelays(t *testing.T) { - cl := clock.NewMock() + cl := newMockClock() var staticRelays []peer.AddrInfo const numStaticRelays = 1 relays := make([]host.Host, 0, numStaticRelays) @@ -403,7 +439,7 @@ func TestReconnectToStaticRelays(t *testing.T) { ) defer h.Close() - cl.Add(time.Minute) + cl.AdvanceBy(time.Minute) require.Eventually(t, func() bool { return numRelays(h) == 1 }, 10*time.Second, 100*time.Millisecond) @@ -419,14 +455,14 @@ func TestReconnectToStaticRelays(t *testing.T) { return numRelays(h) == 0 }, 10*time.Second, 100*time.Millisecond) - cl.Add(time.Hour) + cl.AdvanceBy(time.Hour) require.Eventually(t, func() bool { return numRelays(h) == 1 }, 10*time.Second, 100*time.Millisecond) } func TestMinInterval(t *testing.T) { - cl := clock.NewMock() + cl := newMockClock() h := newPrivateNode(t, func(context.Context, int) <-chan peer.AddrInfo { peerChan := make(chan peer.AddrInfo, 1) @@ -444,9 +480,40 @@ func TestMinInterval(t *testing.T) { ) defer h.Close() - cl.Add(500 * time.Millisecond) + cl.AdvanceBy(400 * time.Millisecond) // The second call to peerSource should happen after 1 second require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond) - cl.Add(500 * time.Millisecond) - require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond) + cl.AdvanceBy(600 * time.Millisecond) + require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) +} + +func TestNoBusyLoop0MinInterval(t *testing.T) { + var calledTimes uint64 + cl := newMockClock() + h := newPrivateNode(t, + func(context.Context, int) <-chan peer.AddrInfo { + atomic.AddUint64(&calledTimes, 1) + peerChan := make(chan peer.AddrInfo, 1) + defer close(peerChan) + r1 := newRelay(t) + t.Cleanup(func() { r1.Close() }) + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + return peerChan + }, + autorelay.WithClock(cl), + autorelay.WithMinCandidates(1), + autorelay.WithMaxCandidates(1), + autorelay.WithNumRelays(0), + autorelay.WithBootDelay(time.Hour), + autorelay.WithMinInterval(time.Millisecond), + ) + defer h.Close() + + require.Never(t, func() bool { + cl.AdvanceBy(time.Second) + val := atomic.LoadUint64(&calledTimes) + return val >= 2 + }, 500*time.Millisecond, 100*time.Millisecond) + val := atomic.LoadUint64(&calledTimes) + require.Less(t, val, uint64(2)) } diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index 1990e9957..dbacf55e7 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -6,8 +6,6 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" - - "github.com/benbjohnson/clock" ) // AutoRelay will call this function when it needs new candidates because it is @@ -24,7 +22,7 @@ import ( type PeerSource func(ctx context.Context, num int) <-chan peer.AddrInfo type config struct { - clock clock.Clock + clock ClockWithInstantTimer peerSource PeerSource // minimum interval used to call the peerSource callback minInterval time.Duration @@ -45,7 +43,7 @@ type config struct { } var defaultConfig = config{ - clock: clock.New(), + clock: RealClock{}, minCandidates: 4, maxCandidates: 20, bootDelay: 3 * time.Minute, @@ -162,7 +160,53 @@ func WithMaxCandidateAge(d time.Duration) Option { } } -func WithClock(cl clock.Clock) Option { +// InstantTimer is a timer that triggers at some instant rather than some duration +type InstantTimer interface { + Reset(d time.Time) bool + Stop() bool + Ch() <-chan time.Time +} + +// ClockWithInstantTimer is a clock that can create timers that trigger at some +// instant rather than some duration +type ClockWithInstantTimer interface { + Now() time.Time + Since(t time.Time) time.Duration + InstantTimer(when time.Time) InstantTimer +} + +type RealTimer struct{ t *time.Timer } + +var _ InstantTimer = (*RealTimer)(nil) + +func (t RealTimer) Ch() <-chan time.Time { + return t.t.C +} + +func (t RealTimer) Reset(d time.Time) bool { + return t.t.Reset(time.Until(d)) +} + +func (t RealTimer) Stop() bool { + return t.t.Stop() +} + +type RealClock struct{} + +var _ ClockWithInstantTimer = RealClock{} + +func (RealClock) Now() time.Time { + return time.Now() +} +func (RealClock) Since(t time.Time) time.Duration { + return time.Since(t) +} +func (RealClock) InstantTimer(when time.Time) InstantTimer { + t := time.NewTimer(time.Until(when)) + return &RealTimer{t} +} + +func WithClock(cl ClockWithInstantTimer) Option { return func(c *config) error { c.clock = cl return nil diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index bff330035..ec46d5821 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -77,6 +77,9 @@ type relayFinder struct { cachedAddrs []ma.Multiaddr cachedAddrsExpiry time.Time + + // A channel that triggers a run of `runScheduledWork`. + triggerRunScheduledWork chan struct{} } func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) *relayFinder { @@ -94,6 +97,7 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) candidateFound: make(chan struct{}, 1), maybeConnectToRelayTrigger: make(chan struct{}, 1), maybeRequestNewCandidates: make(chan struct{}, 1), + triggerRunScheduledWork: make(chan struct{}, 1), relays: make(map[peer.ID]*circuitv2.Reservation), relayUpdated: make(chan struct{}, 1), } @@ -128,32 +132,32 @@ func (rf *relayFinder) background(ctx context.Context) { } defer subConnectedness.Close() - bootDelayTimer := rf.conf.clock.Timer(rf.conf.bootDelay) + now := rf.conf.clock.Now() + bootDelayTimer := rf.conf.clock.InstantTimer(now.Add(rf.conf.bootDelay)) defer bootDelayTimer.Stop() // This is the least frequent event. It's our fallback timer if we don't have any other work to do. leastFrequentInterval := rf.conf.minInterval - if rf.conf.backoff > leastFrequentInterval { + // Check if leastFrequentInterval is 0 to avoid busy looping + if rf.conf.backoff > leastFrequentInterval || leastFrequentInterval == 0 { leastFrequentInterval = rf.conf.backoff } - if rf.conf.maxCandidateAge > leastFrequentInterval { + if rf.conf.maxCandidateAge > leastFrequentInterval || leastFrequentInterval == 0 { leastFrequentInterval = rf.conf.maxCandidateAge } - if rsvpRefreshInterval > leastFrequentInterval { - leastFrequentInterval = rf.conf.maxCandidateAge + if rsvpRefreshInterval > leastFrequentInterval || leastFrequentInterval == 0 { + leastFrequentInterval = rsvpRefreshInterval } - now := rf.conf.clock.Now() - scheduledWork := &scheduledWorkTimes{ leastFrequentInterval: leastFrequentInterval, nextRefresh: now.Add(rsvpRefreshInterval), - nextBackoff: now.Add(rf.conf.backoff / 5), - nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge / 5), + nextBackoff: now.Add(rf.conf.backoff), + nextOldCandidateCheck: now.Add(rf.conf.maxCandidateAge), nextAllowedCallToPeerSource: now.Add(-time.Second), // allow immediately } - workTimer := rf.conf.clock.Timer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter).Sub(now)) + workTimer := rf.conf.clock.InstantTimer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter)) defer workTimer.Stop() for { @@ -183,14 +187,19 @@ func (rf *relayFinder) background(ctx context.Context) { } case <-rf.candidateFound: rf.notifyMaybeConnectToRelay() - case <-bootDelayTimer.C: + case <-bootDelayTimer.Ch(): rf.notifyMaybeConnectToRelay() case <-rf.relayUpdated: rf.clearCachedAddrsAndSignalAddressChange() - case <-workTimer.C: - now := rf.conf.clock.Now() + case now := <-workTimer.Ch(): + // Note: `now` is not guaranteed to be the current time. It's the time + // that the timer was fired. This is okay because we'll schedule + // future work at a specific time. nextTime := rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter) - workTimer.Reset(nextTime.Sub(now)) + workTimer.Reset(nextTime) + case <-rf.triggerRunScheduledWork: + // Ignore the next time because we aren't scheduling any future work here + _ = rf.runScheduledWork(ctx, rf.conf.clock.Now(), scheduledWork, peerSourceRateLimiter) case <-ctx.Done(): return } @@ -223,11 +232,19 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche } if now.After(scheduledWork.nextAllowedCallToPeerSource) { - scheduledWork.nextAllowedCallToPeerSource = scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval) select { case peerSourceRateLimiter <- struct{}{}: + scheduledWork.nextAllowedCallToPeerSource = now.Add(rf.conf.minInterval) + if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) { + nextTime = scheduledWork.nextAllowedCallToPeerSource + } default: } + } else { + // We still need to schedule this work if it's sooner than nextTime + if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) { + nextTime = scheduledWork.nextAllowedCallToPeerSource + } } // Find the next time we need to run scheduled work. @@ -240,9 +257,6 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche if scheduledWork.nextOldCandidateCheck.Before(nextTime) { nextTime = scheduledWork.nextOldCandidateCheck } - if scheduledWork.nextAllowedCallToPeerSource.Before(nextTime) { - nextTime = scheduledWork.nextAllowedCallToPeerSource - } if nextTime == now { // Only happens in CI with a mock clock nextTime = nextTime.Add(1) // avoids an infinite loop @@ -319,6 +333,10 @@ func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-ch select { case <-peerSourceRateLimiter: peerChan = rf.peerSource(ctx, rf.conf.maxCandidates) + select { + case rf.triggerRunScheduledWork <- struct{}{}: + default: + } case <-ctx.Done(): return }