Browse Source

fix: address code review

fix/obserations
Steven Allen 5 years ago
parent
commit
84b923c7bf
  1. 2
      p2p/protocol/identify/id.go
  2. 26
      p2p/protocol/identify/obsaddr.go
  3. 2
      p2p/protocol/identify/obsaddr_test.go

2
p2p/protocol/identify/id.go

@ -116,7 +116,7 @@ func NewIDService(h host.Host, opts ...Option) *IDService {
ctx: hostCtx, ctx: hostCtx,
ctxCancel: cancel, ctxCancel: cancel,
conns: make(map[network.Conn]chan struct{}), conns: make(map[network.Conn]chan struct{}),
observedAddrs: NewObservedAddrManager(h), observedAddrs: NewObservedAddrManager(hostCtx, h),
} }
// handle local protocol handler updates, and push deltas to peers. // handle local protocol handler updates, and push deltas to peers.

26
p2p/protocol/identify/obsaddr.go

@ -1,6 +1,7 @@
package identify package identify
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -79,17 +80,18 @@ type ObservedAddrManager struct {
// NewObservedAddrManager returns a new address manager using // NewObservedAddrManager returns a new address manager using
// peerstore.OwnObservedAddressTTL as the TTL. // peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(host host.Host) *ObservedAddrManager { func NewObservedAddrManager(ctx context.Context, host host.Host) *ObservedAddrManager {
oas := &ObservedAddrManager{ oas := &ObservedAddrManager{
addrs: make(map[string][]*ObservedAddr), addrs: make(map[string][]*ObservedAddr),
ttl: peerstore.OwnObservedAddrTTL, ttl: peerstore.OwnObservedAddrTTL,
wch: make(chan newObservation, observedAddrManagerWorkerChannelSize), wch: make(chan newObservation, observedAddrManagerWorkerChannelSize),
host: host, host: host,
activeConns: make(map[network.Conn]ma.Multiaddr), activeConns: make(map[network.Conn]ma.Multiaddr),
// refresh every ttl/2 so we don't forget observations from connected peers
refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2), refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2),
} }
oas.host.Network().Notify((*obsAddrNotifiee)(oas)) oas.host.Network().Notify((*obsAddrNotifiee)(oas))
go oas.worker() go oas.worker(ctx)
return oas return oas
} }
@ -162,13 +164,13 @@ func (oas *ObservedAddrManager) teardown() {
oas.mu.Unlock() oas.mu.Unlock()
} }
func (oas *ObservedAddrManager) worker() { func (oas *ObservedAddrManager) worker(ctx context.Context) {
defer oas.teardown() defer oas.teardown()
ticker := time.NewTicker(GCInterval) ticker := time.NewTicker(GCInterval)
defer ticker.Stop() defer ticker.Stop()
closingCh := oas.host.Network().Process().Closing() hostClosing := oas.host.Network().Process().Closing()
for { for {
select { select {
case obs := <-oas.wch: case obs := <-oas.wch:
@ -177,7 +179,9 @@ func (oas *ObservedAddrManager) worker() {
oas.gc() oas.gc()
case <-oas.refreshTimer.C: case <-oas.refreshTimer.C:
oas.refresh() oas.refresh()
case <-closingCh: case <-hostClosing:
return
case <-ctx.Done():
return return
} }
} }
@ -199,6 +203,7 @@ func (oas *ObservedAddrManager) refresh() {
for _, obs := range recycledObservations { for _, obs := range recycledObservations {
oas.recordObservationUnlocked(obs.conn, obs.observed) oas.recordObservationUnlocked(obs.conn, obs.observed)
} }
// refresh every ttl/2 so we don't forget observations from connected peers
oas.refreshTimer.Reset(oas.ttl / 2) oas.refreshTimer.Reset(oas.ttl / 2)
} }
@ -356,6 +361,7 @@ func (oas *ObservedAddrManager) SetTTL(ttl time.Duration) {
oas.mu.Lock() oas.mu.Lock()
defer oas.mu.Unlock() defer oas.mu.Unlock()
oas.ttl = ttl oas.ttl = ttl
// refresh every ttl/2 so we don't forget observations from connected peers
oas.refreshTimer.Reset(ttl / 2) oas.refreshTimer.Reset(ttl / 2)
} }

2
p2p/protocol/identify/obsaddr_test.go

@ -70,7 +70,7 @@ func newHarness(ctx context.Context, t *testing.T) harness {
} }
return harness{ return harness{
oas: identify.NewObservedAddrManager(h), oas: identify.NewObservedAddrManager(ctx, h),
mocknet: mn, mocknet: mn,
host: h, host: h,
} }

Loading…
Cancel
Save