From 5c3146fd841a8e5188b3b68687475d594f523fda Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 2 Feb 2015 10:33:08 -0800 Subject: [PATCH] p2p/peer/addr: manager with expirations --- peer/addr/addr_manager.go | 152 +++++++++++++++++++++++++++ peer/addr/addr_manager_test.go | 182 +++++++++++++++++++++++++++++++++ 2 files changed, 334 insertions(+) create mode 100644 peer/addr/addr_manager.go create mode 100644 peer/addr/addr_manager_test.go diff --git a/peer/addr/addr_manager.go b/peer/addr/addr_manager.go new file mode 100644 index 000000000..b35b02eac --- /dev/null +++ b/peer/addr/addr_manager.go @@ -0,0 +1,152 @@ +// package addr provides useful address utilities for p2p +// applications. It buys into the multi-transport addressing +// scheme Multiaddr, and uses it to build its own p2p addressing. +// All Addrs must have an associated peer.ID. +package addr + +import ( + "sync" + "time" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + peer "github.com/jbenet/go-ipfs/p2p/peer" +) + +type expiringAddr struct { + Addr ma.Multiaddr + TTL time.Time +} + +func (e *expiringAddr) ExpiredBy(t time.Time) bool { + return t.After(e.TTL) +} + +type addrSet map[string]expiringAddr + +// Manager manages addresses. +// The zero-value is ready to be used. +type Manager struct { + addrmu sync.Mutex // guards addrs + addrs map[peer.ID]addrSet +} + +// ensures the Manager is initialized. +// So we can use the zero value. +func (mgr *Manager) init() { + if mgr.addrs == nil { + mgr.addrs = make(map[peer.ID]addrSet) + } +} + +// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl) +func (mgr *Manager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { + mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) +} + +// AddAddrs gives Manager addresses to use, with a given ttl +// (time-to-live), after which the address is no longer valid. +// If the manager has a longer TTL, the operation is a no-op for that address +func (mgr *Manager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + + // if ttl is zero, exit. nothing to do. + if ttl <= 0 { + return + } + + // so zero value can be used + mgr.init() + + amap, found := mgr.addrs[p] + if !found { + amap = make(addrSet) + mgr.addrs[p] = amap + } + + // only expand ttls + exp := time.Now().Add(ttl) + for _, addr := range addrs { + addrstr := addr.String() + a, found := amap[addrstr] + if !found || exp.After(a.TTL) { + amap[addrstr] = expiringAddr{Addr: addr, TTL: exp} + } + } +} + +// SetAddr calls mgr.SetAddrs(p, addr, ttl) +func (mgr *Manager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { + mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl) +} + +// SetAddrs sets the ttl on addresses. This clears any TTL there previously. +// This is used when we receive the best estimate of the validity of an address. +func (mgr *Manager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + + // so zero value can be used + mgr.init() + + amap, found := mgr.addrs[p] + if !found { + amap = make(addrSet) + mgr.addrs[p] = amap + } + + exp := time.Now().Add(ttl) + for _, addr := range addrs { + // re-set all of them for new ttl. + addrs := addr.String() + + if ttl > 0 { + amap[addrs] = expiringAddr{Addr: addr, TTL: exp} + } else { + delete(amap, addrs) + } + } +} + +// Addresses returns all known (and valid) addresses for a given peer. +func (mgr *Manager) Addrs(p peer.ID) []ma.Multiaddr { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + + // not initialized? nothing to give. + if mgr.addrs == nil { + return nil + } + + maddrs, found := mgr.addrs[p] + if !found { + return nil + } + + now := time.Now() + good := make([]ma.Multiaddr, 0, len(maddrs)) + var expired []string + for s, m := range maddrs { + if m.ExpiredBy(now) { + expired = append(expired, s) + } else { + good = append(good, m.Addr) + } + } + + // clean up the expired ones. + for _, s := range expired { + delete(maddrs, s) + } + return good +} + +// ClearAddresses removes all previously stored addresses +func (mgr *Manager) ClearAddrs(p peer.ID) { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + mgr.init() + + mgr.addrs[p] = make(addrSet) // clear what was there before +} diff --git a/peer/addr/addr_manager_test.go b/peer/addr/addr_manager_test.go new file mode 100644 index 000000000..be24ff12b --- /dev/null +++ b/peer/addr/addr_manager_test.go @@ -0,0 +1,182 @@ +package addr + +import ( + "testing" + "time" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + peer "github.com/jbenet/go-ipfs/p2p/peer" +) + +func IDS(t *testing.T, ids string) peer.ID { + id, err := peer.IDB58Decode(ids) + if err != nil { + t.Fatal(err) + } + return id +} + +func MA(t *testing.T, m string) ma.Multiaddr { + maddr, err := ma.NewMultiaddr(m) + if err != nil { + t.Fatal(err) + } + return maddr +} + +func testHas(t *testing.T, exp, act []ma.Multiaddr) { + if len(exp) != len(act) { + t.Fatal("lengths not the same") + } + + for _, a := range exp { + found := false + + for _, b := range act { + if a.Equal(b) { + found = true + break + } + } + + if !found { + t.Fatal("expected address %s not found", a) + } + } +} + +func TestAddresses(t *testing.T) { + + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ") + id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn") + id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn") + id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km") + + ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111") + ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111") + ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222") + ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111") + ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222") + ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333") + ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111") + ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222") + ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333") + ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444") + ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + ttl := time.Hour + m := Manager{} + m.AddAddr(id1, ma11, ttl) + + m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) + m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) // idempotency + + m.AddAddr(id3, ma31, ttl) + m.AddAddr(id3, ma32, ttl) + m.AddAddr(id3, ma33, ttl) + m.AddAddr(id3, ma33, ttl) // idempotency + m.AddAddr(id3, ma33, ttl) + + m.AddAddrs(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // multiple + + m.AddAddrs(id5, []ma.Multiaddr{ma21, ma22}, ttl) // clearing + m.AddAddrs(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // clearing + m.ClearAddrs(id5) + m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing + + // test the Addresses return value + testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2)) + testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3)) + testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4)) + testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5)) + +} + +func TestAddressesExpire(t *testing.T) { + + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM") + ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + m := Manager{} + m.AddAddr(id1, ma11, time.Hour) + m.AddAddr(id1, ma12, time.Hour) + m.AddAddr(id1, ma13, time.Hour) + m.AddAddr(id2, ma24, time.Hour) + m.AddAddr(id2, ma25, time.Hour) + + testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id1, ma11, 2*time.Hour) + m.SetAddr(id1, ma12, 2*time.Hour) + m.SetAddr(id1, ma13, 2*time.Hour) + m.SetAddr(id2, ma24, 2*time.Hour) + m.SetAddr(id2, ma25, 2*time.Hour) + + testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id1, ma11, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id1, ma13, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id2, ma24, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma25}, m.Addrs(id2)) + + m.SetAddr(id2, ma25, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1)) + testHas(t, nil, m.Addrs(id2)) + + m.SetAddr(id1, ma12, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, nil, m.Addrs(id1)) + testHas(t, nil, m.Addrs(id2)) +} + +func TestClearWorks(t *testing.T) { + + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM") + ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + m := Manager{} + m.AddAddr(id1, ma11, time.Hour) + m.AddAddr(id1, ma12, time.Hour) + m.AddAddr(id1, ma13, time.Hour) + m.AddAddr(id2, ma24, time.Hour) + m.AddAddr(id2, ma25, time.Hour) + + testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.ClearAddrs(id1) + m.ClearAddrs(id2) + + testHas(t, nil, m.Addrs(id1)) + testHas(t, nil, m.Addrs(id2)) +}