Browse Source
Merge pull request #1305 from libp2p/fix/identify-storm
add semaphore to control push/delta concurrency
pull/1306/head
vyzo
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with
13 additions and
0 deletions
-
p2p/protocol/identify/id.go
-
p2p/protocol/identify/peer_loop.go
|
|
@ -43,6 +43,8 @@ const LibP2PVersion = "ipfs/0.1.0" |
|
|
|
|
|
|
|
const ServiceName = "libp2p.identify" |
|
|
|
|
|
|
|
const maxPushConcurrency = 32 |
|
|
|
|
|
|
|
// StreamReadTimeout is the read timeout on all incoming Identify family streams.
|
|
|
|
var StreamReadTimeout = 60 * time.Second |
|
|
|
|
|
|
@ -129,6 +131,10 @@ type idService struct { |
|
|
|
|
|
|
|
addPeerHandlerCh chan addPeerHandlerReq |
|
|
|
rmPeerHandlerCh chan rmPeerHandlerReq |
|
|
|
|
|
|
|
// pushSemaphore limits the push/delta concurrency to avoid storms
|
|
|
|
// that clog the transient scope.
|
|
|
|
pushSemaphore chan struct{} |
|
|
|
} |
|
|
|
|
|
|
|
// NewIDService constructs a new *idService and activates it by
|
|
|
@ -154,6 +160,8 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) { |
|
|
|
|
|
|
|
addPeerHandlerCh: make(chan addPeerHandlerReq), |
|
|
|
rmPeerHandlerCh: make(chan rmPeerHandlerReq), |
|
|
|
|
|
|
|
pushSemaphore: make(chan struct{}, maxPushConcurrency), |
|
|
|
} |
|
|
|
s.ctx, s.ctxCancel = context.WithCancel(context.Background()) |
|
|
|
|
|
|
|
|
|
@ -179,6 +179,11 @@ func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network |
|
|
|
return nil, errProtocolNotSupported |
|
|
|
} |
|
|
|
|
|
|
|
ph.ids.pushSemaphore <- struct{}{} |
|
|
|
defer func() { |
|
|
|
<-ph.ids.pushSemaphore |
|
|
|
}() |
|
|
|
|
|
|
|
// negotiate a stream without opening a new connection as we "should" already have a connection.
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) |
|
|
|
defer cancel() |
|
|
|