Browse Source

autonatv2: add server metrics for dial requests (#2848)

marco/bump-gorilla-websocket
sukun 4 months ago
committed by GitHub
parent
commit
ee0ac60d3f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 506
      dashboards/autonatv2/autonatv2.json
  2. 6
      p2p/host/basic/basic_host.go
  3. 97
      p2p/protocol/autonatv2/metrics.go
  4. 51
      p2p/protocol/autonatv2/metrics_test.go
  5. 8
      p2p/protocol/autonatv2/options.go
  6. 90
      p2p/protocol/autonatv2/server.go

506
dashboards/autonatv2/autonatv2.json

@ -0,0 +1,506 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"target": {
"limit": 100,
"matchAny": false,
"tags": [],
"type": "dashboard"
},
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 4,
"links": [],
"panels": [
{
"datasource": {
"type": "prometheus",
"uid": "${data_source}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic",
"seriesBy": "last"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "OK"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "green",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "E_DIAL_REFUSED"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "purple",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${data_source}"
},
"editorMode": "code",
"expr": "sum by (response_status) (increase(libp2p_autonatv2_requests_completed_total{instance=~\"$instance\", server_error=\"nil\"}[$__rate_interval]))\n",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dial Request by Response Status",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${data_source}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic",
"seriesBy": "last"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "OK"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "green",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "E_DIAL_REFUSED"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "purple",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "E_DIAL_ERROR"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "purple",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 2,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${data_source}"
},
"editorMode": "code",
"expr": "sum by (ip_or_dns_version, transport, dial_status) (increase(libp2p_autonatv2_requests_completed_total{instance=~\"$instance\", server_error=\"nil\", response_status=\"OK\"}[$__rate_interval]))\n",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dial Request by Dial Status",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${data_source}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic",
"seriesBy": "last"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "OK"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "green",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "E_DIAL_REFUSED"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "purple",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "E_DIAL_ERROR"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "purple",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 5,
"y": 8
},
"id": 3,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${data_source}"
},
"editorMode": "code",
"expr": "sum by (server_error) (increase(libp2p_autonatv2_requests_completed_total{instance=~\"$instance\", server_error!=\"nil\"}[$__rate_interval]))\n",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dial Request Errors",
"type": "timeseries"
}
],
"refresh": "",
"schemaVersion": 39,
"tags": [],
"templating": {
"list": [
{
"allValue": "",
"current": {
"selected": true,
"text": [
"All"
],
"value": [
"$__all"
]
},
"definition": "label_values(up,instance)",
"hide": 0,
"includeAll": true,
"label": "instance",
"multi": true,
"name": "instance",
"options": [],
"query": {
"qryType": 1,
"query": "label_values(up,instance)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"sort": 0,
"type": "query"
},
{
"hide": 0,
"includeAll": false,
"label": "",
"multi": false,
"name": "data_source",
"options": [],
"query": "prometheus",
"queryValue": "",
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"type": "datasource"
}
]
},
"time": {
"from": "now-1h",
"to": "now"
},
"timeRangeUpdatedDuringEditOrView": false,
"timepicker": {},
"timezone": "browser",
"title": "go-libp2p autoNATv2",
"uid": "cdpusyp3xtfcwa",
"version": 1,
"weekStart": ""
}

6
p2p/host/basic/basic_host.go

@ -316,7 +316,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}
if opts.EnableAutoNATv2 {
h.autonatv2, err = autonatv2.New(h, opts.AutoNATv2Dialer)
var mt autonatv2.MetricsTracer
if opts.EnableMetrics {
mt = autonatv2.NewMetricsTracer(opts.PrometheusRegisterer)
}
h.autonatv2, err = autonatv2.New(h, opts.AutoNATv2Dialer, autonatv2.WithMetricsTracer(mt))
if err != nil {
return nil, fmt.Errorf("failed to create autonatv2: %w", err)
}

97
p2p/protocol/autonatv2/metrics.go

@ -0,0 +1,97 @@
package autonatv2
import (
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2/pb"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
)
type MetricsTracer interface {
CompletedRequest(EventDialRequestCompleted)
}
const metricNamespace = "libp2p_autonatv2"
var (
requestsCompleted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "requests_completed_total",
Help: "Requests Completed",
},
[]string{"server_error", "response_status", "dial_status", "dial_data_required", "ip_or_dns_version", "transport"},
)
)
type metricsTracer struct {
}
func NewMetricsTracer(reg prometheus.Registerer) MetricsTracer {
metricshelper.RegisterCollectors(reg, requestsCompleted)
return &metricsTracer{}
}
func (m *metricsTracer) CompletedRequest(e EventDialRequestCompleted) {
labels := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(labels)
errStr := getErrString(e.Error)
dialData := "false"
if e.DialDataRequired {
dialData = "true"
}
var ip, transport string
if e.DialedAddr != nil {
ip = getIPOrDNSVersion(e.DialedAddr)
transport = metricshelper.GetTransport(e.DialedAddr)
}
*labels = append(*labels,
errStr,
pb.DialResponse_ResponseStatus_name[int32(e.ResponseStatus)],
pb.DialStatus_name[int32(e.DialStatus)],
dialData,
ip,
transport,
)
requestsCompleted.WithLabelValues(*labels...).Inc()
}
func getIPOrDNSVersion(a ma.Multiaddr) string {
if a == nil {
return ""
}
res := "unknown"
ma.ForEach(a, func(c ma.Component) bool {
switch c.Protocol().Code {
case ma.P_IP4:
res = "ip4"
case ma.P_IP6:
res = "ip6"
case ma.P_DNS, ma.P_DNSADDR:
res = "dns"
case ma.P_DNS4:
res = "dns4"
case ma.P_DNS6:
res = "dns6"
}
return false
})
return res
}
func getErrString(e error) string {
var errStr string
switch e {
case nil:
errStr = "nil"
case errBadRequest, errDialDataRefused, errResourceLimitExceeded:
errStr = e.Error()
default:
errStr = "other"
}
return errStr
}

