Browse Source

move go-libp2p-resource-manager here

pull/1677/head
Marten Seemann 2 years ago
parent
commit
115858f161
  1. 450
      p2p/host/resource-manager/README.md
  2. 215
      p2p/host/resource-manager/allowlist.go
  3. 287
      p2p/host/resource-manager/allowlist_test.go
  4. 54
      p2p/host/resource-manager/docs/allowlist.md
  5. 81
      p2p/host/resource-manager/error.go
  6. 147
      p2p/host/resource-manager/extapi.go
  7. 264
      p2p/host/resource-manager/limit.go
  8. 62
      p2p/host/resource-manager/limit_config_test.go
  9. 45
      p2p/host/resource-manager/limit_config_test.json
  10. 548
      p2p/host/resource-manager/limit_defaults.go
  11. 88
      p2p/host/resource-manager/limit_test.go
  12. 168
      p2p/host/resource-manager/metrics.go
  13. 49
      p2p/host/resource-manager/obs/grafana-dashboards/README.md
  14. 1818
      p2p/host/resource-manager/obs/grafana-dashboards/resource-manager.json
  15. 343
      p2p/host/resource-manager/obs/stats.go
  16. 39
      p2p/host/resource-manager/obs/stats_test.go
  17. 874
      p2p/host/resource-manager/rcmgr.go
  18. 1052
      p2p/host/resource-manager/rcmgr_test.go
  19. 771
      p2p/host/resource-manager/scope.go
  20. 1200
      p2p/host/resource-manager/scope_test.go
  21. 11
      p2p/host/resource-manager/sys_not_unix.go
  22. 17
      p2p/host/resource-manager/sys_unix.go
  23. 11
      p2p/host/resource-manager/sys_windows.go
  24. 698
      p2p/host/resource-manager/trace.go

450
p2p/host/resource-manager/README.md

