diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 4c792ccf7..70ff28b63 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -43,6 +43,8 @@ const LibP2PVersion = "ipfs/0.1.0" const ServiceName = "libp2p.identify" +const maxPushConcurrency = 32 + // StreamReadTimeout is the read timeout on all incoming Identify family streams. var StreamReadTimeout = 60 * time.Second @@ -129,6 +131,10 @@ type idService struct { addPeerHandlerCh chan addPeerHandlerReq rmPeerHandlerCh chan rmPeerHandlerReq + + // pushSemaphore limits the push/delta concurrency to avoid storms + // that clog the transient scope. + pushSemaphore chan struct{} } // NewIDService constructs a new *idService and activates it by @@ -154,6 +160,8 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) { addPeerHandlerCh: make(chan addPeerHandlerReq), rmPeerHandlerCh: make(chan rmPeerHandlerReq), + + pushSemaphore: make(chan struct{}, maxPushConcurrency), } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index ad62910f4..5fd312585 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -179,6 +179,11 @@ func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network return nil, errProtocolNotSupported } + ph.ids.pushSemaphore <- struct{}{} + defer func() { + <-ph.ids.pushSemaphore + }() + // negotiate a stream without opening a new connection as we "should" already have a connection. ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel()