From d3d5351d8160b6b531fc39327a8313a3b8bb6228 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 7 May 2019 15:40:45 -0700 Subject: [PATCH 1/2] ping: return a stream of results Otherwise, we can't return errors. This is a breaking change but unlikely to have a large impact on anyone but go-ipfs. Part of https://github.com/ipfs/go-ipfs/issues/6298 --- p2p/protocol/ping/ping.go | 47 ++++++++++++++++++++++++++-------- p2p/protocol/ping/ping_test.go | 12 ++++----- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index b76133cc1..8f33513b5 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -71,42 +71,67 @@ func (p *PingService) PingHandler(s inet.Stream) { } } -func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { +// Result is a result of a ping attempt, either an RTT or an error. +type Result struct { + RTT time.Duration + Error error +} + +func (ps *PingService) Ping(ctx context.Context, p peer.ID) <-chan Result { return Ping(ctx, ps.Host, p) } -func Ping(ctx context.Context, h host.Host, p peer.ID) (<-chan time.Duration, error) { +// Ping pings the remote peer until the context is canceled, returning a stream +// of RTTs or errors. +func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result { s, err := h.NewStream(ctx, p, ID) if err != nil { - return nil, err + ch := make(chan Result, 1) + ch <- Result{Error: err} + close(ch) + return ch } - out := make(chan time.Duration) + ctx, cancel := context.WithCancel(ctx) + + out := make(chan Result) go func() { defer close(out) - defer s.Reset() + defer cancel() + for { select { case <-ctx.Done(): return default: - t, err := ping(s) - if err != nil { - log.Debugf("ping error: %s", err) + var res Result + res.RTT, res.Error = ping(s) + + // canceled, ignore everything. + if ctx.Err() != nil { return } - h.Peerstore().RecordLatency(p, t) + // No error, record the RTT. + if res.Error == nil { + h.Peerstore().RecordLatency(p, res.RTT) + } + select { - case out <- t: + case out <- res: case <-ctx.Done(): return } } } }() + go func() { + // forces the ping to abort. + <-ctx.Done() + s.Reset() + }() - return out, nil + return out } func ping(s inet.Stream) (time.Duration, error) { diff --git a/p2p/protocol/ping/ping_test.go b/p2p/protocol/ping/ping_test.go index f8ea0cc79..043eac0fd 100644 --- a/p2p/protocol/ping/ping_test.go +++ b/p2p/protocol/ping/ping_test.go @@ -37,15 +37,15 @@ func TestPing(t *testing.T) { func testPing(t *testing.T, ps *ping.PingService, p peer.ID) { pctx, cancel := context.WithCancel(context.Background()) defer cancel() - ts, err := ps.Ping(pctx, p) - if err != nil { - t.Fatal(err) - } + ts := ps.Ping(pctx, p) for i := 0; i < 5; i++ { select { - case took := <-ts: - t.Log("ping took: ", took) + case res := <-ts: + if res.Error != nil { + t.Fatal(res.Error) + } + t.Log("ping took: ", res.RTT) case <-time.After(time.Second * 4): t.Fatal("failed to receive ping") } From d0ab45164c3f9715db464e1f45832a8535b18ca7 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 7 May 2019 15:43:48 -0700 Subject: [PATCH 2/2] ping: simplify ping loop --- p2p/protocol/ping/ping.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index 8f33513b5..d8e143bd0 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -99,29 +99,24 @@ func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result { defer close(out) defer cancel() - for { + for ctx.Err() == nil { + var res Result + res.RTT, res.Error = ping(s) + + // canceled, ignore everything. + if ctx.Err() != nil { + return + } + + // No error, record the RTT. + if res.Error == nil { + h.Peerstore().RecordLatency(p, res.RTT) + } + select { + case out <- res: case <-ctx.Done(): return - default: - var res Result - res.RTT, res.Error = ping(s) - - // canceled, ignore everything. - if ctx.Err() != nil { - return - } - - // No error, record the RTT. - if res.Error == nil { - h.Peerstore().RecordLatency(p, res.RTT) - } - - select { - case out <- res: - case <-ctx.Done(): - return - } } } }()