|
|
@ -14,9 +14,6 @@ import ( |
|
|
|
var log = logging.Logger("rcmgrObs") |
|
|
|
|
|
|
|
var ( |
|
|
|
systemOutboundConns = stats.Int64("system/outbound/conn", "Number of outbound Connections", stats.UnitDimensionless) |
|
|
|
systemInboundConns = stats.Int64("system/inbound/conn", "Number of inbound Connections", stats.UnitDimensionless) |
|
|
|
|
|
|
|
conns = stats.Int64("connections", "Number of Connections", stats.UnitDimensionless) |
|
|
|
|
|
|
|
peerConns = stats.Int64("peer/connections", "Number of connections this peer has", stats.UnitDimensionless) |
|
|
@ -40,19 +37,15 @@ var ( |
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
|
LessThanEq, _ = tag.NewKey("le") |
|
|
|
Direction, _ = tag.NewKey("dir") |
|
|
|
Scope, _ = tag.NewKey("scope") |
|
|
|
Service, _ = tag.NewKey("service") |
|
|
|
Protocol, _ = tag.NewKey("protocol") |
|
|
|
Resource, _ = tag.NewKey("resource") |
|
|
|
directionTag, _ = tag.NewKey("dir") |
|
|
|
scopeTag, _ = tag.NewKey("scope") |
|
|
|
serviceTag, _ = tag.NewKey("service") |
|
|
|
protocolTag, _ = tag.NewKey("protocol") |
|
|
|
resourceTag, _ = tag.NewKey("resource") |
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
|
SystemOutboundConnsView = &view.View{Measure: systemOutboundConns, Aggregation: view.Sum()} |
|
|
|
SystemInboundConnsView = &view.View{Measure: systemInboundConns, Aggregation: view.Sum()} |
|
|
|
|
|
|
|
ConnView = &view.View{Measure: conns, Aggregation: view.Sum(), TagKeys: []tag.Key{Direction, Scope}} |
|
|
|
ConnView = &view.View{Measure: conns, Aggregation: view.Sum(), TagKeys: []tag.Key{directionTag, scopeTag}} |
|
|
|
|
|
|
|
oneTenThenExpDistribution = []float64{ |
|
|
|
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1, 16.1, 32.1, 64.1, 128.1, 256.1, |
|
|
@ -61,19 +54,19 @@ var ( |
|
|
|
PeerConnsView = &view.View{ |
|
|
|
Measure: peerConns, |
|
|
|
Aggregation: view.Distribution(oneTenThenExpDistribution...), |
|
|
|
TagKeys: []tag.Key{Direction}, |
|
|
|
TagKeys: []tag.Key{directionTag}, |
|
|
|
} |
|
|
|
PeerConnsNegativeView = &view.View{ |
|
|
|
Measure: peerConnsNegative, |
|
|
|
Aggregation: view.Distribution(oneTenThenExpDistribution...), |
|
|
|
TagKeys: []tag.Key{Direction}, |
|
|
|
TagKeys: []tag.Key{directionTag}, |
|
|
|
} |
|
|
|
|
|
|
|
StreamView = &view.View{Measure: streams, Aggregation: view.Sum(), TagKeys: []tag.Key{Direction, Scope, Service, Protocol}} |
|
|
|
PeerStreamsView = &view.View{Measure: peerStreams, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{Direction}} |
|
|
|
PeerStreamNegativeView = &view.View{Measure: peerStreamsNegative, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{Direction}} |
|
|
|
StreamView = &view.View{Measure: streams, Aggregation: view.Sum(), TagKeys: []tag.Key{directionTag, scopeTag, serviceTag, protocolTag}} |
|
|
|
PeerStreamsView = &view.View{Measure: peerStreams, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{directionTag}} |
|
|
|
PeerStreamNegativeView = &view.View{Measure: peerStreamsNegative, Aggregation: view.Distribution(oneTenThenExpDistribution...), TagKeys: []tag.Key{directionTag}} |
|
|
|
|
|
|
|
MemoryView = &view.View{Measure: memory, Aggregation: view.Sum(), TagKeys: []tag.Key{Scope, Service, Protocol}} |
|
|
|
MemoryView = &view.View{Measure: memory, Aggregation: view.Sum(), TagKeys: []tag.Key{scopeTag, serviceTag, protocolTag}} |
|
|
|
|
|
|
|
memDistribution = []float64{ |
|
|
|
1 << 10, // 1KB
|
|
|
@ -106,12 +99,12 @@ var ( |
|
|
|
Aggregation: view.Distribution(memDistribution...), |
|
|
|
} |
|
|
|
|
|
|
|
FDsView = &view.View{Measure: fds, Aggregation: view.Sum(), TagKeys: []tag.Key{Scope}} |
|
|
|
FDsView = &view.View{Measure: fds, Aggregation: view.Sum(), TagKeys: []tag.Key{scopeTag}} |
|
|
|
|
|
|
|
BlockedResourcesView = &view.View{ |
|
|
|
Measure: blockedResources, |
|
|
|
Aggregation: view.Sum(), |
|
|
|
TagKeys: []tag.Key{Scope, Resource}, |
|
|
|
TagKeys: []tag.Key{scopeTag, resourceTag}, |
|
|
|
} |
|
|
|
) |
|
|
|
|
|
|
@ -156,10 +149,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
peerStreamsOut := int64(evt.StreamsOut) |
|
|
|
if oldStreamsOut != peerStreamsOut { |
|
|
|
if oldStreamsOut != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "outbound")}, peerStreamsNegative.M(oldStreamsOut)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreamsNegative.M(oldStreamsOut)) |
|
|
|
} |
|
|
|
if peerStreamsOut != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "outbound")}, peerStreams.M(peerStreamsOut)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreams.M(peerStreamsOut)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -167,20 +160,20 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
peerStreamsIn := int64(evt.StreamsIn) |
|
|
|
if oldStreamsIn != peerStreamsIn { |
|
|
|
if oldStreamsIn != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "inbound")}, peerStreamsNegative.M(oldStreamsIn)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreamsNegative.M(oldStreamsIn)) |
|
|
|
} |
|
|
|
if peerStreamsIn != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "inbound")}, peerStreams.M(peerStreamsIn)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreams.M(peerStreamsIn)) |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
var tags []tag.Mutator |
|
|
|
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { |
|
|
|
tags = append(tags, tag.Upsert(Scope, evt.Name)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, evt.Name)) |
|
|
|
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" { |
|
|
|
tags = append(tags, tag.Upsert(Scope, "service"), tag.Upsert(Service, svc)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc)) |
|
|
|
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { |
|
|
|
tags = append(tags, tag.Upsert(Scope, "protocol"), tag.Upsert(Protocol, proto)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto)) |
|
|
|
} else { |
|
|
|
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
|
|
|
|
// you can use aggregated peer stats + service stats to infer
|
|
|
@ -191,7 +184,7 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
if evt.DeltaOut != 0 { |
|
|
|
stats.RecordWithTags( |
|
|
|
ctx, |
|
|
|
append([]tag.Mutator{tag.Upsert(Direction, "outbound")}, tags...), |
|
|
|
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...), |
|
|
|
streams.M(int64(evt.DeltaOut)), |
|
|
|
) |
|
|
|
} |
|
|
@ -199,7 +192,7 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
if evt.DeltaIn != 0 { |
|
|
|
stats.RecordWithTags( |
|
|
|
ctx, |
|
|
|
append([]tag.Mutator{tag.Upsert(Direction, "inbound")}, tags...), |
|
|
|
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...), |
|
|
|
streams.M(int64(evt.DeltaIn)), |
|
|
|
) |
|
|
|
} |
|
|
@ -217,10 +210,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
connsOut := int64(evt.ConnsOut) |
|
|
|
if oldConnsOut != connsOut { |
|
|
|
if oldConnsOut != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "outbound")}, peerConnsNegative.M(oldConnsOut)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConnsNegative.M(oldConnsOut)) |
|
|
|
} |
|
|
|
if connsOut != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "outbound")}, peerConns.M(connsOut)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConns.M(connsOut)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -228,16 +221,16 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
connsIn := int64(evt.ConnsIn) |
|
|
|
if oldConnsIn != connsIn { |
|
|
|
if oldConnsIn != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "inbound")}, peerConnsNegative.M(oldConnsIn)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConnsNegative.M(oldConnsIn)) |
|
|
|
} |
|
|
|
if connsIn != 0 { |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(Direction, "inbound")}, peerConns.M(connsIn)) |
|
|
|
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConns.M(connsIn)) |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
var tags []tag.Mutator |
|
|
|
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { |
|
|
|
tags = append(tags, tag.Upsert(Scope, evt.Name)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, evt.Name)) |
|
|
|
} else if rcmgr.IsConnScope(evt.Name) { |
|
|
|
// Not measuring this. I don't think it's useful.
|
|
|
|
break |
|
|
@ -249,7 +242,7 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
if evt.DeltaOut != 0 { |
|
|
|
stats.RecordWithTags( |
|
|
|
ctx, |
|
|
|
append([]tag.Mutator{tag.Upsert(Direction, "outbound")}, tags...), |
|
|
|
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...), |
|
|
|
conns.M(int64(evt.DeltaOut)), |
|
|
|
) |
|
|
|
} |
|
|
@ -257,7 +250,7 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
if evt.DeltaIn != 0 { |
|
|
|
stats.RecordWithTags( |
|
|
|
ctx, |
|
|
|
append([]tag.Mutator{tag.Upsert(Direction, "inbound")}, tags...), |
|
|
|
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...), |
|
|
|
conns.M(int64(evt.DeltaIn)), |
|
|
|
) |
|
|
|
} |
|
|
@ -295,11 +288,11 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
} else { |
|
|
|
var tags []tag.Mutator |
|
|
|
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { |
|
|
|
tags = append(tags, tag.Upsert(Scope, evt.Name)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, evt.Name)) |
|
|
|
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" { |
|
|
|
tags = append(tags, tag.Upsert(Scope, "service"), tag.Upsert(Service, svc)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc)) |
|
|
|
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { |
|
|
|
tags = append(tags, tag.Upsert(Scope, "protocol"), tag.Upsert(Protocol, proto)) |
|
|
|
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto)) |
|
|
|
} else { |
|
|
|
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
|
|
|
|
// you can use aggregated peer stats + service stats to infer
|
|
|
@ -322,12 +315,12 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { |
|
|
|
resource = "memory" |
|
|
|
} |
|
|
|
|
|
|
|
// Only the top scope. We don't want to get the peerid here.
|
|
|
|
scope := strings.SplitN(evt.Name, ":", 2)[0] |
|
|
|
// Only the top scopeName. We don't want to get the peerid here.
|
|
|
|
scopeName := strings.SplitN(evt.Name, ":", 2)[0] |
|
|
|
// Drop the connection or stream id
|
|
|
|
scope = strings.SplitN(scope, "-", 2)[0] |
|
|
|
scopeName = strings.SplitN(scopeName, "-", 2)[0] |
|
|
|
|
|
|
|
tags := []tag.Mutator{tag.Upsert(Scope, scope), tag.Upsert(Resource, resource)} |
|
|
|
tags := []tag.Mutator{tag.Upsert(scopeTag, scopeName), tag.Upsert(resourceTag, resource)} |
|
|
|
|
|
|
|
if evt.DeltaIn != 0 { |
|
|
|
stats.RecordWithTags(ctx, tags, blockedResources.M(int64(1))) |
|
|
|