Browse Source

Merge pull request #899 from libp2p/fix/obserations

fix: keep observed addrs alive as long as their associated connections are alive
pull/901/head v0.8.2
Steven Allen 5 years ago
committed by GitHub
parent
commit
ef1cfe78bd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      go.sum
  2. 31
      p2p/protocol/identify/id.go
  3. 245
      p2p/protocol/identify/obsaddr.go
  4. 214
      p2p/protocol/identify/obsaddr_test.go

3
go.sum

@ -1,6 +1,7 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
@ -427,6 +428,7 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
@ -532,4 +534,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

31
p2p/protocol/identify/id.go

@ -85,8 +85,7 @@ type IDService struct {
addrMu sync.Mutex
// our own observed addresses.
// TODO: instead of expiring, remove these when we disconnect
observedAddrs *ObservedAddrSet
observedAddrs *ObservedAddrManager
subscription event.Subscription
emitters struct {
@ -117,7 +116,7 @@ func NewIDService(h host.Host, opts ...Option) *IDService {
ctx: hostCtx,
ctxCancel: cancel,
conns: make(map[network.Conn]chan struct{}),
observedAddrs: NewObservedAddrSet(hostCtx),
observedAddrs: NewObservedAddrManager(hostCtx, h),
}
// handle local protocol handler updates, and push deltas to peers.
@ -584,31 +583,7 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c network.Conn) {
return
}
// we should only use ObservedAddr when our connection's LocalAddr is one
// of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that
// address's external mapping is not very useful because the port will not be
// the same as the listen addr.
ifaceaddrs, err := ids.Host.Network().InterfaceListenAddresses()
if err != nil {
log.Infof("failed to get interface listen addrs", err)
return
}
log.Debugf("identify identifying observed multiaddr: %s %s", c.LocalMultiaddr(), ifaceaddrs)
if !addrInAddrs(c.LocalMultiaddr(), ifaceaddrs) && !addrInAddrs(c.LocalMultiaddr(), ids.Host.Network().ListenAddresses()) {
// not in our list
return
}
if !HasConsistentTransport(maddr, ids.Host.Addrs()) {
log.Debugf("ignoring observed multiaddr that doesn't match the transports of any addresses we're announcing", c.RemoteMultiaddr())
return
}
// ok! we have the observed version of one of our ListenAddresses!
log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr)
ids.observedAddrs.Add(maddr, c.LocalMultiaddr(), c.RemoteMultiaddr(),
c.Stat().Direction)
ids.observedAddrs.Record(c, maddr)
}
func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {

245
p2p/protocol/identify/obsaddr.go

@ -5,10 +5,12 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// ActivationThresh sets how many times an address must be seen as "activated"
@ -24,9 +26,9 @@ var ActivationThresh = 4
// it is older than OwnObservedAddressTTL * ActivationThresh (40 minutes).
var GCInterval = 10 * time.Minute
// observedAddrSetWorkerChannelSize defines how many addresses can be enqueued
// for adding to an ObservedAddrSet.
var observedAddrSetWorkerChannelSize = 16
// observedAddrManagerWorkerChannelSize defines how many addresses can be enqueued
// for adding to an ObservedAddrManager.
var observedAddrManagerWorkerChannelSize = 16
type observation struct {
seenTime time.Time
@ -52,40 +54,52 @@ func (oa *ObservedAddr) activated(ttl time.Duration) bool {
}
type newObservation struct {
observed, local, observer ma.Multiaddr
direction network.Direction
conn network.Conn
observed ma.Multiaddr
}
// ObservedAddrSet keeps track of a set of ObservedAddrs
// the zero-value is ready to be used.
type ObservedAddrSet struct {
sync.RWMutex // guards whole datastruct.
// ObservedAddrManager keeps track of a ObservedAddrs.
type ObservedAddrManager struct {
host host.Host
// latest observation from active connections
// we'll "re-observe" these when we gc
activeConnsMu sync.Mutex
// active connection -> most recent observation
activeConns map[network.Conn]ma.Multiaddr
mu sync.RWMutex
// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
ttl time.Duration
addrs map[string][]*ObservedAddr
ttl time.Duration
refreshTimer *time.Timer
// this is the worker channel
wch chan newObservation
}
// NewObservedAddrSet returns a new set using peerstore.OwnObservedAddressTTL
// as the TTL.
func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet {
oas := &ObservedAddrSet{
addrs: make(map[string][]*ObservedAddr),
ttl: peerstore.OwnObservedAddrTTL,
wch: make(chan newObservation, observedAddrSetWorkerChannelSize),
// NewObservedAddrManager returns a new address manager using
// peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(ctx context.Context, host host.Host) *ObservedAddrManager {
oas := &ObservedAddrManager{
addrs: make(map[string][]*ObservedAddr),
ttl: peerstore.OwnObservedAddrTTL,
wch: make(chan newObservation, observedAddrManagerWorkerChannelSize),
host: host,
activeConns: make(map[network.Conn]ma.Multiaddr),
// refresh every ttl/2 so we don't forget observations from connected peers
refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2),
}
oas.host.Network().Notify((*obsAddrNotifiee)(oas))
go oas.worker(ctx)
return oas
}
// AddrsFor return all activated observed addresses associated with the given
// (resolved) listen address.
func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
oas.RLock()
defer oas.RUnlock()
func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
oas.mu.RLock()
defer oas.mu.RUnlock()
if len(oas.addrs) == 0 {
return nil
@ -108,9 +122,9 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
}
// Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.RLock()
defer oas.RUnlock()
func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) {
oas.mu.RLock()
defer oas.mu.RUnlock()
if len(oas.addrs) == 0 {
return nil
@ -127,43 +141,79 @@ func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
return addrs
}
// Add attemps to queue a new observed address to be added to the set.
func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
direction network.Direction) {
// Record records an address observation, if valid.
func (oas *ObservedAddrManager) Record(conn network.Conn, observed ma.Multiaddr) {
select {
case oas.wch <- newObservation{observed: observed, local: local, observer: observer, direction: direction}:
case oas.wch <- newObservation{
conn: conn,
observed: observed,
}:
default:
log.Debugf("dropping address observation of %s; buffer full", observed)
log.Debugw("dropping address observation due to full buffer",
"from", conn.RemoteMultiaddr(),
"observed", observed,
)
}
}
func (oas *ObservedAddrSet) worker(ctx context.Context) {
func (oas *ObservedAddrManager) teardown() {
oas.host.Network().StopNotify((*obsAddrNotifiee)(oas))
oas.mu.Lock()
oas.refreshTimer.Stop()
oas.mu.Unlock()
}
func (oas *ObservedAddrManager) worker(ctx context.Context) {
defer oas.teardown()
ticker := time.NewTicker(GCInterval)
defer ticker.Stop()
hostClosing := oas.host.Network().Process().Closing()
for {
select {
case obs := <-oas.wch:
oas.doAdd(obs.observed, obs.local, obs.observer, obs.direction)
oas.maybeRecordObservation(obs.conn, obs.observed)
case <-ticker.C:
oas.gc()
case <-oas.refreshTimer.C:
oas.refresh()
case <-hostClosing:
return
case <-ctx.Done():
return
}
}
}
func (oas *ObservedAddrSet) gc() {
oas.Lock()
defer oas.Unlock()
func (oas *ObservedAddrManager) refresh() {
oas.activeConnsMu.Lock()
recycledObservations := make([]newObservation, 0, len(oas.activeConns))
for conn, observed := range oas.activeConns {
recycledObservations = append(recycledObservations, newObservation{
conn: conn,
observed: observed,
})
}
oas.activeConnsMu.Unlock()
oas.mu.Lock()
defer oas.mu.Unlock()
for _, obs := range recycledObservations {
oas.recordObservationUnlocked(obs.conn, obs.observed)
}
// refresh every ttl/2 so we don't forget observations from connected peers
oas.refreshTimer.Reset(oas.ttl / 2)
}
func (oas *ObservedAddrManager) gc() {
oas.mu.Lock()
defer oas.mu.Unlock()
now := time.Now()
for local, observedAddrs := range oas.addrs {
// TODO we can do this without allocating by compacting the array in place
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
filteredAddrs := observedAddrs[:0]
for _, a := range observedAddrs {
// clean up SeenBy set
for k, ob := range a.SeenBy {
@ -185,20 +235,92 @@ func (oas *ObservedAddrSet) gc() {
}
}
func (oas *ObservedAddrSet) doAdd(observed, local, observer ma.Multiaddr,
direction network.Direction) {
func (oas *ObservedAddrManager) addConn(conn network.Conn, observed ma.Multiaddr) {
oas.activeConnsMu.Lock()
defer oas.activeConnsMu.Unlock()
// We need to make sure we haven't received a disconnect event for this
// connection yet. The only way to do that right now is to make sure the
// swarm still has the connection.
//
// Doing this under a lock that we _also_ take in a disconnect event
// handler ensures everything happens in the right order.
for _, c := range oas.host.Network().ConnsToPeer(conn.RemotePeer()) {
if c == conn {
oas.activeConns[conn] = observed
return
}
}
}
func (oas *ObservedAddrManager) removeConn(conn network.Conn) {
// DO NOT remove this lock.
// This ensures we don't call addConn at the same time:
// 1. see that we have a connection and pause inside addConn right before recording it.
// 2. process a disconnect event.
// 3. record the connection (leaking it).
oas.activeConnsMu.Lock()
delete(oas.activeConns, conn)
oas.activeConnsMu.Unlock()
}
func (oas *ObservedAddrManager) maybeRecordObservation(conn network.Conn, observed ma.Multiaddr) {
// First, determine if this observation is even worth keeping...
// Ignore observations from loopback nodes. We already know our loopback
// addresses.
if manet.IsIPLoopback(observed) {
return
}
// we should only use ObservedAddr when our connection's LocalAddr is one
// of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that
// address's external mapping is not very useful because the port will not be
// the same as the listen addr.
ifaceaddrs, err := oas.host.Network().InterfaceListenAddresses()
if err != nil {
log.Infof("failed to get interface listen addrs", err)
return
}
local := conn.LocalMultiaddr()
if !addrInAddrs(local, ifaceaddrs) && !addrInAddrs(local, oas.host.Network().ListenAddresses()) {
// not in our list
return
}
// We should reject the connection if the observation doesn't match the
// transports of one of our advertised addresses.
if !HasConsistentTransport(observed, oas.host.Addrs()) {
log.Debugw(
"observed multiaddr doesn't match the transports of any announced addresses",
"from", conn.RemoteMultiaddr(),
"observed", observed,
)
return
}
// Ok, the observation is good, record it.
log.Debugw("added own observed listen addr", "observed", observed)
defer oas.addConn(conn, observed)
oas.mu.Lock()
defer oas.mu.Unlock()
oas.recordObservationUnlocked(conn, observed)
}
func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, observed ma.Multiaddr) {
now := time.Now()
observerString := observerGroup(observer)
localString := string(local.Bytes())
observerString := observerGroup(conn.RemoteMultiaddr())
localString := string(conn.LocalMultiaddr().Bytes())
ob := observation{
seenTime: now,
connDirection: direction,
connDirection: conn.Stat().Direction,
}
oas.Lock()
defer oas.Unlock()
observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
@ -234,16 +356,29 @@ func observerGroup(m ma.Multiaddr) string {
return string(first.Bytes())
}
// SetTTL sets the TTL of an observed address-set.
func (oas *ObservedAddrSet) SetTTL(ttl time.Duration) {
oas.Lock()
defer oas.Unlock()
// SetTTL sets the TTL of an observed address manager.
func (oas *ObservedAddrManager) SetTTL(ttl time.Duration) {
oas.mu.Lock()
defer oas.mu.Unlock()
oas.ttl = ttl
// refresh every ttl/2 so we don't forget observations from connected peers
oas.refreshTimer.Reset(ttl / 2)
}
// TTL gets the TTL of an observed address-set.
func (oas *ObservedAddrSet) TTL() time.Duration {
oas.RLock()
defer oas.RUnlock()
// TTL gets the TTL of an observed address manager.
func (oas *ObservedAddrManager) TTL() time.Duration {
oas.mu.RLock()
defer oas.mu.RUnlock()
return oas.ttl
}
type obsAddrNotifiee ObservedAddrManager
func (on *obsAddrNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (on *obsAddrNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
func (on *obsAddrNotifiee) Connected(n network.Network, v network.Conn) {}
func (on *obsAddrNotifiee) Disconnected(n network.Network, v network.Conn) {
(*ObservedAddrManager)(on).removeConn(v)
}
func (on *obsAddrNotifiee) OpenedStream(n network.Network, s network.Stream) {}
func (on *obsAddrNotifiee) ClosedStream(n network.Network, s network.Stream) {}

214
p2p/protocol/identify/obsaddr_test.go

@ -1,4 +1,4 @@
package identify
package identify_test
import (
"context"
@ -7,10 +7,75 @@ import (
"time"
detectrace "github.com/ipfs/go-detect-race"
net "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
p2putil "github.com/libp2p/go-libp2p-netutil"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
)
type harness struct {
t *testing.T
mocknet mocknet.Mocknet
host host.Host
oas *identify.ObservedAddrManager
}
func (h *harness) add(observer ma.Multiaddr) peer.ID {
// create a new fake peer.
sk, err := p2putil.RandTestBogusPrivateKey()
if err != nil {
h.t.Fatal(err)
}
h2, err := h.mocknet.AddPeer(sk, observer)
if err != nil {
h.t.Fatal(err)
}
_, err = h.mocknet.LinkPeers(h.host.ID(), h2.ID())
if err != nil {
h.t.Fatal(err)
}
return h2.ID()
}
func (h *harness) conn(observer peer.ID) network.Conn {
c, err := h.mocknet.ConnectPeers(h.host.ID(), observer)
if err != nil {
h.t.Fatal(err)
}
return c
}
func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) {
c := h.conn(observer)
h.oas.Record(c, observed)
time.Sleep(1 * time.Millisecond) // let the worker run
}
func newHarness(ctx context.Context, t *testing.T) harness {
mn := mocknet.New(ctx)
sk, err := p2putil.RandTestBogusPrivateKey()
if err != nil {
t.Fatal(err)
}
h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086"))
if err != nil {
t.Fatal(err)
}
return harness{
oas: identify.NewObservedAddrManager(ctx, h),
mocknet: mn,
host: h,
}
}
// TestObsAddrSet
func TestObsAddrSet(t *testing.T) {
m := func(s string) ma.Multiaddr {
@ -55,120 +120,125 @@ func TestObsAddrSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oas := NewObservedAddrSet(ctx)
if !addrsMarch(oas.Addrs(), nil) {
harness := newHarness(ctx, t)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
}
add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) {
dummyLocal := m("/ip4/127.0.0.1/tcp/10086")
dummyDirection := net.DirOutbound
pa4 := harness.add(a4)
pa5 := harness.add(a5)
oas.Add(observed, dummyLocal, observer, dummyDirection)
time.Sleep(1 * time.Millisecond) // let the worker run
}
pb1 := harness.add(b1)
pb2 := harness.add(b2)
pb3 := harness.add(b3)
pb4 := harness.add(b4)
pb5 := harness.add(b5)
add(oas, a1, a4)
add(oas, a2, a4)
add(oas, a3, a4)
harness.observe(a1, pa4)
harness.observe(a2, pa4)
harness.observe(a3, pa4)
// these are all different so we should not yet get them.
if !addrsMarch(oas.Addrs(), nil) {
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (once)")
}
// same observer, so should not yet get them.
add(oas, a1, a4)
add(oas, a2, a4)
add(oas, a3, a4)
if !addrsMarch(oas.Addrs(), nil) {
harness.observe(a1, pa4)
harness.observe(a2, pa4)
harness.observe(a3, pa4)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs)")
}
// different observer, but same observer group.
add(oas, a1, a5)
add(oas, a2, a5)
add(oas, a3, a5)
if !addrsMarch(oas.Addrs(), nil) {
harness.observe(a1, pa5)
harness.observe(a2, pa5)
harness.observe(a3, pa5)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs group)")
}
add(oas, a1, b1)
add(oas, a1, b2)
add(oas, a1, b3)
if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1}) {
harness.observe(a1, pb1)
harness.observe(a1, pb2)
harness.observe(a1, pb3)
if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a1}) {
t.Error("addrs should only have a1")
}
add(oas, a2, a5)
add(oas, a1, a5)
add(oas, a1, a5)
add(oas, a2, b1)
add(oas, a1, b1)
add(oas, a1, b1)
add(oas, a2, b2)
add(oas, a1, b2)
add(oas, a1, b2)
add(oas, a2, b4)
add(oas, a2, b5)
if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1, a2}) {
harness.observe(a2, pa5)
harness.observe(a1, pa5)
harness.observe(a1, pa5)
harness.observe(a2, pb1)
harness.observe(a1, pb1)
harness.observe(a1, pb1)
harness.observe(a2, pb2)
harness.observe(a1, pb2)
harness.observe(a1, pb2)
harness.observe(a2, pb4)
harness.observe(a2, pb5)
if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a1, a2}) {
t.Error("addrs should only have a1, a2")
}
// change the timeout constant so we can time it out.
oas.SetTTL(time.Millisecond * 200)
// force a refresh.
harness.oas.SetTTL(time.Millisecond * 200)
<-time.After(time.Millisecond * 210)
if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should have timed out")
if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a1, a2}) {
t.Error("addrs should only have a1, a2")
}
}
func TestAddAddrsProfile(b *testing.T) {
if detectrace.WithRace() {
b.Skip("test too slow when the race detector is running")
}
m := func(s string) ma.Multiaddr {
m, err := ma.NewMultiaddr(s)
if err != nil {
b.Fatal(err)
// disconnect from all but b5.
for _, p := range harness.host.Network().Peers() {
if p == pb5 {
continue
}
return m
harness.host.Network().ClosePeer(p)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oas := NewObservedAddrSet(ctx)
// wait for all other addresses to time out.
<-time.After(time.Millisecond * 210)
// Should still have a2
if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a2}) {
t.Error("should only have a2, have: ", harness.oas.Addrs())
}
harness.host.Network().ClosePeer(pb5)
add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) {
dummyLocal := m("/ip4/127.0.0.1/tcp/10086")
dummyDirection := net.DirOutbound
// wait for all addresses to timeout
<-time.After(time.Millisecond * 400)
oas.Add(observed, dummyLocal, observer, dummyDirection)
time.Sleep(1 * time.Millisecond) // let the worker run
// Should still have a2
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should have timed out")
}
}
a1 := m("/ip4/1.2.3.4/tcp/1231")
a2 := m("/ip4/1.2.3.4/tcp/1232")
a3 := m("/ip4/1.2.3.4/tcp/1233")
a4 := m("/ip4/1.2.3.4/tcp/1234")
a5 := m("/ip4/1.2.3.4/tcp/1235")
func TestAddAddrsProfile(t *testing.T) {
if detectrace.WithRace() {
t.Skip("test too slow when the race detector is running")
}
b1 := m("/ip4/1.2.3.6/tcp/1236")
b2 := m("/ip4/1.2.3.7/tcp/1237")
b3 := m("/ip4/1.2.3.8/tcp/1237")
b4 := m("/ip4/1.2.3.9/tcp/1237")
b5 := m("/ip4/1.2.3.10/tcp/1237")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
_ = []ma.Multiaddr{a1, a2, a3, a4, a5, b1, b2, b3, b4, b5}
addr := ma.StringCast("/ip4/1.2.3.4/tcp/1231")
p := harness.add(ma.StringCast("/ip4/1.2.3.6/tcp/1236"))
c := harness.conn(p)
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
add(oas, a1, b1)
harness.oas.Record(c, addr)
time.Sleep(1 * time.Millisecond)
}
}()
}

Loading…
Cancel
Save