@ -0,0 +1,450 @@
# The libp2p Network Resource Manager
This package contains the canonical implementation of the libp2p
Network Resource Manager interface.
The implementation is based on the concept of Resource Management
Scopes, whereby resource usage is constrained by a DAG of scopes,
accounting for multiple levels of resource constraints.
## Basic Resources
### Memory
Perhaps the most fundamental resource is memory, and in particular
buffers used for network operations. The system must provide an
interface for components to reserve memory that accounts for buffers
(and possibly other live objects), which is scoped within the component.
Before a new buffer is allocated, the component should try a memory
reservation, which can fail if the resource limit is exceeded. It is
then up to the component to react to the error condition, depending on
the situation. For example, a muxer failing to grow a buffer in
response to a window change should simply retain the old buffer and
operate at perhaps degraded performance.
### File Descriptors
File descriptors are an important resource that uses memory (and
computational time) at the system level. They are also a scarce
resource, as typically (unless the user explicitly intervenes) they
are constrained by the system. Exhaustion of file descriptors may
render the application incapable of operating (e.g. because it is
unable to open a file), this is important for libp2p because most
operating systems represent sockets as file descriptors.
### Connections
Connections are a higher level concept endemic to libp2p; in order to
communicate with another peer, a connection must first be
established. Connections are an important resource in libp2p, as they
consume memory, goroutines, and possibly file descriptors.
We distinguish between inbound and outbound connections, as the former
are initiated by remote peers and consume resources in response to
network events and thus need to be tightly controlled in order to
protect the application from overload or attack. Outbound
connections are typically initiated by the application's volition and
don't need to be controlled as tightly. However, outbound connections
still consume resources and may be initiated in response to network
events because of (potentially faulty) application logic, so they
still need to be constrained.
### Streams
Streams are the fundamental object of interaction in libp2p; all
protocol interactions happen through a stream that goes over some
connection. Streams are a fundamental resource in libp2p, as they
consume memory and goroutines at all levels of the stack.
Streams always belong to a peer, specify a protocol and they may
belong to some service in the system. Hence, this suggests that apart
from global limits, we can constrain stream usage at finer
granularity, at the protocol and service level.
Once again, we disinguish between inbound and outbound streams.
Inbound streams are initiated by remote peers and consume resources in
response to network events; controlling inbound stream usage is again
paramount for protecting the system from overload or attack.
Outbound streams are normally initiated by the application or some
service in the system in order to effect some protocol
interaction. However, they can also be initiated in response to
network events because of application or service logic, so we still
need to constrain them.
## Resource Scopes
The Resource Manager is based on the concept of resource
scopes. Resource Scopes account for resource usage that is temporally
delimited for the span of the scope. Resource Scopes conceptually
form a DAG, providing us with a mechanism to enforce multiresolution
resource accounting. Downstream resource usage is aggregated at scopes
higher up the graph.
The following diagram depicts the canonical scope graph:
```
System
+------------> Transient.............+................+
| . .
+------------> Service------------- . ----------+ .
| . | .
+-------------> Protocol----------- . ----------+ .
| . | .
+-------------->* Peer \/ | .
+------------> Connection | .
| \/ \/
+---------------------------> Stream
```
### The System Scope
The system scope is the top level scope that accounts for global
resource usage at all levels of the system. This scope nests and
constrains all other scopes and institutes global hard limits.
### The Transient Scope
The transient scope accounts for resources that are in the process of
full establishment. For instance, a new connection prior to the
handshake does not belong to any peer, but it still needs to be
constrained as this opens an avenue for attacks in transient resource
usage. Similarly, a stream that has not negotiated a protocol yet is
constrained by the transient scope.
The transient scope effectively represents a DMZ (DeMilitarized Zone),
where resource usage can be accounted for connections and streams that
are not fully established.
### Service Scopes
The system is typically organized across services, which may be
ambient and provide basic functionality to the system (e.g. identify,
autonat, relay, etc). Alternatively, services may be explicitly
instantiated by the application, and provide core components of its
functionality (e.g. pubsub, the DHT, etc).
Services are logical groupings of streams that implement protocol flow
and may additionally consume resources such as memory. Services
typically have at least one stream handler, so they are subject to
inbound stream creation and resource usage in response to network
events. As such, the system explicitly models them allowing for
isolated resource usage that can be tuned by the user.
### Protocol Scopes
Protocol Scopes account for resources at the protocol level. They are
an intermediate resource scope which can constrain streams which may
not have a service associated or for resource control within a
service. It also provides an opportunity for system operators to
explicitly restrict specific protocols.
For instance, a service that is not aware of the resource manager and
has not been ported to mark its streams, may still gain limits
transparently without any programmer intervention. Furthermore, the
protocol scope can constrain resource usage for services that
implement multiple protocols for the sake of backwards
compatibility. A tighter limit in some older protocol can protect the
application from resource consumption caused by legacy clients or
potential attacks.
For a concrete example, consider pubsub with the gossipsub router: the
service also understands the floodsub protocol for backwards
compatibility and support for unsophisticated clients that are lagging
in the implementation effort. By specifying a lower limit for the
floodsub protocol, we can can constrain the service level for legacy
clients using an inefficient protocol.
### Peer Scopes
The peer scope accounts for resource usage by an individual peer. This
constrains connections and streams and limits the blast radius of
resource consumption by a single remote peer.
This ensures that no single peer can use more resources than allowed
by the peer limits. Every peer has a default limit, but the programmer
may raise (or lower) limits for specific peers.
### Connection Scopes
The connection scope is delimited to the duration of a connection and
constrains resource usage by a single connection. The scope is a leaf
in the DAG, with a span that begins when a connection is established
and ends when the connection is closed. Its resources are aggregated
to the resource usage of a peer.
### Stream Scopes
The stream scope is delimited to the duration of a stream, and
constrains resource usage by a single stream. This scope is also a
leaf in the DAG, with span that begins when a stream is created and
ends when the stream is closed. Its resources are aggregated to the
resource usage of a peer, and constrained by a service and protocol
scope.
### User Transaction Scopes
User transaction scopes can be created as a child of any extant
resource scope, and provide the programmer with a delimited scope for
easy resource accounting. Transactions may form a tree that is rooted
to some canonical scope in the scope DAG.
For instance, a programmer may create a transaction scope within a
service that accounts for some control flow delimited resource
usage. Similarly, a programmer may create a transaction scope for some
interaction within a stream, e.g. a Request/Response interaction that
uses a buffer.
## Limits
Each resource scope has an associated limit object, which designates
limits for all basic resources. The limit is checked every time some
resource is reserved and provides the system with an opportunity to
constrain resource usage.
There are separate limits for each class of scope, allowing us for
multiresolution and aggregate resource accounting. As such, we have
limits for the system and transient scopes, default and specific
limits for services, protocols, and peers, and limits for connections
and streams.
### Scaling Limits
When building software that is supposed to run on many different kind of machines,
with various memory and CPU configurations, it is desireable to have limits that
scale with the size of the machine.
This is done using the `ScalingLimitConfig`. For every scope, this configuration
struct defines the absolutely bare minimum limits, and an (optional) increase of
these limits, which will be applied on nodes that have sufficient memory.
A `ScalingLimitConfig` can be converted into a `LimitConfig` (which can then be
used to initialize a fixed limiter as shown above) by calling the `Scale` method.
The `Scale` method takes two parameters: the amount of memory and the number of file
descriptors that an application is willing to dedicate to libp2p.
These amounts will differ between use cases: A blockchain node running on a dedicated
server might have a lot of memory, and dedicate 1/4 of that memory to libp2p. On the
other end of the spectrum, a desktop companion application running as a background
task on a consumer laptop will probably dedicate significantly less than 1/4 of its system
memory to libp2p.
For convenience, the `ScalingLimitConfig` also provides an `AutoScale` method,
which determines the amount of memory and file descriptors available on the
system, and dedicates up to 1/8 of the memory and 1/2 of the file descriptors to
libp2p.
For example, one might set:
```go
var scalingLimits = ScalingLimitConfig{
SystemBaseLimit: BaseLimit{
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 512,
StreamsOutbound: 1024,
Streams: 1024,
Memory: 128 << 20,
FD: 256,
},
SystemLimitIncrease: BaseLimitIncrease{
ConnsInbound: 32,
ConnsOutbound: 64,
Conns: 64,
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
Memory: 256 << 20,
FDFraction: 1,
},
}
```
The base limit (`SystemBaseLimit`) here is the minimum configuration that any
node will have, no matter how little memory it possesses. For every GB of memory
passed into the `Scale` method, an increase of (`SystemLimitIncrease`) is added.
For Example, calling `Scale` with 4 GB of memory will result in a limit of 384 for
`Conns` (128 + 4*64).
The `FDFraction` defines how many of the file descriptors are allocated to this
scope. In the example above, when called with a file descriptor value of 1000,
this would result in a limit of 1256 file descriptors for the system scope.
Note that we only showed the configuration for the system scope here, equivalent
configuration options apply to all other scopes as well.
### Default limits
By default the resource manager ships with some reasonable scaling limits and
makes a reasonable guess at how much system memory you want to dedicate to the
go-libp2p process. For the default definitions see `DefaultLimits` and
`ScalingLimitConfig.AutoScale()`.
### Tweaking Defaults
If the defaults seem mostly okay, but you want to adjust one facet you can do
simply copy the default struct object and update the field you want to change. You can
apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `LimitConfig` with
`.Apply`.
Example
```
// An example on how to tweak the default limits
tweakedDefaults := DefaultLimits
tweakedDefaults.ProtocolBaseLimit.Apply(BaseLimit{
Streams: 1024,
StreamsInbound: 512,
StreamsOutbound: 512,
})
```
### How to tune your limits
Once you've set your limits and monitoring (see [Monitoring](#monitoring) below) you can now tune your
limits better. The `blocked_resources` metric will tell you what was blocked
and for what scope. If you see a steady stream of these blocked requests it
means your resource limits are too low for your usage. If you see a rare sudden
spike, this is okay and it means the resource manager protected you from some
anamoly.
### How to disable limits
Sometimes disabling all limits is useful when you want to see how much
resources you use during normal operation. You can then use this information to
define your initial limits. Disable the limits by using `InfiniteLimits`.
### Debug "resource limit exceeded" errors
These errors occur whenever a limit is hit. For example you'll get this error if
you are at your limit for the number of streams you can have, and you try to
open one more.
If you're seeing a lot of "resource limit exceeded" errors take a look at the
`blocked_resources` metric for some information on what was blocked. Also take
a look at the resources used per stream, and per protocol (the Grafana
Dashboard is ideal for this) and check if you're routinely hitting limits or if
these are rare (but noisy) spikes.
When debugging in general, in may help to search your logs for errors that match
the string "resource limit exceeded" to see if you're hitting some limits
routinely.
## Monitoring
Once you have limits set, you'll want to monitor to see if you're running into
your limits often. This could be a sign that you need to raise your limits
(your process is more intensive than you originally thought) or that you need
fix something in your application (surely you don't need over 1000 streams?).
There are OpenCensus metrics that can be hooked up to the resource manager. See
`obs/stats_test.go` for an example on how to enable this, and `DefaultViews` in
`stats.go` for recommended views. These metrics can be hooked up to Prometheus
or any other OpenCensus supported platform.
There is also an included Grafana dashboard to help kickstart your
observability into the resource manager. Find more information about it at
`./obs/grafana-dashboards/README.md`.
## Allowlisting multiaddrs to mitigate eclipse attacks
If you have a set of trusted peers and IP addresses, you can use the resource
manager's [Allowlist](./docs/allowlist.md) to protect yourself from eclipse
attacks. The set of peers in the allowlist will have their own limits in case
the normal limits are reached. This means you will always be able to connect to
these trusted peers even if you've already reached your system limits.
Look at `WithAllowlistedMultiaddrs` and its example in the GoDoc to learn more.
## Examples
Here we consider some concrete examples that can ellucidate the abstract
design as described so far.
### Stream Lifetime
Let's consider a stream and the limits that apply to it.
When the stream scope is first opened, it is created by calling
`ResourceManager.OpenStream`.
Initially the stream is constrained by:
- the system scope, where global hard limits apply.
- the transient scope, where unnegotiated streams live.
- the peer scope, where the limits for the peer at the other end of the stream
apply.
Once the protocol has been negotiated, the protocol is set by calling
`StreamManagementScope.SetProtocol`. The constraint from the
transient scope is removed and the stream is now constrained by the
protocol instead.
More specifically, the following constraints apply:
- the system scope, where global hard limits apply.
- the peer scope, where the limits for the peer at the other end of the stream
apply.
- the protocol scope, where the limits of the specific protocol used apply.
The existence of the protocol limit allows us to implicitly constrain
streams for services that have not been ported to the resource manager
yet. Once the programmer attaches a stream to a service by calling
`StreamScope.SetService`, the stream resources are aggregated and constrained
by the service scope in addition to its protocol scope.
More specifically the following constraints apply:
- the system scope, where global hard limits apply.
- the peer scope, where the limits for the peer at the other end of the stream
apply.
- the service scope, where the limits of the specific service owning the stream apply.
- the protcol scope, where the limits of the specific protocol for the stream apply.
The resource transfer that happens in the `SetProtocol` and `SetService`
gives the opportunity to the resource manager to gate the streams. If
the transfer results in exceeding the scope limits, then a error
indicating "resource limit exceeded" is returned. The wrapped error
includes the name of the scope rejecting the resource acquisition to
aid understanding of applicable limits. Note that the (wrapped) error
implements `net.Error` and is marked as temporary, so that the
programmer can handle by backoff retry.
## Usage
This package provides a limiter implementation that applies fixed limits:
```go
limiter := NewFixedLimiter(limits)
```
The `limits` allows fine-grained control of resource usage on all scopes.
## Implementation Notes
- The package only exports a constructor for the resource manager and
basic types for defining limits. Internals are not exposed.
- Internally, there is a resources object that is embedded in every scope and
implements resource accounting.
- There is a single implementation of a generic resource scope, that
provides all necessary interface methods.
- There are concrete types for all canonical scopes, embedding a
pointer to a generic resource scope.
- Peer and Protocol scopes, which may be created in response to
network events, are periodically garbage collected.
## Design Considerations
- The Resource Manager must account for basic resource usage at all
levels of the stack, from the internals to application components
that use the network facilities of libp2p.
- Basic resources include memory, streams, connections, and file
descriptors. These account for both space and time used by
the stack, as each resource has a direct effect on the system
availability and performance.
- The design must support seamless integration for user applications,
which should reap the benefits of resource management without any
changes. That is, existing applications should be oblivious of the
resource manager and transparently obtain limits which protect it
from resource exhaustion and OOM conditions.
- At the same time, the design must support opt-in resource usage
accounting for applications who want to explicitly utilize the
facilities of the system to inform about and constrain their own
resource usage.
- The design must allow the user to set its own limits, which can be
static (fixed) or dynamic.

215
p2p/host/resource-manager/allowlist.go

@ -0,0 +1,215 @@
package rcmgr
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type Allowlist struct {
mu sync.RWMutex
// a simple structure of lists of networks. There is probably a faster way
// to check if an IP address is in this network than iterating over this
// list, but this is good enough for small numbers of networks (<1_000).
// Analyze the benchmark before trying to optimize this.
// Any peer with these IPs are allowed
allowedNetworks []*net.IPNet
// Only the specified peers can use these IPs
allowedPeerByNetwork map[peer.ID][]*net.IPNet
}
// WithAllowlistedMultiaddrs sets the multiaddrs to be in the allowlist
func WithAllowlistedMultiaddrs(mas []multiaddr.Multiaddr) Option {
return func(rm *resourceManager) error {
for _, ma := range mas {
err := rm.allowlist.Add(ma)
if err != nil {
return err
}
}
return nil
}
}
func newAllowlist() Allowlist {
return Allowlist{
allowedPeerByNetwork: make(map[peer.ID][]*net.IPNet),
}
}
func toIPNet(ma multiaddr.Multiaddr) (*net.IPNet, peer.ID, error) {
var ipString string
var mask string
var allowedPeerStr string
var allowedPeer peer.ID
var isIPV4 bool
multiaddr.ForEach(ma, func(c multiaddr.Component) bool {
if c.Protocol().Code == multiaddr.P_IP4 || c.Protocol().Code == multiaddr.P_IP6 {
isIPV4 = c.Protocol().Code == multiaddr.P_IP4
ipString = c.Value()
}
if c.Protocol().Code == multiaddr.P_IPCIDR {
mask = c.Value()
}
if c.Protocol().Code == multiaddr.P_P2P {
allowedPeerStr = c.Value()
}
return ipString == "" || mask == "" || allowedPeerStr == ""
})
if ipString == "" {
return nil, allowedPeer, errors.New("missing ip address")
}
if allowedPeerStr != "" {
var err error
allowedPeer, err = peer.Decode(allowedPeerStr)
if err != nil {
return nil, allowedPeer, fmt.Errorf("failed to decode allowed peer: %w", err)
}
}
if mask == "" {
ip := net.ParseIP(ipString)
if ip == nil {
return nil, allowedPeer, errors.New("invalid ip address")
}
var mask net.IPMask
if isIPV4 {
mask = net.CIDRMask(32, 32)
} else {
mask = net.CIDRMask(128, 128)
}
net := &net.IPNet{IP: ip, Mask: mask}
return net, allowedPeer, nil
}
_, ipnet, err := net.ParseCIDR(ipString + "/" + mask)
return ipnet, allowedPeer, err
}
// Add takes a multiaddr and adds it to the allowlist. The multiaddr should be
// an ip address of the peer with or without a `/p2p` protocol.
// e.g. /ip4/1.2.3.4/p2p/QmFoo, /ip4/1.2.3.4, and /ip4/1.2.3.0/ipcidr/24 are valid.
// /p2p/QmFoo is not valid.
func (al *Allowlist) Add(ma multiaddr.Multiaddr) error {
ipnet, allowedPeer, err := toIPNet(ma)
if err != nil {
return err
}
al.mu.Lock()
defer al.mu.Unlock()
if allowedPeer != peer.ID("") {
// We have a peerID constraint
if al.allowedPeerByNetwork == nil {
al.allowedPeerByNetwork = make(map[peer.ID][]*net.IPNet)
}
al.allowedPeerByNetwork[allowedPeer] = append(al.allowedPeerByNetwork[allowedPeer], ipnet)
} else {
al.allowedNetworks = append(al.allowedNetworks, ipnet)
}
return nil
}
func (al *Allowlist) Remove(ma multiaddr.Multiaddr) error {
ipnet, allowedPeer, err := toIPNet(ma)
if err != nil {
return err
}
al.mu.Lock()
defer al.mu.Unlock()
ipNetList := al.allowedNetworks
if allowedPeer != "" {
// We have a peerID constraint
ipNetList = al.allowedPeerByNetwork[allowedPeer]
}
if ipNetList == nil {
return nil
}
i := len(ipNetList)
for i > 0 {
i--
if ipNetList[i].IP.Equal(ipnet.IP) && bytes.Equal(ipNetList[i].Mask, ipnet.Mask) {
// swap remove
ipNetList[i] = ipNetList[len(ipNetList)-1]
ipNetList = ipNetList[:len(ipNetList)-1]
// We only remove one thing
break
}
}
if allowedPeer != "" {
al.allowedPeerByNetwork[allowedPeer] = ipNetList
} else {
al.allowedNetworks = ipNetList
}
return nil
}
func (al *Allowlist) Allowed(ma multiaddr.Multiaddr) bool {
ip, err := manet.ToIP(ma)
if err != nil {
return false
}
al.mu.RLock()
defer al.mu.RUnlock()
for _, network := range al.allowedNetworks {
if network.Contains(ip) {
return true
}
}
for _, allowedNetworks := range al.allowedPeerByNetwork {
for _, network := range allowedNetworks {
if network.Contains(ip) {
return true
}
}
}
return false
}
func (al *Allowlist) AllowedPeerAndMultiaddr(peerID peer.ID, ma multiaddr.Multiaddr) bool {
ip, err := manet.ToIP(ma)
if err != nil {
return false
}
al.mu.RLock()
defer al.mu.RUnlock()
for _, network := range al.allowedNetworks {
if network.Contains(ip) {
// We found a match that isn't constrained by a peerID
return true
}
}
if expectedNetworks, ok := al.allowedPeerByNetwork[peerID]; ok {
for _, expectedNetwork := range expectedNetworks {
if expectedNetwork.Contains(ip) {
return true
}
}
}
return false
}

287
p2p/host/resource-manager/allowlist_test.go

@ -0,0 +1,287 @@
package rcmgr
import (
"crypto/rand"
"fmt"
"net"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
"github.com/multiformats/go-multiaddr"
)
func ExampleWithAllowlistedMultiaddrs() {
somePeer, err := test.RandPeerID()
if err != nil {
panic("Failed to generate somePeer")
}
limits := DefaultLimits.AutoScale()
rcmgr, err := NewResourceManager(NewFixedLimiter(limits), WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{
// Any peer connecting from this IP address
multiaddr.StringCast("/ip4/1.2.3.4"),
// Only the specified peer from this address
multiaddr.StringCast("/ip4/2.2.3.4/p2p/" + peer.Encode(somePeer)),
// Only peers from this 1.2.3.0/24 IP address range
multiaddr.StringCast("/ip4/1.2.3.0/ipcidr/24"),
}))
if err != nil {
panic("Failed to start resource manager")
}
// Use rcmgr as before
_ = rcmgr
}
func TestAllowedSimple(t *testing.T) {
allowlist := newAllowlist()
ma := multiaddr.StringCast("/ip4/1.2.3.4/tcp/1234")
err := allowlist.Add(ma)
if err != nil {
t.Fatalf("failed to add ip4: %s", err)
}
if !allowlist.Allowed(ma) {
t.Fatalf("addr should be allowed")
}
}
func TestAllowedWithPeer(t *testing.T) {
type testcase struct {
name string
allowlist []string
endpoint multiaddr.Multiaddr
peer peer.ID
// Is this endpoint allowed? (We don't have peer info yet)
isConnAllowed bool
// Is this peer + endpoint allowed?
isAllowedWithPeer bool
}
peerA := test.RandPeerIDFatal(t)
peerB := test.RandPeerIDFatal(t)
multiaddrA := multiaddr.StringCast("/ip4/1.2.3.4/tcp/1234")
multiaddrB := multiaddr.StringCast("/ip4/2.2.3.4/tcp/1234")
testcases := []testcase{
{
name: "Blocked",
isConnAllowed: false,
isAllowedWithPeer: false,
allowlist: []string{"/ip4/1.2.3.1"},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "Blocked wrong peer",
isConnAllowed: true,
isAllowedWithPeer: false,
allowlist: []string{"/ip4/1.2.3.4" + "/p2p/" + peer.Encode(peerB)},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "allowed on network",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.0/ipcidr/24"},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "Blocked peer not on network",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.0/ipcidr/24"},
endpoint: multiaddrA,
peer: peerA,
}, {
name: "allowed. right network, right peer",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.0/ipcidr/24" + "/p2p/" + peer.Encode(peerA)},
endpoint: multiaddrA,
peer: peerA,
}, {
name: "allowed. right network, no peer",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.0/ipcidr/24"},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "Blocked. right network, wrong peer",
isConnAllowed: true,
isAllowedWithPeer: false,
allowlist: []string{"/ip4/1.2.3.0/ipcidr/24" + "/p2p/" + peer.Encode(peerB)},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "allowed peer any ip",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/0.0.0.0/ipcidr/0"},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "allowed peer multiple ips in allowlist",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.4/p2p/" + peer.Encode(peerA), "/ip4/2.2.3.4/p2p/" + peer.Encode(peerA)},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "allowed peer multiple ips in allowlist",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.4/p2p/" + peer.Encode(peerA), "/ip4/1.2.3.4/p2p/" + peer.Encode(peerA)},
endpoint: multiaddrA,
peer: peerA,
},
{
name: "allowed peer multiple ips in allowlist",
isConnAllowed: true,
isAllowedWithPeer: true,
allowlist: []string{"/ip4/1.2.3.4/p2p/" + peer.Encode(peerA), "/ip4/2.2.3.4/p2p/" + peer.Encode(peerA)},
endpoint: multiaddrB,
peer: peerA,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
allowlist := newAllowlist()
for _, maStr := range tc.allowlist {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
fmt.Printf("failed to parse multiaddr: %s", err)
}
allowlist.Add(ma)
}
if allowlist.Allowed(tc.endpoint) != tc.isConnAllowed {
t.Fatalf("%v: expected %v", !tc.isConnAllowed, tc.isConnAllowed)
}
if allowlist.AllowedPeerAndMultiaddr(tc.peer, tc.endpoint) != tc.isAllowedWithPeer {
t.Fatalf("%v: expected %v", !tc.isAllowedWithPeer, tc.isAllowedWithPeer)
}
})
}
}
func TestRemoved(t *testing.T) {
type testCase struct {
name string
allowedMA string
}
peerA := test.RandPeerIDFatal(t)
maA := multiaddr.StringCast("/ip4/1.2.3.4")
testCases := []testCase{
{name: "ip4", allowedMA: "/ip4/1.2.3.4"},
{name: "ip4 with peer", allowedMA: "/ip4/1.2.3.4/p2p/" + peer.Encode(peerA)},
{name: "ip4 network", allowedMA: "/ip4/0.0.0.0/ipcidr/0"},
{name: "ip4 network with peer", allowedMA: "/ip4/0.0.0.0/ipcidr/0/p2p/" + peer.Encode(peerA)},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allowlist := newAllowlist()
ma := multiaddr.StringCast(tc.allowedMA)
err := allowlist.Add(ma)
if err != nil {
t.Fatalf("failed to add ip4: %s", err)
}
if !allowlist.AllowedPeerAndMultiaddr(peerA, maA) {
t.Fatalf("addr should be allowed")
}
allowlist.Remove((ma))
if allowlist.AllowedPeerAndMultiaddr(peerA, maA) {
t.Fatalf("addr should not be allowed")
}
})
}
}
// BenchmarkAllowlistCheck benchmarks the allowlist with plausible conditions.
func BenchmarkAllowlistCheck(b *testing.B) {
allowlist := newAllowlist()
// How often do we expect a peer to be specified? 1 in N
ratioOfSpecifiedPeers := 10
// How often do we expect an allowlist hit? 1 in N
ratioOfAllowlistHit := 100
// How many multiaddrs in our allowlist?
howManyMultiaddrsInAllowList := 1_000
// How often is the IP addr an IPV6? 1 in N
ratioOfIPV6 := 20
countOfTotalPeersForTest := 100_000
mas := make([]multiaddr.Multiaddr, countOfTotalPeersForTest)
for i := 0; i < countOfTotalPeersForTest; i++ {
ip := make([]byte, 16)
n, err := rand.Reader.Read(ip)
if err != nil || n != 16 {
b.Fatalf("Failed to generate IP address")
}
var ipString string
if i%ratioOfIPV6 == 0 {
// IPv6
ip6 := net.IP(ip)
ipString = "/ip6/" + ip6.String()
} else {
// IPv4
ip4 := net.IPv4(ip[0], ip[1], ip[2], ip[3])
ipString = "/ip4/" + ip4.String()
}
var ma multiaddr.Multiaddr
if i%ratioOfSpecifiedPeers == 0 {
ma = multiaddr.StringCast(ipString + "/p2p/" + peer.Encode(test.RandPeerIDFatal(b)))
} else {
ma = multiaddr.StringCast(ipString)
}
if err != nil {
b.Fatalf("Failed to generate multiaddr: %v", ipString)
}
mas[i] = ma
}
for _, ma := range mas[:howManyMultiaddrsInAllowList] {
err := allowlist.Add(ma)
if err != nil {
b.Fatalf("Failed to add multiaddr")
}
}
masInAllowList := mas[:howManyMultiaddrsInAllowList]
masNotInAllowList := mas[howManyMultiaddrsInAllowList:]
b.ResetTimer()
for n := 0; n < b.N; n++ {
if n%ratioOfAllowlistHit == 0 {
allowlist.Allowed(masInAllowList[n%len(masInAllowList)])
} else {
allowlist.Allowed(masNotInAllowList[n%len(masNotInAllowList)])
}
}
}

54
p2p/host/resource-manager/docs/allowlist.md

@ -0,0 +1,54 @@
# Allowlist
Imagine you have a node that is getting overloaded by possibly malicious
incoming connections. This node won't be able to accept incoming connections
from peers it _knows_ to be good. This node would effectively be _eclipsed_ from
the network since no other nodes will be able to connect to it.
This is the problem that the Allowlist is designed to solve.
## Design Goals
- We should not fail to allocate a resource for an allowlisted peer because the
normal transient and system scopes are at their limits. This is the minimum
bar to avoid eclipse attacks.
- Minimal changes to resource manager and existing code (e.g. go-libp2p).
- The allowlist scope itself is limited to avoid giving an allowlisted peer the
ability to DoS a node.
- PeerIDs can optionally be fed into the allowlist. This will give an extra
step of verification before continuing to allow the peer to open streams.
- A peer may be able to open a connection, but after the handshake, if it's
not an expected peer id we move it to the normal system scope.
- We can have multiple PeerIDs for a given IP addr.
- No extra cost for the happy path when we are still below system and transient
limits.
## Proposed change
Add a change to `ResourceManager.OpenConnection` so that it accepts a multiaddr
parameter of the endpoint the connection is for.
Add a change to `ResourceManager` to initialize it with a set of allowlisted
multiaddrs. This set can be modified at runtime as well for dynamic updating.
For example, an allowlist set could look like:
```
/ip4/1.1.1.1
/ip6/2345:0425:2CA1::0567:5673:23b5
/ip4/192.168.1.1/p2p/qmFoo
/ip4/192.168.1.1/p2p/qmBar
/ip4/1.2.3.0/ipcidr/24
```
When a new connection is opened, the resource manager tries to allocate with the
normal system and transient resource scopes. If that fails, it checks if the
multiaddr matches an item in the set of allowlisted multiaddrs. If so, it
creates the connection resource scope using the allowlisted specific system and
transient resource scopes. If it wasn't an allowlisted multiaddr it fails as
before.
When an allowlisted connection is tied to a peer id and transfered with
`ConnManagementScope.SetPeer`, we check if that peer id matches the expected
value in the allowlist (if it exists). If it does not match, we attempt to
transfer this resource to the normal system and peer scope. If that transfer
fails we close the connection.

