Browse Source

autorelay: add metrics (#2185)

* autorelay: add metrics
metrics added:
relay finder status

reservation request outcomes
current reservations

candidate circuit v2 support
current candidates

relay addresses updated
num relay address

scheduled work times

* autorelay: fix refresh reservations bug

* fix max value hack

* improve tracking errors in reservation requests

* fix config-query in grafana

* add candidate loop state panel

* fix logging

* reset metrics on relayfinder stop

* update dashboard

* update dashboard
0270-changelog
Sukun 2 years ago
committed by GitHub
parent
commit
15ec149403
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      config/config.go
  2. 1048
      dashboards/autorelay/autorelay.json
  3. 12
      p2p/host/autorelay/autorelay.go
  4. 373
      p2p/host/autorelay/metrics.go
  5. 59
      p2p/host/autorelay/metrics_noalloc_test.go
  6. 10
      p2p/host/autorelay/options.go
  7. 114
      p2p/host/autorelay/relay_finder.go
  8. 52
      p2p/protocol/circuitv2/client/reservation.go
  9. 23
      p2p/protocol/circuitv2/client/reservation_test.go

6
config/config.go

@ -354,6 +354,12 @@ func (cfg *Config) NewNode() (host.Host, error) {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled")
}
if !cfg.DisableMetrics {
mt := autorelay.WithMetricsTracer(
autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer)))
mtOpts := []autorelay.Option{mt}
cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...)
}
ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {

1048
dashboards/autorelay/autorelay.json

File diff suppressed because it is too large

12
p2p/host/autorelay/autorelay.go

@ -2,6 +2,7 @@ package autorelay
import (
"context"
"errors"
"sync"
"github.com/libp2p/go-libp2p/core/event"
@ -30,6 +31,8 @@ type AutoRelay struct {
host host.Host
addrsF basic.AddrsFactory
metricsTracer MetricsTracer
}
func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
@ -47,6 +50,7 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}
bhost.AddrsFactory = r.hostAddrs
return r, nil
@ -80,11 +84,17 @@ func (r *AutoRelay) background() {
evt := ev.(event.EvtLocalReachabilityChanged)
switch evt.Reachability {
case network.ReachabilityPrivate, network.ReachabilityUnknown:
if err := r.relayFinder.Start(); err != nil {
err := r.relayFinder.Start()
if errors.Is(err, errAlreadyRunning) {
log.Debug("tried to start already running relay finder")
} else if err != nil {
log.Errorw("failed to start relay finder", "error", err)
} else {
r.metricsTracer.RelayFinderStatus(true)
}
case network.ReachabilityPublic:
r.relayFinder.Stop()
r.metricsTracer.RelayFinderStatus(false)
}
r.mx.Lock()
r.status = evt.Reachability

373
p2p/host/autorelay/metrics.go

@ -0,0 +1,373 @@
package autorelay
import (
"errors"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/prometheus/client_golang/prometheus"
)
const metricNamespace = "libp2p_autorelay"
var (
status = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "status",
Help: "relay finder active",
})
reservationsOpenedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "reservations_opened_total",
Help: "Reservations Opened",
},
)
reservationsClosedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "reservations_closed_total",
Help: "Reservations Closed",
},
)
reservationRequestsOutcomeTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "reservation_requests_outcome_total",
Help: "Reservation Request Outcome",
},
[]string{"request_type", "outcome"},
)
relayAddressesUpdatedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "relay_addresses_updated_total",
Help: "Relay Addresses Updated Count",
},
)
relayAddressesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "relay_addresses_count",
Help: "Relay Addresses Count",
},
)
candidatesCircuitV2SupportTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "candidates_circuit_v2_support_total",
Help: "Candidiates supporting circuit v2",
},
[]string{"support"},
)
candidatesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "candidates_total",
Help: "Candidates Total",
},
[]string{"type"},
)
candLoopState = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "candidate_loop_state",
Help: "Candidate Loop State",
},
)
scheduledWorkTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "scheduled_work_time",
Help: "Scheduled Work Times",
},
[]string{"work_type"},
)
desiredReservations = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "desired_reservations",
Help: "Desired Reservations",
},
)
collectors = []prometheus.Collector{
status,
reservationsOpenedTotal,
reservationsClosedTotal,
reservationRequestsOutcomeTotal,
relayAddressesUpdatedTotal,
relayAddressesCount,
candidatesCircuitV2SupportTotal,
candidatesTotal,
candLoopState,
scheduledWorkTime,
desiredReservations,
}
)
type candidateLoopState int
const (
peerSourceRateLimited candidateLoopState = iota
waitingOnPeerChan
waitingForTrigger
stopped
)
// MetricsTracer is the interface for tracking metrics for autorelay
type MetricsTracer interface {
RelayFinderStatus(isActive bool)
ReservationEnded(cnt int)
ReservationOpened(cnt int)
ReservationRequestFinished(isRefresh bool, err error)
RelayAddressCount(int)
RelayAddressUpdated()
CandidateChecked(supportsCircuitV2 bool)
CandidateAdded(cnt int)
CandidateRemoved(cnt int)
CandidateLoopState(state candidateLoopState)
ScheduledWorkUpdated(scheduledWork *scheduledWorkTimes)
DesiredReservations(int)
}
type metricsTracer struct{}
var _ MetricsTracer = &metricsTracer{}
type metricsTracerSetting struct {
reg prometheus.Registerer
}
type MetricsTracerOption func(*metricsTracerSetting)
func WithRegisterer(reg prometheus.Registerer) MetricsTracerOption {
return func(s *metricsTracerSetting) {
if reg != nil {
s.reg = reg
}
}
}
func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
setting := &metricsTracerSetting{reg: prometheus.DefaultRegisterer}
for _, opt := range opts {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
// Initialise these counters to 0 otherwise the first reservation requests aren't handled
// correctly when using promql increse function
reservationRequestsOutcomeTotal.WithLabelValues("refresh", "success")
reservationRequestsOutcomeTotal.WithLabelValues("new", "success")
candidatesCircuitV2SupportTotal.WithLabelValues("yes")
candidatesCircuitV2SupportTotal.WithLabelValues("no")
return &metricsTracer{}
}
func (mt *metricsTracer) RelayFinderStatus(isActive bool) {
if isActive {
status.Set(1)
} else {
status.Set(0)
}
}
func (mt *metricsTracer) ReservationEnded(cnt int) {
reservationsClosedTotal.Add(float64(cnt))
}
func (mt *metricsTracer) ReservationOpened(cnt int) {
reservationsOpenedTotal.Add(float64(cnt))
}
func (mt *metricsTracer) ReservationRequestFinished(isRefresh bool, err error) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
if isRefresh {
*tags = append(*tags, "refresh")
} else {
*tags = append(*tags, "new")
}
*tags = append(*tags, getReservationRequestStatus(err))
reservationRequestsOutcomeTotal.WithLabelValues(*tags...).Inc()
if !isRefresh && err == nil {
reservationsOpenedTotal.Inc()
}
}
func (mt *metricsTracer) RelayAddressUpdated() {
relayAddressesUpdatedTotal.Inc()
}
func (mt *metricsTracer) RelayAddressCount(cnt int) {
relayAddressesCount.Set(float64(cnt))
}
func (mt *metricsTracer) CandidateChecked(supportsCircuitV2 bool) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
if supportsCircuitV2 {
*tags = append(*tags, "yes")
} else {
*tags = append(*tags, "no")
}
candidatesCircuitV2SupportTotal.WithLabelValues(*tags...).Inc()
}
func (mt *metricsTracer) CandidateAdded(cnt int) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "added")
candidatesTotal.WithLabelValues(*tags...).Add(float64(cnt))
}
func (mt *metricsTracer) CandidateRemoved(cnt int) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "removed")
candidatesTotal.WithLabelValues(*tags...).Add(float64(cnt))
}
func (mt *metricsTracer) CandidateLoopState(state candidateLoopState) {
candLoopState.Set(float64(state))
}
func (mt *metricsTracer) ScheduledWorkUpdated(scheduledWork *scheduledWorkTimes) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "allowed peer source call")
scheduledWorkTime.WithLabelValues(*tags...).Set(float64(scheduledWork.nextAllowedCallToPeerSource.Unix()))
*tags = (*tags)[:0]
*tags = append(*tags, "reservation refresh")
scheduledWorkTime.WithLabelValues(*tags...).Set(float64(scheduledWork.nextRefresh.Unix()))
*tags = (*tags)[:0]
*tags = append(*tags, "clear backoff")
scheduledWorkTime.WithLabelValues(*tags...).Set(float64(scheduledWork.nextBackoff.Unix()))
*tags = (*tags)[:0]
*tags = append(*tags, "old candidate check")
scheduledWorkTime.WithLabelValues(*tags...).Set(float64(scheduledWork.nextOldCandidateCheck.Unix()))
}
func (mt *metricsTracer) DesiredReservations(cnt int) {
desiredReservations.Set(float64(cnt))
}
func getReservationRequestStatus(err error) string {
if err == nil {
return "success"
}
status := "err other"
var re client.ReservationError
if errors.As(err, &re) {
switch re.Status {
case pbv2.Status_CONNECTION_FAILED:
return "connection failed"
case pbv2.Status_MALFORMED_MESSAGE:
return "malformed message"
case pbv2.Status_RESERVATION_REFUSED:
return "reservation refused"
case pbv2.Status_PERMISSION_DENIED:
return "permission denied"
case pbv2.Status_RESOURCE_LIMIT_EXCEEDED:
return "resource limit exceeded"
}
}
return status
}
// wrappedMetricsTracer wraps MetricsTracer and ignores all calls when mt is nil
type wrappedMetricsTracer struct {
mt MetricsTracer
}
var _ MetricsTracer = &wrappedMetricsTracer{}
func (mt *wrappedMetricsTracer) RelayFinderStatus(isActive bool) {
if mt.mt != nil {
mt.mt.RelayFinderStatus(isActive)
}
}
func (mt *wrappedMetricsTracer) ReservationEnded(cnt int) {
if mt.mt != nil {
mt.mt.ReservationEnded(cnt)
}
}
func (mt *wrappedMetricsTracer) ReservationOpened(cnt int) {
if mt.mt != nil {
mt.mt.ReservationOpened(cnt)
}
}
func (mt *wrappedMetricsTracer) ReservationRequestFinished(isRefresh bool, err error) {
if mt.mt != nil {
mt.mt.ReservationRequestFinished(isRefresh, err)
}
}
func (mt *wrappedMetricsTracer) RelayAddressUpdated() {
if mt.mt != nil {
mt.mt.RelayAddressUpdated()
}
}
func (mt *wrappedMetricsTracer) RelayAddressCount(cnt int) {
if mt.mt != nil {
mt.mt.RelayAddressCount(cnt)
}
}
func (mt *wrappedMetricsTracer) CandidateChecked(supportsCircuitV2 bool) {
if mt.mt != nil {
mt.mt.CandidateChecked(supportsCircuitV2)
}
}
func (mt *wrappedMetricsTracer) CandidateAdded(cnt int) {
if mt.mt != nil {
mt.mt.CandidateAdded(cnt)
}
}
func (mt *wrappedMetricsTracer) CandidateRemoved(cnt int) {
if mt.mt != nil {
mt.mt.CandidateRemoved(cnt)
}
}
func (mt *wrappedMetricsTracer) ScheduledWorkUpdated(scheduledWork *scheduledWorkTimes) {
if mt.mt != nil {
mt.mt.ScheduledWorkUpdated(scheduledWork)
}
}
func (mt *wrappedMetricsTracer) DesiredReservations(cnt int) {
if mt.mt != nil {
mt.mt.DesiredReservations(cnt)
}
}
func (mt *wrappedMetricsTracer) CandidateLoopState(state candidateLoopState) {
if mt.mt != nil {
mt.mt.CandidateLoopState(state)
}
}

