Browse Source

autorelay hosts

pull/454/head
vyzo 6 years ago
parent
commit
a5858912d6
  1. 252
      p2p/host/relay/autorelay.go
  2. 7
      p2p/host/relay/log.go
  3. 23
      p2p/host/relay/relay.go

252
p2p/host/relay/autorelay.go

@ -0,0 +1,252 @@
package relay
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
autonat "github.com/libp2p/go-libp2p-autonat"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
var DesiredRelays = 3
// AutoRelayHost is a Host that uses relays for connectivity when a NAT is detected.
type AutoRelayHost struct {
*basic.BasicHost
discover discovery.Discoverer
autonat autonat.AutoNAT
addrsF basic.AddrsFactory
disconnect chan struct{}
mx sync.Mutex
relays map[peer.ID]pstore.PeerInfo
addrs []ma.Multiaddr
}
func NewAutoRelayHost(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer) *AutoRelayHost {
autonat := autonat.NewAutoNAT(ctx, bhost)
h := &AutoRelayHost{
BasicHost: bhost,
discover: discover,
autonat: autonat,
addrsF: bhost.AddrsFactory,
relays: make(map[peer.ID]pstore.PeerInfo),
disconnect: make(chan struct{}, 1),
}
bhost.AddrsFactory = h.hostAddrs
bhost.Network().Notify(h)
go h.background(ctx)
return h
}
func (h *AutoRelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
h.mx.Lock()
defer h.mx.Unlock()
if h.addrs != nil && h.autonat.Status() == autonat.NATStatusPrivate {
return h.addrs
} else {
return h.addrsF(addrs)
}
}
func (h *AutoRelayHost) background(ctx context.Context) {
select {
case <-time.After(autonat.AutoNATBootDelay + 30*time.Second):
case <-ctx.Done():
return
}
for {
wait := autonat.AutoNATRefreshInterval
switch h.autonat.Status() {
case autonat.NATStatusUnknown:
wait = autonat.AutoNATRetryInterval
case autonat.NATStatusPublic:
case autonat.NATStatusPrivate:
h.findRelays(ctx)
}
select {
case <-h.disconnect:
// invalidate addrs
h.mx.Lock()
h.addrs = nil
h.mx.Unlock()
case <-time.After(wait):
case <-ctx.Done():
return
}
}
}
func (h *AutoRelayHost) findRelays(ctx context.Context) {
h.mx.Lock()
if len(h.relays) >= DesiredRelays {
h.mx.Unlock()
return
}
need := DesiredRelays - len(h.relays)
h.mx.Unlock()
limit := 20
for ; need > limit; limit *= 2 {
}
dctx, cancel := context.WithTimeout(ctx, 60*time.Second)
pis, err := discovery.FindPeers(dctx, h.discover, "/libp2p/relay", limit)
cancel()
if err != nil {
log.Debugf("error discovering relays: %s", err.Error())
return
}
// TODO better relay selection strategy; this just selects random relays
// but we should probably use ping latency as the selection metric
shuffleRelays(pis)
update := 0
for _, pi := range pis {
h.mx.Lock()
if _, ok := h.relays[pi.ID]; ok {
h.mx.Unlock()
continue
}
h.mx.Unlock()
cctx, cancel := context.WithTimeout(ctx, 60*time.Second)
err = h.Connect(cctx, pi)
cancel()
if err != nil {
log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error())
continue
}
log.Debugf("connected to relay %s", pi.ID)
h.mx.Lock()
h.relays[pi.ID] = pi
h.mx.Unlock()
update++
need--
if need == 0 {
break
}
}
if update > 0 || h.addrs == nil {
h.updateAddrs()
}
}
func (h *AutoRelayHost) updateAddrs() {
h.doUpdateAddrs()
h.PushIdentify()
}
func (h *AutoRelayHost) doUpdateAddrs() {
h.mx.Lock()
defer h.mx.Unlock()
addrs := h.addrsF(h.AllAddrs())
raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(h.relays))
// remove our public addresses from the list and replace them by just the public IP
for _, addr := range addrs {
if manet.IsPublicAddr(addr) {
ip, err := addr.ValueForProtocol(ma.P_IP4)
if err == nil {
pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s", ip))
if err != nil {
panic(err)
}
if !containsAddr(raddrs, pub) {
raddrs = append(raddrs, pub)
}
continue
}
ip, err = addr.ValueForProtocol(ma.P_IP6)
if err == nil {
pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s", ip))
if err != nil {
panic(err)
}
if !containsAddr(raddrs, pub) {
raddrs = append(raddrs, pub)
}
continue
}
} else {
raddrs = append(raddrs, addr)
}
}
circuit, err := ma.NewMultiaddr("/p2p-circuit")
if err != nil {
panic(err)
}
for _, pi := range h.relays {
for _, addr := range pi.Addrs {
if !manet.IsPrivateAddr(addr) {
pub := addr.Encapsulate(circuit)
raddrs = append(raddrs, pub)
}
}
}
h.addrs = raddrs
}
func shuffleRelays(pis []pstore.PeerInfo) {
for i := range pis {
j := rand.Intn(i + 1)
pis[i], pis[j] = pis[j], pis[i]
}
}
func containsAddr(lst []ma.Multiaddr, addr ma.Multiaddr) bool {
for _, xaddr := range lst {
if xaddr.Equal(addr) {
return true
}
}
return false
}
// notify
func (h *AutoRelayHost) Listen(inet.Network, ma.Multiaddr) {}
func (h *AutoRelayHost) ListenClose(inet.Network, ma.Multiaddr) {}
func (h *AutoRelayHost) Connected(inet.Network, inet.Conn) {}
func (h *AutoRelayHost) Disconnected(_ inet.Network, c inet.Conn) {
p := c.RemotePeer()
h.mx.Lock()
defer h.mx.Unlock()
if _, ok := h.relays[p]; ok {
delete(h.relays, p)
select {
case h.disconnect <- struct{}{}:
default:
}
}
}
func (h *AutoRelayHost) OpenedStream(inet.Network, inet.Stream) {}
func (h *AutoRelayHost) ClosedStream(inet.Network, inet.Stream) {}
var _ host.Host = (*AutoRelayHost)(nil)

7
p2p/host/relay/log.go

@ -0,0 +1,7 @@
package relay
import (
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("relay")

23
p2p/host/relay/relay.go

@ -0,0 +1,23 @@
package relay
import (
"context"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
)
// RelayHost is a Host that provides Relay services.
type RelayHost struct {
host.Host
advertise discovery.Advertiser
}
// New constructs a new RelayHost
func NewRelayHost(ctx context.Context, host host.Host, advertise discovery.Advertiser) *RelayHost {
h := &RelayHost{Host: host, advertise: advertise}
discovery.Advertise(ctx, advertise, "/libp2p/relay")
return h
}
var _ host.Host = (*RelayHost)(nil)
Loading…
Cancel
Save