81
p2p/host/resource-manager/error.go

@ -0,0 +1,81 @@
package rcmgr
import (
"errors"
"github.com/libp2p/go-libp2p-core/network"
)
type errStreamOrConnLimitExceeded struct {
current, attempted, limit int
err error
}
func (e *errStreamOrConnLimitExceeded) Error() string { return e.err.Error() }
func (e *errStreamOrConnLimitExceeded) Unwrap() error { return e.err }
// edge may be "" if this is not an edge error
func logValuesStreamLimit(scope, edge string, dir network.Direction, stat network.ScopeStat, err error) []interface{} {
logValues := make([]interface{}, 0, 2*8)
logValues = append(logValues, "scope", scope)
if edge != "" {
logValues = append(logValues, "edge", edge)
}
logValues = append(logValues, "direction", dir)
var e *errStreamOrConnLimitExceeded
if errors.As(err, &e) {
logValues = append(logValues,
"current", e.current,
"attempted", e.attempted,
"limit", e.limit,
)
}
return append(logValues, "stat", stat, "error", err)
}
// edge may be "" if this is not an edge error
func logValuesConnLimit(scope, edge string, dir network.Direction, usefd bool, stat network.ScopeStat, err error) []interface{} {
logValues := make([]interface{}, 0, 2*9)
logValues = append(logValues, "scope", scope)
if edge != "" {
logValues = append(logValues, "edge", edge)
}
logValues = append(logValues, "direction", dir, "usefd", usefd)
var e *errStreamOrConnLimitExceeded
if errors.As(err, &e) {
logValues = append(logValues,
"current", e.current,
"attempted", e.attempted,
"limit", e.limit,
)
}
return append(logValues, "stat", stat, "error", err)
}
type errMemoryLimitExceeded struct {
current, attempted, limit int64
priority uint8
err error
}
func (e *errMemoryLimitExceeded) Error() string { return e.err.Error() }
func (e *errMemoryLimitExceeded) Unwrap() error { return e.err }
// edge may be "" if this is not an edge error
func logValuesMemoryLimit(scope, edge string, stat network.ScopeStat, err error) []interface{} {
logValues := make([]interface{}, 0, 2*8)
logValues = append(logValues, "scope", scope)
if edge != "" {
logValues = append(logValues, "edge", edge)
}
var e *errMemoryLimitExceeded
if errors.As(err, &e) {
logValues = append(logValues,
"current", e.current,
"attempted", e.attempted,
"priority", e.priority,
"limit", e.limit,
)
}
return append(logValues, "stat", stat, "error", err)
}

147
p2p/host/resource-manager/extapi.go

@ -0,0 +1,147 @@
package rcmgr
import (
"bytes"
"sort"
"strings"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// ResourceScopeLimiter is a trait interface that allows you to access scope limits.
type ResourceScopeLimiter interface {
Limit() Limit
SetLimit(Limit)
}
var _ ResourceScopeLimiter = (*resourceScope)(nil)
// ResourceManagerStat is a trait that allows you to access resource manager state.
type ResourceManagerState interface {
ListServices() []string
ListProtocols() []protocol.ID
ListPeers() []peer.ID
Stat() ResourceManagerStat
}
type ResourceManagerStat struct {
System network.ScopeStat
Transient network.ScopeStat
Services map[string]network.ScopeStat
Protocols map[protocol.ID]network.ScopeStat
Peers map[peer.ID]network.ScopeStat
}
var _ ResourceManagerState = (*resourceManager)(nil)
func (s *resourceScope) Limit() Limit {
s.Lock()
defer s.Unlock()
return s.rc.limit
}
func (s *resourceScope) SetLimit(limit Limit) {
s.Lock()
defer s.Unlock()
s.rc.limit = limit
}
func (s *protocolScope) SetLimit(limit Limit) {
s.rcmgr.setStickyProtocol(s.proto)
s.resourceScope.SetLimit(limit)
}
func (s *peerScope) SetLimit(limit Limit) {
s.rcmgr.setStickyPeer(s.peer)
s.resourceScope.SetLimit(limit)
}
func (r *resourceManager) ListServices() []string {
r.mx.Lock()
defer r.mx.Unlock()
result := make([]string, 0, len(r.svc))
for svc := range r.svc {
result = append(result, svc)
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(result[i], result[j]) < 0
})
return result
}
func (r *resourceManager) ListProtocols() []protocol.ID {
r.mx.Lock()
defer r.mx.Unlock()
result := make([]protocol.ID, 0, len(r.proto))
for p := range r.proto {
result = append(result, p)
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(string(result[i]), string(result[j])) < 0
})
return result
}
func (r *resourceManager) ListPeers() []peer.ID {
r.mx.Lock()
defer r.mx.Unlock()
result := make([]peer.ID, 0, len(r.peer))
for p := range r.peer {
result = append(result, p)
}
sort.Slice(result, func(i, j int) bool {
return bytes.Compare([]byte(result[i]), []byte(result[j])) < 0
})
return result
}
func (r *resourceManager) Stat() (result ResourceManagerStat) {
r.mx.Lock()
svcs := make([]*serviceScope, 0, len(r.svc))
for _, svc := range r.svc {
svcs = append(svcs, svc)
}
protos := make([]*protocolScope, 0, len(r.proto))
for _, proto := range r.proto {
protos = append(protos, proto)
}
peers := make([]*peerScope, 0, len(r.peer))
for _, peer := range r.peer {
peers = append(peers, peer)
}
r.mx.Unlock()
// Note: there is no global lock, so the system is updating while we are dumping its state...
// as such stats might not exactly add up to the system level; we take the system stat
// last nonetheless so that this is the most up-to-date snapshot
result.Peers = make(map[peer.ID]network.ScopeStat, len(peers))
for _, peer := range peers {
result.Peers[peer.peer] = peer.Stat()
}
result.Protocols = make(map[protocol.ID]network.ScopeStat, len(protos))
for _, proto := range protos {
result.Protocols[proto.proto] = proto.Stat()
}
result.Services = make(map[string]network.ScopeStat, len(svcs))
for _, svc := range svcs {
result.Services[svc.service] = svc.Stat()
}
result.Transient = r.transient.Stat()
result.System = r.system.Stat()
return result
}

264
p2p/host/resource-manager/limit.go

@ -0,0 +1,264 @@
/*
Package rcmgr is the resource manager for go-libp2p. This allows you to track
resources being used throughout your go-libp2p process. As well as making sure
that the process doesn't use more resources than what you define as your
limits. The resource manager only knows about things it is told about, so it's
the responsibility of the user of this library (either go-libp2p or a go-libp2p
user) to make sure they check with the resource manager before actually
allocating the resource.
*/
package rcmgr
import (
"encoding/json"
"io"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// Limit is an object that specifies basic resource limits.
type Limit interface {
// GetMemoryLimit returns the (current) memory limit.
GetMemoryLimit() int64
// GetStreamLimit returns the stream limit, for inbound or outbound streams.
GetStreamLimit(network.Direction) int
// GetStreamTotalLimit returns the total stream limit
GetStreamTotalLimit() int
// GetConnLimit returns the connection limit, for inbound or outbound connections.
GetConnLimit(network.Direction) int
// GetConnTotalLimit returns the total connection limit
GetConnTotalLimit() int
// GetFDLimit returns the file descriptor limit.
GetFDLimit() int
}
// Limiter is the interface for providing limits to the resource manager.
type Limiter interface {
GetSystemLimits() Limit
GetTransientLimits() Limit
GetAllowlistedSystemLimits() Limit
GetAllowlistedTransientLimits() Limit
GetServiceLimits(svc string) Limit
GetServicePeerLimits(svc string) Limit
GetProtocolLimits(proto protocol.ID) Limit
GetProtocolPeerLimits(proto protocol.ID) Limit
GetPeerLimits(p peer.ID) Limit
GetStreamLimits(p peer.ID) Limit
GetConnLimits() Limit
}
// NewDefaultLimiterFromJSON creates a new limiter by parsing a json configuration,
// using the default limits for fallback.
func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) {
return NewLimiterFromJSON(in, DefaultLimits.AutoScale())
}
// NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) {
cfg, err := readLimiterConfigFromJSON(in, defaults)
if err != nil {
return nil, err
}
return &fixedLimiter{cfg}, nil
}
func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) {
var cfg LimitConfig
if err := json.NewDecoder(in).Decode(&cfg); err != nil {
return LimitConfig{}, err
}
cfg.Apply(defaults)
return cfg, nil
}
// fixedLimiter is a limiter with fixed limits.
type fixedLimiter struct {
LimitConfig
}
var _ Limiter = (*fixedLimiter)(nil)
func NewFixedLimiter(conf LimitConfig) Limiter {
log.Debugw("initializing new limiter with config", "limits", conf)
return &fixedLimiter{LimitConfig: conf}
}
// BaseLimit is a mixin type for basic resource limits.
type BaseLimit struct {
Streams int
StreamsInbound int
StreamsOutbound int
Conns int
ConnsInbound int
ConnsOutbound int
FD int
Memory int64
}
// Apply overwrites all zero-valued limits with the values of l2
// Must not use a pointer receiver.
func (l *BaseLimit) Apply(l2 BaseLimit) {
if l.Streams == 0 {
l.Streams = l2.Streams
}
if l.StreamsInbound == 0 {
l.StreamsInbound = l2.StreamsInbound
}
if l.StreamsOutbound == 0 {
l.StreamsOutbound = l2.StreamsOutbound
}
if l.Conns == 0 {
l.Conns = l2.Conns
}
if l.ConnsInbound == 0 {
l.ConnsInbound = l2.ConnsInbound
}
if l.ConnsOutbound == 0 {
l.ConnsOutbound = l2.ConnsOutbound
}
if l.Memory == 0 {
l.Memory = l2.Memory
}
if l.FD == 0 {
l.FD = l2.FD
}
}
// BaseLimitIncrease is the increase per GB of system memory.
type BaseLimitIncrease struct {
Streams int
StreamsInbound int
StreamsOutbound int
Conns int
ConnsInbound int
ConnsOutbound int
Memory int64
FDFraction float64
}
// Apply overwrites all zero-valued limits with the values of l2
// Must not use a pointer receiver.
func (l *BaseLimitIncrease) Apply(l2 BaseLimitIncrease) {
if l.Streams == 0 {
l.Streams = l2.Streams
}
if l.StreamsInbound == 0 {
l.StreamsInbound = l2.StreamsInbound
}
if l.StreamsOutbound == 0 {
l.StreamsOutbound = l2.StreamsOutbound
}
if l.Conns == 0 {
l.Conns = l2.Conns
}
if l.ConnsInbound == 0 {
l.ConnsInbound = l2.ConnsInbound
}
if l.ConnsOutbound == 0 {
l.ConnsOutbound = l2.ConnsOutbound
}
if l.Memory == 0 {
l.Memory = l2.Memory
}
if l.FDFraction == 0 {
l.FDFraction = l2.FDFraction
}
}
func (l *BaseLimit) GetStreamLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.StreamsInbound
} else {
return l.StreamsOutbound
}
}
func (l *BaseLimit) GetStreamTotalLimit() int {
return l.Streams
}
func (l *BaseLimit) GetConnLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.ConnsInbound
} else {
return l.ConnsOutbound
}
}
func (l *BaseLimit) GetConnTotalLimit() int {
return l.Conns
}
func (l *BaseLimit) GetFDLimit() int {
return l.FD
}
func (l *BaseLimit) GetMemoryLimit() int64 {
return l.Memory
}
func (l *fixedLimiter) GetSystemLimits() Limit {
return &l.System
}
func (l *fixedLimiter) GetTransientLimits() Limit {
return &l.Transient
}
func (l *fixedLimiter) GetAllowlistedSystemLimits() Limit {
return &l.AllowlistedSystem
}
func (l *fixedLimiter) GetAllowlistedTransientLimits() Limit {
return &l.AllowlistedTransient
}
func (l *fixedLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.Service[svc]
if !ok {
return &l.ServiceDefault
}
return &sl
}
func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeer[svc]
if !ok {
return &l.ServicePeerDefault
}
return &pl
}
func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.Protocol[proto]
if !ok {
return &l.ProtocolDefault
}
return &pl
}
func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeer[proto]
if !ok {
return &l.ProtocolPeerDefault
}
return &pl
}
func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.Peer[p]
if !ok {
return &l.PeerDefault
}
return &pl
}
func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit {
return &l.Stream
}
func (l *fixedLimiter) GetConnLimits() Limit {
return &l.Conn
}

62
p2p/host/resource-manager/limit_config_test.go

@ -0,0 +1,62 @@
package rcmgr
import (
"bytes"
"encoding/json"
"os"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)
func withMemoryLimit(l BaseLimit, m int64) BaseLimit {
l2 := l
l2.Memory = m
return l2
}
func TestLimitConfigParser(t *testing.T) {
in, err := os.Open("limit_config_test.json")
require.NoError(t, err)
defer in.Close()
DefaultLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{})
DefaultLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{})
defaults := DefaultLimits.AutoScale()
cfg, err := readLimiterConfigFromJSON(in, defaults)
require.NoError(t, err)
require.Equal(t, int64(65536), cfg.System.Memory)
require.Equal(t, defaults.System.Streams, cfg.System.Streams)
require.Equal(t, defaults.System.StreamsInbound, cfg.System.StreamsInbound)
require.Equal(t, defaults.System.StreamsOutbound, cfg.System.StreamsOutbound)
require.Equal(t, 16, cfg.System.Conns)
require.Equal(t, 8, cfg.System.ConnsInbound)
require.Equal(t, 16, cfg.System.ConnsOutbound)
require.Equal(t, 16, cfg.System.FD)
require.Equal(t, defaults.Transient, cfg.Transient)
require.Equal(t, int64(8765), cfg.ServiceDefault.Memory)
require.Contains(t, cfg.Service, "A")
require.Equal(t, withMemoryLimit(cfg.ServiceDefault, 8192), cfg.Service["A"])
require.Contains(t, cfg.Service, "B")
require.Equal(t, cfg.ServiceDefault, cfg.Service["B"])
require.Contains(t, cfg.Service, "C")
require.Equal(t, defaults.Service["C"], cfg.Service["C"])
require.Equal(t, int64(4096), cfg.PeerDefault.Memory)
peerID, err := peer.Decode("12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS")
require.NoError(t, err)
require.Contains(t, cfg.Peer, peerID)
require.Equal(t, int64(4097), cfg.Peer[peerID].Memory)
// Roundtrip
jsonBytes, err := json.Marshal(&cfg)
require.NoError(t, err)
cfgAfterRoundTrip, err := readLimiterConfigFromJSON(bytes.NewReader(jsonBytes), defaults)
require.NoError(t, err)
require.Equal(t, cfg, cfgAfterRoundTrip)
}

45
p2p/host/resource-manager/limit_config_test.json

