Browse Source

metrics: add options to disable metrics and to set Prometheus registerer (#2116)

* provided a WithRegisterer option for metrics

* provide a libp2p.Option to setup metrics
update-quic-go-v033
Sukun 2 years ago
committed by GitHub
parent
commit
a491074d89
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      config/config.go
  2. 10
      defaults.go
  3. 26
      options.go
  4. 51
      p2p/host/autonat/metrics.go
  5. 49
      p2p/host/basic/basic_host.go
  6. 28
      p2p/host/eventbus/basic_metrics.go
  7. 20
      p2p/metricshelper/registerer.go
  8. 32
      p2p/metricshelper/registerer_test.go
  9. 37
      p2p/net/swarm/swarm_metrics.go
  10. 35
      p2p/protocol/identify/metrics.go

37
config/config.go

@ -31,6 +31,7 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse" "github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/prometheus/client_golang/prometheus"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns" madns "github.com/multiformats/go-multiaddr-dns"
@ -117,6 +118,9 @@ type Config struct {
EnableHolePunching bool EnableHolePunching bool
HolePunchingOptions []holepunch.Option HolePunchingOptions []holepunch.Option
DisableMetrics bool
PrometheusRegisterer prometheus.Registerer
} }
func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) { func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
@ -168,7 +172,8 @@ func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
opts = append(opts, swarm.WithMultiaddrResolver(cfg.MultiaddrResolver)) opts = append(opts, swarm.WithMultiaddrResolver(cfg.MultiaddrResolver))
} }
if enableMetrics { if enableMetrics {
opts = append(opts, swarm.WithMetricsTracer(swarm.NewMetricsTracer())) opts = append(opts,
swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(cfg.PrometheusRegisterer))))
} }
// TODO: Make the swarm implementation configurable. // TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...) return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
@ -279,22 +284,24 @@ func (cfg *Config) addTransports(h host.Host) error {
// //
// This function consumes the config. Do not reuse it (really!). // This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) { func (cfg *Config) NewNode() (host.Host, error) {
swrm, err := cfg.makeSwarm(true) swrm, err := cfg.makeSwarm(!cfg.DisableMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }
h, err := bhost.NewHost(swrm, &bhost.HostOpts{ h, err := bhost.NewHost(swrm, &bhost.HostOpts{
ConnManager: cfg.ConnManager, ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory, AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager, NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing, EnablePing: !cfg.DisablePing,
UserAgent: cfg.UserAgent, UserAgent: cfg.UserAgent,
ProtocolVersion: cfg.ProtocolVersion, ProtocolVersion: cfg.ProtocolVersion,
EnableHolePunching: cfg.EnableHolePunching, EnableHolePunching: cfg.EnableHolePunching,
HolePunchingOptions: cfg.HolePunchingOptions, HolePunchingOptions: cfg.HolePunchingOptions,
EnableRelayService: cfg.EnableRelayService, EnableRelayService: cfg.EnableRelayService,
RelayServiceOpts: cfg.RelayServiceOpts, RelayServiceOpts: cfg.RelayServiceOpts,
EnableMetrics: !cfg.DisableMetrics,
PrometheusRegisterer: cfg.PrometheusRegisterer,
}) })
if err != nil { if err != nil {
swrm.Close() swrm.Close()
@ -354,7 +361,11 @@ func (cfg *Config) NewNode() (host.Host, error) {
autonat.UsingAddresses(func() []ma.Multiaddr { autonat.UsingAddresses(func() []ma.Multiaddr {
return addrF(h.AllAddrs()) return addrF(h.AllAddrs())
}), }),
autonat.WithMetricsTracer(autonat.NewMetricsTracer()), }
if !cfg.DisableMetrics {
autonatOpts = append(autonatOpts,
autonat.WithMetricsTracer(
autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer))))
} }
if cfg.AutoNATConfig.ThrottleInterval != 0 { if cfg.AutoNATConfig.ThrottleInterval != 0 {
autonatOpts = append(autonatOpts, autonatOpts = append(autonatOpts,

10
defaults.go

@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/libp2p/go-libp2p/p2p/transport/tcp"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/prometheus/client_golang/prometheus"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns" madns "github.com/multiformats/go-multiaddr-dns"
@ -129,6 +130,11 @@ var DefaultMultiaddrResolver = func(cfg *Config) error {
return cfg.Apply(MultiaddrResolver(madns.DefaultResolver)) return cfg.Apply(MultiaddrResolver(madns.DefaultResolver))
} }
// DefaultPrometheusRegisterer configures libp2p to use the default registerer
var DefaultPrometheusRegisterer = func(cfg *Config) error {
return cfg.Apply(PrometheusRegisterer(prometheus.DefaultRegisterer))
}
// Complete list of default options and when to fallback on them. // Complete list of default options and when to fallback on them.
// //
// Please *DON'T* specify default options any other way. Putting this all here // Please *DON'T* specify default options any other way. Putting this all here
@ -181,6 +187,10 @@ var defaults = []struct {
fallback: func(cfg *Config) bool { return cfg.MultiaddrResolver == nil }, fallback: func(cfg *Config) bool { return cfg.MultiaddrResolver == nil },
opt: DefaultMultiaddrResolver, opt: DefaultMultiaddrResolver,
}, },
{
fallback: func(cfg *Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil },
opt: DefaultPrometheusRegisterer,
},
} }
// Defaults configures libp2p to use the default options. Can be combined with // Defaults configures libp2p to use the default options. Can be combined with

