|
|
@ -20,6 +20,8 @@ var ( |
|
|
|
bytesRcvdDesc *prometheus.Desc |
|
|
|
) |
|
|
|
|
|
|
|
const collectFrequency = 10 * time.Second |
|
|
|
|
|
|
|
var collector *aggregatingCollector |
|
|
|
|
|
|
|
func init() { |
|
|
@ -58,12 +60,14 @@ type aggregatingCollector struct { |
|
|
|
conns map[uint64] /* id */ *tracingConn |
|
|
|
rtts prometheus.Histogram |
|
|
|
connDurations prometheus.Histogram |
|
|
|
segsSent, segsRcvd uint64 |
|
|
|
bytesSent, bytesRcvd uint64 |
|
|
|
} |
|
|
|
|
|
|
|
var _ prometheus.Collector = &aggregatingCollector{} |
|
|
|
|
|
|
|
func newAggregatingCollector() *aggregatingCollector { |
|
|
|
return &aggregatingCollector{ |
|
|
|
c := &aggregatingCollector{ |
|
|
|
conns: make(map[uint64]*tracingConn), |
|
|
|
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{ |
|
|
|
Name: "tcp_rtt", |
|
|
@ -76,6 +80,8 @@ func newAggregatingCollector() *aggregatingCollector { |
|
|
|
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
|
|
|
|
}), |
|
|
|
} |
|
|
|
go c.cron() |
|
|
|
return c |
|
|
|
} |
|
|
|
|
|
|
|
func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 { |
|
|
@ -103,11 +109,16 @@ func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { |
|
|
|
now := time.Now() |
|
|
|
func (c *aggregatingCollector) cron() { |
|
|
|
ticker := time.NewTicker(collectFrequency) |
|
|
|
defer ticker.Stop() |
|
|
|
|
|
|
|
for now := range ticker.C { |
|
|
|
c.mutex.Lock() |
|
|
|
var segsSent, segsRcvd uint64 |
|
|
|
var bytesSent, bytesRcvd uint64 |
|
|
|
c.segsSent = 0 |
|
|
|
c.segsRcvd = 0 |
|
|
|
c.bytesSent = 0 |
|
|
|
c.bytesRcvd = 0 |
|
|
|
for _, conn := range c.conns { |
|
|
|
info, err := conn.getTCPInfo() |
|
|
|
if err != nil { |
|
|
@ -119,12 +130,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { |
|
|
|
continue |
|
|
|
} |
|
|
|
if hasSegmentCounter { |
|
|
|
segsSent += getSegmentsSent(info) |
|
|
|
segsRcvd += getSegmentsRcvd(info) |
|
|
|
c.segsSent += getSegmentsSent(info) |
|
|
|
c.segsRcvd += getSegmentsRcvd(info) |
|
|
|
} |
|
|
|
if hasByteCounter { |
|
|
|
bytesSent += getBytesSent(info) |
|
|
|
bytesRcvd += getBytesRcvd(info) |
|
|
|
c.bytesSent += getBytesSent(info) |
|
|
|
c.bytesRcvd += getBytesRcvd(info) |
|
|
|
} |
|
|
|
c.rtts.Observe(info.RTT.Seconds()) |
|
|
|
c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) |
|
|
@ -133,15 +144,22 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { |
|
|
|
} |
|
|
|
} |
|
|
|
c.mutex.Unlock() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { |
|
|
|
c.mutex.Lock() |
|
|
|
defer c.mutex.Unlock() |
|
|
|
|
|
|
|
metrics <- c.rtts |
|
|
|
metrics <- c.connDurations |
|
|
|
if hasSegmentCounter { |
|
|
|
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent)) |
|
|
|
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent)) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("creating tcp_sent_segments_total metric failed: %v", err) |
|
|
|
return |
|
|
|
} |
|
|
|
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd)) |
|
|
|
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd)) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err) |
|
|
|
return |
|
|
@ -150,12 +168,12 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { |
|
|
|
metrics <- segsRcvdMetric |
|
|
|
} |
|
|
|
if hasByteCounter { |
|
|
|
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent)) |
|
|
|
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent)) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("creating tcp_sent_bytes metric failed: %v", err) |
|
|
|
return |
|
|
|
} |
|
|
|
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd)) |
|
|
|
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd)) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err) |
|
|
|
return |
|
|
|