Browse Source

emit the EvtPeerConnectednessChanged event

pull/1411/head
Marten Seemann 3 years ago
parent
commit
dd821e8db9
  1. 5
      p2p/host/blank/blank.go
  2. 73
      p2p/host/blank/peer_connectedness.go
  3. 46
      p2p/host/blank/peer_connectedness_test.go

5
p2p/host/blank/blank.go

@ -70,6 +70,11 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil return nil
} }
evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil
}
n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))
n.SetStreamHandler(bh.newStreamHandler) n.SetStreamHandler(bh.newStreamHandler)

73
p2p/host/blank/peer_connectedness.go

@ -0,0 +1,73 @@
package blankhost
import (
"sync"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
type peerConnectWatcher struct {
emitter event.Emitter
mutex sync.Mutex
connected map[peer.ID]struct{}
}
var _ network.Notifiee = &peerConnectWatcher{}
func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
return &peerConnectWatcher{
emitter: emitter,
connected: make(map[peer.ID]struct{}),
}
}
func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
w.handleTransition(p, n.Connectedness(p))
}
func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
w.handleTransition(p, n.Connectedness(p))
}
func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) {
if changed := w.checkTransition(p, state); !changed {
return
}
w.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: state,
})
}
func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool {
w.mutex.Lock()
defer w.mutex.Unlock()
switch state {
case network.Connected:
if _, ok := w.connected[p]; ok {
return false
}
w.connected[p] = struct{}{}
return true
case network.NotConnected:
if _, ok := w.connected[p]; ok {
delete(w.connected, p)
return true
}
return false
default:
return false
}
}

46
p2p/host/blank/peer_connectedness_test.go

@ -0,0 +1,46 @@
package blankhost
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/stretchr/testify/require"
)
func TestPeerConnectedness(t *testing.T) {
h1 := NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
h2 := NewBlankHost(swarmt.GenSwarm(t))
sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
defer sub1.Close()
sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
defer sub2.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h2.ID(),
Connectedness: network.Connected,
})
require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h1.ID(),
Connectedness: network.Connected,
})
// now close h2. This will disconnect it from h1.
require.NoError(t, h2.Close())
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h2.ID(),
Connectedness: network.NotConnected,
})
}
Loading…
Cancel
Save