Browse Source

quicreuse: remove QUIC metrics tracer (#2582)

pull/2595/head
Marten Seemann 1 year ago
committed by GitHub
parent
commit
e6f21e8b67
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      p2p/transport/quicreuse/connmgr.go
  2. 20
      p2p/transport/quicreuse/reuse.go
  3. 16
      p2p/transport/quicreuse/reuse_test.go
  4. 372
      p2p/transport/quicreuse/tracer_metrics.go

30
p2p/transport/quicreuse/connmgr.go

@ -26,7 +26,6 @@ type ConnManager struct {
quicListeners map[string]quicListenerEntry
srk quic.StatelessResetKey
mt *metricsTracer
}
type quicListenerEntry struct {
@ -48,26 +47,20 @@ func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (*
quicConf := quicConfig.Clone()
if cm.enableMetrics {
cm.mt = newMetricsTracer()
}
quicConf.Tracer = func(ctx context.Context, p quiclogging.Perspective, ci quic.ConnectionID) quiclogging.ConnectionTracer {
tracers := make([]quiclogging.ConnectionTracer, 0, 2)
var tracer quiclogging.ConnectionTracer
if qlogTracerDir != "" {
tracers = append(tracers, qloggerForDir(qlogTracerDir, p, ci))
}
if cm.mt != nil {
tracers = append(tracers, cm.mt.TracerForConnection(ctx, p, ci))
tracer = qloggerForDir(qlogTracerDir, p, ci)
}
return quiclogging.NewMultiplexedConnectionTracer(tracers...)
return tracer
}
serverConfig := quicConf.Clone()
cm.clientConfig = quicConf
cm.serverConfig = serverConfig
if cm.enableReuseport {
cm.reuseUDP4 = newReuse(&statelessResetKey, cm.mt)
cm.reuseUDP6 = newReuse(&statelessResetKey, cm.mt)
cm.reuseUDP4 = newReuse(&statelessResetKey)
cm.reuseUDP6 = newReuse(&statelessResetKey)
}
return cm, nil
}
@ -149,11 +142,7 @@ func (c *ConnManager) transportForListen(network string, laddr *net.UDPAddr) (re
if err != nil {
return nil, err
}
tr := &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}
if c.mt != nil {
tr.Transport.Tracer = c.mt
}
return tr, nil
return &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil
}
func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (quic.Connection, error) {
@ -208,12 +197,7 @@ func (c *ConnManager) TransportForDial(network string, raddr *net.UDPAddr) (refC
if err != nil {
return nil, err
}
tr := &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}
if c.mt != nil {
tr.Transport.Tracer = c.mt
}
return tr, nil
return &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil
}
func (c *ConnManager) Protocols() []int {

20
p2p/transport/quicreuse/reuse.go

@ -123,10 +123,9 @@ type reuse struct {
globalDialers map[int]*refcountedTransport
statelessResetKey *quic.StatelessResetKey
metricsTracer *metricsTracer
}
func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse {
func newReuse(srk *quic.StatelessResetKey) *reuse {
r := &reuse{
unicast: make(map[string]map[int]*refcountedTransport),
globalListeners: make(map[int]*refcountedTransport),
@ -134,7 +133,6 @@ func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse {
closeChan: make(chan struct{}),
gcStopChan: make(chan struct{}),
statelessResetKey: srk,
metricsTracer: mt,
}
go r.gc()
return r
@ -271,9 +269,6 @@ func (r *reuse) transportForDialLocked(network string, source *net.IP) (*refcoun
Conn: conn,
StatelessResetKey: r.statelessResetKey,
}, packetConn: conn}
if r.metricsTracer != nil {
tr.Transport.Tracer = r.metricsTracer
}
r.globalDialers[conn.LocalAddr().(*net.UDPAddr).Port] = tr
return tr, nil
}
@ -317,14 +312,13 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun
return nil, err
}
localAddr := conn.LocalAddr().(*net.UDPAddr)
tr := &refcountedTransport{Transport: quic.Transport{
Conn: conn,
StatelessResetKey: r.statelessResetKey,
}, packetConn: conn}
if r.metricsTracer != nil {
tr.Transport.Tracer = r.metricsTracer
tr := &refcountedTransport{
Transport: quic.Transport{
Conn: conn,
StatelessResetKey: r.statelessResetKey,
},
packetConn: conn,
}
tr.IncreaseCount()
// Deal with listen on a global address

16
p2p/transport/quicreuse/reuse_test.go

@ -61,7 +61,7 @@ func cleanup(t *testing.T, reuse *reuse) {
}
func TestReuseListenOnAllIPv4(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
require.Eventually(t, isGarbageCollectorRunning, 500*time.Millisecond, 50*time.Millisecond, "expected garbage collector to be running")
cleanup(t, reuse)
@ -73,7 +73,7 @@ func TestReuseListenOnAllIPv4(t *testing.T) {
}
func TestReuseListenOnAllIPv6(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
require.Eventually(t, isGarbageCollectorRunning, 500*time.Millisecond, 50*time.Millisecond, "expected garbage collector to be running")
cleanup(t, reuse)
@ -86,7 +86,7 @@ func TestReuseListenOnAllIPv6(t *testing.T) {
}
func TestReuseCreateNewGlobalConnOnDial(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
cleanup(t, reuse)
addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
@ -100,7 +100,7 @@ func TestReuseCreateNewGlobalConnOnDial(t *testing.T) {
}
func TestReuseConnectionWhenDialing(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
cleanup(t, reuse)
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
@ -117,7 +117,7 @@ func TestReuseConnectionWhenDialing(t *testing.T) {
}
func TestReuseConnectionWhenListening(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
cleanup(t, reuse)
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
@ -132,7 +132,7 @@ func TestReuseConnectionWhenListening(t *testing.T) {
}
func TestReuseConnectionWhenDialBeforeListen(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
cleanup(t, reuse)
// dial any address
@ -166,7 +166,7 @@ func TestReuseListenOnSpecificInterface(t *testing.T) {
if platformHasRoutingTables() {
t.Skip("this test only works on platforms that support routing tables")
}
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
cleanup(t, reuse)
router, err := netroute.New()
@ -203,7 +203,7 @@ func TestReuseGarbageCollect(t *testing.T) {
maxUnusedDuration = 10 * maxUnusedDuration
}
reuse := newReuse(nil, nil)
reuse := newReuse(nil)
cleanup(t, reuse)
numGlobals := func() int {

372
p2p/transport/quicreuse/tracer_metrics.go

@ -1,372 +0,0 @@
package quicreuse
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/logging"
)
var (
bytesTransferred *prometheus.CounterVec
newConns *prometheus.CounterVec
closedConns *prometheus.CounterVec
sentPackets *prometheus.CounterVec
rcvdPackets *prometheus.CounterVec
bufferedPackets *prometheus.CounterVec
droppedPackets *prometheus.CounterVec
lostPackets *prometheus.CounterVec
connErrors *prometheus.CounterVec
)
type aggregatingCollector struct {
mutex sync.Mutex
conns map[string] /* conn ID */ *metricsConnTracer
rtts prometheus.Histogram
connDurations prometheus.Histogram
}
func newAggregatingCollector() *aggregatingCollector {
return &aggregatingCollector{
conns: make(map[string]*metricsConnTracer),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "quic_smoothed_rtt",
Help: "Smoothed RTT",
Buckets: prometheus.ExponentialBuckets(0.001, 1.25, 40), // 1ms to ~6000ms
}),
connDurations: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "quic_connection_duration",
Help: "Connection Duration",
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
}
var _ prometheus.Collector = &aggregatingCollector{}
func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
descs <- c.rtts.Desc()
descs <- c.connDurations.Desc()
}
func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
now := time.Now()
c.mutex.Lock()
for _, conn := range c.conns {
if rtt, valid := conn.getSmoothedRTT(); valid {
c.rtts.Observe(rtt.Seconds())
}
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
}
c.mutex.Unlock()
metrics <- c.rtts
metrics <- c.connDurations
}
func (c *aggregatingCollector) AddConn(id string, t *metricsConnTracer) {
c.mutex.Lock()
c.conns[id] = t
c.mutex.Unlock()
}
func (c *aggregatingCollector) RemoveConn(id string) {
c.mutex.Lock()
delete(c.conns, id)
c.mutex.Unlock()
}
var collector *aggregatingCollector
var initMetricsOnce sync.Once
func initMetrics() {
const (
direction = "direction"
encLevel = "encryption_level"
)
closedConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_connections_closed_total",
Help: "closed QUIC connection",
},
[]string{direction},
)
prometheus.MustRegister(closedConns)
newConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_connections_new_total",
Help: "new QUIC connection",
},
[]string{direction, "handshake_successful"},
)
prometheus.MustRegister(newConns)
bytesTransferred = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_transferred_bytes",
Help: "QUIC bytes transferred",
},
[]string{direction}, // TODO: this is confusing. Other times, we use direction for the perspective
)
prometheus.MustRegister(bytesTransferred)
sentPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_packets_sent_total",
Help: "QUIC packets sent",
},
[]string{encLevel},
)
prometheus.MustRegister(sentPackets)
rcvdPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_packets_rcvd_total",
Help: "QUIC packets received",
},
[]string{encLevel},
)
prometheus.MustRegister(rcvdPackets)
bufferedPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_packets_buffered_total",
Help: "Buffered packets",
},
[]string{"packet_type"},
)
prometheus.MustRegister(bufferedPackets)
droppedPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_packets_dropped_total",
Help: "Dropped packets",
},
[]string{"packet_type", "reason"},
)
prometheus.MustRegister(droppedPackets)
connErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_connection_errors_total",
Help: "QUIC connection errors",
},
[]string{"side", "error_code"},
)
prometheus.MustRegister(connErrors)
lostPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "quic_packets_lost_total",
Help: "QUIC lost received",
},
[]string{encLevel, "reason"},
)
prometheus.MustRegister(lostPackets)
collector = newAggregatingCollector()
prometheus.MustRegister(collector)
}
type metricsTracer struct {
logging.NullTracer
}
var _ logging.Tracer = &metricsTracer{}
func newMetricsTracer() *metricsTracer {
initMetricsOnce.Do(func() { initMetrics() })
return &metricsTracer{}
}
func (m *metricsTracer) TracerForConnection(_ context.Context, p logging.Perspective, connID logging.ConnectionID) logging.ConnectionTracer {
return &metricsConnTracer{perspective: p, connID: connID}
}
func (m *metricsTracer) SentPacket(_ net.Addr, _ *logging.Header, size logging.ByteCount, _ []logging.Frame) {
bytesTransferred.WithLabelValues("sent").Add(float64(size))
}
type metricsConnTracer struct {
logging.NullConnectionTracer
perspective logging.Perspective
startTime time.Time
connID logging.ConnectionID
handshakeComplete bool
mutex sync.Mutex
numRTTMeasurements int
rtt time.Duration
}
var _ logging.ConnectionTracer = &metricsConnTracer{}
func (m *metricsConnTracer) getDirection() string {
if m.perspective == logging.PerspectiveClient {
return "outgoing"
}
return "incoming"
}
func (m *metricsConnTracer) getEncLevel(packetType logging.PacketType) string {
switch packetType {
case logging.PacketType0RTT:
return "0-RTT"
case logging.PacketTypeInitial:
return "Initial"
case logging.PacketTypeHandshake:
return "Handshake"
case logging.PacketTypeRetry:
return "Retry"
case logging.PacketType1RTT:
return "1-RTT"
default:
return "unknown"
}
}
func (m *metricsConnTracer) StartedConnection(net.Addr, net.Addr, logging.ConnectionID, logging.ConnectionID) {
m.startTime = time.Now()
collector.AddConn(m.connID.String(), m)
}
func (m *metricsConnTracer) ClosedConnection(e error) {
var (
applicationErr *quic.ApplicationError
transportErr *quic.TransportError
statelessResetErr *quic.StatelessResetError
vnErr *quic.VersionNegotiationError
idleTimeoutErr *quic.IdleTimeoutError
handshakeTimeoutErr *quic.HandshakeTimeoutError
remote bool
desc string
)
switch {
case errors.As(e, &applicationErr):
return
case errors.As(e, &transportErr):
remote = transportErr.Remote
desc = transportErr.ErrorCode.String()
case errors.As(e, &statelessResetErr):
remote = true
desc = "stateless_reset"
case errors.As(e, &vnErr):
desc = "version_negotiation"
case errors.As(e, &idleTimeoutErr):
desc = "idle_timeout"
case errors.As(e, &handshakeTimeoutErr):
desc = "handshake_timeout"
default:
desc = fmt.Sprintf("unknown error: %v", e)
}
side := "local"
if remote {
side = "remote"
}
connErrors.WithLabelValues(side, desc).Inc()
}
func (m *metricsConnTracer) SentPacket(hdr *logging.ExtendedHeader, size logging.ByteCount, _ *logging.AckFrame, _ []logging.Frame) {
bytesTransferred.WithLabelValues("sent").Add(float64(size))
sentPackets.WithLabelValues(m.getEncLevel(logging.PacketTypeFromHeader(&hdr.Header))).Inc()
}
func (m *metricsConnTracer) ReceivedVersionNegotiationPacket(dst, src logging.ArbitraryLenConnectionID, v []logging.VersionNumber) {
bytesTransferred.WithLabelValues("rcvd").Add(1 /* header form byte */ + 4 /* version number */ + 2 /* src and dest conn id length fields */ + float64(dst.Len()+src.Len()) + float64(4*len(v)))
rcvdPackets.WithLabelValues("Version Negotiation").Inc()
}
func (m *metricsConnTracer) ReceivedRetry(*logging.Header) {
rcvdPackets.WithLabelValues("Retry").Inc()
}
func (m *metricsConnTracer) ReceivedPacket(hdr *logging.ExtendedHeader, size logging.ByteCount, _ []logging.Frame) {
bytesTransferred.WithLabelValues("rcvd").Add(float64(size))
rcvdPackets.WithLabelValues(m.getEncLevel(logging.PacketTypeFromHeader(&hdr.Header))).Inc()
}
func (m *metricsConnTracer) BufferedPacket(packetType logging.PacketType, _ logging.ByteCount) {
bufferedPackets.WithLabelValues(m.getEncLevel(packetType)).Inc()
}
func (m *metricsConnTracer) DroppedPacket(packetType logging.PacketType, size logging.ByteCount, r logging.PacketDropReason) {
bytesTransferred.WithLabelValues("rcvd").Add(float64(size))
var reason string
switch r {
case logging.PacketDropKeyUnavailable:
reason = "key_unavailable"
case logging.PacketDropUnknownConnectionID:
reason = "unknown_connection_id"
case logging.PacketDropHeaderParseError:
reason = "header_parse_error"
case logging.PacketDropPayloadDecryptError:
reason = "payload_decrypt_error"
case logging.PacketDropProtocolViolation:
reason = "protocol_violation"
case logging.PacketDropDOSPrevention:
reason = "dos_prevention"
case logging.PacketDropUnsupportedVersion:
reason = "unsupported_version"
case logging.PacketDropUnexpectedPacket:
reason = "unexpected_packet"
case logging.PacketDropUnexpectedSourceConnectionID:
reason = "unexpected_source_connection_id"
case logging.PacketDropUnexpectedVersion:
reason = "unexpected_version"
case logging.PacketDropDuplicate:
reason = "duplicate"
default:
reason = "unknown"
}
droppedPackets.WithLabelValues(m.getEncLevel(packetType), reason).Inc()
}
func (m *metricsConnTracer) UpdatedMetrics(rttStats *logging.RTTStats, cwnd, bytesInFlight logging.ByteCount, packetsInFlight int) {
m.mutex.Lock()
m.rtt = rttStats.SmoothedRTT()
m.numRTTMeasurements++
m.mutex.Unlock()
}
func (m *metricsConnTracer) LostPacket(level logging.EncryptionLevel, _ logging.PacketNumber, r logging.PacketLossReason) {
var reason string
switch r {
case logging.PacketLossReorderingThreshold:
reason = "reordering_threshold"
case logging.PacketLossTimeThreshold:
reason = "time_threshold"
default:
reason = "unknown"
}
lostPackets.WithLabelValues(level.String(), reason).Inc()
}
func (m *metricsConnTracer) DroppedEncryptionLevel(level logging.EncryptionLevel) {
if level == logging.EncryptionHandshake {
m.handleHandshakeComplete()
}
}
func (m *metricsConnTracer) Close() {
if m.handshakeComplete {
closedConns.WithLabelValues(m.getDirection()).Inc()
} else {
newConns.WithLabelValues(m.getDirection(), "false").Inc()
}
collector.RemoveConn(m.connID.String())
}
func (m *metricsConnTracer) handleHandshakeComplete() {
m.handshakeComplete = true
newConns.WithLabelValues(m.getDirection(), "true").Inc()
}
func (m *metricsConnTracer) getSmoothedRTT() (rtt time.Duration, valid bool) {
m.mutex.Lock()
rtt = m.rtt
valid = m.numRTTMeasurements > 10
m.mutex.Unlock()
return
}
Loading…
Cancel
Save