@ -0,0 +1,45 @@
{
"System": {
"Memory": 65536,
"Conns": 16,
"ConnsInbound": 8,
"ConnsOutbound": 16,
"FD": 16
},
"ServiceDefault": {
"Memory": 8765
},
"Service": {
"A": {
"Memory": 8192
},
"B": {}
},
"ServicePeerDefault": {
"Memory": 2048
},
"ServicePeer": {
"A": {
"Memory": 4096
}
},
"ProtocolDefault": {
"Memory": 2048
},
"ProtocolPeerDefault": {
"Memory": 1024
},
"Protocol": {
"/A": {
"Memory": 8192
}
},
"PeerDefault": {
"Memory": 4096
},
"Peer": {
"12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": {
"Memory": 4097
}
}
}

548
p2p/host/resource-manager/limit_defaults.go

@ -0,0 +1,548 @@
package rcmgr
import (
"encoding/json"
"math"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pbnjay/memory"
)
type baseLimitConfig struct {
BaseLimit BaseLimit
BaseLimitIncrease BaseLimitIncrease
}
// ScalingLimitConfig is a struct for configuring default limits.
// {}BaseLimit is the limits that Apply for a minimal node (128 MB of memory for libp2p) and 256 file descriptors.
// {}LimitIncrease is the additional limit granted for every additional 1 GB of RAM.
type ScalingLimitConfig struct {
SystemBaseLimit BaseLimit
SystemLimitIncrease BaseLimitIncrease
TransientBaseLimit BaseLimit
TransientLimitIncrease BaseLimitIncrease
AllowlistedSystemBaseLimit BaseLimit
AllowlistedSystemLimitIncrease BaseLimitIncrease
AllowlistedTransientBaseLimit BaseLimit
AllowlistedTransientLimitIncrease BaseLimitIncrease
ServiceBaseLimit BaseLimit
ServiceLimitIncrease BaseLimitIncrease
ServiceLimits map[string]baseLimitConfig // use AddServiceLimit to modify
ServicePeerBaseLimit BaseLimit
ServicePeerLimitIncrease BaseLimitIncrease
ServicePeerLimits map[string]baseLimitConfig // use AddServicePeerLimit to modify
ProtocolBaseLimit BaseLimit
ProtocolLimitIncrease BaseLimitIncrease
ProtocolLimits map[protocol.ID]baseLimitConfig // use AddProtocolLimit to modify
ProtocolPeerBaseLimit BaseLimit
ProtocolPeerLimitIncrease BaseLimitIncrease
ProtocolPeerLimits map[protocol.ID]baseLimitConfig // use AddProtocolPeerLimit to modify
PeerBaseLimit BaseLimit
PeerLimitIncrease BaseLimitIncrease
PeerLimits map[peer.ID]baseLimitConfig // use AddPeerLimit to modify
ConnBaseLimit BaseLimit
ConnLimitIncrease BaseLimitIncrease
StreamBaseLimit BaseLimit
StreamLimitIncrease BaseLimitIncrease
}
func (cfg *ScalingLimitConfig) AddServiceLimit(svc string, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ServiceLimits == nil {
cfg.ServiceLimits = make(map[string]baseLimitConfig)
}
cfg.ServiceLimits[svc] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddProtocolLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ProtocolLimits == nil {
cfg.ProtocolLimits = make(map[protocol.ID]baseLimitConfig)
}
cfg.ProtocolLimits[proto] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddPeerLimit(p peer.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.PeerLimits == nil {
cfg.PeerLimits = make(map[peer.ID]baseLimitConfig)
}
cfg.PeerLimits[p] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddServicePeerLimit(svc string, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ServicePeerLimits == nil {
cfg.ServicePeerLimits = make(map[string]baseLimitConfig)
}
cfg.ServicePeerLimits[svc] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddProtocolPeerLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ProtocolPeerLimits == nil {
cfg.ProtocolPeerLimits = make(map[protocol.ID]baseLimitConfig)
}
cfg.ProtocolPeerLimits[proto] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
type LimitConfig struct {
System BaseLimit `json:",omitempty"`
Transient BaseLimit `json:",omitempty"`
// Limits that are applied to resources with an allowlisted multiaddr.
// These will only be used if the normal System & Transient limits are
// reached.
AllowlistedSystem BaseLimit `json:",omitempty"`
AllowlistedTransient BaseLimit `json:",omitempty"`
ServiceDefault BaseLimit `json:",omitempty"`
Service map[string]BaseLimit `json:",omitempty"`
ServicePeerDefault BaseLimit `json:",omitempty"`
ServicePeer map[string]BaseLimit `json:",omitempty"`
ProtocolDefault BaseLimit `json:",omitempty"`
Protocol map[protocol.ID]BaseLimit `json:",omitempty"`
ProtocolPeerDefault BaseLimit `json:",omitempty"`
ProtocolPeer map[protocol.ID]BaseLimit `json:",omitempty"`
PeerDefault BaseLimit `json:",omitempty"`
Peer map[peer.ID]BaseLimit `json:",omitempty"`
Conn BaseLimit `json:",omitempty"`
Stream BaseLimit `json:",omitempty"`
}
func (cfg *LimitConfig) MarshalJSON() ([]byte, error) {
// we want to marshal the encoded peer id
encodedPeerMap := make(map[string]BaseLimit, len(cfg.Peer))
for p, v := range cfg.Peer {
encodedPeerMap[peer.Encode(p)] = v
}
type Alias LimitConfig
return json.Marshal(&struct {
*Alias
Peer map[string]BaseLimit `json:",omitempty"`
}{
Alias: (*Alias)(cfg),
Peer: encodedPeerMap,
})
}
func (cfg *LimitConfig) Apply(c LimitConfig) {
cfg.System.Apply(c.System)
cfg.Transient.Apply(c.Transient)
cfg.AllowlistedSystem.Apply(c.AllowlistedSystem)
cfg.AllowlistedTransient.Apply(c.AllowlistedTransient)
cfg.ServiceDefault.Apply(c.ServiceDefault)
cfg.ProtocolDefault.Apply(c.ProtocolDefault)
cfg.ProtocolPeerDefault.Apply(c.ProtocolPeerDefault)
cfg.PeerDefault.Apply(c.PeerDefault)
cfg.Conn.Apply(c.Conn)
cfg.Stream.Apply(c.Stream)
// TODO: the following could be solved a lot nicer, if only we could use generics
for s, l := range cfg.Service {
r := cfg.ServiceDefault
if l2, ok := c.Service[s]; ok {
r = l2
}
l.Apply(r)
cfg.Service[s] = l
}
if c.Service != nil && cfg.Service == nil {
cfg.Service = make(map[string]BaseLimit)
}
for s, l := range c.Service {
if _, ok := cfg.Service[s]; !ok {
cfg.Service[s] = l
}
}
for s, l := range cfg.ServicePeer {
r := cfg.ServicePeerDefault
if l2, ok := c.ServicePeer[s]; ok {
r = l2
}
l.Apply(r)
cfg.ServicePeer[s] = l
}
if c.ServicePeer != nil && cfg.ServicePeer == nil {
cfg.ServicePeer = make(map[string]BaseLimit)
}
for s, l := range c.ServicePeer {
if _, ok := cfg.ServicePeer[s]; !ok {
cfg.ServicePeer[s] = l
}
}
for s, l := range cfg.Protocol {
r := cfg.ProtocolDefault
if l2, ok := c.Protocol[s]; ok {
r = l2
}
l.Apply(r)
cfg.Protocol[s] = l
}
if c.Protocol != nil && cfg.Protocol == nil {
cfg.Protocol = make(map[protocol.ID]BaseLimit)
}
for s, l := range c.Protocol {
if _, ok := cfg.Protocol[s]; !ok {
cfg.Protocol[s] = l
}
}
for s, l := range cfg.ProtocolPeer {
r := cfg.ProtocolPeerDefault
if l2, ok := c.ProtocolPeer[s]; ok {
r = l2
}
l.Apply(r)
cfg.ProtocolPeer[s] = l
}
if c.ProtocolPeer != nil && cfg.ProtocolPeer == nil {
cfg.ProtocolPeer = make(map[protocol.ID]BaseLimit)
}
for s, l := range c.ProtocolPeer {
if _, ok := cfg.ProtocolPeer[s]; !ok {
cfg.ProtocolPeer[s] = l
}
}
for s, l := range cfg.Peer {
r := cfg.PeerDefault
if l2, ok := c.Peer[s]; ok {
r = l2
}
l.Apply(r)
cfg.Peer[s] = l
}
if c.Peer != nil && cfg.Peer == nil {
cfg.Peer = make(map[peer.ID]BaseLimit)
}
for s, l := range c.Peer {
if _, ok := cfg.Peer[s]; !ok {
cfg.Peer[s] = l
}
}
}
// Scale scales up a limit configuration.
// memory is the amount of memory that the stack is allowed to consume,
// for a full it's recommended to use 1/8 of the installed system memory.
// If memory is smaller than 128 MB, the base configuration will be used.
//
func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) LimitConfig {
var scaleFactor int
if memory > 128<<20 {
scaleFactor = int((memory - 128<<20) >> 20)
}
lc := LimitConfig{
System: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, scaleFactor, numFD),
Transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, scaleFactor, numFD),
AllowlistedSystem: scale(cfg.AllowlistedSystemBaseLimit, cfg.AllowlistedSystemLimitIncrease, scaleFactor, numFD),
AllowlistedTransient: scale(cfg.AllowlistedTransientBaseLimit, cfg.AllowlistedTransientLimitIncrease, scaleFactor, numFD),
ServiceDefault: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, scaleFactor, numFD),
ServicePeerDefault: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, scaleFactor, numFD),
ProtocolDefault: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, scaleFactor, numFD),
ProtocolPeerDefault: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, scaleFactor, numFD),
PeerDefault: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, scaleFactor, numFD),
Conn: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD),
Stream: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD),
}
if cfg.ServiceLimits != nil {
lc.Service = make(map[string]BaseLimit)
for svc, l := range cfg.ServiceLimits {
lc.Service[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ProtocolLimits != nil {
lc.Protocol = make(map[protocol.ID]BaseLimit)
for proto, l := range cfg.ProtocolLimits {
lc.Protocol[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.PeerLimits != nil {
lc.Peer = make(map[peer.ID]BaseLimit)
for p, l := range cfg.PeerLimits {
lc.Peer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ServicePeerLimits != nil {
lc.ServicePeer = make(map[string]BaseLimit)
for svc, l := range cfg.ServicePeerLimits {
lc.ServicePeer[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ProtocolPeerLimits != nil {
lc.ProtocolPeer = make(map[protocol.ID]BaseLimit)
for p, l := range cfg.ProtocolPeerLimits {
lc.ProtocolPeer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
return lc
}
func (cfg *ScalingLimitConfig) AutoScale() LimitConfig {
return cfg.Scale(
int64(memory.TotalMemory())/8,
getNumFDs()/2,
)
}
// factor is the number of MBs above the minimum (128 MB)
func scale(base BaseLimit, inc BaseLimitIncrease, factor int, numFD int) BaseLimit {
l := BaseLimit{
StreamsInbound: base.StreamsInbound + (inc.StreamsInbound*factor)>>10,
StreamsOutbound: base.StreamsOutbound + (inc.StreamsOutbound*factor)>>10,
Streams: base.Streams + (inc.Streams*factor)>>10,
ConnsInbound: base.ConnsInbound + (inc.ConnsInbound*factor)>>10,
ConnsOutbound: base.ConnsOutbound + (inc.ConnsOutbound*factor)>>10,
Conns: base.Conns + (inc.Conns*factor)>>10,
Memory: base.Memory + (inc.Memory*int64(factor))>>10,
FD: base.FD,
}
if inc.FDFraction > 0 && numFD > 0 {
l.FD = int(inc.FDFraction * float64(numFD))
}
return l
}
// DefaultLimits are the limits used by the default limiter constructors.
var DefaultLimits = ScalingLimitConfig{
SystemBaseLimit: BaseLimit{
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 128 << 20,
FD: 256,
},
SystemLimitIncrease: BaseLimitIncrease{
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 1 << 30,
FDFraction: 1,
},
TransientBaseLimit: BaseLimit{
ConnsInbound: 32,
ConnsOutbound: 64,
Conns: 64,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 32 << 20,
FD: 64,
},
TransientLimitIncrease: BaseLimitIncrease{
ConnsInbound: 16,
ConnsOutbound: 32,
Conns: 32,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 128 << 20,
FDFraction: 0.25,
},
// Setting the allowlisted limits to be the same as the normal limits. The
// allowlist only activates when you reach your normal system/transient
// limits. So it's okay if these limits err on the side of being too big,
// since most of the time you won't even use any of these. Tune these down
// if you want to manage your resources against an allowlisted endpoint.
AllowlistedSystemBaseLimit: BaseLimit{
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 128 << 20,
FD: 256,
},
AllowlistedSystemLimitIncrease: BaseLimitIncrease{
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 1 << 30,
FDFraction: 1,
},
AllowlistedTransientBaseLimit: BaseLimit{
ConnsInbound: 32,
ConnsOutbound: 64,
Conns: 64,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 32 << 20,
FD: 64,
},
AllowlistedTransientLimitIncrease: BaseLimitIncrease{
ConnsInbound: 16,
ConnsOutbound: 32,
Conns: 32,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 128 << 20,
FDFraction: 0.25,
},
ServiceBaseLimit: BaseLimit{
StreamsInbound: 1024,
StreamsOutbound: 4096,
Streams: 4096,
Memory: 64 << 20,
},
ServiceLimitIncrease: BaseLimitIncrease{
StreamsInbound: 512,
StreamsOutbound: 2048,
Streams: 2048,
Memory: 128 << 20,
},
ServicePeerBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 16 << 20,
},
ServicePeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 4,
StreamsOutbound: 8,
Streams: 8,
Memory: 4 << 20,
},
ProtocolBaseLimit: BaseLimit{
StreamsInbound: 512,
StreamsOutbound: 2048,
Streams: 2048,
Memory: 64 << 20,
},
ProtocolLimitIncrease: BaseLimitIncrease{
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
Memory: 164 << 20,
},
ProtocolPeerBaseLimit: BaseLimit{
StreamsInbound: 64,
StreamsOutbound: 128,
Streams: 256,
Memory: 16 << 20,
},
ProtocolPeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 4,
StreamsOutbound: 8,
Streams: 16,
Memory: 4,
},
PeerBaseLimit: BaseLimit{
ConnsInbound: 4,
ConnsOutbound: 8,
Conns: 8,
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
Memory: 64 << 20,
FD: 4,
},
PeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 128 << 20,
FDFraction: 1.0 / 64,
},
ConnBaseLimit: BaseLimit{
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
Memory: 1 << 20,
},
StreamBaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
Memory: 16 << 20,
},
}
var infiniteBaseLimit = BaseLimit{
Streams: math.MaxInt,
StreamsInbound: math.MaxInt,
StreamsOutbound: math.MaxInt,
Conns: math.MaxInt,
ConnsInbound: math.MaxInt,
ConnsOutbound: math.MaxInt,
FD: math.MaxInt,
Memory: math.MaxInt64,
}
// InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything.
// Keep in mind that the operating system limits the number of file descriptors that an application can use.
var InfiniteLimits = LimitConfig{
System: infiniteBaseLimit,
Transient: infiniteBaseLimit,
AllowlistedSystem: infiniteBaseLimit,
AllowlistedTransient: infiniteBaseLimit,
ServiceDefault: infiniteBaseLimit,
ServicePeerDefault: infiniteBaseLimit,
ProtocolDefault: infiniteBaseLimit,
ProtocolPeerDefault: infiniteBaseLimit,
PeerDefault: infiniteBaseLimit,
Conn: infiniteBaseLimit,
Stream: infiniteBaseLimit,
}

88
p2p/host/resource-manager/limit_test.go

@ -0,0 +1,88 @@
package rcmgr
import (
"runtime"
"testing"
"github.com/stretchr/testify/require"
)
func TestFileDescriptorCounting(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("can't read file descriptors on Windows")
}
n := getNumFDs()
require.NotZero(t, n)
require.Less(t, n, int(1e7))
}
func TestScaling(t *testing.T) {
base := BaseLimit{
Streams: 100,
StreamsInbound: 200,
StreamsOutbound: 400,
Conns: 10,
ConnsInbound: 20,
ConnsOutbound: 40,
FD: 1,
Memory: 1 << 20,
}
t.Run("no scaling if no increase is defined", func(t *testing.T) {
cfg := ScalingLimitConfig{ServiceBaseLimit: base}
scaled := cfg.Scale(8<<30, 100)
require.Equal(t, base, scaled.ServiceDefault)
})
t.Run("scaling", func(t *testing.T) {
cfg := ScalingLimitConfig{
TransientBaseLimit: base,
TransientLimitIncrease: BaseLimitIncrease{
Streams: 1,
StreamsInbound: 2,
StreamsOutbound: 3,
Conns: 4,
ConnsInbound: 5,
ConnsOutbound: 6,
Memory: 7,
FDFraction: 0.5,
},
}
scaled := cfg.Scale(128<<20+4<<30, 1000)
require.Equal(t, 500, scaled.Transient.FD)
require.Equal(t, base.Streams+4, scaled.Transient.Streams)
require.Equal(t, base.StreamsInbound+4*2, scaled.Transient.StreamsInbound)
require.Equal(t, base.StreamsOutbound+4*3, scaled.Transient.StreamsOutbound)
require.Equal(t, base.Conns+4*4, scaled.Transient.Conns)
require.Equal(t, base.ConnsInbound+4*5, scaled.Transient.ConnsInbound)
require.Equal(t, base.ConnsOutbound+4*6, scaled.Transient.ConnsOutbound)
require.Equal(t, base.Memory+4*7, scaled.Transient.Memory)
})
t.Run("scaling limits in maps", func(t *testing.T) {
cfg := ScalingLimitConfig{
ServiceLimits: map[string]baseLimitConfig{
"A": {
BaseLimit: BaseLimit{Streams: 10, Memory: 100, FD: 9},
},
"B": {
BaseLimit: BaseLimit{Streams: 20, Memory: 200, FD: 10},
BaseLimitIncrease: BaseLimitIncrease{Streams: 2, Memory: 3, FDFraction: 0.4},
},
},
}
scaled := cfg.Scale(128<<20+4<<30, 1000)
require.Len(t, scaled.Service, 2)
require.Contains(t, scaled.Service, "A")
require.Equal(t, 10, scaled.Service["A"].Streams)
require.Equal(t, int64(100), scaled.Service["A"].Memory)
require.Equal(t, 9, scaled.Service["A"].FD)
require.Contains(t, scaled.Service, "B")
require.Equal(t, 20+4*2, scaled.Service["B"].Streams)
require.Equal(t, int64(200+4*3), scaled.Service["B"].Memory)
require.Equal(t, 400, scaled.Service["B"].FD)
})
}

168
p2p/host/resource-manager/metrics.go

@ -0,0 +1,168 @@
package rcmgr
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// MetricsReporter is an interface for collecting metrics from resource manager actions
type MetricsReporter interface {
// AllowConn is invoked when opening a connection is allowed
AllowConn(dir network.Direction, usefd bool)
// BlockConn is invoked when opening a connection is blocked
BlockConn(dir network.Direction, usefd bool)
// AllowStream is invoked when opening a stream is allowed
AllowStream(p peer.ID, dir network.Direction)
// BlockStream is invoked when opening a stream is blocked
BlockStream(p peer.ID, dir network.Direction)
// AllowPeer is invoked when attaching ac onnection to a peer is allowed
AllowPeer(p peer.ID)
// BlockPeer is invoked when attaching ac onnection to a peer is blocked
BlockPeer(p peer.ID)
// AllowProtocol is invoked when setting the protocol for a stream is allowed
AllowProtocol(proto protocol.ID)
// BlockProtocol is invoked when setting the protocol for a stream is blocked
BlockProtocol(proto protocol.ID)
// BlockProtocolPeer is invoked when setting the protocol for a stream is blocked at the per protocol peer scope
BlockProtocolPeer(proto protocol.ID, p peer.ID)
// AllowService is invoked when setting the protocol for a stream is allowed
AllowService(svc string)
// BlockService is invoked when setting the protocol for a stream is blocked
BlockService(svc string)
// BlockServicePeer is invoked when setting the service for a stream is blocked at the per service peer scope
BlockServicePeer(svc string, p peer.ID)
// AllowMemory is invoked when a memory reservation is allowed
AllowMemory(size int)
// BlockMemory is invoked when a memory reservation is blocked
BlockMemory(size int)
}
type metrics struct {
reporter MetricsReporter
}
// WithMetrics is a resource manager option to enable metrics collection
func WithMetrics(reporter MetricsReporter) Option {
return func(r *resourceManager) error {
r.metrics = &metrics{reporter: reporter}
return nil
}
}
func (m *metrics) AllowConn(dir network.Direction, usefd bool) {
if m == nil {
return
}
m.reporter.AllowConn(dir, usefd)
}
func (m *metrics) BlockConn(dir network.Direction, usefd bool) {
if m == nil {
return
}
m.reporter.BlockConn(dir, usefd)
}
func (m *metrics) AllowStream(p peer.ID, dir network.Direction) {
if m == nil {
return
}
m.reporter.AllowStream(p, dir)
}
func (m *metrics) BlockStream(p peer.ID, dir network.Direction) {
if m == nil {
return
}
m.reporter.BlockStream(p, dir)
}
func (m *metrics) AllowPeer(p peer.ID) {
if m == nil {
return
}
m.reporter.AllowPeer(p)
}
func (m *metrics) BlockPeer(p peer.ID) {
if m == nil {
return
}
m.reporter.BlockPeer(p)
}
func (m *metrics) AllowProtocol(proto protocol.ID) {
if m == nil {
return
}
m.reporter.AllowProtocol(proto)
}
func (m *metrics) BlockProtocol(proto protocol.ID) {
if m == nil {
return
}
m.reporter.BlockProtocol(proto)
}
func (m *metrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
if m == nil {
return
}
m.reporter.BlockProtocolPeer(proto, p)
}
func (m *metrics) AllowService(svc string) {
if m == nil {
return
}
m.reporter.AllowService(svc)
}
func (m *metrics) BlockService(svc string) {
if m == nil {
return
}
m.reporter.BlockService(svc)
}
func (m *metrics) BlockServicePeer(svc string, p peer.ID) {
if m == nil {
return
}
m.reporter.BlockServicePeer(svc, p)
}
func (m *metrics) AllowMemory(size int) {
if m == nil {
return
}
m.reporter.AllowMemory(size)
}
func (m *metrics) BlockMemory(size int) {
if m == nil {
return
}
m.reporter.BlockMemory(size)
}

49
p2p/host/resource-manager/obs/grafana-dashboards/README.md

@ -0,0 +1,49 @@
# Ready to go Grafana Dashboard
Here are some prebuilt dashboards that you can add to your Grafana instance. To
import follow the Grafana docs [here](https://grafana.com/docs/grafana/latest/dashboards/export-import/#import-dashboard)
## Setup
To make sure you're emitting the correct metrics you'll have to hook up the
Opencensus views that `stats.go` exports. For Prometheus this looks like:
``` go
import (
// ...
ocprom "contrib.go.opencensus.io/exporter/prometheus"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p-resource-manager/obs"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats/view"
)
func SetupResourceManager() (network.ResourceManager, error) {
// Hook up the trace reporter metrics. This will expose all opencensus
// stats via the default prometheus registry. See https://opencensus.io/exporters/supported-exporters/go/prometheus/ for other options.
view.Register(rcmgrObs.DefaultViews...)
ocprom.NewExporter(ocprom.Options{
Registry: prometheus.DefaultRegisterer.(*prometheus.Registry),
})
str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {
return nil, err
}
return rcmgr.NewResourceManager(limiter, rcmgr.WithTraceReporter(str))
}
```
It should be fairly similar for other exporters. See the [OpenCensus
docs](https://opencensus.io/exporters/supported-exporters/go/) to see how to
export to another exporter.
## Updating Dashboard json
Use the share functionality on an existing dashboard, and make sure to toggle
"Export for sharing externally". See the [Grafana
Docs](https://grafana.com/docs/grafana/latest/dashboards/export-import/#exporting-a-dashboard)
for more details.

1818
p2p/host/resource-manager/obs/grafana-dashboards/resource-manager.json

File diff suppressed because it is too large

343
p2p/host/resource-manager/obs/stats.go

@ -0,0 +1,343 @@
package obs
import (
"context"
"strings"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
var (
metricNamespace = "rcmgr/"
conns = stats.Int64(metricNamespace+"connections", "Number of Connections", stats.UnitDimensionless)
peerConns = stats.Int64(metricNamespace+"peer/connections", "Number of connections this peer has", stats.UnitDimensionless)
peerConnsNegative = stats.Int64(metricNamespace+"peer/connections_negative", "Number of connections this peer had. This is used to get the current connection number per peer histogram by subtracting this from the peer/connections histogram", stats.UnitDimensionless)
streams = stats.Int64(metricNamespace+"streams", "Number of Streams", stats.UnitDimensionless)
peerStreams = stats.Int64(metricNamespace+"peer/streams", "Number of streams this peer has", stats.UnitDimensionless)
peerStreamsNegative = stats.Int64(metricNamespace+"peer/streams_negative", "Number of streams this peer had. This is used to get the current streams number per peer histogram by subtracting this from the peer/streams histogram", stats.UnitDimensionless)
memory = stats.Int64(metricNamespace+"memory", "Amount of memory reserved as reported to the Resource Manager", stats.UnitDimensionless)
peerMemory = stats.Int64(metricNamespace+"peer/memory", "Amount of memory currently reseved for peer", stats.UnitDimensionless)
peerMemoryNegative = stats.Int64(metricNamespace+"peer/memory_negative", "Amount of memory previously reseved for peer. This is used to get the current memory per peer histogram by subtracting this from the peer/memory histogram", stats.UnitDimensionless)
connMemory = stats.Int64(metricNamespace+"conn/memory", "Amount of memory currently reseved for the connection", stats.UnitDimensionless)
connMemoryNegative = stats.Int64(metricNamespace+"conn/memory_negative", "Amount of memory previously reseved for the connection. This is used to get the current memory per connection histogram by subtracting this from the conn/memory histogram", stats.UnitDimensionless)
fds = stats.Int64(metricNamespace+"fds", "Number of fds as reported to the Resource Manager", stats.UnitDimensionless)
blockedResources = stats.Int64(metricNamespace+"blocked_resources", "Number of resource requests blocked", stats.UnitDimensionless)
)
var (
directionTag, _ = tag.NewKey("dir")
scopeTag, _ = tag.NewKey("scope")
serviceTag, _ = tag.NewKey("service")
protocolTag, _ = tag.NewKey("protocol")
resourceTag, _ = tag.NewKey("resource")
)
var (
ConnView = &view.View{Measure: conns, Aggregation: view.Sum(), TagKeys: []tag.Key{directionTag, scopeTag}}
oneTenThenExpDistribution = []float64{
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1, 16.1, 32.1, 64.1, 128.1, 256.1,
}
PeerConnsView = &view.View{
Measure: peerConns,
Aggregation: view.Distribution(oneTenThenExpDistribution...),
TagKeys: []tag.Key{directionTag},
}
PeerConnsNegativeView = &view.View{
Measure: peerConnsNegative,
Aggregation: view.Distribution(oneTenThenExpDistribution...),
TagKeys: []tag.Key{directionTag},
}
StreamView = &view.View{Measure: streams, Aggregation: view.Sum(), TagKeys: []tag.Key{directionTag, scopeTag, serviceTag, protocolTag}}
PeerStreamsView = &view.View{Measure: peerStreams, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{directionTag}}
PeerStreamNegativeView = &view.View{Measure: peerStreamsNegative, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{directionTag}}
MemoryView = &view.View{Measure: memory, Aggregation: view.Sum(), TagKeys: []tag.Key{scopeTag, serviceTag, protocolTag}}
memDistribution = []float64{
1 << 10, // 1KB
4 << 10, // 4KB
32 << 10, // 32KB
1 << 20, // 1MB
32 << 20, // 32MB
256 << 20, // 256MB
512 << 20, // 512MB
1 << 30, // 1GB
2 << 30, // 2GB
4 << 30, // 4GB
}
PeerMemoryView = &view.View{
Measure: peerMemory,
Aggregation: view.Distribution(memDistribution...),
}
PeerMemoryNegativeView = &view.View{
Measure: peerMemoryNegative,
Aggregation: view.Distribution(memDistribution...),
}
// Not setup yet. Memory isn't attached to a given connection.
ConnMemoryView = &view.View{
Measure: connMemory,
Aggregation: view.Distribution(memDistribution...),
}
ConnMemoryNegativeView = &view.View{
Measure: connMemoryNegative,
Aggregation: view.Distribution(memDistribution...),
}
FDsView = &view.View{Measure: fds, Aggregation: view.Sum(), TagKeys: []tag.Key{scopeTag}}
BlockedResourcesView = &view.View{
Measure: blockedResources,
Aggregation: view.Sum(),
TagKeys: []tag.Key{scopeTag, resourceTag},
}
)
var DefaultViews []*view.View = []*view.View{
ConnView,
PeerConnsView,
PeerConnsNegativeView,
FDsView,
StreamView,
PeerStreamsView,
PeerStreamNegativeView,
MemoryView,
PeerMemoryView,
PeerMemoryNegativeView,
BlockedResourcesView,
}
// StatsTraceReporter reports stats on the resource manager using its traces.
type StatsTraceReporter struct{}
func NewStatsTraceReporter() (StatsTraceReporter, error) {
// TODO tell prometheus the system limits
return StatsTraceReporter{}, nil
}
func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
ctx := context.Background()
switch evt.Type {
case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
// Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
// streams did the peer use to have. When looking at the data you
// take the difference from the two.
oldStreamsOut := int64(evt.StreamsOut - evt.DeltaOut)
peerStreamsOut := int64(evt.StreamsOut)
if oldStreamsOut != peerStreamsOut {
if oldStreamsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreamsNegative.M(oldStreamsOut))
}
if peerStreamsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreams.M(peerStreamsOut))
}
}
oldStreamsIn := int64(evt.StreamsIn - evt.DeltaIn)
peerStreamsIn := int64(evt.StreamsIn)
if oldStreamsIn != peerStreamsIn {
if oldStreamsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreamsNegative.M(oldStreamsIn))
}
if peerStreamsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreams.M(peerStreamsIn))
}
}
} else {
var tags []tag.Mutator
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" {
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto))
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
if evt.DeltaOut != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...),
streams.M(int64(evt.DeltaOut)),
)
}
if evt.DeltaIn != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...),
streams.M(int64(evt.DeltaIn)),
)
}
}
case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
// Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
// conns did the peer use to have. When looking at the data you
// take the difference from the two.
oldConnsOut := int64(evt.ConnsOut - evt.DeltaOut)
connsOut := int64(evt.ConnsOut)
if oldConnsOut != connsOut {
if oldConnsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConnsNegative.M(oldConnsOut))
}
if connsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConns.M(connsOut))
}
}
oldConnsIn := int64(evt.ConnsIn - evt.DeltaIn)
connsIn := int64(evt.ConnsIn)
if oldConnsIn != connsIn {
if oldConnsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConnsNegative.M(oldConnsIn))
}
if connsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConns.M(connsIn))
}
}
} else {
var tags []tag.Mutator
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
} else if rcmgr.IsConnScope(evt.Name) {
// Not measuring this. I don't think it's useful.
break
} else {
// This could be a span
break
}
if evt.DeltaOut != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...),
conns.M(int64(evt.DeltaOut)),
)
}
if evt.DeltaIn != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...),
conns.M(int64(evt.DeltaIn)),
)
}
// Represents the delta in fds
if evt.Delta != 0 {
stats.RecordWithTags(
ctx,
tags,
fds.M(int64(evt.Delta)),
)
}
}
case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
stats.Record(ctx, peerMemoryNegative.M(oldMem))
}
if evt.Memory != 0 {
stats.Record(ctx, peerMemory.M(evt.Memory))
}
}
} else if rcmgr.IsConnScope(evt.Name) {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
stats.Record(ctx, connMemoryNegative.M(oldMem))
}
if evt.Memory != 0 {
stats.Record(ctx, connMemory.M(evt.Memory))
}
}
} else {
var tags []tag.Mutator
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" {
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto))
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
if evt.Delta != 0 {
stats.RecordWithTags(ctx, tags, memory.M(int64(evt.Delta)))
}
}
case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt:
var resource string
if evt.Type == rcmgr.TraceBlockAddConnEvt {
resource = "connection"
} else if evt.Type == rcmgr.TraceBlockAddStreamEvt {
resource = "stream"
} else {
resource = "memory"
}
// Only the top scopeName. We don't want to get the peerid here.
scopeName := strings.SplitN(evt.Name, ":", 2)[0]
// Drop the connection or stream id
scopeName = strings.SplitN(scopeName, "-", 2)[0]
// If something else gets added here, make sure to update the size hint
// below when we make `tagsWithDir`.
tags := []tag.Mutator{tag.Upsert(scopeTag, scopeName), tag.Upsert(resourceTag, resource)}
if evt.DeltaIn != 0 {
tagsWithDir := make([]tag.Mutator, 0, 3)
tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "inbound"))
tagsWithDir = append(tagsWithDir, tags...)
stats.RecordWithTags(ctx, tagsWithDir[0:], blockedResources.M(int64(1)))
}
if evt.DeltaOut != 0 {
tagsWithDir := make([]tag.Mutator, 0, 3)
tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "outbound"))
tagsWithDir = append(tagsWithDir, tags...)
stats.RecordWithTags(ctx, tagsWithDir, blockedResources.M(int64(1)))
}
if evt.Delta != 0 {
stats.RecordWithTags(ctx, tags, blockedResources.M(1))
}
}
}

