|
|
@ -89,10 +89,17 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, |
|
|
|
err = b.withNode(typ.Elem(), func(n *node) { |
|
|
|
// when all subs are waiting on this channel, setting this to 1 doesn't
|
|
|
|
// really affect benchmarks
|
|
|
|
i := n.sub(refCh) |
|
|
|
n.sub(refCh) |
|
|
|
|
|
|
|
c = func() { |
|
|
|
n.lk.Lock() |
|
|
|
delete(n.sinks, i) |
|
|
|
for i := 0; i < len(n.sinks); i++ { |
|
|
|
if n.sinks[i] == refCh { |
|
|
|
n.sinks[i] = n.sinks[len(n.sinks)-1] |
|
|
|
n.sinks = n.sinks[:len(n.sinks)-1] |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
tryDrop := len(n.sinks) == 0 && n.nEmitters == 0 |
|
|
|
n.lk.Unlock() |
|
|
|
if tryDrop { |
|
|
@ -143,30 +150,20 @@ type node struct { |
|
|
|
// emitter ref count
|
|
|
|
nEmitters int32 |
|
|
|
|
|
|
|
// sink index counter
|
|
|
|
sinkC int |
|
|
|
|
|
|
|
// TODO: we could make emit a bit faster by making this into an array, but
|
|
|
|
// it doesn't seem needed for now
|
|
|
|
sinks map[int]reflect.Value |
|
|
|
|
|
|
|
keepLast bool |
|
|
|
last reflect.Value |
|
|
|
|
|
|
|
sinks []reflect.Value |
|
|
|
} |
|
|
|
|
|
|
|
func newNode(typ reflect.Type) *node { |
|
|
|
return &node{ |
|
|
|
typ: typ, |
|
|
|
|
|
|
|
sinks: map[int]reflect.Value{}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (n *node) sub(outChan reflect.Value) int { |
|
|
|
i := n.sinkC |
|
|
|
n.sinkC++ |
|
|
|
n.sinks[i] = outChan |
|
|
|
return i |
|
|
|
func (n *node) sub(outChan reflect.Value) { |
|
|
|
n.sinks = append(n.sinks, outChan) |
|
|
|
} |
|
|
|
|
|
|
|
func (n *node) emit(event interface{}) { |
|
|
|