59
p2p/host/autorelay/metrics_noalloc_test.go

@ -0,0 +1,59 @@
//go:build nocover
package autorelay
import (
"math/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
)
func getRandScheduledWork() scheduledWorkTimes {
randTime := func() time.Time {
return time.Now().Add(time.Duration(rand.Intn(10)) * time.Second)
}
return scheduledWorkTimes{
leastFrequentInterval: 0,
nextRefresh: randTime(),
nextBackoff: randTime(),
nextOldCandidateCheck: randTime(),
nextAllowedCallToPeerSource: randTime(),
}
}
func TestMetricsNoAllocNoCover(t *testing.T) {
scheduledWork := []scheduledWorkTimes{}
for i := 0; i < 10; i++ {
scheduledWork = append(scheduledWork, getRandScheduledWork())
}
errs := []error{
client.ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE},
client.ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE},
nil,
}
tr := NewMetricsTracer()
tests := map[string]func(){
"RelayFinderStatus": func() { tr.RelayFinderStatus(rand.Intn(2) == 1) },
"ReservationEnded": func() { tr.ReservationEnded(rand.Intn(10)) },
"ReservationRequestFinished": func() { tr.ReservationRequestFinished(rand.Intn(2) == 1, errs[rand.Intn(len(errs))]) },
"RelayAddressCount": func() { tr.RelayAddressCount(rand.Intn(10)) },
"RelayAddressUpdated": func() { tr.RelayAddressUpdated() },
"ReservationOpened": func() { tr.ReservationOpened(rand.Intn(10)) },
"CandidateChecked": func() { tr.CandidateChecked(rand.Intn(2) == 1) },
"CandidateAdded": func() { tr.CandidateAdded(rand.Intn(10)) },
"CandidateRemoved": func() { tr.CandidateRemoved(rand.Intn(10)) },
"ScheduledWorkUpdated": func() { tr.ScheduledWorkUpdated(&scheduledWork[rand.Intn(len(scheduledWork))]) },
"DesiredReservations": func() { tr.DesiredReservations(rand.Intn(10)) },
"CandidateLoopState": func() { tr.CandidateLoopState(candidateLoopState(rand.Intn(10))) },
}
for method, f := range tests {
allocs := testing.AllocsPerRun(1000, f)
if allocs > 0 {
t.Fatalf("Alloc Test: %s, got: %0.2f, expected: 0 allocs", method, allocs)
}
}
}