26
options.go

@ -27,6 +27,7 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse" "github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/prometheus/client_golang/prometheus"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns" madns "github.com/multiformats/go-multiaddr-dns"
@ -548,3 +549,28 @@ func WithDialTimeout(t time.Duration) Option {
return nil return nil
} }
} }
// DisableMetrics configures libp2p to disable prometheus metrics
func DisableMetrics() Option {
return func(cfg *Config) error {
cfg.DisableMetrics = true
return nil
}
}
// PrometheusRegisterer configures libp2p to use reg as the Registerer for all metrics subsystems
func PrometheusRegisterer(reg prometheus.Registerer) Option {
return func(cfg *Config) error {
if cfg.DisableMetrics {
return errors.New("cannot set registerer when metrics are disabled")
}
if cfg.PrometheusRegisterer != nil {
return errors.New("registerer already set")
}
if reg == nil {
return errors.New("registerer cannot be nil")
}
cfg.PrometheusRegisterer = reg
return nil
}
}

51
p2p/host/autonat/metrics.go

@ -1,7 +1,6 @@
package autonat package autonat
import ( import (
"sync"
"time" "time"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
@ -58,20 +57,15 @@ var (
Help: "Time of next probe", Help: "Time of next probe",
}, },
) )
) collectors = []prometheus.Collector{
var initMetricsOnce sync.Once
func initMetrics(reg prometheus.Registerer) {
reg.MustRegister(
reachabilityStatus, reachabilityStatus,
reachabilityStatusConfidence, reachabilityStatusConfidence,
receivedDialResponseTotal, receivedDialResponseTotal,
outgoingDialResponseTotal, outgoingDialResponseTotal,
outgoingDialRefusedTotal, outgoingDialRefusedTotal,
nextProbeTimestamp, nextProbeTimestamp,
) }
} )
type MetricsTracer interface { type MetricsTracer interface {
ReachabilityStatus(status network.Reachability) ReachabilityStatus(status network.Reachability)
@ -107,14 +101,32 @@ const (
no_valid_address = "no valid address" no_valid_address = "no valid address"
) )
type metricsTracer struct{}
var _ MetricsTracer = &metricsTracer{}
type metricsTracerSetting struct { type metricsTracerSetting struct {
reg prometheus.Registerer reg prometheus.Registerer
} }
type metricsTracer struct { type MetricsTracerOption func(*metricsTracerSetting)
func WithRegisterer(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) {
if reg != nil {
s.reg = reg
}
}
} }
var _ MetricsTracer = &metricsTracer{} func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
setting := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{}
}
func (mt *metricsTracer) ReachabilityStatus(status network.Reachability) { func (mt *metricsTracer) ReachabilityStatus(status network.Reachability) {
reachabilityStatus.Set(float64(status)) reachabilityStatus.Set(float64(status))
@ -148,20 +160,3 @@ func (mt *metricsTracer) OutgoingDialRefused(reason string) {
func (mt *metricsTracer) NextProbeTime(t time.Time) { func (mt *metricsTracer) NextProbeTime(t time.Time) {
nextProbeTimestamp.Set(float64(t.Unix())) nextProbeTimestamp.Set(float64(t.Unix()))
} }
type MetricsTracerOption = func(*metricsTracerSetting)
func MustRegisterWith(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) {
s.reg = reg
}
}
func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
settings := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts {
opt(settings)
}
initMetricsOnce.Do(func() { initMetrics(settings.reg) })
return &metricsTracer{}
}

49
p2p/host/basic/basic_host.go

@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/prometheus/client_golang/prometheus"
"github.com/libp2p/go-netroute" "github.com/libp2p/go-netroute"
@ -151,19 +152,32 @@ type HostOpts struct {
EnableHolePunching bool EnableHolePunching bool
// HolePunchingOptions are options for the hole punching service // HolePunchingOptions are options for the hole punching service
HolePunchingOptions []holepunch.Option HolePunchingOptions []holepunch.Option
// EnableMetrics enables the metrics subsystems
EnableMetrics bool
// PrometheusRegisterer is the PrometheusRegisterer used for metrics
PrometheusRegisterer prometheus.Registerer
} }
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer())) if opts == nil {
opts = &HostOpts{}
}
var eventBus event.Bus
if opts.EnableMetrics {
eventBus = eventbus.NewBus(
eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(opts.PrometheusRegisterer))))
} else {
eventBus = eventbus.NewBus()
}
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus) psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus)
if err != nil { if err != nil {
return nil, err return nil, err
} }
hostCtx, cancel := context.WithCancel(context.Background()) hostCtx, cancel := context.WithCancel(context.Background())
if opts == nil {
opts = &HostOpts{}
}
h := &BasicHost{ h := &BasicHost{
network: n, network: n,
@ -223,23 +237,22 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.mux = opts.MultistreamMuxer h.mux = opts.MultistreamMuxer
} }
idOpts := []identify.Option{
identify.UserAgent(opts.UserAgent),
identify.ProtocolVersion(opts.ProtocolVersion),
}
// we can't set this as a default above because it depends on the *BasicHost. // we can't set this as a default above because it depends on the *BasicHost.
if h.disableSignedPeerRecord { if h.disableSignedPeerRecord {
h.ids, err = identify.NewIDService( idOpts = append(idOpts, identify.DisableSignedPeerRecord())
h,
identify.UserAgent(opts.UserAgent),
identify.ProtocolVersion(opts.ProtocolVersion),
identify.DisableSignedPeerRecord(),
identify.WithMetricsTracer(identify.NewMetricsTracer()),
)
} else {
h.ids, err = identify.NewIDService(
h,
identify.UserAgent(opts.UserAgent),
identify.ProtocolVersion(opts.ProtocolVersion),
identify.WithMetricsTracer(identify.NewMetricsTracer()),
)
} }
if opts.EnableMetrics {
idOpts = append(idOpts,
identify.WithMetricsTracer(
identify.NewMetricsTracer(identify.WithRegisterer(opts.PrometheusRegisterer))))
}
h.ids, err = identify.NewIDService(h, idOpts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create Identify service: %s", err) return nil, fmt.Errorf("failed to create Identify service: %s", err)
} }

