From a5858912d6da151ea5c95ba62742bbdbb9ad3ac6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 17 Oct 2018 17:05:15 +0300 Subject: [PATCH] autorelay hosts --- p2p/host/relay/autorelay.go | 252 ++++++++++++++++++++++++++++++++++++ p2p/host/relay/log.go | 7 + p2p/host/relay/relay.go | 23 ++++ 3 files changed, 282 insertions(+) create mode 100644 p2p/host/relay/autorelay.go create mode 100644 p2p/host/relay/log.go create mode 100644 p2p/host/relay/relay.go diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go new file mode 100644 index 000000000..09a9e9884 --- /dev/null +++ b/p2p/host/relay/autorelay.go @@ -0,0 +1,252 @@ +package relay + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + + autonat "github.com/libp2p/go-libp2p-autonat" + discovery "github.com/libp2p/go-libp2p-discovery" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" +) + +var DesiredRelays = 3 + +// AutoRelayHost is a Host that uses relays for connectivity when a NAT is detected. +type AutoRelayHost struct { + *basic.BasicHost + discover discovery.Discoverer + autonat autonat.AutoNAT + addrsF basic.AddrsFactory + + disconnect chan struct{} + + mx sync.Mutex + relays map[peer.ID]pstore.PeerInfo + addrs []ma.Multiaddr +} + +func NewAutoRelayHost(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer) *AutoRelayHost { + autonat := autonat.NewAutoNAT(ctx, bhost) + h := &AutoRelayHost{ + BasicHost: bhost, + discover: discover, + autonat: autonat, + addrsF: bhost.AddrsFactory, + relays: make(map[peer.ID]pstore.PeerInfo), + disconnect: make(chan struct{}, 1), + } + bhost.AddrsFactory = h.hostAddrs + bhost.Network().Notify(h) + go h.background(ctx) + return h +} + +func (h *AutoRelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + h.mx.Lock() + defer h.mx.Unlock() + if h.addrs != nil && h.autonat.Status() == autonat.NATStatusPrivate { + return h.addrs + } else { + return h.addrsF(addrs) + } +} + +func (h *AutoRelayHost) background(ctx context.Context) { + select { + case <-time.After(autonat.AutoNATBootDelay + 30*time.Second): + case <-ctx.Done(): + return + } + + for { + wait := autonat.AutoNATRefreshInterval + switch h.autonat.Status() { + case autonat.NATStatusUnknown: + wait = autonat.AutoNATRetryInterval + case autonat.NATStatusPublic: + case autonat.NATStatusPrivate: + h.findRelays(ctx) + } + + select { + case <-h.disconnect: + // invalidate addrs + h.mx.Lock() + h.addrs = nil + h.mx.Unlock() + case <-time.After(wait): + case <-ctx.Done(): + return + } + } +} + +func (h *AutoRelayHost) findRelays(ctx context.Context) { + h.mx.Lock() + if len(h.relays) >= DesiredRelays { + h.mx.Unlock() + return + } + need := DesiredRelays - len(h.relays) + h.mx.Unlock() + + limit := 20 + for ; need > limit; limit *= 2 { + } + + dctx, cancel := context.WithTimeout(ctx, 60*time.Second) + pis, err := discovery.FindPeers(dctx, h.discover, "/libp2p/relay", limit) + cancel() + if err != nil { + log.Debugf("error discovering relays: %s", err.Error()) + return + } + + // TODO better relay selection strategy; this just selects random relays + // but we should probably use ping latency as the selection metric + shuffleRelays(pis) + + update := 0 + + for _, pi := range pis { + h.mx.Lock() + if _, ok := h.relays[pi.ID]; ok { + h.mx.Unlock() + continue + } + h.mx.Unlock() + + cctx, cancel := context.WithTimeout(ctx, 60*time.Second) + err = h.Connect(cctx, pi) + cancel() + if err != nil { + log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error()) + continue + } + + log.Debugf("connected to relay %s", pi.ID) + h.mx.Lock() + h.relays[pi.ID] = pi + h.mx.Unlock() + + update++ + need-- + if need == 0 { + break + } + } + + if update > 0 || h.addrs == nil { + h.updateAddrs() + } +} + +func (h *AutoRelayHost) updateAddrs() { + h.doUpdateAddrs() + h.PushIdentify() +} + +func (h *AutoRelayHost) doUpdateAddrs() { + h.mx.Lock() + defer h.mx.Unlock() + + addrs := h.addrsF(h.AllAddrs()) + raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(h.relays)) + + // remove our public addresses from the list and replace them by just the public IP + for _, addr := range addrs { + if manet.IsPublicAddr(addr) { + ip, err := addr.ValueForProtocol(ma.P_IP4) + if err == nil { + pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s", ip)) + if err != nil { + panic(err) + } + + if !containsAddr(raddrs, pub) { + raddrs = append(raddrs, pub) + } + continue + } + + ip, err = addr.ValueForProtocol(ma.P_IP6) + if err == nil { + pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s", ip)) + if err != nil { + panic(err) + } + if !containsAddr(raddrs, pub) { + raddrs = append(raddrs, pub) + } + continue + } + } else { + raddrs = append(raddrs, addr) + } + } + + circuit, err := ma.NewMultiaddr("/p2p-circuit") + if err != nil { + panic(err) + } + + for _, pi := range h.relays { + for _, addr := range pi.Addrs { + if !manet.IsPrivateAddr(addr) { + pub := addr.Encapsulate(circuit) + raddrs = append(raddrs, pub) + } + } + } + + h.addrs = raddrs +} + +func shuffleRelays(pis []pstore.PeerInfo) { + for i := range pis { + j := rand.Intn(i + 1) + pis[i], pis[j] = pis[j], pis[i] + } +} + +func containsAddr(lst []ma.Multiaddr, addr ma.Multiaddr) bool { + for _, xaddr := range lst { + if xaddr.Equal(addr) { + return true + } + } + return false +} + +// notify +func (h *AutoRelayHost) Listen(inet.Network, ma.Multiaddr) {} +func (h *AutoRelayHost) ListenClose(inet.Network, ma.Multiaddr) {} +func (h *AutoRelayHost) Connected(inet.Network, inet.Conn) {} + +func (h *AutoRelayHost) Disconnected(_ inet.Network, c inet.Conn) { + p := c.RemotePeer() + h.mx.Lock() + defer h.mx.Unlock() + if _, ok := h.relays[p]; ok { + delete(h.relays, p) + select { + case h.disconnect <- struct{}{}: + default: + } + } +} + +func (h *AutoRelayHost) OpenedStream(inet.Network, inet.Stream) {} +func (h *AutoRelayHost) ClosedStream(inet.Network, inet.Stream) {} + +var _ host.Host = (*AutoRelayHost)(nil) diff --git a/p2p/host/relay/log.go b/p2p/host/relay/log.go new file mode 100644 index 000000000..9671dc760 --- /dev/null +++ b/p2p/host/relay/log.go @@ -0,0 +1,7 @@ +package relay + +import ( + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("relay") diff --git a/p2p/host/relay/relay.go b/p2p/host/relay/relay.go new file mode 100644 index 000000000..95b5079c4 --- /dev/null +++ b/p2p/host/relay/relay.go @@ -0,0 +1,23 @@ +package relay + +import ( + "context" + + discovery "github.com/libp2p/go-libp2p-discovery" + host "github.com/libp2p/go-libp2p-host" +) + +// RelayHost is a Host that provides Relay services. +type RelayHost struct { + host.Host + advertise discovery.Advertiser +} + +// New constructs a new RelayHost +func NewRelayHost(ctx context.Context, host host.Host, advertise discovery.Advertiser) *RelayHost { + h := &RelayHost{Host: host, advertise: advertise} + discovery.Advertise(ctx, advertise, "/libp2p/relay") + return h +} + +var _ host.Host = (*RelayHost)(nil)