10
p2p/host/autorelay/options.go

@ -40,6 +40,8 @@ type config struct {
// see WithMaxCandidateAge
maxCandidateAge time.Duration
setMinCandidates bool
// see WithMetricsTracer
metricsTracer MetricsTracer
}
var defaultConfig = config{
@ -221,3 +223,11 @@ func WithMinInterval(interval time.Duration) Option {
return nil
}
}
// WithMetricsTracer configures autorelay to use mt to track metrics
func WithMetricsTracer(mt MetricsTracer) Option {
return func(c *config) error {
c.metricsTracer = mt
return nil
}
}

114
p2p/host/autorelay/relay_finder.go

@ -80,8 +80,11 @@ type relayFinder struct {
// A channel that triggers a run of `runScheduledWork`.
triggerRunScheduledWork chan struct{}
metricsTracer MetricsTracer
}
var errAlreadyRunning = errors.New("relayFinder already running")
func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config) *relayFinder {
if peerSource == nil {
panic("Can not create a new relayFinder. Need a Peer Source fn or a list of static relays. Refer to the documentation around `libp2p.EnableAutoRelay`")
@ -100,6 +103,7 @@ func newRelayFinder(host *basic.BasicHost, peerSource PeerSource, conf *config)
triggerRunScheduledWork: make(chan struct{}, 1),
relays: make(map[peer.ID]*circuitv2.Reservation),
relayUpdated: make(chan struct{}, 1),
metricsTracer: &wrappedMetricsTracer{conf.metricsTracer},
}
}
@ -184,6 +188,7 @@ func (rf *relayFinder) background(ctx context.Context) {
if push {
rf.clearCachedAddrsAndSignalAddressChange()
rf.metricsTracer.ReservationEnded(1)
}
case <-rf.candidateFound:
rf.notifyMaybeConnectToRelay()
@ -211,6 +216,8 @@ func (rf *relayFinder) clearCachedAddrsAndSignalAddressChange() {
rf.cachedAddrs = nil
rf.relayMx.Unlock()
rf.host.SignalAddressChange()
rf.metricsTracer.RelayAddressUpdated()
}
func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, scheduledWork *scheduledWorkTimes, peerSourceRateLimiter chan<- struct{}) time.Time {
@ -262,6 +269,8 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche
nextTime = nextTime.Add(1) // avoids an infinite loop
}
rf.metricsTracer.ScheduledWorkUpdated(scheduledWork)
return nextTime
}
@ -281,10 +290,9 @@ func (rf *relayFinder) clearOldCandidates(now time.Time) time.Time {
nextTime = expiry
}
} else {
deleted = true
log.Debugw("deleting candidate due to age", "id", id)
delete(rf.candidates, id)
deleted = true
rf.removeCandidate(id)
}
}
if deleted {
@ -330,6 +338,8 @@ func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-ch
rf.candidateMx.Unlock()
if peerChan == nil && numCandidates < rf.conf.minCandidates {
rf.metricsTracer.CandidateLoopState(peerSourceRateLimited)
select {
case <-peerSourceRateLimiter:
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
@ -342,6 +352,12 @@ func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-ch
}
}
if peerChan == nil {
rf.metricsTracer.CandidateLoopState(waitingForTrigger)
} else {
rf.metricsTracer.CandidateLoopState(waitingOnPeerChan)
}
select {
case <-rf.maybeRequestNewCandidates:
continue
@ -374,6 +390,7 @@ func (rf *relayFinder) findNodes(ctx context.Context, peerSourceRateLimiter <-ch
}
}()
case <-ctx.Done():
rf.metricsTracer.CandidateLoopState(stopped)
return
}
}
@ -417,23 +434,30 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) (add
supportsV2, err := rf.tryNode(ctx, pi)
if err != nil {
log.Debugf("node %s not accepted as a candidate: %s", pi.ID, err)
if err == errProtocolNotSupported {
rf.metricsTracer.CandidateChecked(false)
}
return false
}
rf.metricsTracer.CandidateChecked(true)
rf.candidateMx.Lock()
if len(rf.candidates) > rf.conf.maxCandidates {
rf.candidateMx.Unlock()
return false
}
log.Debugw("node supports relay protocol", "peer", pi.ID, "supports circuit v2", supportsV2)
rf.candidates[pi.ID] = &candidate{
rf.addCandidate(&candidate{
added: rf.conf.clock.Now(),
ai: pi,
supportsRelayV2: supportsV2,
}
})
rf.candidateMx.Unlock()
return true
}
var errProtocolNotSupported = errors.New("doesn't speak circuit v2")
// tryNode checks if a peer actually supports either circuit v2.
// It does not modify any internal state.
func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV2 bool, err error) {
@ -474,7 +498,7 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR
return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err)
}
if len(protos) == 0 {
return false, errors.New("doesn't speak circuit v2")
return false, errProtocolNotSupported
}
return true, nil
}
@ -526,7 +550,7 @@ func (rf *relayFinder) maybeConnectToRelay(ctx context.Context) {
rf.relayMx.Unlock()
if usingRelay {
rf.candidateMx.Lock()
delete(rf.candidates, id)
rf.removeCandidate(id)
rf.candidateMx.Unlock()
rf.notifyMaybeNeedNewCandidates()
continue
@ -535,6 +559,7 @@ func (rf *relayFinder) maybeConnectToRelay(ctx context.Context) {
if err != nil {
log.Debugw("failed to connect to relay", "peer", id, "error", err)
rf.notifyMaybeNeedNewCandidates()
rf.metricsTracer.ReservationRequestFinished(false, err)
continue
}
log.Debugw("adding new relay", "id", id)
@ -551,6 +576,8 @@ func (rf *relayFinder) maybeConnectToRelay(ctx context.Context) {
default:
}
rf.metricsTracer.ReservationRequestFinished(false, nil)
if numRelays >= rf.conf.desiredRelays {
break
}
@ -569,7 +596,7 @@ func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*ci
if rf.host.Network().Connectedness(id) != network.Connected {
if err := rf.host.Connect(ctx, cand.ai); err != nil {
rf.candidateMx.Lock()
delete(rf.candidates, cand.ai.ID)
rf.removeCandidate(cand.ai.ID)
rf.candidateMx.Unlock()
return nil, fmt.Errorf("failed to connect: %w", err)
}
@ -586,7 +613,7 @@ func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*ci
}
}
rf.candidateMx.Lock()
delete(rf.candidates, id)
rf.removeCandidate(id)
rf.candidateMx.Unlock()
return rsvp, err
}
@ -602,7 +629,12 @@ func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) b
}
p := p
g.Go(func() error { return rf.refreshRelayReservation(ctx, p) })
g.Go(func() error {
err := rf.refreshRelayReservation(ctx, p)
rf.metricsTracer.ReservationRequestFinished(true, err)
return err
})
}
rf.relayMx.Unlock()
@ -614,19 +646,22 @@ func (rf *relayFinder) refreshRelayReservation(ctx context.Context, p peer.ID) e
rsvp, err := circuitv2.Reserve(ctx, rf.host, peer.AddrInfo{ID: p})
rf.relayMx.Lock()
defer rf.relayMx.Unlock()
if err != nil {
log.Debugw("failed to refresh relay slot reservation", "relay", p, "error", err)
_, exists := rf.relays[p]
delete(rf.relays, p)
// unprotect the connection
rf.host.ConnManager().Unprotect(p, autorelayTag)
rf.relayMx.Unlock()
if exists {
rf.metricsTracer.ReservationEnded(1)
}
return err
}
log.Debugw("refreshed relay slot reservation", "relay", p)
rf.relays[p] = rsvp
rf.relayMx.Unlock()
return nil
}
@ -636,6 +671,23 @@ func (rf *relayFinder) usingRelay(p peer.ID) bool {
return ok
}
// addCandidates adds a candidate to the candidates set. Assumes caller holds candidateMx mutex
func (rf *relayFinder) addCandidate(cand *candidate) {
_, exists := rf.candidates[cand.ai.ID]
rf.candidates[cand.ai.ID] = cand
if !exists {
rf.metricsTracer.CandidateAdded(1)
}
}
func (rf *relayFinder) removeCandidate(id peer.ID) {
_, exists := rf.candidates[id]
if exists {
delete(rf.candidates, id)
rf.metricsTracer.CandidateRemoved(1)
}
}
// selectCandidates returns an ordered slice of relay candidates.
// Callers should attempt to obtain reservations with the candidates in this order.
func (rf *relayFinder) selectCandidates() []*candidate {
@ -680,9 +732,10 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
}
// add relay specific addrs to the list
relayAddrCnt := 0
for p := range rf.relays {
addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p))
relayAddrCnt += len(addrs)
circuit := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty()))
for _, addr := range addrs {
pub := addr.Encapsulate(circuit)
@ -693,6 +746,7 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
rf.cachedAddrs = raddrs
rf.cachedAddrsExpiry = rf.conf.clock.Now().Add(30 * time.Second)
rf.metricsTracer.RelayAddressCount(relayAddrCnt)
return raddrs
}
@ -700,9 +754,12 @@ func (rf *relayFinder) Start() error {
rf.ctxCancelMx.Lock()
defer rf.ctxCancelMx.Unlock()
if rf.ctxCancel != nil {
return errors.New("relayFinder already running")
return errAlreadyRunning
}
log.Debug("starting relay finder")
rf.initMetrics()
ctx, cancel := context.WithCancel(context.Background())
rf.ctxCancel = cancel
rf.refCount.Add(1)
@ -722,5 +779,32 @@ func (rf *relayFinder) Stop() error {
}
rf.refCount.Wait()
rf.ctxCancel = nil
rf.resetMetrics()
return nil
}
func (rf *relayFinder) initMetrics() {
rf.metricsTracer.DesiredReservations(rf.conf.desiredRelays)
rf.relayMx.Lock()
rf.metricsTracer.ReservationOpened(len(rf.relays))
rf.relayMx.Unlock()
rf.candidateMx.Lock()
rf.metricsTracer.CandidateAdded(len(rf.candidates))
rf.candidateMx.Unlock()
}
func (rf *relayFinder) resetMetrics() {
rf.relayMx.Lock()
rf.metricsTracer.ReservationEnded(len(rf.relays))
rf.relayMx.Unlock()
rf.candidateMx.Lock()
rf.metricsTracer.CandidateRemoved(len(rf.candidates))
rf.candidateMx.Unlock()
rf.metricsTracer.RelayAddressCount(0)
rf.metricsTracer.ScheduledWorkUpdated(&scheduledWorkTimes{})
}