28
p2p/host/eventbus/basic_metrics.go

@ -3,7 +3,6 @@ package eventbus
import ( import (
"reflect" "reflect"
"strings" "strings"
"sync"
"github.com/libp2p/go-libp2p/p2p/metricshelper" "github.com/libp2p/go-libp2p/p2p/metricshelper"
@ -53,6 +52,13 @@ var (
}, },
[]string{"subscriber_name"}, []string{"subscriber_name"},
) )
collectors = []prometheus.Collector{
eventsEmitted,
totalSubscribers,
subscriberQueueLength,
subscriberQueueFull,
subscriberEventQueued,
}
) )
// MetricsTracer tracks metrics for the eventbus subsystem // MetricsTracer tracks metrics for the eventbus subsystem
@ -81,30 +87,26 @@ type metricsTracer struct{}
var _ MetricsTracer = &metricsTracer{} var _ MetricsTracer = &metricsTracer{}
type MetricsTracerOption = func(*metricsTracerSetting)
type metricsTracerSetting struct { type metricsTracerSetting struct {
reg prometheus.Registerer reg prometheus.Registerer
} }
var initMetricsOnce sync.Once type MetricsTracerOption func(*metricsTracerSetting)
func initMetrics(reg prometheus.Registerer) {
reg.MustRegister(eventsEmitted, totalSubscribers, subscriberQueueLength, subscriberQueueFull, subscriberEventQueued)
}
func MustRegisterWith(reg prometheus.Registerer) MetricsTracerOption { func WithRegisterer(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) { return func(s *metricsTracerSetting) {
s.reg = reg if reg != nil {
s.reg = reg
}
} }
} }
func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer { func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
settings := &metricsTracerSetting{reg: prometheus.DefaultRegisterer} setting := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts { for _, opt := range opts {
opt(settings) opt(setting)
} }
initMetricsOnce.Do(func() { initMetrics(settings.reg) }) metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{} return &metricsTracer{}
} }

