Browse Source

eventbus abstraction + initial events (#17)

pull/1683/head
Whyrusleeping 5 years ago
committed by Raúl Kripalani
parent
commit
d962316f81
  1. 48
      core/event/bus.go
  2. 11
      core/event/doc.go
  3. 26
      core/event/protocol.go
  4. 4
      core/host/host.go
  5. 20
      core/protocol/id.go

48
core/event/bus.go

@ -0,0 +1,48 @@
package event
import "io"
// SubscriptionOpt represents a subscriber option. Use the options exposed by the implementation of choice.
type SubscriptionOpt = func(interface{}) error
// EmitterOpt represents an emitter option. Use the options exposed by the implementation of choice.
type EmitterOpt = func(interface{}) error
// CancelFunc closes a subscriber.
type CancelFunc = func()
// Emitter represents an actor that emits events onto the eventbus.
type Emitter interface {
io.Closer
// Emit emits an event onto the eventbus. If any channel subscribed to the topic is blocked,
// calls to Emit will block.
//
// Calling this function with wrong event type will cause a panic.
Emit(evt interface{})
}
// Bus is an interface for a type-based event delivery system.
type Bus interface {
// Subscribe creates a new subscription.
//
// Failing to drain the channel may cause publishers to block. CancelFunc must return after
// last send to the channel.
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
Subscribe(typedChan interface{}, opts ...SubscriptionOpt) (CancelFunc, error)
// Emitter creates a new event emitter.
//
// eventType accepts typed nil pointers, and uses the type information for wiring purposes.
//
// Example:
// em, err := eventbus.Emitter(new(EventT))
// defer em.Close() // MUST call this after being done with the emitter
// em.Emit(EventT{})
Emitter(eventType interface{}, opts ...EmitterOpt) (Emitter, error)
}

11
core/event/doc.go

@ -0,0 +1,11 @@
// Package event contains the abstractions for a local event bus, along with the standard events
// that libp2p subsystems may emit.
//
// Source code is arranged as follows:
// * doc.go: this file.
// * bus.go: abstractions for the event bus.
// * rest: event structs, sensibly categorised in files by entity, and following this naming convention:
// Evt[Entity (noun)][Event (verb past tense / gerund)]
// The past tense is used to convey that something happened, whereas the gerund form of the verb (-ing)
// expresses that a process is in progress. Examples: EvtConnEstablishing, EvtConnEstablished.
package event

26
core/event/protocol.go

@ -0,0 +1,26 @@
package event
import (
peer "github.com/libp2p/go-libp2p-core/peer"
protocol "github.com/libp2p/go-libp2p-core/protocol"
)
// EvtPeerProtocolsUpdated should be emitted when a peer we're connected to adds or removes protocols from their stack.
type EvtPeerProtocolsUpdated struct {
// Peer is the peer whose protocols were updated.
Peer peer.ID
// Added enumerates the protocols that were added by this peer.
Added []protocol.ID
// Removed enumerates the protocols that were removed by this peer.
Removed []protocol.ID
}
// EvtLocalProtocolsUpdated should be emitted when stream handlers are attached or detached from the local host.
// For handlers attached with a matcher predicate (host.SetStreamHandlerMatch()), only the protocol ID will be
// included in this event.
type EvtLocalProtocolsUpdated struct {
// Added enumerates the protocols that were added locally.
Added []protocol.ID
// Removed enumerates the protocols that were removed locally.
Removed []protocol.ID
}

4
core/host/host.go

@ -7,6 +7,7 @@ import (
"context"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
@ -68,4 +69,7 @@ type Host interface {
// ConnManager returns this hosts connection manager
ConnManager() connmgr.ConnManager
// EventBus returns the hosts eventbus
EventBus() event.Bus
}

20
core/protocol/id.go

@ -7,3 +7,23 @@ type ID string
const (
TestingID ID = "/p2p/_testing"
)
// ConvertFromStrings is a convenience function that takes a slice of strings and
// converts it to a slice of protocol.ID.
func ConvertFromStrings(ids []string) (res []ID) {
res = make([]ID, 0, len(ids))
for _, id := range ids {
res = append(res, ID(id))
}
return res
}
// ConvertToStrings is a convenience function that takes a slice of protocol.ID and
// converts it to a slice of strings.
func ConvertToStrings(ids []ID) (res []string) {
res = make([]string, 0, len(ids))
for _, id := range ids {
res = append(res, string(id))
}
return res
}

Loading…
Cancel
Save