diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index c5133ed7b..9ebde0d2f 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -89,10 +89,17 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err = b.withNode(typ.Elem(), func(n *node) { // when all subs are waiting on this channel, setting this to 1 doesn't // really affect benchmarks - i := n.sub(refCh) + n.sub(refCh) + c = func() { n.lk.Lock() - delete(n.sinks, i) + for i := 0; i < len(n.sinks); i++ { + if n.sinks[i] == refCh { + n.sinks[i] = n.sinks[len(n.sinks)-1] + n.sinks = n.sinks[:len(n.sinks)-1] + break + } + } tryDrop := len(n.sinks) == 0 && n.nEmitters == 0 n.lk.Unlock() if tryDrop { @@ -143,30 +150,20 @@ type node struct { // emitter ref count nEmitters int32 - // sink index counter - sinkC int - - // TODO: we could make emit a bit faster by making this into an array, but - // it doesn't seem needed for now - sinks map[int]reflect.Value - keepLast bool last reflect.Value + + sinks []reflect.Value } func newNode(typ reflect.Type) *node { return &node{ typ: typ, - - sinks: map[int]reflect.Value{}, } } -func (n *node) sub(outChan reflect.Value) int { - i := n.sinkC - n.sinkC++ - n.sinks[i] = outChan - return i +func (n *node) sub(outChan reflect.Value) { + n.sinks = append(n.sinks, outChan) } func (n *node) emit(event interface{}) {