Browse Source

Add trace reporter to expose traces to users

pull/1677/head
Marco Munizaga 2 years ago
parent
commit
d76fbd7993
  1. 119
      p2p/host/resource-manager/trace.go

119
p2p/host/resource-manager/trace.go

@ -21,31 +21,54 @@ type trace struct {
cancel func() cancel func()
closed chan struct{} closed chan struct{}
mx sync.Mutex mx sync.Mutex
done bool done bool
pend []interface{} pendingWrites []interface{}
reporters []TraceReporter
}
type TraceReporter interface {
// ConsumeEvent consumes a trace event. This is called synchronously,
// implementations should process the event quickly.
ConsumeEvent(TraceEvt)
} }
func WithTrace(path string) Option { func WithTrace(path string) Option {
return func(r *resourceManager) error { return func(r *resourceManager) error {
r.trace = &trace{path: path} if r.trace == nil {
r.trace = &trace{path: path}
} else {
r.trace.path = path
}
return nil
}
}
func WithTraceReporter(reporter TraceReporter) Option {
return func(r *resourceManager) error {
if r.trace == nil {
r.trace = &trace{reporters: []TraceReporter{reporter}}
}
r.trace.reporters = append(r.trace.reporters, reporter)
return nil return nil
} }
} }
type TraceEvtTyp string
const ( const (
traceStartEvt = "start" TraceStartEvt TraceEvtTyp = "start"
traceCreateScopeEvt = "create_scope" TraceCreateScopeEvt TraceEvtTyp = "create_scope"
traceDestroyScopeEvt = "destroy_scope" TraceDestroyScopeEvt TraceEvtTyp = "destroy_scope"
traceReserveMemoryEvt = "reserve_memory" TraceReserveMemoryEvt TraceEvtTyp = "reserve_memory"
traceBlockReserveMemoryEvt = "block_reserve_memory" TraceBlockReserveMemoryEvt TraceEvtTyp = "block_reserve_memory"
traceReleaseMemoryEvt = "release_memory" TraceReleaseMemoryEvt TraceEvtTyp = "release_memory"
traceAddStreamEvt = "add_stream" TraceAddStreamEvt TraceEvtTyp = "add_stream"
traceBlockAddStreamEvt = "block_add_stream" TraceBlockAddStreamEvt TraceEvtTyp = "block_add_stream"
traceRemoveStreamEvt = "remove_stream" TraceRemoveStreamEvt TraceEvtTyp = "remove_stream"
traceAddConnEvt = "add_conn" TraceAddConnEvt TraceEvtTyp = "add_conn"
traceBlockAddConnEvt = "block_add_conn" TraceBlockAddConnEvt TraceEvtTyp = "block_add_conn"
traceRemoveConnEvt = "remove_conn" TraceRemoveConnEvt TraceEvtTyp = "remove_conn"
) )
type scopeClass struct { type scopeClass struct {
@ -163,7 +186,7 @@ func (s scopeClass) MarshalJSON() ([]byte, error) {
type TraceEvt struct { type TraceEvt struct {
Time string Time string
Type string Type TraceEvtTyp
Scope *scopeClass `json:",omitempty"` Scope *scopeClass `json:",omitempty"`
Name string `json:",omitempty"` Name string `json:",omitempty"`
@ -199,10 +222,16 @@ func (t *trace) push(evt TraceEvt) {
evt.Scope = &scopeClass{name: evt.Name} evt.Scope = &scopeClass{name: evt.Name}
} }
t.pend = append(t.pend, evt) for _, reporter := range t.reporters {
reporter.ConsumeEvent(evt)
}
if t.path != "" {
t.pendingWrites = append(t.pendingWrites, evt)
}
} }
func (t *trace) background(out io.WriteCloser) { func (t *trace) backgroundWriter(out io.WriteCloser) {
defer close(t.closed) defer close(t.closed)
defer out.Close() defer out.Close()
@ -218,8 +247,8 @@ func (t *trace) background(out io.WriteCloser) {
getEvents := func() { getEvents := func() {
t.mx.Lock() t.mx.Lock()
tmp := t.pend tmp := t.pendingWrites
t.pend = pend[:0] t.pendingWrites = pend[:0]
pend = tmp pend = tmp
t.mx.Unlock() t.mx.Unlock()
} }
@ -288,15 +317,17 @@ func (t *trace) Start(limits Limiter) error {
t.ctx, t.cancel = context.WithCancel(context.Background()) t.ctx, t.cancel = context.WithCancel(context.Background())
t.closed = make(chan struct{}) t.closed = make(chan struct{})
out, err := os.OpenFile(t.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) if t.path != "" {
if err != nil { out, err := os.OpenFile(t.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
return nil if err != nil {
} return nil
}
go t.background(out) go t.backgroundWriter(out)
}
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceStartEvt, Type: TraceStartEvt,
Limit: limits, Limit: limits,
}) })
@ -329,7 +360,7 @@ func (t *trace) CreateScope(scope string, limit Limit) {
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceCreateScopeEvt, Type: TraceCreateScopeEvt,
Name: scope, Name: scope,
Limit: limit, Limit: limit,
}) })
@ -341,7 +372,7 @@ func (t *trace) DestroyScope(scope string) {
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceDestroyScopeEvt, Type: TraceDestroyScopeEvt,
Name: scope, Name: scope,
}) })
} }
@ -356,7 +387,7 @@ func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceReserveMemoryEvt, Type: TraceReserveMemoryEvt,
Name: scope, Name: scope,
Priority: prio, Priority: prio,
Delta: size, Delta: size,
@ -374,7 +405,7 @@ func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceBlockReserveMemoryEvt, Type: TraceBlockReserveMemoryEvt,
Name: scope, Name: scope,
Priority: prio, Priority: prio,
Delta: size, Delta: size,
@ -392,7 +423,7 @@ func (t *trace) ReleaseMemory(scope string, size, mem int64) {
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceReleaseMemoryEvt, Type: TraceReleaseMemoryEvt,
Name: scope, Name: scope,
Delta: size, Delta: size,
Memory: mem, Memory: mem,
@ -412,7 +443,7 @@ func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstre
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceAddStreamEvt, Type: TraceAddStreamEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -434,7 +465,7 @@ func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn,
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceBlockAddStreamEvt, Type: TraceBlockAddStreamEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -456,7 +487,7 @@ func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, ns
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceRemoveStreamEvt, Type: TraceRemoveStreamEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -475,7 +506,7 @@ func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreams
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceAddStreamEvt, Type: TraceAddStreamEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -494,7 +525,7 @@ func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nst
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceBlockAddStreamEvt, Type: TraceBlockAddStreamEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -513,7 +544,7 @@ func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstre
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceRemoveStreamEvt, Type: TraceRemoveStreamEvt,
Name: scope, Name: scope,
DeltaIn: -deltaIn, DeltaIn: -deltaIn,
DeltaOut: -deltaOut, DeltaOut: -deltaOut,
@ -538,7 +569,7 @@ func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsI
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceAddConnEvt, Type: TraceAddConnEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -565,7 +596,7 @@ func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nc
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceBlockAddConnEvt, Type: TraceBlockAddConnEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -592,7 +623,7 @@ func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, ncon
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceRemoveConnEvt, Type: TraceRemoveConnEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -613,7 +644,7 @@ func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nco
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceAddConnEvt, Type: TraceAddConnEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -634,7 +665,7 @@ func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceBlockAddConnEvt, Type: TraceBlockAddConnEvt,
Name: scope, Name: scope,
DeltaIn: deltaIn, DeltaIn: deltaIn,
DeltaOut: deltaOut, DeltaOut: deltaOut,
@ -655,7 +686,7 @@ func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn,
} }
t.push(TraceEvt{ t.push(TraceEvt{
Type: traceRemoveConnEvt, Type: TraceRemoveConnEvt,
Name: scope, Name: scope,
DeltaIn: -deltaIn, DeltaIn: -deltaIn,
DeltaOut: -deltaOut, DeltaOut: -deltaOut,

Loading…
Cancel
Save