39
p2p/host/resource-manager/obs/stats_test.go

@ -0,0 +1,39 @@
package obs_test
import (
"testing"
"time"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"github.com/libp2p/go-libp2p-resource-manager/obs"
"go.opencensus.io/stats/view"
)
func TestTraceReporterStartAndClose(t *testing.T) {
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{}))
if err != nil {
t.Fatal(err)
}
defer rcmgr.Close()
}
func TestConsumeEvent(t *testing.T) {
evt := rcmgr.TraceEvt{
Type: rcmgr.TraceBlockAddStreamEvt,
Name: "conn-1",
DeltaOut: 1,
Time: time.Now().Format(time.RFC3339Nano),
}
err := view.Register(obs.DefaultViews...)
if err != nil {
t.Fatal(err)
}
str, err := obs.NewStatsTraceReporter()
if err != nil {
t.Fatal(err)
}
str.ConsumeEvent(evt)
}

874
p2p/host/resource-manager/rcmgr.go

@ -0,0 +1,874 @@
package rcmgr
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multiaddr"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("rcmgr")
type resourceManager struct {
limits Limiter
trace *trace
metrics *metrics
allowlist *Allowlist
system *systemScope
transient *transientScope
allowlistedSystem *systemScope
allowlistedTransient *transientScope
cancelCtx context.Context
cancel func()
wg sync.WaitGroup
mx sync.Mutex
svc map[string]*serviceScope
proto map[protocol.ID]*protocolScope
peer map[peer.ID]*peerScope
stickyProto map[protocol.ID]struct{}
stickyPeer map[peer.ID]struct{}
connId, streamId int64
}
var _ network.ResourceManager = (*resourceManager)(nil)
type systemScope struct {
*resourceScope
}
var _ network.ResourceScope = (*systemScope)(nil)
type transientScope struct {
*resourceScope
system *systemScope
}
var _ network.ResourceScope = (*transientScope)(nil)
type serviceScope struct {
*resourceScope
service string
rcmgr *resourceManager
peers map[peer.ID]*resourceScope
}
var _ network.ServiceScope = (*serviceScope)(nil)
type protocolScope struct {
*resourceScope
proto protocol.ID
rcmgr *resourceManager
peers map[peer.ID]*resourceScope
}
var _ network.ProtocolScope = (*protocolScope)(nil)
type peerScope struct {
*resourceScope
peer peer.ID
rcmgr *resourceManager
}
var _ network.PeerScope = (*peerScope)(nil)
type connectionScope struct {
*resourceScope
dir network.Direction
usefd bool
isAllowlisted bool
rcmgr *resourceManager
peer *peerScope
endpoint multiaddr.Multiaddr
}
var _ network.ConnScope = (*connectionScope)(nil)
var _ network.ConnManagementScope = (*connectionScope)(nil)
type streamScope struct {
*resourceScope
dir network.Direction
rcmgr *resourceManager
peer *peerScope
svc *serviceScope
proto *protocolScope
peerProtoScope *resourceScope
peerSvcScope *resourceScope
}
var _ network.StreamScope = (*streamScope)(nil)
var _ network.StreamManagementScope = (*streamScope)(nil)
type Option func(*resourceManager) error
func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager, error) {
allowlist := newAllowlist()
r := &resourceManager{
limits: limits,
allowlist: &allowlist,
svc: make(map[string]*serviceScope),
proto: make(map[protocol.ID]*protocolScope),
peer: make(map[peer.ID]*peerScope),
}
for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}
if err := r.trace.Start(limits); err != nil {
return nil, err
}
r.system = newSystemScope(limits.GetSystemLimits(), r, "system")
r.system.IncRef()
r.transient = newTransientScope(limits.GetTransientLimits(), r, "transient", r.system.resourceScope)
r.transient.IncRef()
r.allowlistedSystem = newSystemScope(limits.GetAllowlistedSystemLimits(), r, "allowlistedSystem")
r.allowlistedSystem.IncRef()
r.allowlistedTransient = newTransientScope(limits.GetAllowlistedTransientLimits(), r, "allowlistedTransient", r.allowlistedSystem.resourceScope)
r.allowlistedTransient.IncRef()
r.cancelCtx, r.cancel = context.WithCancel(context.Background())
r.wg.Add(1)
go r.background()
return r, nil
}
func (r *resourceManager) GetAllowlist() *Allowlist {
return r.allowlist
}
// GetAllowlist tries to get the allowlist from the given resourcemanager
// interface by checking to see if its concrete type is a resourceManager.
// Returns nil if it fails to get the allowlist.
func GetAllowlist(rcmgr network.ResourceManager) *Allowlist {
r, ok := rcmgr.(*resourceManager)
if !ok {
return nil
}
return r.allowlist
}
func (r *resourceManager) ViewSystem(f func(network.ResourceScope) error) error {
return f(r.system)
}
func (r *resourceManager) ViewTransient(f func(network.ResourceScope) error) error {
return f(r.transient)
}
func (r *resourceManager) ViewService(srv string, f func(network.ServiceScope) error) error {
s := r.getServiceScope(srv)
defer s.DecRef()
return f(s)
}
func (r *resourceManager) ViewProtocol(proto protocol.ID, f func(network.ProtocolScope) error) error {
s := r.getProtocolScope(proto)
defer s.DecRef()
return f(s)
}
func (r *resourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error {
s := r.getPeerScope(p)
defer s.DecRef()
return f(s)
}
func (r *resourceManager) getServiceScope(svc string) *serviceScope {
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.svc[svc]
if !ok {
s = newServiceScope(svc, r.limits.GetServiceLimits(svc), r)
r.svc[svc] = s
}
s.IncRef()
return s
}
func (r *resourceManager) getProtocolScope(proto protocol.ID) *protocolScope {
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.proto[proto]
if !ok {
s = newProtocolScope(proto, r.limits.GetProtocolLimits(proto), r)
r.proto[proto] = s
}
s.IncRef()
return s
}
func (r *resourceManager) setStickyProtocol(proto protocol.ID) {
r.mx.Lock()
defer r.mx.Unlock()
if r.stickyProto == nil {
r.stickyProto = make(map[protocol.ID]struct{})
}
r.stickyProto[proto] = struct{}{}
}
func (r *resourceManager) getPeerScope(p peer.ID) *peerScope {
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.peer[p]
if !ok {
s = newPeerScope(p, r.limits.GetPeerLimits(p), r)
r.peer[p] = s
}
s.IncRef()
return s
}
func (r *resourceManager) setStickyPeer(p peer.ID) {
r.mx.Lock()
defer r.mx.Unlock()
if r.stickyPeer == nil {
r.stickyPeer = make(map[peer.ID]struct{})
}
r.stickyPeer[p] = struct{}{}
}
func (r *resourceManager) nextConnId() int64 {
r.mx.Lock()
defer r.mx.Unlock()
r.connId++
return r.connId
}
func (r *resourceManager) nextStreamId() int64 {
r.mx.Lock()
defer r.mx.Unlock()
r.streamId++
return r.streamId
}
func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr) (network.ConnManagementScope, error) {
var conn *connectionScope
conn = newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint)
err := conn.AddConn(dir, usefd)
if err != nil {
// Try again if this is an allowlisted connection
// Failed to open connection, let's see if this was allowlisted and try again
allowed := r.allowlist.Allowed(endpoint)
if allowed {
conn.Done()
conn = newAllowListedConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint)
err = conn.AddConn(dir, usefd)
}
}
if err != nil {
conn.Done()
r.metrics.BlockConn(dir, usefd)
return nil, err
}
r.metrics.AllowConn(dir, usefd)
return conn, nil
}
func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
peer := r.getPeerScope(p)
stream := newStreamScope(dir, r.limits.GetStreamLimits(p), peer, r)
peer.DecRef() // we have the reference in edges
err := stream.AddStream(dir)
if err != nil {
stream.Done()
r.metrics.BlockStream(p, dir)
return nil, err
}
r.metrics.AllowStream(p, dir)
return stream, nil
}
func (r *resourceManager) Close() error {
r.cancel()
r.wg.Wait()
r.trace.Close()
return nil
}
func (r *resourceManager) background() {
defer r.wg.Done()
// periodically garbage collects unused peer and protocol scopes
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.gc()
case <-r.cancelCtx.Done():
return
}
}
}
func (r *resourceManager) gc() {
r.mx.Lock()
defer r.mx.Unlock()
for proto, s := range r.proto {
_, sticky := r.stickyProto[proto]
if sticky {
continue
}
if s.IsUnused() {
s.Done()
delete(r.proto, proto)
}
}
var deadPeers []peer.ID
for p, s := range r.peer {
_, sticky := r.stickyPeer[p]
if sticky {
continue
}
if s.IsUnused() {
s.Done()
delete(r.peer, p)
deadPeers = append(deadPeers, p)
}
}
for _, s := range r.svc {
s.Lock()
for _, p := range deadPeers {
ps, ok := s.peers[p]
if ok {
ps.Done()
delete(s.peers, p)
}
}
s.Unlock()
}
for _, s := range r.proto {
s.Lock()
for _, p := range deadPeers {
ps, ok := s.peers[p]
if ok {
ps.Done()
delete(s.peers, p)
}
}
s.Unlock()
}
}
func newSystemScope(limit Limit, rcmgr *resourceManager, name string) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, name, rcmgr.trace, rcmgr.metrics),
}
}
func newTransientScope(limit Limit, rcmgr *resourceManager, name string, systemScope *resourceScope) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{systemScope},
name, rcmgr.trace, rcmgr.metrics),
system: rcmgr.system,
}
}
func newServiceScope(service string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("service:%s", service), rcmgr.trace, rcmgr.metrics),
service: service,
rcmgr: rcmgr,
}
}
func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("protocol:%s", proto), rcmgr.trace, rcmgr.metrics),
proto: proto,
rcmgr: rcmgr,
}
}
func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
peerScopeName(p), rcmgr.trace, rcmgr.metrics),
peer: p,
rcmgr: rcmgr,
}
}
func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager, endpoint multiaddr.Multiaddr) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
connScopeName(rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
endpoint: endpoint,
}
}
func newAllowListedConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager, endpoint multiaddr.Multiaddr) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.allowlistedTransient.resourceScope, rcmgr.allowlistedSystem.resourceScope},
connScopeName(rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
endpoint: endpoint,
isAllowlisted: true,
}
}
func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope {
return &streamScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
streamScopeName(rcmgr.nextStreamId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
}
}
func IsSystemScope(name string) bool {
return name == "system"
}
func IsTransientScope(name string) bool {
return name == "transient"
}
func streamScopeName(streamId int64) string {
return fmt.Sprintf("stream-%d", streamId)
}
func IsStreamScope(name string) bool {
return strings.HasPrefix(name, "stream-") && !IsSpan(name)
}
func connScopeName(streamId int64) string {
return fmt.Sprintf("conn-%d", streamId)
}
func IsConnScope(name string) bool {
return strings.HasPrefix(name, "conn-") && !IsSpan(name)
}
func peerScopeName(p peer.ID) string {
return fmt.Sprintf("peer:%s", p)
}
// ParsePeerScopeName returns "" if name is not a peerScopeName
func ParsePeerScopeName(name string) peer.ID {
if !strings.HasPrefix(name, "peer:") || IsSpan(name) {
return ""
}
parts := strings.SplitN(name, "peer:", 2)
if len(parts) != 2 {
return ""
}
p, err := peer.Decode(parts[1])
if err != nil {
return ""
}
return p
}
// ParseServiceScopeName returns the service name if name is a serviceScopeName.
// Otherwise returns ""
func ParseServiceScopeName(name string) string {
if strings.HasPrefix(name, "service:") && !IsSpan(name) {
if strings.Contains(name, "peer:") {
// This is a service peer scope
return ""
}
parts := strings.SplitN(name, ":", 2)
if len(parts) != 2 {
return ""
}
return parts[1]
}
return ""
}
// ParseProtocolScopeName returns the service name if name is a serviceScopeName.
// Otherwise returns ""
func ParseProtocolScopeName(name string) string {
if strings.HasPrefix(name, "protocol:") && !IsSpan(name) {
if strings.Contains(name, "peer:") {
// This is a protocol peer scope
return ""
}
parts := strings.SplitN(name, ":", 2)
if len(parts) != 2 {
return ("")
}
return parts[1]
}
return ""
}
func (s *serviceScope) Name() string {
return s.service
}
func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.Lock()
defer s.Unlock()
ps, ok := s.peers[p]
if ok {
ps.IncRef()
return ps
}
l := s.rcmgr.limits.GetServicePeerLimits(s.service)
if s.peers == nil {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps
ps.IncRef()
return ps
}
func (s *protocolScope) Protocol() protocol.ID {
return s.proto
}
func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.Lock()
defer s.Unlock()
ps, ok := s.peers[p]
if ok {
ps.IncRef()
return ps
}
l := s.rcmgr.limits.GetProtocolPeerLimits(s.proto)
if s.peers == nil {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps
ps.IncRef()
return ps
}
func (s *peerScope) Peer() peer.ID {
return s.peer
}
func (s *connectionScope) PeerScope() network.PeerScope {
s.Lock()
defer s.Unlock()
// avoid nil is not nil footgun; go....
if s.peer == nil {
return nil
}
return s.peer
}
// transferAllowedToStandard transfers this connection scope from being part of
// the allowlist set of scopes to being part of the standard set of scopes.
// Happens when we first allowlisted this connection due to its IP, but later
// discovered that the peer id not what we expected.
func (s *connectionScope) transferAllowedToStandard() (err error) {
systemScope := s.rcmgr.system.resourceScope
transientScope := s.rcmgr.transient.resourceScope
stat := s.resourceScope.rc.stat()
for _, scope := range s.edges {
scope.ReleaseForChild(stat)
scope.DecRef() // removed from edges
}
s.edges = nil
if err := systemScope.ReserveForChild(stat); err != nil {
return err
}
systemScope.IncRef()
// Undo this if we fail later
defer func() {
if err != nil {
systemScope.ReleaseForChild(stat)
systemScope.DecRef()
}
}()
if err := transientScope.ReserveForChild(stat); err != nil {
return err
}
transientScope.IncRef()
// Update edges
s.edges = []*resourceScope{
systemScope,
transientScope,
}
return nil
}
func (s *connectionScope) SetPeer(p peer.ID) error {
s.Lock()
defer s.Unlock()
if s.peer != nil {
return fmt.Errorf("connection scope already attached to a peer")
}
system := s.rcmgr.system
transient := s.rcmgr.transient
if s.isAllowlisted {
system = s.rcmgr.allowlistedSystem
transient = s.rcmgr.allowlistedTransient
if !s.rcmgr.allowlist.AllowedPeerAndMultiaddr(p, s.endpoint) {
s.isAllowlisted = false
// This is not an allowed peer + multiaddr combination. We need to
// transfer this connection to the general scope. We'll do this first by
// transferring the connection to the system and transient scopes, then
// continue on with this function. The idea is that a connection
// shouldn't get the benefit of evading the transient scope because it
// was _almost_ an allowlisted connection.
if err := s.transferAllowedToStandard(); err != nil {
// Failed to transfer this connection to the standard scopes
return err
}
// set the system and transient scopes to the non-allowlisted ones
system = s.rcmgr.system
transient = s.rcmgr.transient
}
}
s.peer = s.rcmgr.getPeerScope(p)
// juggle resources from transient scope to peer scope
stat := s.resourceScope.rc.stat()
if err := s.peer.ReserveForChild(stat); err != nil {
s.peer.DecRef()
s.peer = nil
s.rcmgr.metrics.BlockPeer(p)
return err
}
transient.ReleaseForChild(stat)
transient.DecRef() // removed from edges
// update edges
edges := []*resourceScope{
s.peer.resourceScope,
system.resourceScope,
}
s.resourceScope.edges = edges
s.rcmgr.metrics.AllowPeer(p)
return nil
}
func (s *streamScope) ProtocolScope() network.ProtocolScope {
s.Lock()
defer s.Unlock()
// avoid nil is not nil footgun; go....
if s.proto == nil {
return nil
}
return s.proto
}
func (s *streamScope) SetProtocol(proto protocol.ID) error {
s.Lock()
defer s.Unlock()
if s.proto != nil {
return fmt.Errorf("stream scope already attached to a protocol")
}
s.proto = s.rcmgr.getProtocolScope(proto)
// juggle resources from transient scope to protocol scope
stat := s.resourceScope.rc.stat()
if err := s.proto.ReserveForChild(stat); err != nil {
s.proto.DecRef()
s.proto = nil
s.rcmgr.metrics.BlockProtocol(proto)
return err
}
s.peerProtoScope = s.proto.getPeerScope(s.peer.peer)
if err := s.peerProtoScope.ReserveForChild(stat); err != nil {
s.proto.ReleaseForChild(stat)
s.proto.DecRef()
s.proto = nil
s.peerProtoScope.DecRef()
s.peerProtoScope = nil
s.rcmgr.metrics.BlockProtocolPeer(proto, s.peer.peer)
return err
}
s.rcmgr.transient.ReleaseForChild(stat)
s.rcmgr.transient.DecRef() // removed from edges
// update edges
edges := []*resourceScope{
s.peer.resourceScope,
s.peerProtoScope,
s.proto.resourceScope,
s.rcmgr.system.resourceScope,
}
s.resourceScope.edges = edges
s.rcmgr.metrics.AllowProtocol(proto)
return nil
}
func (s *streamScope) ServiceScope() network.ServiceScope {
s.Lock()
defer s.Unlock()
// avoid nil is not nil footgun; go....
if s.svc == nil {
return nil
}
return s.svc
}
func (s *streamScope) SetService(svc string) error {
s.Lock()
defer s.Unlock()
if s.svc != nil {
return fmt.Errorf("stream scope already attached to a service")
}
if s.proto == nil {
return fmt.Errorf("stream scope not attached to a protocol")
}
s.svc = s.rcmgr.getServiceScope(svc)
// reserve resources in service
stat := s.resourceScope.rc.stat()
if err := s.svc.ReserveForChild(stat); err != nil {
s.svc.DecRef()
s.svc = nil
s.rcmgr.metrics.BlockService(svc)
return err
}
// get the per peer service scope constraint, if any
s.peerSvcScope = s.svc.getPeerScope(s.peer.peer)
if err := s.peerSvcScope.ReserveForChild(stat); err != nil {
s.svc.ReleaseForChild(stat)
s.svc.DecRef()
s.svc = nil
s.peerSvcScope.DecRef()
s.peerSvcScope = nil
s.rcmgr.metrics.BlockServicePeer(svc, s.peer.peer)
return err
}
// update edges
edges := []*resourceScope{
s.peer.resourceScope,
s.peerProtoScope,
s.peerSvcScope,
s.proto.resourceScope,
s.svc.resourceScope,
s.rcmgr.system.resourceScope,
}
s.resourceScope.edges = edges
s.rcmgr.metrics.AllowService(svc)
return nil
}
func (s *streamScope) PeerScope() network.PeerScope {
s.Lock()
defer s.Unlock()
// avoid nil is not nil footgun; go....
if s.peer == nil {
return nil
}
return s.peer
}