51
p2p/protocol/autonatv2/metrics_test.go

@ -0,0 +1,51 @@
package autonatv2
import (
"errors"
"math/rand"
"testing"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2/pb"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
)
func TestMetricsNoAllocNoCover(t *testing.T) {
mt := NewMetricsTracer(prometheus.DefaultRegisterer)
respStatuses := []pb.DialResponse_ResponseStatus{
pb.DialResponse_E_DIAL_REFUSED,
pb.DialResponse_OK,
}
dialStatuses := []pb.DialStatus{
pb.DialStatus_OK,
pb.DialStatus_E_DIAL_BACK_ERROR,
}
errs := []error{
nil,
errBadRequest,
errDialDataRefused,
errors.New("write failed"),
}
addrs := []ma.Multiaddr{
nil,
ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1"),
ma.StringCast("/ip4/1.1.1.1/tcp/1/"),
}
tests := map[string]func(){
"CompletedRequest": func() {
mt.CompletedRequest(EventDialRequestCompleted{
Error: errs[rand.Intn(len(errs))],
ResponseStatus: respStatuses[rand.Intn(len(respStatuses))],
DialStatus: dialStatuses[rand.Intn(len(dialStatuses))],
DialDataRequired: rand.Intn(2) == 1,
DialedAddr: addrs[rand.Intn(len(addrs))],
})
},
}
for method, f := range tests {
allocs := testing.AllocsPerRun(10000, f)
if allocs > 0 {
t.Fatalf("%s alloc test failed expected 0 received %0.2f", method, allocs)
}
}
}

8
p2p/protocol/autonatv2/options.go

