Browse Source

rcmgr: Use prometheus SDK for rcmgr metrics (#2044)

* Replace OpenCensus with Prometheus Go SDK

Allows for much more efficient metric collecting

* PR comments

* Try a different way of testing noallocs

* CI fiddling

* CI fiddling

* Undo debug change

* Return early

* Debug

* Try AllocsPerRun

* Try gosched in noop

* Use AllocsPerRun

* Try without cover

* Use tag

* Revert "Try without cover"

This reverts commit e91b1e8f0d0ffbe6842a37925eb72e0177bf0773.

* Cleanup debug code

* Use global string slice pool
rcmgr-metrics-prefix
Marco Munizaga 2 years ago
committed by GitHub
parent
commit
97af39a2ed
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      .github/actions/go-test-setup/action.yml
  2. 1
      go.mod
  3. 7
      go.sum
  4. 7
      p2p/host/resource-manager/README.md
  5. 18
      p2p/host/resource-manager/obs/grafana-dashboards/README.md
  6. 300
      p2p/host/resource-manager/obs/grafana-dashboards/resource-manager.json
  7. 112
      p2p/host/resource-manager/obs/noalloc_test.go
  8. 415
      p2p/host/resource-manager/obs/stats.go
  9. 13
      p2p/host/resource-manager/obs/stats_test.go
  10. 43
      p2p/host/resource-manager/rcmgr.go

4
.github/actions/go-test-setup/action.yml

@ -5,3 +5,7 @@ runs:
shell: bash
run: sysctl -w net.core.rmem_max=2500000
if: ${{ matrix.os == 'ubuntu' }}
- name: Run nocover tests. These are tests that require the coverage analysis to be off # See https://github.com/protocol/.github/issues/460
shell: bash
# This matches only tests with "NoCover" in their test name to avoid running all tests again.
run: go test -tags nocover -run NoCover -v ./...

1
go.mod

@ -48,7 +48,6 @@ require (
github.com/quic-go/webtransport-go v0.5.0
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.8.1
go.opencensus.io v0.24.0
go.uber.org/fx v1.18.2
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.4.0

7
go.sum

@ -150,7 +150,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@ -191,7 +190,6 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
@ -214,7 +212,6 @@ github.com/google/pprof v0.0.0-20221203041831-ce31453925ec h1:fR20TYVVwhK4O7r7y+
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
@ -526,8 +523,6 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
@ -636,7 +631,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
@ -881,7 +875,6 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

7
p2p/host/resource-manager/README.md

@ -35,7 +35,8 @@ limits := scalingLimits.AutoScale()
// The resource manager expects a limiter, se we create one from our limits.
limiter := rcmgr.NewFixedLimiter(limits)
// (Optional if you want metrics) Construct the OpenCensus metrics reporter.
// (Optional if you want metrics)
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {
panic(err)
@ -427,10 +428,10 @@ your limits often. This could be a sign that you need to raise your limits
(your process is more intensive than you originally thought) or that you need
to fix something in your application (surely you don't need over 1000 streams?).
There are OpenCensus metrics that can be hooked up to the resource manager. See
There are Prometheus metrics that can be hooked up to the resource manager. See
`obs/stats_test.go` for an example on how to enable this, and `DefaultViews` in
`stats.go` for recommended views. These metrics can be hooked up to Prometheus
or any other OpenCensus supported platform.
or any other platform that can scrape a prometheus endpoint.
There is also an included Grafana dashboard to help kickstart your
observability into the resource manager. Find more information about it at

18
p2p/host/resource-manager/obs/grafana-dashboards/README.md

@ -5,28 +5,20 @@ import follow the Grafana docs [here](https://grafana.com/docs/grafana/latest/da
## Setup
To make sure you're emitting the correct metrics you'll have to hook up the
Opencensus views that `stats.go` exports. For Prometheus this looks like:
To make sure you're emitting the correct metrics you'll have to register the
metrics with a Prometheus Registerer. For example:
``` go
import (
// ...
ocprom "contrib.go.opencensus.io/exporter/prometheus"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats/view"
)
func SetupResourceManager() (network.ResourceManager, error) {
// Hook up the trace reporter metrics. This will expose all opencensus
// stats via the default prometheus registry. See https://opencensus.io/exporters/supported-exporters/go/prometheus/ for other options.
view.Register(rcmgrObs.DefaultViews...)
ocprom.NewExporter(ocprom.Options{
Registry: prometheus.DefaultRegisterer.(*prometheus.Registry),
})
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {
@ -37,10 +29,6 @@ import (
}
```
It should be fairly similar for other exporters. See the [OpenCensus
docs](https://opencensus.io/exporters/supported-exporters/go/) to see how to
export to another exporter.
## Updating Dashboard json
Use the share functionality on an existing dashboard, and make sure to toggle

300
p2p/host/resource-manager/obs/grafana-dashboards/resource-manager.json

@ -9,7 +9,7 @@
"pluginName": "Prometheus"
}
],
"__elements": [],
"__elements": {},
"__requires": [
{
"type": "panel",
@ -21,7 +21,7 @@
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "8.5.6"
"version": "9.3.6"
},
{
"type": "datasource",
@ -96,6 +96,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -151,7 +153,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -198,6 +201,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -253,7 +258,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -287,6 +293,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -342,7 +350,8 @@
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -365,96 +374,6 @@
"title": "Transient Streams",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "How many streams does each service have open",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 18
},
"id": 43,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "rcmgr_streams{scope=\"service\"}",
"interval": "",
"legendFormat": "{{dir}} {{service}} {{instance}}",
"refId": "A"
}
],
"title": "Streams by service",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
@ -467,6 +386,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -515,14 +436,15 @@
"h": 9,
"w": 24,
"x": 0,
"y": 27
"y": 18
},
"id": 52,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -557,6 +479,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -605,14 +529,15 @@
"h": 10,
"w": 24,
"x": 0,
"y": 36
"y": 27
},
"id": 35,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -626,7 +551,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(0.50, (rcmgr_peer_streams_bucket - rcmgr_peer_streams_negative_bucket)) - 0.1",
"expr": "histogram_quantile(0.50, (rcmgr_peer_streams_bucket - rcmgr_previous_peer_streams_bucket)) - 0.1",
"interval": "",
"legendFormat": "p50 {{dir}} streams per peer – {{instance}}",
"refId": "A"
@ -637,7 +562,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(0.90, (rcmgr_peer_streams_bucket - rcmgr_peer_streams_negative_bucket)) - 0.1",
"expr": "histogram_quantile(0.90, (rcmgr_peer_streams_bucket - rcmgr_previous_peer_streams_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "p90 {{dir}} streams per peer – {{instance}}",
@ -649,7 +574,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(1, (rcmgr_peer_streams_bucket - rcmgr_peer_streams_negative_bucket)) - 0.1",
"expr": "histogram_quantile(1, (rcmgr_peer_streams_bucket - rcmgr_previous_peer_streams_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "max {{dir}} streams per peer – {{instance}}",
@ -664,7 +589,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "How many peers have N-0.1 streams open",
"description": "How many peers have N streams open",
"fieldConfig": {
"defaults": {
"color": {
@ -675,7 +600,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -690,11 +616,13 @@
"h": 8,
"w": 12,
"x": 0,
"y": 46
"y": 37
},
"id": 46,
"options": {
"displayMode": "gradient",
"minVizHeight": 10,
"minVizWidth": 0,
"orientation": "auto",
"reduceOptions": {
"calcs": [
@ -705,7 +633,7 @@
},
"showUnfilled": true
},
"pluginVersion": "8.4.5",
"pluginVersion": "9.3.6",
"targets": [
{
"datasource": {
@ -713,7 +641,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": false,
"expr": "sum without (instance) (rcmgr_peer_streams_bucket{dir=\"inbound\"}-rcmgr_peer_streams_negative_bucket{dir=\"inbound\"})",
"expr": "sum without (instance) (rcmgr_peer_streams_bucket{dir=\"inbound\"}-rcmgr_previous_peer_streams_bucket{dir=\"inbound\"})",
"format": "heatmap",
"hide": false,
"interval": "",
@ -729,7 +657,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "How many peers have N-0.1 streams open",
"description": "How many peers have N streams open",
"fieldConfig": {
"defaults": {
"color": {
@ -740,7 +668,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -755,11 +684,13 @@
"h": 8,
"w": 12,
"x": 12,
"y": 46
"y": 37
},
"id": 47,
"options": {
"displayMode": "gradient",
"minVizHeight": 10,
"minVizWidth": 0,
"orientation": "auto",
"reduceOptions": {
"calcs": [
@ -770,7 +701,7 @@
},
"showUnfilled": true
},
"pluginVersion": "8.4.5",
"pluginVersion": "9.3.6",
"targets": [
{
"datasource": {
@ -778,7 +709,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": false,
"expr": "sum without (instance) (rcmgr_peer_streams_bucket{dir=\"outbound\"}-rcmgr_peer_streams_negative_bucket{dir=\"outbound\"})",
"expr": "sum without (instance) (rcmgr_peer_streams_bucket{dir=\"outbound\"}-rcmgr_previous_peer_streams_bucket{dir=\"outbound\"})",
"format": "heatmap",
"hide": false,
"interval": "",
@ -795,7 +726,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 54
"y": 45
},
"id": 29,
"panels": [],
@ -811,14 +742,19 @@
"h": 9,
"w": 24,
"x": 0,
"y": 55
"y": 46
},
"id": 31,
"options": {
"code": {
"language": "plaintext",
"showLineNumbers": false,
"showMiniMap": false
},
"content": "# Libp2p Connections\n\nBroken down by [Resource Scope](https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/README.md#resource-scopes). \nScopes represent what is imposing limits on this resource. For connections, we have three main scopes:\n\n1. System. The total number of connections owned by the process. Includes both application usable connections + the number of transient connections.\n2. Transient. The total number of connections that are being upgraded into usable connections in the process.\n3. Peer. The total number of connections associated with this peer. When a connection has this scope it is usable by the application.\n\nAn example of a System connection is a connection you can open a libp2p stream on and send data.\nA transient connection is not yet usable for application data since it may be negotiating \na security handshake or a multiplexer.\n\nConnections start in the transient scope and move over to the System and Peer scopes once they are ready to be used.\n\nIt would be unusual to see a lot of transient connections. It would also be unusal to see a peer with a lot of connections.",
"mode": "markdown"
},
"pluginVersion": "8.4.5",
"pluginVersion": "9.3.6",
"title": "libp2p Connections",
"type": "text"
},
@ -833,6 +769,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -865,7 +803,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -880,14 +819,15 @@
"h": 8,
"w": 12,
"x": 0,
"y": 64
"y": 55
},
"id": 33,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -921,6 +861,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -953,7 +895,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -968,14 +911,15 @@
"h": 8,
"w": 12,
"x": 12,
"y": 64
"y": 55
},
"id": 36,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1007,14 +951,19 @@
"h": 3,
"w": 24,
"x": 0,
"y": 72
"y": 63
},
"id": 38,
"options": {
"content": "These are aggregated stats. They are grouped by buckets. Each bucket represents how many peers have N number of connections.\n\nDue to a quirk in [opencensus](https://github.com/census-instrumentation/opencensus-go/blob/v0.23.0/stats/view/aggregation_data.go#L195) the bucket values have to be a bit bigger than the integer values.\nSo subtract 0.1 from the number to get the true number of connections. e.g. If a peer has 3 connections, it'll be put in the 3.1 bucket. \n",
"code": {
"language": "plaintext",
"showLineNumbers": false,
"showMiniMap": false
},
"content": "These are aggregated stats. They are grouped by buckets. Each bucket represents how many peers have N number of connections.",
"mode": "markdown"
},
"pluginVersion": "8.4.5",
"pluginVersion": "9.3.6",
"title": "Connections per Peer",
"type": "text"
},
@ -1030,6 +979,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1062,7 +1013,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1077,14 +1029,15 @@
"h": 10,
"w": 24,
"x": 0,
"y": 75
"y": 66
},
"id": 45,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1098,7 +1051,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(0.50, (rcmgr_peer_connections_bucket - rcmgr_peer_connections_negative_bucket)) - 0.1",
"expr": "histogram_quantile(0.50, (rcmgr_peer_connections_bucket - rcmgr_previous_peer_connections_bucket)) - 0.1",
"interval": "",
"legendFormat": "p50 {{dir}} connections per peer – {{instance}}",
"refId": "A"
@ -1109,7 +1062,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(0.90, (rcmgr_peer_connections_bucket - rcmgr_peer_connections_negative_bucket)) - 0.1",
"expr": "histogram_quantile(0.90, (rcmgr_peer_connections_bucket - rcmgr_previous_peer_connections_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "p90 {{dir}} connections per peer – {{instance}}",
@ -1121,7 +1074,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(1, (rcmgr_peer_connections_bucket - rcmgr_peer_connections_negative_bucket)) - 0.1",
"expr": "histogram_quantile(1, (rcmgr_peer_connections_bucket - rcmgr_previous_peer_connections_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "max {{dir}} connections per peer – {{instance}}",
@ -1147,7 +1100,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1162,11 +1116,13 @@
"h": 8,
"w": 12,
"x": 0,
"y": 85
"y": 76
},
"id": 39,
"options": {
"displayMode": "gradient",
"minVizHeight": 10,
"minVizWidth": 0,
"orientation": "auto",
"reduceOptions": {
"calcs": [
@ -1177,7 +1133,7 @@
},
"showUnfilled": true
},
"pluginVersion": "8.4.5",
"pluginVersion": "9.3.6",
"targets": [
{
"datasource": {
@ -1185,7 +1141,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": false,
"expr": "sum without (instance) (rcmgr_peer_connections_bucket{dir=\"inbound\"}-rcmgr_peer_connections_negative_bucket{dir=\"inbound\"})",
"expr": "sum without (instance) (rcmgr_peer_connections_bucket{dir=\"inbound\"}-rcmgr_previous_peer_connections_bucket{dir=\"inbound\"})",
"format": "heatmap",
"hide": false,
"interval": "",
@ -1212,7 +1168,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1227,11 +1184,13 @@
"h": 8,
"w": 12,
"x": 12,
"y": 85
"y": 76
},
"id": 40,
"options": {
"displayMode": "gradient",
"minVizHeight": 10,
"minVizWidth": 0,
"orientation": "auto",
"reduceOptions": {
"calcs": [
@ -1242,7 +1201,7 @@
},
"showUnfilled": true
},
"pluginVersion": "8.4.5",
"pluginVersion": "9.3.6",
"targets": [
{
"datasource": {
@ -1250,7 +1209,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": false,
"expr": "sum without (instance) (rcmgr_peer_connections_bucket{dir=\"outbound\"}-rcmgr_peer_connections_negative_bucket{dir=\"outbound\"})",
"expr": "sum without (instance) (rcmgr_peer_connections_bucket{dir=\"outbound\"}-rcmgr_previous_peer_connections_bucket{dir=\"outbound\"})",
"format": "heatmap",
"hide": false,
"interval": "",
@ -1267,7 +1226,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 93
"y": 84
},
"id": 54,
"panels": [],
@ -1286,6 +1245,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1318,7 +1279,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1334,14 +1296,15 @@
"h": 9,
"w": 24,
"x": 0,
"y": 94
"y": 85
},
"id": 56,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1376,6 +1339,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1408,7 +1373,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1424,14 +1390,15 @@
"h": 8,
"w": 12,
"x": 0,
"y": 103
"y": 94
},
"id": 57,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1466,6 +1433,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1498,7 +1467,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1514,14 +1484,15 @@
"h": 8,
"w": 12,
"x": 12,
"y": 103
"y": 94
},
"id": 58,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1556,6 +1527,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1625,14 +1598,15 @@
"h": 10,
"w": 24,
"x": 0,
"y": 111
"y": 102
},
"id": 59,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1646,7 +1620,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(0.50, sum by (le) (rcmgr_peer_memory_bucket - rcmgr_peer_memory_negative_bucket)) - 0.1",
"expr": "histogram_quantile(0.50, sum by (le) (rcmgr_peer_memory_bucket - rcmgr_previous_peer_memory_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "p50 memory usage per peer",
@ -1658,7 +1632,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(0.90, sum by (le) (rcmgr_peer_memory_bucket - rcmgr_peer_memory_negative_bucket)) - 0.1",
"expr": "histogram_quantile(0.90, sum by (le) (rcmgr_peer_memory_bucket - rcmgr_previous_peer_memory_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "p90 memory usage per peer",
@ -1670,7 +1644,7 @@
"uid": "${DS_PROMETHEUS}"
},
"exemplar": true,
"expr": "histogram_quantile(1, sum by (le) (rcmgr_peer_memory_bucket - rcmgr_peer_memory_negative_bucket)) - 0.1",
"expr": "histogram_quantile(1, sum by (le) (rcmgr_peer_memory_bucket - rcmgr_previous_peer_memory_bucket)) - 0.1",
"hide": false,
"interval": "",
"legendFormat": "max memory usage per peer",
@ -1683,7 +1657,7 @@
},
"editorMode": "code",
"exemplar": true,
"expr": "sum(rcmgr_peer_memory_count-rcmgr_peer_memory_negative_count)",
"expr": "sum(rcmgr_peer_memory_count-rcmgr_previous_peer_memory_count)",
"hide": false,
"instant": false,
"interval": "",
@ -1701,7 +1675,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 121
"y": 112
},
"id": 62,
"panels": [],
@ -1720,6 +1694,8 @@
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
@ -1752,7 +1728,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@ -1767,14 +1744,15 @@
"h": 8,
"w": 12,
"x": 0,
"y": 122
"y": 113
},
"id": 60,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
@ -1799,7 +1777,7 @@
}
],
"refresh": false,
"schemaVersion": 36,
"schemaVersion": 37,
"style": "dark",
"tags": [],
"templating": {
@ -1812,7 +1790,7 @@
"timepicker": {},
"timezone": "",
"title": "Resource Manager",
"uid": "MgmGIjjnk",
"uid": "MgmGIjjnj",
"version": 1,
"weekStart": ""
}
}