1052
p2p/host/resource-manager/rcmgr_test.go

File diff suppressed because it is too large

771
p2p/host/resource-manager/scope.go

@ -0,0 +1,771 @@
package rcmgr
import (
"fmt"
"strings"
"sync"
"github.com/libp2p/go-libp2p-core/network"
)
// resources tracks the current state of resource consumption
type resources struct {
limit Limit
nconnsIn, nconnsOut int
nstreamsIn, nstreamsOut int
nfd int
memory int64
}
// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
// A resourceScope can be a span scope, where it has a specific owner; span scopes create a tree rooted
// at the owner (which can be a DAG scope) and can outlive their parents -- this is important because
// span scopes are the main *user* interface for memory management, and the user may call
// Done in a span scope after the system has closed the root of the span tree in some background
// goroutine.
// If we didn't make this distinction we would have a double release problem in that case.
type resourceScope struct {
sync.Mutex
done bool
refCnt int
spanID int
rc resources
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set
name string // for debugging purposes
trace *trace // debug tracing
metrics *metrics // metrics collection
}
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace, metrics *metrics) *resourceScope {
for _, e := range edges {
e.IncRef()
}
r := &resourceScope{
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
metrics: metrics,
}
r.trace.CreateScope(name, limit)
return r
}
func newResourceScopeSpan(owner *resourceScope, id int) *resourceScope {
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span-%d", owner.name, id),
trace: owner.trace,
metrics: owner.metrics,
}
r.trace.CreateScope(r.name, r.rc.limit)
return r
}
// IsSpan will return true if this name was created by newResourceScopeSpan
func IsSpan(name string) bool {
return strings.Contains(name, ".span-")
}
// Resources implementation
func (rc *resources) checkMemory(rsvp int64, prio uint8) error {
// overflow check; this also has the side effect that we cannot reserve negative memory.
newmem := rc.memory + rsvp
limit := rc.limit.GetMemoryLimit()
threshold := (1 + int64(prio)) * limit / 256
if newmem > threshold {
return &errMemoryLimitExceeded{
current: rc.memory,
attempted: rsvp,
limit: limit,
priority: prio,
err: network.ErrResourceLimitExceeded,
}
}
return nil
}
func (rc *resources) reserveMemory(size int64, prio uint8) error {
if err := rc.checkMemory(size, prio); err != nil {
return err
}
rc.memory += size
return nil
}
func (rc *resources) releaseMemory(size int64) {
rc.memory -= size
// sanity check for bugs upstream
if rc.memory < 0 {
log.Warn("BUG: too much memory released")
rc.memory = 0
}
}
func (rc *resources) addStream(dir network.Direction) error {
if dir == network.DirInbound {
return rc.addStreams(1, 0)
}
return rc.addStreams(0, 1)
}
func (rc *resources) addStreams(incount, outcount int) error {
if incount > 0 {
limit := rc.limit.GetStreamLimit(network.DirInbound)
if rc.nstreamsIn+incount > limit {
return &errStreamOrConnLimitExceeded{
current: rc.nstreamsIn,
attempted: incount,
limit: limit,
err: fmt.Errorf("cannot reserve inbound stream: %w", network.ErrResourceLimitExceeded),
}
}
}
if outcount > 0 {
limit := rc.limit.GetStreamLimit(network.DirOutbound)
if rc.nstreamsOut+outcount > limit {
return &errStreamOrConnLimitExceeded{
current: rc.nstreamsOut,
attempted: outcount,
limit: limit,
err: fmt.Errorf("cannot reserve outbound stream: %w", network.ErrResourceLimitExceeded),
}
}
}
if limit := rc.limit.GetStreamTotalLimit(); rc.nstreamsIn+incount+rc.nstreamsOut+outcount > limit {
return &errStreamOrConnLimitExceeded{
current: rc.nstreamsIn + rc.nstreamsOut,
attempted: incount + outcount,
limit: limit,
err: fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded),
}
}
rc.nstreamsIn += incount
rc.nstreamsOut += outcount
return nil
}
func (rc *resources) removeStream(dir network.Direction) {
if dir == network.DirInbound {
rc.removeStreams(1, 0)
} else {
rc.removeStreams(0, 1)
}
}
func (rc *resources) removeStreams(incount, outcount int) {
rc.nstreamsIn -= incount
rc.nstreamsOut -= outcount
if rc.nstreamsIn < 0 {
log.Warn("BUG: too many inbound streams released")
rc.nstreamsIn = 0
}
if rc.nstreamsOut < 0 {
log.Warn("BUG: too many outbound streams released")
rc.nstreamsOut = 0
}
}
func (rc *resources) addConn(dir network.Direction, usefd bool) error {
var fd int
if usefd {
fd = 1
}
if dir == network.DirInbound {
return rc.addConns(1, 0, fd)
}
return rc.addConns(0, 1, fd)
}
func (rc *resources) addConns(incount, outcount, fdcount int) error {
if incount > 0 {
limit := rc.limit.GetConnLimit(network.DirInbound)
if rc.nconnsIn+incount > limit {
return &errStreamOrConnLimitExceeded{
current: rc.nconnsIn,
attempted: incount,
limit: limit,
err: fmt.Errorf("cannot reserve inbound connection: %w", network.ErrResourceLimitExceeded),
}
}
}
if outcount > 0 {
limit := rc.limit.GetConnLimit(network.DirOutbound)
if rc.nconnsOut+outcount > limit {
return &errStreamOrConnLimitExceeded{
current: rc.nconnsOut,
attempted: outcount,
limit: limit,
err: fmt.Errorf("cannot reserve outbound connection: %w", network.ErrResourceLimitExceeded),
}
}
}
if connLimit := rc.limit.GetConnTotalLimit(); rc.nconnsIn+incount+rc.nconnsOut+outcount > connLimit {
return &errStreamOrConnLimitExceeded{
current: rc.nconnsIn + rc.nconnsOut,
attempted: incount + outcount,
limit: connLimit,
err: fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded),
}
}
if fdcount > 0 {
limit := rc.limit.GetFDLimit()
if rc.nfd+fdcount > limit {
return &errStreamOrConnLimitExceeded{
current: rc.nfd,
attempted: fdcount,
limit: limit,
err: fmt.Errorf("cannot reserve file descriptor: %w", network.ErrResourceLimitExceeded),
}
}
}
rc.nconnsIn += incount
rc.nconnsOut += outcount
rc.nfd += fdcount
return nil
}
func (rc *resources) removeConn(dir network.Direction, usefd bool) {
var fd int
if usefd {
fd = 1
}
if dir == network.DirInbound {
rc.removeConns(1, 0, fd)
} else {
rc.removeConns(0, 1, fd)
}
}
func (rc *resources) removeConns(incount, outcount, fdcount int) {
rc.nconnsIn -= incount
rc.nconnsOut -= outcount
rc.nfd -= fdcount
if rc.nconnsIn < 0 {
log.Warn("BUG: too many inbound connections released")
rc.nconnsIn = 0
}
if rc.nconnsOut < 0 {
log.Warn("BUG: too many outbound connections released")
rc.nconnsOut = 0
}
if rc.nfd < 0 {
log.Warn("BUG: too many file descriptors released")
rc.nfd = 0
}
}
func (rc *resources) stat() network.ScopeStat {
return network.ScopeStat{
Memory: rc.memory,
NumStreamsInbound: rc.nstreamsIn,
NumStreamsOutbound: rc.nstreamsOut,
NumConnsInbound: rc.nconnsIn,
NumConnsOutbound: rc.nconnsOut,
NumFD: rc.nfd,
}
}
// resourceScope implementation
func (s *resourceScope) wrapError(err error) error {
return fmt.Errorf("%s: %w", s.name, err)
}
func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", logValuesMemoryLimit(s.name, "", s.rc.stat(), err)...)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
s.metrics.BlockMemory(size)
return s.wrapError(err)
}
if err := s.reserveMemoryForEdges(size, prio); err != nil {
s.rc.releaseMemory(int64(size))
s.metrics.BlockMemory(size)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, int64(size), s.rc.memory)
s.metrics.AllowMemory(size)
return nil
}
func (s *resourceScope) reserveMemoryForEdges(size int, prio uint8) error {
if s.owner != nil {
return s.owner.ReserveMemory(size, prio)
}
var reserved int
var err error
for _, e := range s.edges {
var stat network.ScopeStat
stat, err = e.ReserveMemoryForChild(int64(size), prio)
if err != nil {
log.Debugw("blocked memory reservation from constraining edge", logValuesMemoryLimit(s.name, e.name, stat, err)...)
break
}
reserved++
}
if err != nil {
// we failed because of a constraint; undo memory reservations
for _, e := range s.edges[:reserved] {
e.ReleaseMemoryForChild(int64(size))
}
}
return err
}
func (s *resourceScope) releaseMemoryForEdges(size int) {
if s.owner != nil {
s.owner.ReleaseMemory(size)
return
}
for _, e := range s.edges {
e.ReleaseMemoryForChild(int64(size))
}
}
func (s *resourceScope) ReserveMemoryForChild(size int64, prio uint8) (network.ScopeStat, error) {
s.Lock()
defer s.Unlock()
if s.done {
return s.rc.stat(), s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.reserveMemory(size, prio); err != nil {
s.trace.BlockReserveMemory(s.name, prio, size, s.rc.memory)
return s.rc.stat(), s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, size, s.rc.memory)
return network.ScopeStat{}, nil
}
func (s *resourceScope) ReleaseMemory(size int) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(int64(size))
s.releaseMemoryForEdges(size)
s.trace.ReleaseMemory(s.name, int64(size), s.rc.memory)
}
func (s *resourceScope) ReleaseMemoryForChild(size int64) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(size)
s.trace.ReleaseMemory(s.name, size, s.rc.memory)
}
func (s *resourceScope) AddStream(dir network.Direction) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addStream(dir); err != nil {
log.Debugw("blocked stream", logValuesStreamLimit(s.name, "", dir, s.rc.stat(), err)...)
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
}
if err := s.addStreamForEdges(dir); err != nil {
s.rc.removeStream(dir)
return s.wrapError(err)
}
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
}
func (s *resourceScope) addStreamForEdges(dir network.Direction) error {
if s.owner != nil {
return s.owner.AddStream(dir)
}
var err error
var reserved int
for _, e := range s.edges {
var stat network.ScopeStat
stat, err = e.AddStreamForChild(dir)
if err != nil {
log.Debugw("blocked stream from constraining edge", logValuesStreamLimit(s.name, e.name, dir, stat, err)...)
break
}
reserved++
}
if err != nil {
for _, e := range s.edges[:reserved] {
e.RemoveStreamForChild(dir)
}
}
return err
}
func (s *resourceScope) AddStreamForChild(dir network.Direction) (network.ScopeStat, error) {
s.Lock()
defer s.Unlock()
if s.done {
return s.rc.stat(), s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addStream(dir); err != nil {
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.rc.stat(), s.wrapError(err)
}
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return network.ScopeStat{}, nil
}
func (s *resourceScope) RemoveStream(dir network.Direction) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeStream(dir)
s.removeStreamForEdges(dir)
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
}
func (s *resourceScope) removeStreamForEdges(dir network.Direction) {
if s.owner != nil {
s.owner.RemoveStream(dir)
return
}
for _, e := range s.edges {
e.RemoveStreamForChild(dir)
}
}
func (s *resourceScope) RemoveStreamForChild(dir network.Direction) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeStream(dir)
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
}
func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addConn(dir, usefd); err != nil {
log.Debugw("blocked connection", logValuesConnLimit(s.name, "", dir, usefd, s.rc.stat(), err)...)
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
}
if err := s.addConnForEdges(dir, usefd); err != nil {
s.rc.removeConn(dir, usefd)
return s.wrapError(err)
}
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
func (s *resourceScope) addConnForEdges(dir network.Direction, usefd bool) error {
if s.owner != nil {
return s.owner.AddConn(dir, usefd)
}
var err error
var reserved int
for _, e := range s.edges {
var stat network.ScopeStat
stat, err = e.AddConnForChild(dir, usefd)
if err != nil {
log.Debugw("blocked connection from constraining edge", logValuesConnLimit(s.name, e.name, dir, usefd, stat, err)...)
break
}
reserved++
}
if err != nil {
for _, e := range s.edges[:reserved] {
e.RemoveConnForChild(dir, usefd)
}
}
return err
}
func (s *resourceScope) AddConnForChild(dir network.Direction, usefd bool) (network.ScopeStat, error) {
s.Lock()
defer s.Unlock()
if s.done {
return s.rc.stat(), s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addConn(dir, usefd); err != nil {
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.rc.stat(), s.wrapError(err)
}
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return network.ScopeStat{}, nil
}
func (s *resourceScope) RemoveConn(dir network.Direction, usefd bool) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeConn(dir, usefd)
s.removeConnForEdges(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) removeConnForEdges(dir network.Direction, usefd bool) {
if s.owner != nil {
s.owner.RemoveConn(dir, usefd)
}
for _, e := range s.edges {
e.RemoveConnForChild(dir, usefd)
}
}
func (s *resourceScope) RemoveConnForChild(dir network.Direction, usefd bool) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeConn(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) ReserveForChild(st network.ScopeStat) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.reserveMemory(st.Memory, network.ReservationPriorityAlways); err != nil {
s.trace.BlockReserveMemory(s.name, 255, st.Memory, s.rc.memory)
return s.wrapError(err)
}
if err := s.rc.addStreams(st.NumStreamsInbound, st.NumStreamsOutbound); err != nil {
s.trace.BlockAddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.rc.releaseMemory(st.Memory)
return s.wrapError(err)
}
if err := s.rc.addConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD); err != nil {
s.trace.BlockAddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, 255, st.Memory, s.rc.memory)
s.trace.AddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.AddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
func (s *resourceScope) ReleaseForChild(st network.ScopeStat) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
if s.owner != nil {
s.owner.ReleaseResources(st)
} else {
for _, e := range s.edges {
e.ReleaseForChild(st)
}
}
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) nextSpanID() int {
s.spanID++
return s.spanID
}
func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) {
s.Lock()
defer s.Unlock()
if s.done {
return nil, s.wrapError(network.ErrResourceScopeClosed)
}
s.refCnt++
return newResourceScopeSpan(s, s.nextSpanID()), nil
}
func (s *resourceScope) Done() {
s.Lock()
defer s.Unlock()
if s.done {
return
}
stat := s.rc.stat()
if s.owner != nil {
s.owner.ReleaseResources(stat)
s.owner.DecRef()
} else {
for _, e := range s.edges {
e.ReleaseForChild(stat)
e.DecRef()
}
}
s.rc.nstreamsIn = 0
s.rc.nstreamsOut = 0
s.rc.nconnsIn = 0
s.rc.nconnsOut = 0
s.rc.nfd = 0
s.rc.memory = 0
s.done = true
s.trace.DestroyScope(s.name)
}
func (s *resourceScope) Stat() network.ScopeStat {
s.Lock()
defer s.Unlock()
return s.rc.stat()
}
func (s *resourceScope) IncRef() {
s.Lock()
defer s.Unlock()
s.refCnt++
}
func (s *resourceScope) DecRef() {
s.Lock()
defer s.Unlock()
s.refCnt--
}
func (s *resourceScope) IsUnused() bool {
s.Lock()
defer s.Unlock()
if s.done {
return true
}
if s.refCnt > 0 {
return false
}
st := s.rc.stat()
return st.NumStreamsInbound == 0 &&
st.NumStreamsOutbound == 0 &&
st.NumConnsInbound == 0 &&
st.NumConnsOutbound == 0 &&
st.NumFD == 0
}