20
p2p/metricshelper/registerer.go

@ -0,0 +1,20 @@
package metricshelper
import (
"errors"
"github.com/prometheus/client_golang/prometheus"
)
// RegisterCollectors registers the collectors with reg ignoring
// reregistration error and panics on any other error
func RegisterCollectors(reg prometheus.Registerer, collectors ...prometheus.Collector) {
for _, c := range collectors {
err := reg.Register(c)
if err != nil {
if ok := errors.As(err, &prometheus.AlreadyRegisteredError{}); !ok {
panic(err)
}
}
}
}

32
p2p/metricshelper/registerer_test.go

@ -0,0 +1,32 @@
package metricshelper
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
func TestRegisterCollectors(t *testing.T) {
reg := prometheus.NewRegistry()
c1 := prometheus.NewCounter(
prometheus.CounterOpts{
Name: "counter",
},
)
c2 := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "test",
Name: "gauge",
},
)
// c3 == c1
c3 := prometheus.NewCounter(
prometheus.CounterOpts{
Name: "counter",
},
)
require.NotPanics(t, func() { RegisterCollectors(reg, c1, c2) })
require.NotPanics(t, func() { RegisterCollectors(reg, c3) }, "should not panic on duplicate registration")
}

37
p2p/net/swarm/swarm_metrics.go

@ -5,7 +5,6 @@ import (
"errors" "errors"
"net" "net"
"strings" "strings"
"sync"
"time" "time"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
@ -70,14 +69,16 @@ var (
}, },
[]string{"transport", "security", "muxer", "ip_version"}, []string{"transport", "security", "muxer", "ip_version"},
) )
collectors = []prometheus.Collector{
connsOpened,
keyTypes,
connsClosed,
dialError,
connDuration,
connHandshakeLatency,
}
) )
var initMetricsOnce sync.Once
func initMetrics() {
prometheus.MustRegister(connsOpened, keyTypes, connsClosed, dialError, connDuration, connHandshakeLatency)
}
type MetricsTracer interface { type MetricsTracer interface {
OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState, ma.Multiaddr) OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState, ma.Multiaddr)
ClosedConnection(network.Direction, time.Duration, network.ConnectionState, ma.Multiaddr) ClosedConnection(network.Direction, time.Duration, network.ConnectionState, ma.Multiaddr)
@ -89,8 +90,26 @@ type metricsTracer struct{}
var _ MetricsTracer = &metricsTracer{} var _ MetricsTracer = &metricsTracer{}
func NewMetricsTracer() *metricsTracer { type metricsTracerSetting struct {
initMetricsOnce.Do(initMetrics) reg prometheus.Registerer
}
type MetricsTracerOption func(*metricsTracerSetting)
func WithRegisterer(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) {
if reg != nil {
s.reg = reg
}
}
}
func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
setting := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{} return &metricsTracer{}
} }

35
p2p/protocol/identify/metrics.go

@ -1,8 +1,6 @@
package identify package identify
import ( import (
"sync"
"github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/metricshelper" "github.com/libp2p/go-libp2p/p2p/metricshelper"
@ -37,14 +35,13 @@ var (
}, },
[]string{"dir"}, []string{"dir"},
) )
collectors = []prometheus.Collector{
pushesTriggered,
identify,
identifyPush,
}
) )
var initMetricsOnce sync.Once
func initMetrics() {
prometheus.MustRegister(pushesTriggered, identify, identifyPush)
}
type MetricsTracer interface { type MetricsTracer interface {
TriggeredPushes(event any) TriggeredPushes(event any)
Identify(network.Direction) Identify(network.Direction)
@ -55,8 +52,26 @@ type metricsTracer struct{}
var _ MetricsTracer = &metricsTracer{} var _ MetricsTracer = &metricsTracer{}
func NewMetricsTracer() MetricsTracer { type metricsTracerSetting struct {
initMetricsOnce.Do(initMetrics) reg prometheus.Registerer
}
type MetricsTracerOption func(*metricsTracerSetting)
func WithRegisterer(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) {
if reg != nil {
s.reg = reg
}
}
}
func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
setting := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{} return &metricsTracer{}
} }

Loading…
Cancel
Save