Browse Source

fix: cleanup transport suite

merge-testsuite-transports
Steven Allen 3 years ago
parent
commit
75a6c48ffc
  1. 206
      p2p/transport/testsuite/stream_suite.go

206
p2p/transport/testsuite/stream_suite.go

@ -7,7 +7,6 @@ import (
"io"
"io/ioutil"
"os"
"runtime/debug"
"strconv"
"sync"
"testing"
@ -82,15 +81,6 @@ func randBuf(size int) []byte {
return randomness[start : start+size]
}
func checkErr(t *testing.T, err error) {
t.Helper()
if err != nil {
debug.PrintStack()
// TODO: not safe to call in parallel
t.Fatal(err)
}
}
func debugLog(t *testing.T, s string, args ...interface{}) {
if VerboseDebugging {
t.Logf(s, args...)
@ -98,7 +88,6 @@ func debugLog(t *testing.T, s string, args ...interface{}) {
}
func echoStream(t *testing.T, s mux.MuxedStream) {
defer s.Close()
// echo everything
var err error
if VerboseDebugging {
@ -123,42 +112,45 @@ func (lw *logWriter) Write(buf []byte) (int, error) {
return lw.W.Write(buf)
}
func goServe(t *testing.T, l transport.Listener) (done func()) {
closed := make(chan struct{}, 1)
func echo(t *testing.T, c transport.CapableConn) {
var wg sync.WaitGroup
defer wg.Wait()
for {
str, err := c.AcceptStream()
if err != nil {
break
}
wg.Add(1)
go func() {
defer wg.Done()
defer str.Close()
echoStream(t, str)
}()
}
}
go func() {
for {
c, err := l.Accept()
if err != nil {
select {
case <-closed:
return // closed naturally.
default:
checkErr(t, err)
}
}
func serve(t *testing.T, l transport.Listener) {
var wg sync.WaitGroup
defer wg.Wait()
debugLog(t, "accepted connection")
go func() {
for {
str, err := c.AcceptStream()
if err != nil {
break
}
go echoStream(t, str)
}
}()
for {
c, err := l.Accept()
if err != nil {
return
}
}()
return func() {
closed <- struct{}{}
wg.Add(1)
debugLog(t, "accepted connection")
go func() {
defer wg.Done()
defer c.Close()
echo(t, c)
}()
}
}
func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) {
msgsize := 1 << 11
errs := make(chan error) // dont block anything.
rateLimitN := 5000 // max of 5k funcs, because -race has 8k max.
rateLimitChan := make(chan struct{}, rateLimitN)
@ -180,7 +172,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
bufs <- buf
debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- fmt.Errorf("s.Write(buf): %s", err)
t.Errorf("s.Write(buf): %s", err)
continue
}
}
@ -196,12 +188,12 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3])
if _, err := io.ReadFull(s, buf2); err != nil {
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
t.Errorf("io.ReadFull(s, buf2): %s", err)
debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3])
continue
}
if !bytes.Equal(buf1, buf2) {
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
t.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
}
}
}
@ -211,7 +203,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
s, err := c.OpenStream(context.Background())
if err != nil {
errs <- fmt.Errorf("failed to create NewStream: %s", err)
t.Errorf("failed to create NewStream: %s", err)
return
}
@ -228,68 +220,67 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
openConnAndRW := func() {
debugLog(t, "openConnAndRW")
var wg sync.WaitGroup
defer wg.Wait()
l, err := ta.Listen(maddr)
checkErr(t, err)
if err != nil {
t.Error(err)
return
}
defer l.Close()
done := goServe(t, l)
defer done()
wg.Add(1)
go func() {
defer wg.Done()
serve(t, l)
}()
c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
if err != nil {
t.Error(err)
return
}
// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
wg.Add(1)
go func() {
defer wg.Done()
defer c.Close()
debugLog(t, "serving connection")
for {
str, err := c.AcceptStream()
if err != nil {
break
}
go echoStream(t, str)
}
echo(t, c)
}()
var wg sync.WaitGroup
var openWg sync.WaitGroup
for i := 0; i < opt.StreamNum; i++ {
wg.Add(1)
openWg.Add(1)
go rateLimit(func() {
defer wg.Done()
defer openWg.Done()
openStreamAndRW(c)
})
}
wg.Wait()
c.Close()
}
openConnsAndRW := func() {
debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum)
var wg sync.WaitGroup
for i := 0; i < opt.ConnNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openConnAndRW()
})
}
wg.Wait()
openWg.Wait()
}
go func() {
openConnsAndRW()
close(errs) // done
}()
debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum)
for err := range errs {
t.Error(err)
var wg sync.WaitGroup
defer wg.Wait()
for i := 0; i < opt.ConnNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openConnAndRW()
})
}
}
func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
l, err := ta.Listen(maddr)
checkErr(t, err)
if err != nil {
t.Fatal(err)
}
defer l.Close()
count := 10000
@ -311,8 +302,13 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.
accepted <- err
}()
connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
checkErr(t, <-accepted)
if err != nil {
t.Fatal(err)
}
err = <-accepted
if err != nil {
t.Fatal(err)
}
defer func() {
if connA != nil {
@ -379,22 +375,36 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.
}
func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
var wg sync.WaitGroup
defer wg.Wait()
l, err := ta.Listen(maddr)
checkErr(t, err)
if err != nil {
t.Fatal(err)
}
defer l.Close()
done := make(chan struct{}, 2)
wg.Add(1)
go func() {
defer wg.Done()
muxa, err := l.Accept()
checkErr(t, err)
if err != nil {
t.Error(err)
return
}
defer muxa.Close()
s, err := muxa.OpenStream(context.Background())
if err != nil {
panic(err)
t.Error(err)
return
}
defer s.Close()
// Some transports won't open the stream until we write. That's
// fine.
s.Write([]byte("foo"))
_, _ = s.Write([]byte("foo"))
time.Sleep(time.Millisecond * 50)
@ -403,22 +413,20 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi
t.Error("should have failed to write")
}
s.Close()
done <- struct{}{}
}()
muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
go func() {
str, err := muxb.AcceptStream()
checkErr(t, err)
str.Reset()
done <- struct{}{}
}()
if err != nil {
t.Fatal(err)
}
defer muxb.Close()
<-done
<-done
str, err := muxb.AcceptStream()
if err != nil {
t.Error(err)
return
}
str.Reset()
}
func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {

Loading…
Cancel
Save