112
p2p/host/resource-manager/obs/noalloc_test.go

@ -0,0 +1,112 @@
//go:build nocover
package obs
import (
"math/rand"
"sync"
"testing"
"time"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
// Possibly non-sensical
typs := []rcmgr.TraceEvtTyp{
rcmgr.TraceStartEvt,
rcmgr.TraceCreateScopeEvt,
rcmgr.TraceDestroyScopeEvt,
rcmgr.TraceReserveMemoryEvt,
rcmgr.TraceBlockReserveMemoryEvt,
rcmgr.TraceReleaseMemoryEvt,
rcmgr.TraceAddStreamEvt,
rcmgr.TraceBlockAddStreamEvt,
rcmgr.TraceRemoveStreamEvt,
rcmgr.TraceAddConnEvt,
rcmgr.TraceBlockAddConnEvt,
rcmgr.TraceRemoveConnEvt,
}
names := []string{
"conn-1",
"stream-2",
"peer:abc",
"system",
"transient",
"peer:12D3Koo",
"protocol:/libp2p/autonat/1.0.0",
"protocol:/libp2p/autonat/1.0.0.peer:12D3Koo",
"service:libp2p.autonat",
"service:libp2p.autonat.peer:12D3Koo",
}
return rcmgr.TraceEvt{
Type: typs[rng.Intn(len(typs))],
Name: names[rng.Intn(len(names))],
DeltaOut: rng.Intn(5),
DeltaIn: rng.Intn(5),
Delta: int64(rng.Intn(5)),
Memory: int64(rng.Intn(10000)),
StreamsIn: rng.Intn(100),
StreamsOut: rng.Intn(100),
ConnsIn: rng.Intn(100),
ConnsOut: rng.Intn(100),
FD: rng.Intn(100),
Time: time.Now().Format(time.RFC3339Nano),
}
}
var registerOnce sync.Once
func BenchmarkMetricsRecording(b *testing.B) {
b.ReportAllocs()
registerOnce.Do(func() {
MustRegisterWith(prometheus.DefaultRegisterer)
})
evtCount := 10000
evts := make([]rcmgr.TraceEvt, evtCount)
rng := rand.New(rand.NewSource(int64(b.N)))
for i := 0; i < evtCount; i++ {
evts[i] = randomTraceEvt(rng)
}
str, err := NewStatsTraceReporter()
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
str.ConsumeEvent(evts[i%len(evts)])
}
}
func TestNoAllocsNoCover(t *testing.T) {
str, err := NewStatsTraceReporter()
require.NoError(t, err)
evtCount := 10_000
evts := make([]rcmgr.TraceEvt, 0, evtCount)
rng := rand.New(rand.NewSource(1))
for i := 0; i < evtCount; i++ {
evts = append(evts, randomTraceEvt(rng))
}
tagSlice := make([]string, 0, 10)
allocs := testing.AllocsPerRun(100, func() {
for i := 0; i < evtCount; i++ {
str.consumeEventWithLabelSlice(evts[i], &tagSlice)
}
})
if allocs > 10 {
t.Fatalf("expected less than 10 heap bytes, got %f", allocs)
}
}