52
p2p/protocol/circuitv2/client/reservation.go

@ -37,6 +37,27 @@ type Reservation struct {
Voucher *proto.ReservationVoucher
}
// ReservationError is the error returned on failure to reserve a slot in the relay
type ReservationError struct {
// Status is the status returned by the relay for rejecting the reservation
// request. It is set to pbv2.Status_CONNECTION_FAILED on other failures
Status pbv2.Status
// Reason is the reason for reservation failure
Reason string
err error
}
func (re ReservationError) Error() string {
return fmt.Sprintf("reservation error: status: %s reason: %s err: %s", pbv2.Status_name[int32(re.Status)], re.Reason, re.err)
}
func (re ReservationError) Unwrap() error {
return re.err
}
// Reserve reserves a slot in a relay and returns the reservation information.
// Clients must reserve slots in order for the relay to relay connections to them.
func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, error) {
@ -46,7 +67,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
s, err := h.NewStream(ctx, ai.ID, proto.ProtoIDv2Hop)
if err != nil {
return nil, err
return nil, ReservationError{Status: pbv2.Status_CONNECTION_FAILED, Reason: "failed to open stream", err: err}
}
defer s.Close()
@ -61,33 +82,39 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
if err := wr.WriteMsg(&msg); err != nil {
s.Reset()
return nil, fmt.Errorf("error writing reservation message: %w", err)
return nil, ReservationError{Status: pbv2.Status_CONNECTION_FAILED, Reason: "error writing reservation message", err: err}
}
msg.Reset()
if err := rd.ReadMsg(&msg); err != nil {
s.Reset()
return nil, fmt.Errorf("error reading reservation response message: %w", err)
return nil, ReservationError{Status: pbv2.Status_CONNECTION_FAILED, Reason: "error reading reservation response message: %w", err: err}
}
if msg.GetType() != pbv2.HopMessage_STATUS {
return nil, fmt.Errorf("unexpected relay response: not a status message (%d)", msg.GetType())
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType()),
err: err}
}
if status := msg.GetStatus(); status != pbv2.Status_OK {
return nil, fmt.Errorf("reservation failed: %s (%d)", pbv2.Status_name[int32(status)], status)
return nil, ReservationError{Status: msg.GetStatus(), Reason: "reservation failed"}
}
rsvp := msg.GetReservation()
if rsvp == nil {
return nil, fmt.Errorf("missing reservation info")
return nil, ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE, Reason: "missing reservation info"}
}
result := &Reservation{}
result.Expiration = time.Unix(int64(rsvp.GetExpire()), 0)
if result.Expiration.Before(time.Now()) {
return nil, fmt.Errorf("received reservation with expiration date in the past: %s", result.Expiration)
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("received reservation with expiration date in the past: %s", result.Expiration),
}
}
addrs := rsvp.GetAddrs()
@ -105,12 +132,19 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
if voucherBytes != nil {
_, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
if err != nil {
return nil, fmt.Errorf("error consuming voucher envelope: %w", err)
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("error consuming voucher envelope: %s", err),
err: err,
}
}
voucher, ok := rec.(*proto.ReservationVoucher)
if !ok {
return nil, fmt.Errorf("unexpected voucher record type: %+T", rec)
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("unexpected voucher record type: %+T", rec),
}
}
result.Voucher = voucher
}

