Browse Source

autorelay: poll for new candidates, when needed

pull/1587/head
Marten Seemann 2 years ago
parent
commit
74eb79401e
  1. 40
      p2p/host/autorelay/autorelay.go
  2. 99
      p2p/host/autorelay/autorelay_test.go
  3. 22
      p2p/host/autorelay/options.go
  4. 108
      p2p/host/autorelay/relay_finder.go

40
p2p/host/autorelay/autorelay.go

@ -4,12 +4,11 @@ import (
"context" "context"
"sync" "sync"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
@ -29,8 +28,6 @@ type AutoRelay struct {
relayFinder *relayFinder relayFinder *relayFinder
peerChanOut chan peer.AddrInfo // capacity 20
host host.Host host host.Host
addrsF basic.AddrsFactory addrsF basic.AddrsFactory
} }
@ -48,9 +45,8 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
} }
} }
r.ctx, r.ctxCancel = context.WithCancel(context.Background()) r.ctx, r.ctxCancel = context.WithCancel(context.Background())
r.peerChanOut = make(chan peer.AddrInfo, conf.maxCandidates)
r.conf = &conf r.conf = &conf
r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf) r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
bhost.AddrsFactory = r.hostAddrs bhost.AddrsFactory = r.hostAddrs
r.refCount.Add(1) r.refCount.Add(1)
@ -69,25 +65,6 @@ func (r *AutoRelay) background() {
} }
defer subReachability.Close() defer subReachability.Close()
var peerChan <-chan peer.AddrInfo
if len(r.conf.staticRelays) == 0 {
peerChan = r.conf.peerChan
} else {
pc := make(chan peer.AddrInfo)
peerChan = pc
r.refCount.Add(1)
go func() {
defer r.refCount.Done()
for _, sr := range r.conf.staticRelays {
select {
case pc <- sr:
case <-r.ctx.Done():
return
}
}
}()
}
for { for {
select { select {
case <-r.ctx.Done(): case <-r.ctx.Done():
@ -109,17 +86,6 @@ func (r *AutoRelay) background() {
r.mx.Lock() r.mx.Lock()
r.status = evt.Reachability r.status = evt.Reachability
r.mx.Unlock() r.mx.Unlock()
case pi := <-peerChan:
select {
case r.peerChanOut <- pi: // if there's space in the channel, great
default:
// no space left in the channel. Drop the oldest entry.
select {
case <-r.peerChanOut:
default: // The consumer might just have emptied the channel. Make sure we don't block in that case.
}
r.peerChanOut <- pi
}
} }
} }
} }

99
p2p/host/autorelay/autorelay_test.go

