Browse Source

identify: fix flaky tests (#1515)

* Remove sleep in Identify tests

* Use t.Helper()

* Nits

* Remove BufSize option

* Remove old DisableQUIC option
pull/1522/head
Marco Munizaga 2 years ago
committed by GitHub
parent
commit
df1ee9a23c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 178
      p2p/protocol/identify/id_test.go

178
p2p/protocol/identify/id_test.go

@ -168,7 +168,7 @@ func TestIDService(t *testing.T) {
require.NoError(t, err)
defer ids2.Close()
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(16))
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
if err != nil {
t.Fatal(err)
}
@ -377,6 +377,13 @@ func TestIdentifyDeltaOnProtocolChange(t *testing.T) {
t.Fatal(err)
}
idComplete, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)
defer idComplete.Close()
idFailed, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationFailed{})
require.NoError(t, err)
defer idFailed.Close()
conn := h1.Network().ConnsToPeer(h2.ID())[0]
select {
case <-ids1.IdentifyWait(conn):
@ -384,6 +391,14 @@ func TestIdentifyDeltaOnProtocolChange(t *testing.T) {
t.Fatal("took over 5 seconds to identify")
}
select {
case <-idComplete.Out():
case evt := <-idFailed.Out():
t.Fatalf("Failed to identify: %v", evt.(event.EvtPeerIdentificationFailed).Reason)
default:
t.Fatal("Missing id event")
}
protos, err := h1.Peerstore().GetProtocols(h2.ID())
if err != nil {
t.Fatal(err)
@ -395,49 +410,65 @@ func TestIdentifyDeltaOnProtocolChange(t *testing.T) {
// set up a subscriber to listen to peer protocol updated events in h1. We expect to receive events from h2
// as protocols are added and removed.
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}, eventbus.BufSize(16))
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
if err != nil {
t.Fatal(err)
}
defer sub.Close()
h1ProtocolsUpdates, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
require.NoError(t, err)
defer h1ProtocolsUpdates.Close()
waitForDelta := make(chan struct{})
go func() {
expectedCount := 2
for expectedCount > 0 {
evt := <-h1ProtocolsUpdates.Out()
expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Added)
}
close(waitForDelta)
}()
// add two new protocols in h2 and wait for identify to send deltas.
h2.SetStreamHandler(protocol.ID("foo"), func(_ network.Stream) {})
h2.SetStreamHandler(protocol.ID("bar"), func(_ network.Stream) {})
// check that h1 now knows about h2's new protocols.
require.Eventually(t, func() bool {
protos, err = h1.Peerstore().GetProtocols(h2.ID())
if err != nil {
return false
}
have := make(map[string]struct{}, len(protos))
for _, p := range protos {
have[p] = struct{}{}
}
recvWithTimeout(t, waitForDelta, 10*time.Second, "Timed out waiting to read protocol ids from the wire")
_, okfoo := have["foo"]
_, okbar := have["bar"]
return okfoo && okbar
}, time.Second, 10*time.Millisecond)
protos, err = h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)
have := make(map[string]bool, len(protos))
for _, p := range protos {
have[p] = true
}
require.True(t, have["foo"])
require.True(t, have["bar"])
// remove one of the newly added protocols from h2, and wait for identify to send the delta.
h2.RemoveStreamHandler(protocol.ID("bar"))
// check that h1 now has forgotten about h2's bar protocol.
require.Eventually(t, func() bool {
protos, err = h1.Peerstore().GetProtocols(h2.ID())
if err != nil {
return false
}
have := make(map[string]struct{}, len(protos))
for _, p := range protos {
have[p] = struct{}{}
waitForDelta = make(chan struct{})
go func() {
expectedCount := 1
for expectedCount > 0 {
evt := <-h1ProtocolsUpdates.Out()
expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Removed)
}
close(waitForDelta)
}()
_, okfoo := have["foo"]
_, okbar := have["bar"]
return okfoo && !okbar
}, time.Second, 10*time.Millisecond)
// check that h1 now has forgotten about h2's bar protocol.
recvWithTimeout(t, waitForDelta, 10*time.Second, "timed out waiting for protocol to be removed")
protos, err = h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)
have = make(map[string]bool, len(protos))
for _, p := range protos {
have[p] = true
}
require.True(t, have["foo"])
require.False(t, have["bar"])
// make sure that h1 emitted events in the eventbus for h2's protocol updates.
done := make(chan struct{})
@ -526,7 +557,7 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
<-time.After(500 * time.Millisecond)
// subscribe to events in h1; after identify h1 should receive the delta from h2 and publish an event in the bus.
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}, eventbus.BufSize(16))
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
if err != nil {
t.Fatal(err)
}
@ -591,34 +622,42 @@ func TestIdentifyPushOnAddrChange(t *testing.T) {
lad := ma.StringCast("/ip4/127.0.0.1/tcp/1234")
require.NoError(t, h1.Network().Listen(lad))
require.Contains(t, h1.Addrs(), lad)
h2AddrStream := h2.Peerstore().AddrStream(ctx, h1p)
emitAddrChangeEvt(t, h1)
require.Eventually(t, func() bool {
addrs := h2.Peerstore().Addrs(h1p)
for _, ad := range addrs {
if ad.Equal(lad) {
return true
}
// Wait for h2 to process the new addr
waitForAddrInStream(t, h2AddrStream, lad, 10*time.Second, "h2 did not receive addr change")
found := false
addrs := h2.Peerstore().Addrs(h1p)
for _, ad := range addrs {
if ad.Equal(lad) {
found = true
}
return false
}, 1*time.Second, 10*time.Millisecond)
}
require.True(t, found)
require.NotNil(t, getSignedRecord(t, h2, h1p))
// change addr on host2 and ensure host 1 gets a pus
lad = ma.StringCast("/ip4/127.0.0.1/tcp/1235")
require.NoError(t, h2.Network().Listen(lad))
require.Contains(t, h2.Addrs(), lad)
h1AddrStream := h1.Peerstore().AddrStream(ctx, h2p)
emitAddrChangeEvt(t, h2)
require.Eventually(t, func() bool {
addrs := h1.Peerstore().Addrs(h2p)
for _, ad := range addrs {
if ad.Equal(lad) {
return true
}
// Wait for h1 to process the new addr
waitForAddrInStream(t, h1AddrStream, lad, 10*time.Second, "h1 did not receive addr change")
found = false
addrs = h1.Peerstore().Addrs(h2p)
for _, ad := range addrs {
if ad.Equal(lad) {
found = true
}
return false
}, 1*time.Second, 10*time.Millisecond)
}
require.True(t, found)
require.NotNil(t, getSignedRecord(t, h1, h2p))
// change addr on host2 again
@ -627,15 +666,17 @@ func TestIdentifyPushOnAddrChange(t *testing.T) {
require.Contains(t, h2.Addrs(), lad2)
emitAddrChangeEvt(t, h2)
require.Eventually(t, func() bool {
addrs := h1.Peerstore().Addrs(h2p)
for _, ad := range addrs {
if ad.Equal(lad2) {
return true
}
// Wait for h1 to process the new addr
waitForAddrInStream(t, h1AddrStream, lad2, 10*time.Second, "h1 did not receive addr change")
found = false
addrs = h1.Peerstore().Addrs(h2p)
for _, ad := range addrs {
if ad.Equal(lad2) {
found = true
}
return false
}, 1*time.Second, 10*time.Millisecond)
}
require.True(t, found)
require.NotNil(t, getSignedRecord(t, h1, h2p))
}
@ -780,7 +821,7 @@ func TestLargeIdentifyMessage(t *testing.T) {
require.NoError(t, err)
defer ids2.Close()
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(16))
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
require.NoError(t, err)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
@ -983,7 +1024,7 @@ func TestIdentifyResponseReadTimeout(t *testing.T) {
time.Sleep(100 * time.Second)
})
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationFailed), eventbus.BufSize(16))
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationFailed))
require.NoError(t, err)
h2pi := h2.Peerstore().PeerInfo(h2p)
@ -1043,3 +1084,28 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
}, 1*time.Second, 200*time.Millisecond)
}
}
func recvWithTimeout(t *testing.T, s <-chan struct{}, timeout time.Duration, failMsg string) {
t.Helper()
select {
case <-s:
return
case <-time.After(timeout):
t.Fatalf("Hit time while waiting to recv from channel: %s", failMsg)
}
}
func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multiaddr, timeout time.Duration, failMsg string) {
t.Helper()
for {
select {
case addr := <-s:
if addr.Equal(expected) {
return
}
continue
case <-time.After(timeout):
t.Fatalf(failMsg)
}
}
}

Loading…
Cancel
Save