Browse Source

Merge pull request #851 from libp2p/feat/848

Signal address change
pull/853/head v0.7.0
Steven Allen 5 years ago
committed by GitHub
parent
commit
6e97b5d817
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      go.mod
  2. 2
      go.sum
  3. 82
      p2p/host/basic/basic_host.go
  4. 48
      p2p/host/basic/basic_host_test.go
  5. 2
      p2p/host/relay/autorelay.go

2
go.mod

@ -7,7 +7,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.3
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p-autonat v0.2.0

2
go.sum

@ -110,6 +110,8 @@ github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=

82
p2p/host/basic/basic_host.go

@ -4,7 +4,6 @@ import (
"context"
"io"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
@ -84,12 +83,12 @@ type BasicHost struct {
proc goprocess.Process
mx sync.Mutex
lastAddrs []ma.Multiaddr
emitters struct {
emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
}
addrChangeChan chan struct{}
}
var _ host.Host = (*BasicHost)(nil)
@ -130,12 +129,13 @@ type HostOpts struct {
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) {
h := &BasicHost{
network: net,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
network: net,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
addrChangeChan: make(chan struct{}, 1),
}
var err error
@ -230,6 +230,7 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
}
h, err := NewHost(context.Background(), net, hostopts)
h.Start()
if err != nil {
// this cannot happen with legacy options
// plus we want to keep the (deprecated) legacy interface unchanged
@ -300,24 +301,13 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
go handle(protoID, s)
}
// CheckForAddressChanges determines whether our listen addresses have recently
// changed and emits an EvtLocalAddressesUpdatedEvent & a Push Identify if so.
// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently
// changed.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) CheckForAddressChanges() {
h.mx.Lock()
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs)
if changeEvt != nil {
h.lastAddrs = addrs
}
h.mx.Unlock()
if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
func (h *BasicHost) SignalAddressChange() {
select {
case h.addrChangeChan <- struct{}{}:
default:
}
}
@ -360,41 +350,31 @@ func (h *BasicHost) background(p goprocess.Process) {
defer ticker.Stop()
// initialize lastAddrs
h.mx.Lock()
if h.lastAddrs == nil {
h.lastAddrs = h.Addrs()
}
h.mx.Unlock()
lastAddrs := h.Addrs()
for {
select {
case <-ticker.C:
h.CheckForAddressChanges()
case <-h.addrChangeChan:
case <-p.Closing():
return
}
}
}
func sameAddrs(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}
bmap := make(map[string]struct{}, len(b))
for _, addr := range b {
bmap[string(addr.Bytes())] = struct{}{}
}
// emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed.
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs)
if changeEvt != nil {
lastAddrs = addrs
}
for _, addr := range a {
_, ok := bmap[string(addr.Bytes())]
if !ok {
return false
if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
}
}
return true
}
// ID returns the (local) peer.ID associated with this Host

48
p2p/host/basic/basic_host_test.go

@ -6,6 +6,7 @@ import (
"io"
"reflect"
"sort"
"sync"
"testing"
"time"
@ -518,7 +519,7 @@ func TestHostAddrChangeDetection(t *testing.T) {
{ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")},
}
// The events we expect the host to emit when CheckForAddressChanges is called
// The events we expect the host to emit when SignalAddressChange is called
// and the changes between addr sets are detected
expectedEvents := []event.EvtLocalAddressesUpdated{
{
@ -548,8 +549,11 @@ func TestHostAddrChangeDetection(t *testing.T) {
},
}
var lk sync.Mutex
currentAddrSet := 0
addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr {
lk.Lock()
defer lk.Unlock()
return addrSets[currentAddrSet]
}
@ -563,46 +567,40 @@ func TestHostAddrChangeDetection(t *testing.T) {
}
defer sub.Close()
// wait for the host background thread to start
time.Sleep(1 * time.Second)
// host should start with no addrs (addrSet 0)
addrs := h.Addrs()
if len(addrs) != 0 {
t.Fatalf("expected 0 addrs, got %d", len(addrs))
}
// Advance between addrSets
// change addr, signal and assert event
for i := 1; i < len(addrSets); i++ {
lk.Lock()
currentAddrSet = i
h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update
lk.Unlock()
h.SignalAddressChange()
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt)
}
}
}
// drain events from the subscription
var receivedEvents []event.EvtLocalAddressesUpdated
readEvents:
func waitForAddrChangeEvent(ctx context.Context, sub event.Subscription, t *testing.T) event.EvtLocalAddressesUpdated {
for {
select {
case evt, more := <-sub.Out():
if !more {
break readEvents
}
receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated))
if len(receivedEvents) == len(expectedEvents) {
break readEvents
t.Fatal("channel should not be closed")
}
return evt.(event.EvtLocalAddressesUpdated)
case <-ctx.Done():
break readEvents
case <-time.After(1 * time.Second):
break readEvents
}
}
// assert that we received the events we expected
if len(receivedEvents) != len(expectedEvents) {
t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents))
}
for i, expected := range expectedEvents {
actual := receivedEvents[i]
if !updatedAddrEventsEqual(expected, actual) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual)
t.Fatal("context should not have cancelled")
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for address change event")
}
}
}

2
p2p/host/relay/autorelay.go

@ -119,7 +119,7 @@ func (ar *AutoRelay) background(ctx context.Context) {
ar.cachedAddrs = nil
ar.mx.Unlock()
push = false
ar.host.CheckForAddressChanges()
ar.host.SignalAddressChange()
}
}
}

Loading…
Cancel
Save