23
p2p/protocol/circuitv2/client/reservation_test.go

@ -2,6 +2,7 @@ package client_test
import (
"context"
"errors"
"math"
"testing"
"time"
@ -22,6 +23,7 @@ func TestReservationFailures(t *testing.T) {
name string
streamHandler network.StreamHandler
err string
status pbv2.Status
}
testcases := []testcase{
{
@ -36,7 +38,8 @@ func TestReservationFailures(t *testing.T) {
Type: pbv2.HopMessage_RESERVE.Enum(),
})
},
err: "unexpected relay response: not a status message",
err: "unexpected relay response: not a status message",
status: pbv2.Status_MALFORMED_MESSAGE,
},
{
name: "unknown status",
@ -47,7 +50,8 @@ func TestReservationFailures(t *testing.T) {
Status: &status,
})
},
err: "reservation failed",
err: "reservation failed",
status: pbv2.Status(1337),
},
{
name: "invalid time",
@ -60,7 +64,8 @@ func TestReservationFailures(t *testing.T) {
Reservation: &pbv2.Reservation{Expire: &expire},
})
},
err: "received reservation with expiration date in the past",
err: "received reservation with expiration date in the past",
status: pbv2.Status_MALFORMED_MESSAGE,
},
{
name: "invalid voucher",
@ -76,7 +81,8 @@ func TestReservationFailures(t *testing.T) {
},
})
},
err: "error consuming voucher envelope: failed when unmarshalling the envelope",
err: "error consuming voucher envelope: failed when unmarshalling the envelope",
status: pbv2.Status_MALFORMED_MESSAGE,
},
}
@ -98,6 +104,15 @@ func TestReservationFailures(t *testing.T) {
} else {
require.Error(t, err)
require.Contains(t, err.Error(), tc.err)
if tc.status != 0 {
var re client.ReservationError
if !errors.As(err, &re) {
t.Errorf("expected error to be of type %T", re)
}
if re.Status != tc.status {
t.Errorf("expected status %d got %d", tc.status, re.Status)
}
}
}
})
}

Loading…
Cancel
Save