415
p2p/host/resource-manager/obs/stats.go

@ -1,72 +1,131 @@
package obs
import (
"context"
"strings"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/prometheus/client_golang/prometheus"
)
var (
metricNamespace = "rcmgr/"
conns = stats.Int64(metricNamespace+"connections", "Number of Connections", stats.UnitDimensionless)
peerConns = stats.Int64(metricNamespace+"peer/connections", "Number of connections this peer has", stats.UnitDimensionless)
peerConnsNegative = stats.Int64(metricNamespace+"peer/connections_negative", "Number of connections this peer had. This is used to get the current connection number per peer histogram by subtracting this from the peer/connections histogram", stats.UnitDimensionless)
streams = stats.Int64(metricNamespace+"streams", "Number of Streams", stats.UnitDimensionless)
peerStreams = stats.Int64(metricNamespace+"peer/streams", "Number of streams this peer has", stats.UnitDimensionless)
peerStreamsNegative = stats.Int64(metricNamespace+"peer/streams_negative", "Number of streams this peer had. This is used to get the current streams number per peer histogram by subtracting this from the peer/streams histogram", stats.UnitDimensionless)
memory = stats.Int64(metricNamespace+"memory", "Amount of memory reserved as reported to the Resource Manager", stats.UnitDimensionless)
peerMemory = stats.Int64(metricNamespace+"peer/memory", "Amount of memory currently reseved for peer", stats.UnitDimensionless)
peerMemoryNegative = stats.Int64(metricNamespace+"peer/memory_negative", "Amount of memory previously reseved for peer. This is used to get the current memory per peer histogram by subtracting this from the peer/memory histogram", stats.UnitDimensionless)
connMemory = stats.Int64(metricNamespace+"conn/memory", "Amount of memory currently reseved for the connection", stats.UnitDimensionless)
connMemoryNegative = stats.Int64(metricNamespace+"conn/memory_negative", "Amount of memory previously reseved for the connection. This is used to get the current memory per connection histogram by subtracting this from the conn/memory histogram", stats.UnitDimensionless)
fds = stats.Int64(metricNamespace+"fds", "Number of fds as reported to the Resource Manager", stats.UnitDimensionless)
blockedResources = stats.Int64(metricNamespace+"blocked_resources", "Number of resource requests blocked", stats.UnitDimensionless)
)
metricNamespace = "rcmgr"
// Conns
conns = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "connections",
Help: "Number of Connections",
}, []string{"dir", "scope"})
connsInboundSystem = conns.With(prometheus.Labels{"dir": "inbound", "scope": "system"})
connsInboundTransient = conns.With(prometheus.Labels{"dir": "inbound", "scope": "transient"})
connsOutboundSystem = conns.With(prometheus.Labels{"dir": "outbound", "scope": "system"})
connsOutboundTransient = conns.With(prometheus.Labels{"dir": "outbound", "scope": "transient"})
oneTenThenExpDistributionBuckets = []float64{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 32, 64, 128, 256,
}
var (
directionTag, _ = tag.NewKey("dir")
scopeTag, _ = tag.NewKey("scope")
serviceTag, _ = tag.NewKey("service")
protocolTag, _ = tag.NewKey("protocol")
resourceTag, _ = tag.NewKey("resource")
// PeerConns
peerConns = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "peer_connections",
Buckets: oneTenThenExpDistributionBuckets,
Help: "Number of connections this peer has",
}, []string{"dir"})
peerConnsInbound = peerConns.With(prometheus.Labels{"dir": "inbound"})
peerConnsOutbound = peerConns.With(prometheus.Labels{"dir": "outbound"})
// Lets us build a histogram of our current state. See https://github.com/libp2p/go-libp2p-resource-manager/pull/54#discussion_r911244757 for more information.
previousPeerConns = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "previous_peer_connections",
Buckets: oneTenThenExpDistributionBuckets,
Help: "Number of connections this peer previously had. This is used to get the current connection number per peer histogram by subtracting this from the peer_connections histogram",
}, []string{"dir"})
previousPeerConnsInbound = previousPeerConns.With(prometheus.Labels{"dir": "inbound"})
previousPeerConnsOutbound = previousPeerConns.With(prometheus.Labels{"dir": "outbound"})
// Streams
streams = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "streams",
Help: "Number of Streams",
}, []string{"dir", "scope", "protocol"})
peerStreams = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "peer_streams",
Buckets: oneTenThenExpDistributionBuckets,
Help: "Number of streams this peer has",
}, []string{"dir"})
peerStreamsInbound = peerStreams.With(prometheus.Labels{"dir": "inbound"})
peerStreamsOutbound = peerStreams.With(prometheus.Labels{"dir": "outbound"})
previousPeerStreams = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "previous_peer_streams",
Buckets: oneTenThenExpDistributionBuckets,
Help: "Number of streams this peer has",
}, []string{"dir"})
previousPeerStreamsInbound = previousPeerStreams.With(prometheus.Labels{"dir": "inbound"})
previousPeerStreamsOutbound = previousPeerStreams.With(prometheus.Labels{"dir": "outbound"})
// Memory
memory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "memory",
Help: "Amount of memory reserved as reported to the Resource Manager",
}, []string{"scope", "protocol"})
// PeerMemory
peerMemory = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "peer_memory",
Buckets: memDistribution,
Help: "How many peers have reserved this bucket of memory, as reported to the Resource Manager",
})
previousPeerMemory = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "previous_peer_memory",
Buckets: memDistribution,
Help: "How many peers have previously reserved this bucket of memory, as reported to the Resource Manager",
})
// ConnMemory
connMemory = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "conn_memory",
Buckets: memDistribution,
Help: "How many conns have reserved this bucket of memory, as reported to the Resource Manager",
})
previousConnMemory = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "previous_conn_memory",
Buckets: memDistribution,
Help: "How many conns have previously reserved this bucket of memory, as reported to the Resource Manager",
})
// FDs
fds = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "fds",
Help: "Number of file descriptors reserved as reported to the Resource Manager",
}, []string{"scope"})
fdsSystem = fds.With(prometheus.Labels{"scope": "system"})
fdsTransient = fds.With(prometheus.Labels{"scope": "transient"})
// Blocked resources
blockedResources = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "blocked_resources",
Help: "Number of blocked resources",
}, []string{"dir", "scope", "resource"})
)
var (
ConnView = &view.View{Measure: conns, Aggregation: view.Sum(), TagKeys: []tag.Key{directionTag, scopeTag}}
oneTenThenExpDistribution = []float64{
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1, 16.1, 32.1, 64.1, 128.1, 256.1,
}
PeerConnsView = &view.View{
Measure: peerConns,
Aggregation: view.Distribution(oneTenThenExpDistribution...),
TagKeys: []tag.Key{directionTag},
}
PeerConnsNegativeView = &view.View{
Measure: peerConnsNegative,
Aggregation: view.Distribution(oneTenThenExpDistribution...),
TagKeys: []tag.Key{directionTag},
}
StreamView = &view.View{Measure: streams, Aggregation: view.Sum(), TagKeys: []tag.Key{directionTag, scopeTag, serviceTag, protocolTag}}
PeerStreamsView = &view.View{Measure: peerStreams, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{directionTag}}
PeerStreamNegativeView = &view.View{Measure: peerStreamsNegative, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{directionTag}}
MemoryView = &view.View{Measure: memory, Aggregation: view.Sum(), TagKeys: []tag.Key{scopeTag, serviceTag, protocolTag}}
memDistribution = []float64{
1 << 10, // 1KB
4 << 10, // 4KB
@ -79,49 +138,26 @@ var (
2 << 30, // 2GB
4 << 30, // 4GB
}
PeerMemoryView = &view.View{
Measure: peerMemory,
Aggregation: view.Distribution(memDistribution...),
}
PeerMemoryNegativeView = &view.View{
Measure: peerMemoryNegative,
Aggregation: view.Distribution(memDistribution...),
}
// Not setup yet. Memory isn't attached to a given connection.
ConnMemoryView = &view.View{
Measure: connMemory,
Aggregation: view.Distribution(memDistribution...),
}
ConnMemoryNegativeView = &view.View{
Measure: connMemoryNegative,
Aggregation: view.Distribution(memDistribution...),
}
FDsView = &view.View{Measure: fds, Aggregation: view.Sum(), TagKeys: []tag.Key{scopeTag}}
BlockedResourcesView = &view.View{
Measure: blockedResources,
Aggregation: view.Sum(),
TagKeys: []tag.Key{scopeTag, resourceTag},
}
)
var DefaultViews []*view.View = []*view.View{
ConnView,
PeerConnsView,
PeerConnsNegativeView,
FDsView,
StreamView,
PeerStreamsView,
PeerStreamNegativeView,
MemoryView,
PeerMemoryView,
PeerMemoryNegativeView,
BlockedResourcesView,
func MustRegisterWith(reg prometheus.Registerer) {
reg.MustRegister(
conns,
peerConns,
previousPeerConns,
streams,
peerStreams,
previousPeerStreams,
memory,
peerMemory,
previousPeerMemory,
connMemory,
previousConnMemory,
fds,
blockedResources,
)
}
// StatsTraceReporter reports stats on the resource manager using its traces.
@ -133,11 +169,17 @@ func NewStatsTraceReporter() (StatsTraceReporter, error) {
}
func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
ctx := context.Background()
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
r.consumeEventWithLabelSlice(evt, tags)
}
// Separate func so that we can test that this function does not allocate. The syncPool may allocate.
func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags *[]string) {
switch evt.Type {
case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
@ -148,10 +190,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
peerStreamsOut := int64(evt.StreamsOut)
if oldStreamsOut != peerStreamsOut {
if oldStreamsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreamsNegative.M(oldStreamsOut))
previousPeerStreamsOutbound.Observe(float64(oldStreamsOut))
}
if peerStreamsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreams.M(peerStreamsOut))
peerStreamsOutbound.Observe(float64(peerStreamsOut))
}
}
@ -159,46 +201,50 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
peerStreamsIn := int64(evt.StreamsIn)
if oldStreamsIn != peerStreamsIn {
if oldStreamsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreamsNegative.M(oldStreamsIn))
previousPeerStreamsInbound.Observe(float64(oldStreamsIn))
}
if peerStreamsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreams.M(peerStreamsIn))
peerStreamsInbound.Observe(float64(peerStreamsIn))
}
}
} else {
var tags []tag.Mutator
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" {
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto))
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
if evt.DeltaOut != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...),
streams.M(int64(evt.DeltaOut)),
)
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
} else {
// Not measuring service scope, connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
}
if evt.DeltaIn != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...),
streams.M(int64(evt.DeltaIn)),
)
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
} else {
// Not measuring service scope, connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
}
}
case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
@ -209,10 +255,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
connsOut := int64(evt.ConnsOut)
if oldConnsOut != connsOut {
if oldConnsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConnsNegative.M(oldConnsOut))
previousPeerConnsOutbound.Observe(float64(oldConnsOut))
}
if connsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConns.M(connsOut))
peerConnsOutbound.Observe(float64(connsOut))
}
}
@ -220,88 +266,72 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
connsIn := int64(evt.ConnsIn)
if oldConnsIn != connsIn {
if oldConnsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConnsNegative.M(oldConnsIn))
previousPeerConnsInbound.Observe(float64(oldConnsIn))
}
if connsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConns.M(connsIn))
peerConnsInbound.Observe(float64(connsIn))
}
}
} else {
var tags []tag.Mutator
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
} else if rcmgr.IsConnScope(evt.Name) {
if rcmgr.IsConnScope(evt.Name) {
// Not measuring this. I don't think it's useful.
break
} else {
// This could be a span
break
}
if evt.DeltaOut != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...),
conns.M(int64(evt.DeltaOut)),
)
}
if evt.DeltaIn != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...),
conns.M(int64(evt.DeltaIn)),
)
if rcmgr.IsSystemScope(evt.Name) {
connsInboundSystem.Set(float64(evt.ConnsIn))
connsOutboundSystem.Set(float64(evt.ConnsOut))
} else if rcmgr.IsTransientScope(evt.Name) {
connsInboundTransient.Set(float64(evt.ConnsIn))
connsOutboundTransient.Set(float64(evt.ConnsOut))
}
// Represents the delta in fds
if evt.Delta != 0 {
stats.RecordWithTags(
ctx,
tags,
fds.M(int64(evt.Delta)),
)
if rcmgr.IsSystemScope(evt.Name) {
fdsSystem.Set(float64(evt.FD))
} else if rcmgr.IsTransientScope(evt.Name) {
fdsTransient.Set(float64(evt.FD))
}
}
}
case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
stats.Record(ctx, peerMemoryNegative.M(oldMem))
previousPeerMemory.Observe(float64(oldMem))
}
if evt.Memory != 0 {
stats.Record(ctx, peerMemory.M(evt.Memory))
peerMemory.Observe(float64(evt.Memory))
}
}
} else if rcmgr.IsConnScope(evt.Name) {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
stats.Record(ctx, connMemoryNegative.M(oldMem))
previousConnMemory.Observe(float64(oldMem))
}
if evt.Memory != 0 {
stats.Record(ctx, connMemory.M(evt.Memory))
connMemory.Observe(float64(evt.Memory))
}
}
} else {
var tags []tag.Mutator
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" {
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc))
*tags = (*tags)[:0]
*tags = append(*tags, evt.Name, "")
memory.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto))
*tags = (*tags)[:0]
*tags = append(*tags, "protocol", proto)
memory.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
if evt.Delta != 0 {
stats.RecordWithTags(ctx, tags, memory.M(int64(evt.Delta)))
}
}
case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt:
@ -314,31 +344,40 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
resource = "memory"
}
scopeName := evt.Name
// Only the top scopeName. We don't want to get the peerid here.
scopeName := strings.SplitN(evt.Name, ":", 2)[0]
// Using indexes and slices to avoid allocating.
scopeSplitIdx := strings.IndexByte(scopeName, ':')
if scopeSplitIdx != -1 {
scopeName = evt.Name[0:scopeSplitIdx]
}
// Drop the connection or stream id
scopeName = strings.SplitN(scopeName, "-", 2)[0]
// If something else gets added here, make sure to update the size hint
// below when we make `tagsWithDir`.
tags := []tag.Mutator{tag.Upsert(scopeTag, scopeName), tag.Upsert(resourceTag, resource)}
idSplitIdx := strings.IndexByte(scopeName, '-')
if idSplitIdx != -1 {
scopeName = scopeName[0:idSplitIdx]
}
if evt.DeltaIn != 0 {
tagsWithDir := make([]tag.Mutator, 0, 3)
tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "inbound"))
tagsWithDir = append(tagsWithDir, tags...)
stats.RecordWithTags(ctx, tagsWithDir[0:], blockedResources.M(int64(1)))
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", scopeName, resource)
blockedResources.WithLabelValues(*tags...).Add(float64(evt.DeltaIn))
}
if evt.DeltaOut != 0 {
tagsWithDir := make([]tag.Mutator, 0, 3)
tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "outbound"))
tagsWithDir = append(tagsWithDir, tags...)
stats.RecordWithTags(ctx, tagsWithDir, blockedResources.M(int64(1)))
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", scopeName, resource)
blockedResources.WithLabelValues(*tags...).Add(float64(evt.DeltaOut))
}
if evt.Delta != 0 {
stats.RecordWithTags(ctx, tags, blockedResources.M(1))
if evt.Delta != 0 && resource == "connection" {
// This represents fds blocked
*tags = (*tags)[:0]
*tags = append(*tags, "", scopeName, "fd")
blockedResources.WithLabelValues(*tags...).Add(float64(evt.Delta))
} else if evt.Delta != 0 {
*tags = (*tags)[:0]
*tags = append(*tags, "", scopeName, resource)
blockedResources.WithLabelValues(*tags...).Add(float64(evt.Delta))
}
}
}

