From ed4646f711b1b3323fcae43e3d236944b47babb7 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 19 Mar 2020 19:05:15 +0530 Subject: [PATCH 1/2] local addr updated event --- p2p/host/basic/basic_host.go | 56 ++++++++++-- p2p/host/basic/basic_host_test.go | 139 ++++++++++++++++++++++++++++++ p2p/host/relay/autorelay.go | 2 +- 3 files changed, 188 insertions(+), 9 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index f8bf279cf..cae41a9e4 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -88,6 +88,7 @@ type BasicHost struct { lastAddrs []ma.Multiaddr emitters struct { evtLocalProtocolsUpdated event.Emitter + evtLocalAddrsUpdated event.Emitter } } @@ -141,6 +142,9 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { return nil, err } + if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil { + return nil, err + } h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { if h.natmgr != nil { @@ -150,6 +154,7 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo h.cmgr.Close() } _ = h.emitters.evtLocalProtocolsUpdated.Close() + _ = h.emitters.evtLocalAddrsUpdated.Close() return h.Network().Close() }) @@ -295,24 +300,59 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { go handle(protoID, s) } -// PushIdentify pushes an identify update through the identify push protocol +// CheckForAddressChanges determines whether our listen addresses have recently +// changed and emits an EvtLocalAddressesUpdatedEvent & a Push Identify if so. // Warning: this interface is unstable and may disappear in the future. -func (h *BasicHost) PushIdentify() { - push := false - +func (h *BasicHost) CheckForAddressChanges() { h.mx.Lock() addrs := h.Addrs() - if !sameAddrs(addrs, h.lastAddrs) { - push = true + changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs) + if changeEvt != nil { h.lastAddrs = addrs } h.mx.Unlock() - if push { + if changeEvt != nil { + err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt) + if err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } h.ids.Push() } } +func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { + prevmap := make(map[string]ma.Multiaddr, len(prev)) + evt := event.EvtLocalAddressesUpdated{Diffs: true} + addrsAdded := false + + for _, addr := range prev { + prevmap[string(addr.Bytes())] = addr + } + for _, addr := range current { + _, ok := prevmap[string(addr.Bytes())] + updated := event.UpdatedAddress{Address: addr} + if ok { + updated.Action = event.Maintained + } else { + updated.Action = event.Added + addrsAdded = true + } + evt.Current = append(evt.Current, updated) + delete(prevmap, string(addr.Bytes())) + } + for _, addr := range prevmap { + updated := event.UpdatedAddress{Action: event.Removed, Address: addr} + evt.Removed = append(evt.Removed, updated) + } + + if !addrsAdded && len(evt.Removed) == 0 { + return nil + } + + return &evt +} + func (h *BasicHost) background(p goprocess.Process) { // periodically schedules an IdentifyPush to update our peers for changes // in our address set (if needed) @@ -329,7 +369,7 @@ func (h *BasicHost) background(p goprocess.Process) { for { select { case <-ticker.C: - h.PushIdentify() + h.CheckForAddressChanges() case <-p.Closing(): return diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index ed0370b8a..dd3593a89 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -507,6 +507,145 @@ func TestAddrResolutionRecursive(t *testing.T) { } } +func TestHostAddrChangeDetection(t *testing.T) { + // This test uses the address factory to provide several + // sets of listen addresses for the host. It advances through + // the sets by changing the currentAddrSet index var below. + addrSets := [][]ma.Multiaddr{ + {}, + {ma.StringCast("/ip4/1.2.3.4/tcp/1234")}, + {ma.StringCast("/ip4/1.2.3.4/tcp/1234"), ma.StringCast("/ip4/2.3.4.5/tcp/1234")}, + {ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")}, + } + + // The events we expect the host to emit when CheckForAddressChanges is called + // and the changes between addr sets are detected + expectedEvents := []event.EvtLocalAddressesUpdated{ + { + Diffs: true, + Current: []event.UpdatedAddress{ + {Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")}, + }, + Removed: []event.UpdatedAddress{}, + }, + { + Diffs: true, + Current: []event.UpdatedAddress{ + {Action: event.Maintained, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")}, + {Action: event.Added, Address: ma.StringCast("/ip4/2.3.4.5/tcp/1234")}, + }, + Removed: []event.UpdatedAddress{}, + }, + { + Diffs: true, + Current: []event.UpdatedAddress{ + {Action: event.Added, Address: ma.StringCast("/ip4/3.4.5.6/tcp/4321")}, + {Action: event.Maintained, Address: ma.StringCast("/ip4/2.3.4.5/tcp/1234")}, + }, + Removed: []event.UpdatedAddress{ + {Action: event.Removed, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")}, + }, + }, + } + + currentAddrSet := 0 + addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr { + return addrSets[currentAddrSet] + } + + ctx := context.Background() + h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(addrsFactory)) + defer h.Close() + + sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{}, eventbus.BufSize(10)) + if err != nil { + t.Error(err) + } + defer sub.Close() + + // host should start with no addrs (addrSet 0) + addrs := h.Addrs() + if len(addrs) != 0 { + t.Fatalf("expected 0 addrs, got %d", len(addrs)) + } + + // Advance between addrSets + for i := 1; i < len(addrSets); i++ { + currentAddrSet = i + h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update + } + + // drain events from the subscription + var receivedEvents []event.EvtLocalAddressesUpdated +readEvents: + for { + select { + case evt, more := <-sub.Out(): + if !more { + break readEvents + } + receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated)) + if len(receivedEvents) == len(expectedEvents) { + break readEvents + } + case <-ctx.Done(): + break readEvents + case <-time.After(1 * time.Second): + break readEvents + } + } + + // assert that we received the events we expected + if len(receivedEvents) != len(expectedEvents) { + t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents)) + } + for i, expected := range expectedEvents { + actual := receivedEvents[i] + if !updatedAddrEventsEqual(expected, actual) { + t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual) + } + } +} + +// updatedAddrsEqual is a helper to check whether two lists of +// event.UpdatedAddress have the same contents, ignoring ordering. +func updatedAddrsEqual(a, b []event.UpdatedAddress) bool { + if len(a) != len(b) { + return false + } + + // We can't use an UpdatedAddress directly as a map key, since + // Multiaddr is an interface, and go won't know how to compare + // for equality. So we convert to this little struct, which + // stores the multiaddr as a string. + type ua struct { + action event.AddrAction + addrStr string + } + aSet := make(map[ua]struct{}) + for _, addr := range a { + k := ua{action: addr.Action, addrStr: string(addr.Address.Bytes())} + aSet[k] = struct{}{} + } + for _, addr := range b { + k := ua{action: addr.Action, addrStr: string(addr.Address.Bytes())} + _, ok := aSet[k] + if !ok { + return false + } + } + return true +} + +// updatedAddrEventsEqual is a helper to check whether two +// event.EvtLocalAddressesUpdated are equal, ignoring the ordering of +// addresses in the inner lists. +func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool { + return a.Diffs == b.Diffs && + updatedAddrsEqual(a.Current, b.Current) && + updatedAddrsEqual(a.Removed, b.Removed) +} + type sortedMultiaddrs []ma.Multiaddr func (sma sortedMultiaddrs) Len() int { return len(sma) } diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index 0c3f40b53..484c7dbb9 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -125,7 +125,7 @@ func (ar *AutoRelay) background(ctx context.Context) { ar.cachedAddrs = nil ar.mx.Unlock() push = false - ar.host.PushIdentify() + ar.host.CheckForAddressChanges() } select { From d49806efc92fdc0f996c047d56f0652e3ef3966e Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 20 Mar 2020 10:04:13 +0530 Subject: [PATCH 2/2] emit address change evt --- p2p/host/basic/basic_host.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index cae41a9e4..bce9e580e 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -356,7 +356,7 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses func (h *BasicHost) background(p goprocess.Process) { // periodically schedules an IdentifyPush to update our peers for changes // in our address set (if needed) - ticker := time.NewTicker(1 * time.Minute) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() // initialize lastAddrs