@ -135,20 +135,19 @@ func newBrokenRelay(t *testing.T, workAfter int) host.Host {
return h return h
} }
func TestSingleRelay(t *testing.T) { func TestSingleCandidate(t *testing.T) {
const numPeers = 5 var counter int
peerChan := make(chan peer.AddrInfo) h := newPrivateNode(t,
done := make(chan struct{}) autorelay.WithPeerSource(func(num int) <-chan peer.AddrInfo {
go func() { counter++
defer close(done) require.Equal(t, 1, num)
for i := 0; i < numPeers; i++ { peerChan := make(chan peer.AddrInfo, num)
defer close(peerChan)
r := newRelay(t) r := newRelay(t)
t.Cleanup(func() { r.Close() }) t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
} return peerChan
}() }),
h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan),
autorelay.WithMaxCandidates(1), autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999), autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0), autorelay.WithBootDelay(0),
@ -156,11 +155,39 @@ func TestSingleRelay(t *testing.T) {
defer h.Close() defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond) require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
<-done
// test that we don't add any more relays // test that we don't add any more relays
require.Never(t, func() bool { return numRelays(h) != 1 }, 200*time.Millisecond, 50*time.Millisecond) require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond)
require.Equal(t, 1, counter, "expected the peer source callback to only have been called once")
} }
func TestSingleRelay(t *testing.T) {
const numCandidates = 3
var called bool
peerChan := make(chan peer.AddrInfo, numCandidates)
for i := 0; i < numCandidates; i++ {
r := newRelay(t)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
close(peerChan)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(num int) <-chan peer.AddrInfo {
require.False(t, called, "expected the peer source callback to only have been called once")
called = true
require.Equal(t, numCandidates, num)
return peerChan
}),
autorelay.WithMaxCandidates(numCandidates),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
)
defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 5*time.Second, 100*time.Millisecond)
// test that we don't add any more relays
require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond)
}
func TestPreferRelayV2(t *testing.T) { func TestPreferRelayV2(t *testing.T) {
r := newRelay(t) r := newRelay(t)
defer r.Close() defer r.Close()
@ -170,10 +197,14 @@ func TestPreferRelayV2(t *testing.T) {
str.Reset() str.Reset()
t.Fatal("used relay v1") t.Fatal("used relay v1")
}) })
peerChan := make(chan peer.AddrInfo, 1)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
h := newPrivateNode(t, h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan), autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
return peerChan
}),
autorelay.WithMaxCandidates(1), autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999), autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0), autorelay.WithBootDelay(0),
@ -186,7 +217,7 @@ func TestPreferRelayV2(t *testing.T) {
func TestWaitForCandidates(t *testing.T) { func TestWaitForCandidates(t *testing.T) {
peerChan := make(chan peer.AddrInfo) peerChan := make(chan peer.AddrInfo)
h := newPrivateNode(t, h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan), autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithMinCandidates(2), autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1), autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour), autorelay.WithBootDelay(time.Hour),
@ -212,7 +243,7 @@ func TestBackoff(t *testing.T) {
peerChan := make(chan peer.AddrInfo) peerChan := make(chan peer.AddrInfo)
cl := clock.NewMock() cl := clock.NewMock()
h := newPrivateNode(t, h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan), autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithNumRelays(1), autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0), autorelay.WithBootDelay(0),
autorelay.WithBackoff(backoff), autorelay.WithBackoff(backoff),
@ -229,7 +260,7 @@ func TestBackoff(t *testing.T) {
cl.Add(backoff * 2 / 3) cl.Add(backoff * 2 / 3)
require.Never(t, func() bool { return numRelays(h) > 0 }, 100*time.Millisecond, 20*time.Millisecond) require.Never(t, func() bool { return numRelays(h) > 0 }, 100*time.Millisecond, 20*time.Millisecond)
cl.Add(backoff * 2 / 3) cl.Add(backoff * 2 / 3)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 10*time.Millisecond) require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
} }
func TestMaxBackoffs(t *testing.T) { func TestMaxBackoffs(t *testing.T) {
@ -237,7 +268,7 @@ func TestMaxBackoffs(t *testing.T) {
cl := clock.NewMock() cl := clock.NewMock()
peerChan := make(chan peer.AddrInfo) peerChan := make(chan peer.AddrInfo)
h := newPrivateNode(t, h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan), autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithNumRelays(1), autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0), autorelay.WithBootDelay(0),
autorelay.WithBackoff(backoff), autorelay.WithBackoff(backoff),
@ -277,14 +308,14 @@ func TestStaticRelays(t *testing.T) {
func TestRelayV1(t *testing.T) { func TestRelayV1(t *testing.T) {
t.Run("relay v1 support disabled", func(t *testing.T) { t.Run("relay v1 support disabled", func(t *testing.T) {
peerChan := make(chan peer.AddrInfo) peerChan := make(chan peer.AddrInfo, 1)
go func() { r := newRelayV1(t)
r := newRelayV1(t) t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
t.Cleanup(func() { r.Close() }) close(peerChan)
}()
h := newPrivateNode(t, h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan), autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithBootDelay(0), autorelay.WithBootDelay(0),
) )
defer h.Close() defer h.Close()
@ -293,14 +324,14 @@ func TestRelayV1(t *testing.T) {
}) })
t.Run("relay v1 support enabled", func(t *testing.T) { t.Run("relay v1 support enabled", func(t *testing.T) {
peerChan := make(chan peer.AddrInfo) peerChan := make(chan peer.AddrInfo, 1)
go func() { r := newRelayV1(t)
r := newRelayV1(t) t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
t.Cleanup(func() { r.Close() }) close(peerChan)
}()
h := newPrivateNode(t, h := newPrivateNode(t,
autorelay.WithPeerSource(peerChan), autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }),
autorelay.WithBootDelay(0), autorelay.WithBootDelay(0),
autorelay.WithCircuitV1Support(), autorelay.WithCircuitV1Support(),
) )

22
p2p/host/autorelay/options.go

@ -12,7 +12,7 @@ import (
type config struct { type config struct {
clock clock.Clock clock clock.Clock
peerChan <-chan peer.AddrInfo peerSource func(num int) <-chan peer.AddrInfo
staticRelays []peer.AddrInfo staticRelays []peer.AddrInfo
// see WithMinCandidates // see WithMinCandidates
minCandidates int minCandidates int
@ -41,7 +41,10 @@ var defaultConfig = config{
desiredRelays: 2, desiredRelays: 2,
} }
var errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays") var (
errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays")
errStaticRelaysPeerSource = errors.New("cannot use WithPeerSource and WithStaticRelays")
)
// DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022. // DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022.
var DefaultRelays = []string{ var DefaultRelays = []string{
@ -72,6 +75,9 @@ func WithStaticRelays(static []peer.AddrInfo) Option {
if c.setMinCandidates { if c.setMinCandidates {
return errStaticRelaysMinCandidates return errStaticRelaysMinCandidates
} }
if c.peerSource != nil {
return errStaticRelaysPeerSource
}
if len(c.staticRelays) > 0 { if len(c.staticRelays) > 0 {
return errors.New("can't set static relays, static relays already configured") return errors.New("can't set static relays, static relays already configured")
} }
@ -85,9 +91,17 @@ func WithDefaultStaticRelays() Option {
return WithStaticRelays(defaultStaticRelays) return WithStaticRelays(defaultStaticRelays)
} }
func WithPeerSource(peerChan <-chan peer.AddrInfo) Option { // WithPeerSource defines a callback for AutoRelay to query for more relay candidates.
// AutoRelay will call this function in regular intervals, until it is connected to the desired number of
// relays, and it has enough candidates (in case we get disconnected from one of the relays).
// Implementations must send *at most* numPeers, and close the channel when they don't intend to provide
// any more peers. AutoRelay will not call the callback again until the channel is closed.
func WithPeerSource(f func(numPeers int) <-chan peer.AddrInfo) Option {
return func(c *config) error { return func(c *config) error {
c.peerChan = peerChan if len(c.staticRelays) > 0 {
return errStaticRelaysPeerSource
}
c.peerSource = f
return nil return nil
} }
} }

108
p2p/host/autorelay/relay_finder.go

@ -66,10 +66,11 @@ type relayFinder struct {
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
ctxCancelMx sync.Mutex ctxCancelMx sync.Mutex
peerChan <-chan peer.AddrInfo peerSource func(int) <-chan peer.AddrInfo
candidateFound chan struct{} // receives every time we find a new relay candidate candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex candidateMx sync.Mutex
lastCandidateAdded time.Time
candidates map[peer.ID]*candidate candidates map[peer.ID]*candidate
candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time
handleNewCandidateTrigger chan struct{} // cap: 1 handleNewCandidateTrigger chan struct{} // cap: 1
@ -83,27 +84,37 @@ type relayFinder struct {
cachedAddrsExpiry time.Time cachedAddrsExpiry time.Time
} }
func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *config) *relayFinder { func newRelayFinder(host *basic.BasicHost, peerSource func(int) <-chan peer.AddrInfo, conf *config) *relayFinder {
r := &relayFinder{ return &relayFinder{
bootTime: conf.clock.Now(), bootTime: conf.clock.Now(),
host: host, host: host,
conf: conf, conf: conf,
peerChan: peerChan, peerSource: peerSource,
candidates: make(map[peer.ID]*candidate), candidates: make(map[peer.ID]*candidate),
candidateFound: make(chan struct{}, 1), candidateFound: make(chan struct{}, 1),
handleNewCandidateTrigger: make(chan struct{}, 1), handleNewCandidateTrigger: make(chan struct{}, 1),
relays: make(map[peer.ID]*circuitv2.Reservation), relays: make(map[peer.ID]*circuitv2.Reservation),
relayUpdated: make(chan struct{}, 1), relayUpdated: make(chan struct{}, 1),
} }
return r
} }
func (rf *relayFinder) background(ctx context.Context) { func (rf *relayFinder) background(ctx context.Context) {
rf.refCount.Add(1) relayDisconnected := make(chan struct{}, 1)
go func() {
defer rf.refCount.Done() if rf.usesStaticRelay() {
rf.findNodes(ctx) rf.refCount.Add(1)
}() go func() {
defer rf.refCount.Done()
rf.handleStaticRelays(ctx)
}()
} else {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx, relayDisconnected)
}()
}
rf.refCount.Add(1) rf.refCount.Add(1)
go func() { go func() {
defer rf.refCount.Done() defer rf.refCount.Done()
@ -141,6 +152,10 @@ func (rf *relayFinder) background(ctx context.Context) {
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
log.Debugw("disconnected from relay", "id", evt.Peer) log.Debugw("disconnected from relay", "id", evt.Peer)
delete(rf.relays, evt.Peer) delete(rf.relays, evt.Peer)
select {
case relayDisconnected <- struct{}{}:
default:
}
push = true push = true
} }
rf.relayMx.Unlock() rf.relayMx.Unlock()
@ -178,10 +193,44 @@ func (rf *relayFinder) background(ctx context.Context) {
// It is run on both public and private nodes. // It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow. // It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available. // This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context) { func (rf *relayFinder) findNodes(ctx context.Context, relayDisconnected <-chan struct{}) {
peerChan := rf.peerSource(rf.conf.maxCandidates)
const tick = 5 * time.Minute
const maxAge = 30 * time.Minute
timer := rf.conf.clock.Timer(tick)
defer timer.Stop()
for { for {
select { select {
case pi := <-rf.peerChan: case <-relayDisconnected:
if !timer.Stop() {
<-timer.C
}
timer.Reset(0)
case now := <-timer.C:
if peerChan != nil {
// We're still reading peers from the peerChan. No need to query for more peers now.
timer.Reset(tick)
continue
}
rf.relayMx.Lock()
numRelays := len(rf.relays)
rf.relayMx.Unlock()
rf.candidateMx.Lock()
numCandidates := len(rf.candidates)
rf.candidateMx.Unlock()
// Even if we are connected to the desired number of relays, or have enough candidates,
// we want to make sure that our candidate list doesn't become outdated.
if (numRelays >= rf.conf.desiredRelays || numCandidates >= rf.conf.maxCandidates) && now.Sub(rf.lastCandidateAdded) < maxAge {
timer.Reset(tick)
continue
}
peerChan = rf.peerSource(rf.conf.maxCandidates)
case pi, ok := <-peerChan:
if !ok {
peerChan = nil
continue
}
log.Debugw("found node", "id", pi.ID) log.Debugw("found node", "id", pi.ID)
rf.candidateMx.Lock() rf.candidateMx.Lock()
numCandidates := len(rf.candidates) numCandidates := len(rf.candidates)
@ -190,6 +239,7 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
log.Debugw("skipping node. Already have enough candidates", "id", pi.ID, "num", numCandidates, "max", rf.conf.maxCandidates) log.Debugw("skipping node. Already have enough candidates", "id", pi.ID, "num", numCandidates, "max", rf.conf.maxCandidates)
continue continue
} }
rf.lastCandidateAdded = rf.conf.clock.Now()
rf.refCount.Add(1) rf.refCount.Add(1)
go func() { go func() {
defer rf.refCount.Done() defer rf.refCount.Done()
@ -201,6 +251,23 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
} }
} }
func (rf *relayFinder) handleStaticRelays(ctx context.Context) {
sem := make(chan struct{}, 4)
var wg sync.WaitGroup
wg.Add(len(rf.conf.staticRelays))
for _, pi := range rf.conf.staticRelays {
sem <- struct{}{}
go func(pi peer.AddrInfo) {
defer wg.Done()
defer func() { <-sem }()
rf.handleNewNode(ctx, pi)
}(pi)
}
wg.Wait()
log.Debug("processed all static relays")
rf.notifyNewCandidate()
}
func (rf *relayFinder) notifyNewCandidate() { func (rf *relayFinder) notifyNewCandidate() {
select { select {
case rf.candidateFound <- struct{}{}: case rf.candidateFound <- struct{}{}:
@ -236,7 +303,10 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) {
rf.candidates[pi.ID] = &candidate{ai: pi, supportsRelayV2: supportsV2} rf.candidates[pi.ID] = &candidate{ai: pi, supportsRelayV2: supportsV2}
rf.candidateMx.Unlock() rf.candidateMx.Unlock()
rf.notifyNewCandidate() // Don't notify when we're using static relays. We need to process all entries first.
if !rf.usesStaticRelay() {
rf.notifyNewCandidate()
}
} }
// tryNode checks if a peer actually supports either circuit v1 or circuit v2. // tryNode checks if a peer actually supports either circuit v1 or circuit v2.
@ -343,13 +413,7 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
} }
rf.candidateMx.Lock() rf.candidateMx.Lock()
if len(rf.conf.staticRelays) != 0 { if !rf.usesStaticRelay() && len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay {
// make sure we read all static relays before continuing
if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay {
rf.candidateMx.Unlock()
return
}
} else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay {
// During the startup phase, we don't want to connect to the first candidate that we find. // During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those. // Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay), we still go ahead. // However, if that takes too long (longer than bootDelay), we still go ahead.
@ -584,6 +648,10 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
return raddrs return raddrs
} }
func (rf *relayFinder) usesStaticRelay() bool {
return len(rf.conf.staticRelays) > 0
}
func (rf *relayFinder) Start() error { func (rf *relayFinder) Start() error {
rf.ctxCancelMx.Lock() rf.ctxCancelMx.Lock()
defer rf.ctxCancelMx.Unlock() defer rf.ctxCancelMx.Unlock()

Loading…
Cancel
Save