1200
p2p/host/resource-manager/scope_test.go

File diff suppressed because it is too large

11
p2p/host/resource-manager/sys_not_unix.go

@ -0,0 +1,11 @@
//go:build !linux && !darwin && !windows
package rcmgr
import "runtime"
// TODO: figure out how to get the number of file descriptors on Windows and other systems
func getNumFDs() int {
log.Warnf("cannot determine number of file descriptors on %s", runtime.GOOS)
return 0
}

17
p2p/host/resource-manager/sys_unix.go

@ -0,0 +1,17 @@
//go:build linux || darwin
// +build linux darwin
package rcmgr
import (
"golang.org/x/sys/unix"
)
func getNumFDs() int {
var l unix.Rlimit
if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &l); err != nil {
log.Errorw("failed to get fd limit", "error", err)
return 0
}
return int(l.Cur)
}

11
p2p/host/resource-manager/sys_windows.go

@ -0,0 +1,11 @@
//go:build windows
package rcmgr
import (
"math"
)
func getNumFDs() int {
return math.MaxInt
}

698
p2p/host/resource-manager/trace.go

@ -0,0 +1,698 @@
package rcmgr
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
)
type trace struct {
path string
ctx context.Context
cancel func()
wg sync.WaitGroup
mx sync.Mutex
done bool
pendingWrites []interface{}
reporters []TraceReporter
}
type TraceReporter interface {
// ConsumeEvent consumes a trace event. This is called synchronously,
// implementations should process the event quickly.
ConsumeEvent(TraceEvt)
}
func WithTrace(path string) Option {
return func(r *resourceManager) error {
if r.trace == nil {
r.trace = &trace{path: path}
} else {
r.trace.path = path
}
return nil
}
}
func WithTraceReporter(reporter TraceReporter) Option {
return func(r *resourceManager) error {
if r.trace == nil {
r.trace = &trace{}
}
r.trace.reporters = append(r.trace.reporters, reporter)
return nil
}
}
type TraceEvtTyp string
const (
TraceStartEvt TraceEvtTyp = "start"
TraceCreateScopeEvt TraceEvtTyp = "create_scope"
TraceDestroyScopeEvt TraceEvtTyp = "destroy_scope"
TraceReserveMemoryEvt TraceEvtTyp = "reserve_memory"
TraceBlockReserveMemoryEvt TraceEvtTyp = "block_reserve_memory"
TraceReleaseMemoryEvt TraceEvtTyp = "release_memory"
TraceAddStreamEvt TraceEvtTyp = "add_stream"
TraceBlockAddStreamEvt TraceEvtTyp = "block_add_stream"
TraceRemoveStreamEvt TraceEvtTyp = "remove_stream"
TraceAddConnEvt TraceEvtTyp = "add_conn"
TraceBlockAddConnEvt TraceEvtTyp = "block_add_conn"
TraceRemoveConnEvt TraceEvtTyp = "remove_conn"
)
type scopeClass struct {
name string
}
func (s scopeClass) MarshalJSON() ([]byte, error) {
name := s.name
var span string
if idx := strings.Index(name, "span:"); idx > -1 {
name = name[:idx-1]
span = name[idx+5:]
}
// System and Transient scope
if name == "system" || name == "transient" || name == "allowlistedSystem" || name == "allowlistedTransient" {
return json.Marshal(struct {
Class string
Span string `json:",omitempty"`
}{
Class: name,
Span: span,
})
}
// Connection scope
if strings.HasPrefix(name, "conn-") {
return json.Marshal(struct {
Class string
Conn string
Span string `json:",omitempty"`
}{
Class: "conn",
Conn: name[5:],
Span: span,
})
}
// Stream scope
if strings.HasPrefix(name, "stream-") {
return json.Marshal(struct {
Class string
Stream string
Span string `json:",omitempty"`
}{
Class: "stream",
Stream: name[7:],
Span: span,
})
}
// Peer scope
if strings.HasPrefix(name, "peer:") {
return json.Marshal(struct {
Class string
Peer string
Span string `json:",omitempty"`
}{
Class: "peer",
Peer: name[5:],
Span: span,
})
}
if strings.HasPrefix(name, "service:") {
if idx := strings.Index(name, "peer:"); idx > 0 { // Peer-Service scope
return json.Marshal(struct {
Class string
Service string
Peer string
Span string `json:",omitempty"`
}{
Class: "service-peer",
Service: name[8 : idx-1],
Peer: name[idx+5:],
Span: span,
})
} else { // Service scope
return json.Marshal(struct {
Class string
Service string
Span string `json:",omitempty"`
}{
Class: "service",
Service: name[8:],
Span: span,
})
}
}
if strings.HasPrefix(name, "protocol:") {
if idx := strings.Index(name, "peer:"); idx > -1 { // Peer-Protocol scope
return json.Marshal(struct {
Class string
Protocol string
Peer string
Span string `json:",omitempty"`
}{
Class: "protocol-peer",
Protocol: name[9 : idx-1],
Peer: name[idx+5:],
Span: span,
})
} else { // Protocol scope
return json.Marshal(struct {
Class string
Protocol string
Span string `json:",omitempty"`
}{
Class: "protocol",
Protocol: name[9:],
Span: span,
})
}
}
return nil, fmt.Errorf("unrecognized scope: %s", name)
}
type TraceEvt struct {
Time string
Type TraceEvtTyp
Scope *scopeClass `json:",omitempty"`
Name string `json:",omitempty"`
Limit interface{} `json:",omitempty"`
Priority uint8 `json:",omitempty"`
Delta int64 `json:",omitempty"`
DeltaIn int `json:",omitempty"`
DeltaOut int `json:",omitempty"`
Memory int64 `json:",omitempty"`
StreamsIn int `json:",omitempty"`
StreamsOut int `json:",omitempty"`
ConnsIn int `json:",omitempty"`
ConnsOut int `json:",omitempty"`
FD int `json:",omitempty"`
}
func (t *trace) push(evt TraceEvt) {
t.mx.Lock()
defer t.mx.Unlock()
if t.done {
return
}
evt.Time = time.Now().Format(time.RFC3339Nano)
if evt.Name != "" {
evt.Scope = &scopeClass{name: evt.Name}
}
for _, reporter := range t.reporters {
reporter.ConsumeEvent(evt)
}
if t.path != "" {
t.pendingWrites = append(t.pendingWrites, evt)
}
}
func (t *trace) backgroundWriter(out io.WriteCloser) {
defer t.wg.Done()
defer out.Close()
gzOut := gzip.NewWriter(out)
defer gzOut.Close()
jsonOut := json.NewEncoder(gzOut)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var pend []interface{}
getEvents := func() {
t.mx.Lock()
tmp := t.pendingWrites
t.pendingWrites = pend[:0]
pend = tmp
t.mx.Unlock()
}
for {
select {
case <-ticker.C:
getEvents()
if len(pend) == 0 {
continue
}
if err := t.writeEvents(pend, jsonOut); err != nil {
log.Warnf("error writing rcmgr trace: %s", err)
t.mx.Lock()
t.done = true
t.mx.Unlock()
return
}
if err := gzOut.Flush(); err != nil {
log.Warnf("error flushing rcmgr trace: %s", err)
t.mx.Lock()
t.done = true
t.mx.Unlock()
return
}
case <-t.ctx.Done():
getEvents()
if len(pend) == 0 {
return
}
if err := t.writeEvents(pend, jsonOut); err != nil {
log.Warnf("error writing rcmgr trace: %s", err)
return
}
if err := gzOut.Flush(); err != nil {
log.Warnf("error flushing rcmgr trace: %s", err)
}
return
}
}
}
func (t *trace) writeEvents(pend []interface{}, jout *json.Encoder) error {
for _, e := range pend {
if err := jout.Encode(e); err != nil {
return err
}
}
return nil
}
func (t *trace) Start(limits Limiter) error {
if t == nil {
return nil
}
t.ctx, t.cancel = context.WithCancel(context.Background())
if t.path != "" {
out, err := os.OpenFile(t.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return nil
}
t.wg.Add(1)
go t.backgroundWriter(out)
}
t.push(TraceEvt{
Type: TraceStartEvt,
Limit: limits,
})
return nil
}
func (t *trace) Close() error {
if t == nil {
return nil
}
t.mx.Lock()
if t.done {
t.mx.Unlock()
return nil
}
t.cancel()
t.done = true
t.mx.Unlock()
t.wg.Wait()
return nil
}
func (t *trace) CreateScope(scope string, limit Limit) {
if t == nil {
return
}
t.push(TraceEvt{
Type: TraceCreateScopeEvt,
Name: scope,
Limit: limit,
})
}
func (t *trace) DestroyScope(scope string) {
if t == nil {
return
}
t.push(TraceEvt{
Type: TraceDestroyScopeEvt,
Name: scope,
})
}
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
if size == 0 {
return
}
t.push(TraceEvt{
Type: TraceReserveMemoryEvt,
Name: scope,
Priority: prio,
Delta: size,
Memory: mem,
})
}
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
if size == 0 {
return
}
t.push(TraceEvt{
Type: TraceBlockReserveMemoryEvt,
Name: scope,
Priority: prio,
Delta: size,
Memory: mem,
})
}
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
if t == nil {
return
}
if size == 0 {
return
}
t.push(TraceEvt{
Type: TraceReleaseMemoryEvt,
Name: scope,
Delta: -size,
Memory: mem,
})
}
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t.push(TraceEvt{
Type: TraceAddStreamEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t.push(TraceEvt{
Type: TraceBlockAddStreamEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
t.push(TraceEvt{
Type: TraceRemoveStreamEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t.push(TraceEvt{
Type: TraceAddStreamEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t.push(TraceEvt{
Type: TraceBlockAddStreamEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t.push(TraceEvt{
Type: TraceRemoveStreamEvt,
Name: scope,
DeltaIn: -deltaIn,
DeltaOut: -deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t.push(TraceEvt{
Type: TraceAddConnEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t.push(TraceEvt{
Type: TraceBlockAddConnEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
if usefd {
deltafd = -1
}
t.push(TraceEvt{
Type: TraceRemoveConnEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t.push(TraceEvt{
Type: TraceAddConnEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t.push(TraceEvt{
Type: TraceBlockAddConnEvt,
Name: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t.push(TraceEvt{
Type: TraceRemoveConnEvt,
Name: scope,
DeltaIn: -deltaIn,
DeltaOut: -deltaOut,
Delta: -int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
Loading…
Cancel
Save