13
p2p/host/resource-manager/obs/stats_test.go

@ -1,15 +1,17 @@
package obs_test
import (
"sync"
"testing"
"time"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"go.opencensus.io/stats/view"
"github.com/prometheus/client_golang/prometheus"
)
var registerOnce sync.Once
func TestTraceReporterStartAndClose(t *testing.T) {
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{}))
if err != nil {
@ -26,10 +28,9 @@ func TestConsumeEvent(t *testing.T) {
Time: time.Now().Format(time.RFC3339Nano),
}
err := view.Register(obs.DefaultViews...)
if err != nil {
t.Fatal(err)
}
registerOnce.Do(func() {
obs.MustRegisterWith(prometheus.DefaultRegisterer)
})
str, err := obs.NewStatsTraceReporter()
if err != nil {

43
p2p/host/resource-manager/rcmgr.go

@ -517,40 +517,20 @@ func peerScopeName(p peer.ID) string {
return fmt.Sprintf("peer:%s", p)
}
// ParsePeerScopeName returns "" if name is not a peerScopeName
func ParsePeerScopeName(name string) peer.ID {
// PeerStrInScopeName returns "" if name is not a peerScopeName. Returns a string to avoid allocating a peer ID object
func PeerStrInScopeName(name string) string {
if !strings.HasPrefix(name, "peer:") || IsSpan(name) {
return ""
}
parts := strings.SplitN(name, "peer:", 2)
if len(parts) != 2 {
return ""
}
p, err := peer.Decode(parts[1])
if err != nil {
// Index to avoid allocating a new string
peerSplitIdx := strings.Index(name, "peer:")
if peerSplitIdx == -1 {
return ""
}
p := (name[peerSplitIdx+len("peer:"):])
return p
}
// ParseServiceScopeName returns the service name if name is a serviceScopeName.
// Otherwise returns ""
func ParseServiceScopeName(name string) string {
if strings.HasPrefix(name, "service:") && !IsSpan(name) {
if strings.Contains(name, "peer:") {
// This is a service peer scope
return ""
}
parts := strings.SplitN(name, ":", 2)
if len(parts) != 2 {
return ""
}
return parts[1]
}
return ""
}
// ParseProtocolScopeName returns the service name if name is a serviceScopeName.
// Otherwise returns ""
func ParseProtocolScopeName(name string) string {
@ -559,12 +539,13 @@ func ParseProtocolScopeName(name string) string {
// This is a protocol peer scope
return ""
}
parts := strings.SplitN(name, ":", 2)
if len(parts) != 2 {
return ("")
}
return parts[1]
// Index to avoid allocating a new string
separatorIdx := strings.Index(name, ":")
if separatorIdx == -1 {
return ""
}
return name[separatorIdx+1:]
}
return ""
}

Loading…
Cancel
Save