@ -11,6 +11,7 @@ type autoNATSettings struct {
dataRequestPolicy dataRequestPolicyFunc
now func() time.Time
amplificatonAttackPreventionDialWait time.Duration
metricsTracer MetricsTracer
}
func defaultSettings() *autoNATSettings {
@ -36,6 +37,13 @@ func WithServerRateLimit(rpm, perPeerRPM, dialDataRPM int) AutoNATOption {
}
}
func WithMetricsTracer(m MetricsTracer) AutoNATOption {
return func(s *autoNATSettings) error {
s.metricsTracer = m
return nil
}
}
func withDataRequestPolicy(drp dataRequestPolicyFunc) AutoNATOption {
return func(s *autoNATSettings) error {
s.dataRequestPolicy = drp

90
p2p/protocol/autonatv2/server.go

@ -2,6 +2,7 @@ package autonatv2
import (
"context"
"errors"
"fmt"
"io"
"sync"
@ -21,8 +22,22 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)
var (
errResourceLimitExceeded = errors.New("resource limit exceeded")
errBadRequest = errors.New("bad request")
errDialDataRefused = errors.New("dial data refused")
)
type dataRequestPolicyFunc = func(s network.Stream, dialAddr ma.Multiaddr) bool
type EventDialRequestCompleted struct {
Error error
ResponseStatus pb.DialResponse_ResponseStatus
DialStatus pb.DialStatus
DialDataRequired bool
DialedAddr ma.Multiaddr
}
// server implements the AutoNATv2 server.
// It can ask client to provide dial data before attempting the requested dial.
// It rate limits requests on a global level, per peer level and on whether the request requires dial data.
@ -35,6 +50,7 @@ type server struct {
// dial data. It is set to amplification attack prevention by default.
dialDataRequestPolicy dataRequestPolicyFunc
amplificatonAttackPreventionDialWait time.Duration
metricsTracer MetricsTracer
// for tests
now func() time.Time
@ -54,7 +70,8 @@ func newServer(host, dialer host.Host, s *autoNATSettings) *server {
DialDataRPM: s.serverDialDataRPM,
now: s.now,
},
now: s.now,
now: s.now,
metricsTracer: s.metricsTracer,
}
}
@ -71,16 +88,27 @@ func (as *server) Close() {
// handleDialRequest is the dial-request protocol stream handler
func (as *server) handleDialRequest(s network.Stream) {
evt := as.serveDialRequest(s)
log.Debugf("completed dial-request from %s, response status: %s, dial status: %s, err: %s",
s.Conn().RemotePeer(), evt.ResponseStatus, evt.DialStatus, evt.Error)
if as.metricsTracer != nil {
as.metricsTracer.CompletedRequest(evt)
}
}
func (as *server) serveDialRequest(s network.Stream) EventDialRequestCompleted {
if err := s.Scope().SetService(ServiceName); err != nil {
s.Reset()
log.Debugf("failed to attach stream to service %s: %w", ServiceName, err)
return
log.Debugf("failed to attach stream to %s service: %w", ServiceName, err)
return EventDialRequestCompleted{
Error: errors.New("failed to attach stream to autonat-v2"),
}
}
if err := s.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
s.Reset()
log.Debugf("failed to reserve memory for stream %s: %w", DialProtocol, err)
return
return EventDialRequestCompleted{Error: errResourceLimitExceeded}
}
defer s.Scope().ReleaseMemory(maxMsgSize)
@ -106,10 +134,13 @@ func (as *server) handleDialRequest(s network.Stream) {
if err := w.WriteMsg(&msg); err != nil {
s.Reset()
log.Debugf("failed to write request rejected response to %s: %s", p, err)
return
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_E_REQUEST_REJECTED,
Error: fmt.Errorf("write failed: %w", err),
}
}
log.Debugf("rejected request from %s: rate limit exceeded", p)
return
return EventDialRequestCompleted{ResponseStatus: pb.DialResponse_E_REQUEST_REJECTED}
}
defer as.limiter.CompleteRequest(p)
@ -117,12 +148,12 @@ func (as *server) handleDialRequest(s network.Stream) {
if err := r.ReadMsg(&msg); err != nil {
s.Reset()
log.Debugf("failed to read request from %s: %s", p, err)
return
return EventDialRequestCompleted{Error: fmt.Errorf("read failed: %w", err)}
}
if msg.GetDialRequest() == nil {
s.Reset()
log.Debugf("invalid message type from %s: %T expected: DialRequest", p, msg.Msg)
return
return EventDialRequestCompleted{Error: errBadRequest}
}
// parse peer's addresses
@ -158,9 +189,14 @@ func (as *server) handleDialRequest(s network.Stream) {
if err := w.WriteMsg(&msg); err != nil {
s.Reset()
log.Debugf("failed to write dial refused response to %s: %s", p, err)
return
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_E_DIAL_REFUSED,
Error: fmt.Errorf("write failed: %w", err),
}
}
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_E_DIAL_REFUSED,
}
return
}
nonce := msg.GetDialRequest().Nonce
@ -177,17 +213,28 @@ func (as *server) handleDialRequest(s network.Stream) {
if err := w.WriteMsg(&msg); err != nil {
s.Reset()
log.Debugf("failed to write request rejected response to %s: %s", p, err)
return
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_E_REQUEST_REJECTED,
Error: fmt.Errorf("write failed: %w", err),
DialDataRequired: true,
}
}
log.Debugf("rejected request from %s: rate limit exceeded", p)
return
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_E_REQUEST_REJECTED,
DialDataRequired: true,
}
}
if isDialDataRequired {
if err := getDialData(w, s, &msg, addrIdx); err != nil {
s.Reset()
log.Debugf("%s refused dial data request: %s", p, err)
return
return EventDialRequestCompleted{
Error: errDialDataRefused,
DialDataRequired: true,
DialedAddr: dialAddr,
}
}
// wait for a bit to prevent thundering herd style attacks on a victim
waitTime := time.Duration(rand.Intn(int(as.amplificatonAttackPreventionDialWait) + 1)) // the range is [0, n)
@ -197,7 +244,7 @@ func (as *server) handleDialRequest(s network.Stream) {
case <-ctx.Done():
s.Reset()
log.Debugf("rejecting request without dialing: %s %p ", p, ctx.Err())
return
return EventDialRequestCompleted{Error: ctx.Err(), DialDataRequired: true, DialedAddr: dialAddr}
case <-t.C:
}
}
@ -215,7 +262,20 @@ func (as *server) handleDialRequest(s network.Stream) {
if err := w.WriteMsg(&msg); err != nil {
s.Reset()
log.Debugf("failed to write response to %s: %s", p, err)
return
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_OK,
DialStatus: dialStatus,
Error: fmt.Errorf("write failed: %w", err),
DialDataRequired: isDialDataRequired,
DialedAddr: dialAddr,
}
}
return EventDialRequestCompleted{
ResponseStatus: pb.DialResponse_OK,
DialStatus: dialStatus,
Error: nil,
DialDataRequired: isDialDataRequired,
DialedAddr: dialAddr,
}
}

Loading…
Cancel
Save