Browse Source

autorelay support for circuitv2 relays (#1198)

* move host/relay to host/autorelay

* move autorelay wrapper host to package

* support v2 relays in autorelay

* test autorelay with both v1 and v2 relays

* fix test race

* go mod tidy examples/pubsub/chat

static checker complains; sigh.

* refactor reservation refresh loop

* merge background and refresh goroutines

* handle pushes synchronously from reservation refresh failures

* make connmanager tag a package level constant

* dont sleep to wait for identify, use IdentifyWait

* make relay protocol ids package-level constants

* add comment about v1 relays not having reservations

* use errgrp instead of WaitGroup with atomic int

* fix variable capture bug

* go get x/sync
pull/1201/head
vyzo 3 years ago
committed by GitHub
parent
commit
22b62cfd65
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      config/config.go
  2. 2
      examples/pubsub/chat/go.sum
  3. 1
      go.mod
  4. 2
      options.go
  5. 2
      p2p/host/autorelay/addrsplosion.go
  6. 2
      p2p/host/autorelay/addrsplosion_test.go
  7. 160
      p2p/host/autorelay/autorelay.go
  8. 54
      p2p/host/autorelay/autorelay_test.go
  9. 2
      p2p/host/autorelay/doc.go
  10. 19
      p2p/host/autorelay/host.go
  11. 2
      p2p/host/autorelay/log.go
  12. 2
      p2p/host/autorelay/relay.go

24
config/config.go

@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/relay"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
@ -209,7 +209,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
// TODO: We shouldn't be doing this here.
oldFactory := h.AddrsFactory
h.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
return oldFactory(relay.Filter(addrs))
return oldFactory(autorelay.Filter(addrs))
}
}
@ -237,7 +237,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
// Note: h.AddrsFactory may be changed by AutoRelay, but non-relay version is
// used by AutoNAT below.
var autorelay *relay.AutoRelay
var ar *autorelay.AutoRelay
addrF := h.AddrsFactory
if cfg.EnableAutoRelay {
if !cfg.Relay {
@ -246,7 +246,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}
if len(cfg.StaticRelays) > 0 {
autorelay = relay.NewAutoRelay(h, nil, router, cfg.StaticRelays)
ar = autorelay.NewAutoRelay(h, nil, router, cfg.StaticRelays)
} else {
if router == nil {
h.Close()
@ -259,7 +259,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}
discovery := discovery.NewRoutingDiscovery(crouter)
autorelay = relay.NewAutoRelay(h, discovery, router, cfg.StaticRelays)
ar = autorelay.NewAutoRelay(h, discovery, router, cfg.StaticRelays)
}
}
@ -330,22 +330,12 @@ func (cfg *Config) NewNode() (host.Host, error) {
if router != nil {
ho = routed.Wrap(h, router)
}
if autorelay != nil {
return &autoRelayHost{Host: ho, autoRelay: autorelay}, nil
if ar != nil {
return autorelay.NewAutoRelayHost(ho, ar), nil
}
return ho, nil
}
type autoRelayHost struct {
host.Host
autoRelay *relay.AutoRelay
}
func (h *autoRelayHost) Close() error {
_ = h.autoRelay.Close()
return h.Host.Close()
}
// Option is a libp2p config option that can be given to the libp2p constructor
// (`libp2p.New`).
type Option func(cfg *Config) error

2
examples/pubsub/chat/go.sum

@ -404,6 +404,7 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU=
github.com/libp2p/go-conn-security-multistream v0.2.1/go.mod h1:cR1d8gA0Hr59Fj6NhaTpFhJZrjSYuNmhpT2r25zYR70=
@ -414,6 +415,7 @@ github.com/libp2p/go-eventbus v0.2.1/go.mod h1:jc2S4SoEVPP48H9Wpzm5aiGwUCBMfGhVh
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.5.0 h1:/+3+4NcQV47DQ/duvRyFDP8oxv6CQTvSKYD5iWoPcYs=
github.com/libp2p/go-libp2p-autonat v0.5.0/go.mod h1:085tmmuXn0nXgFwuF7a2tt4UxgTjuapbuml27v4htKY=

1
go.mod

@ -57,6 +57,7 @@ require (
go.uber.org/zap v1.19.0 // indirect
golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/grpc v1.40.0 // indirect

2
options.go

@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p/config"
autorelay "github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
autorelay "github.com/libp2p/go-libp2p/p2p/host/relay"
holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
ma "github.com/multiformats/go-multiaddr"

2
p2p/host/relay/addrsplosion.go → p2p/host/autorelay/addrsplosion.go

@ -1,4 +1,4 @@
package relay
package autorelay
import (
"encoding/binary"

2
p2p/host/relay/addrsplosion_test.go → p2p/host/autorelay/addrsplosion_test.go

@ -1,4 +1,4 @@
package relay
package autorelay
import (
"testing"

160
p2p/host/relay/autorelay.go → p2p/host/autorelay/autorelay.go

@ -1,4 +1,4 @@
package relay
package autorelay
import (
"context"
@ -7,6 +7,8 @@ import (
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
@ -15,6 +17,8 @@ import (
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
@ -22,6 +26,14 @@ import (
const (
RelayRendezvous = "/libp2p/relay"
rsvpRefreshInterval = time.Minute
rsvpExpirationSlack = 2 * time.Minute
autorelayTag = "autorelay"
protoIDv1 = string(relayv1.ProtoID)
protoIDv2 = string(circuitv2_proto.ProtoIDv2Hop)
)
var (
@ -30,7 +42,7 @@ var (
BootDelay = 20 * time.Second
)
// These are the known PL-operated relays
// These are the known PL-operated v1 relays; will be decommissioned in 2022.
var DefaultRelays = []string{
"/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
"/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
@ -55,7 +67,7 @@ type AutoRelay struct {
disconnect chan struct{}
mx sync.Mutex
relays map[peer.ID]struct{}
relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay
status network.Reachability
cachedAddrs []ma.Multiaddr
@ -71,7 +83,7 @@ func NewAutoRelay(bhost *basic.BasicHost, discover discovery.Discoverer, router
router: router,
addrsF: bhost.AddrsFactory,
static: static,
relays: make(map[peer.ID]struct{}),
relays: make(map[peer.ID]*circuitv2.Reservation),
disconnect: make(chan struct{}, 1),
status: network.ReachabilityUnknown,
}
@ -92,6 +104,9 @@ func (ar *AutoRelay) background(ctx context.Context) {
subReachability, _ := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
defer subReachability.Close()
ticker := time.NewTicker(rsvpRefreshInterval)
defer ticker.Stop()
// when true, we need to identify push
push := false
@ -119,8 +134,13 @@ func (ar *AutoRelay) background(ctx context.Context) {
}
ar.status = evt.Reachability
ar.mx.Unlock()
case <-ar.disconnect:
push = true
case now := <-ticker.C:
push = ar.refreshReservations(ctx, now)
case <-ctx.Done():
return
}
@ -135,6 +155,67 @@ func (ar *AutoRelay) background(ctx context.Context) {
}
}
func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) bool {
ar.mx.Lock()
if ar.status == network.ReachabilityPublic {
// we are public, forget about the relays, unprotect peers
for p := range ar.relays {
ar.host.ConnManager().Unprotect(p, autorelayTag)
delete(ar.relays, p)
}
ar.mx.Unlock()
return true
}
if len(ar.relays) == 0 {
ar.mx.Unlock()
return false
}
// find reservations about to expire and refresh them in parallel
g := new(errgroup.Group)
for p, rsvp := range ar.relays {
if rsvp == nil {
// this is a circuitv1 relay, there is no reservation
continue
}
if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) {
continue
}
p := p
g.Go(func() error {
return ar.refreshRelayReservation(ctx, p)
})
}
ar.mx.Unlock()
err := g.Wait()
return err != nil
}
func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID) error {
rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p})
ar.mx.Lock()
defer ar.mx.Unlock()
if err != nil {
log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err)
delete(ar.relays, p)
// unprotect the connection
ar.host.ConnManager().Unprotect(p, autorelayTag)
} else {
log.Debugf("refreshed relay slot reservation with %s", p)
ar.relays[p] = rsvp
}
return err
}
func (ar *AutoRelay) findRelays(ctx context.Context) bool {
if ar.numRelays() >= DesiredRelays {
return false
@ -204,14 +285,46 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
return false
}
ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2)
if err != nil {
log.Debugf("error querying relay: %s", err.Error())
log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err)
return false
}
if !ok {
// not a hop relay
var supportsv1, supportsv2 bool
for _, proto := range protos {
switch proto {
case protoIDv1:
supportsv1 = true
case protoIDv2:
supportsv2 = true
}
}
var rsvp *circuitv2.Reservation
switch {
case supportsv2:
rsvp, err = circuitv2.Reserve(ctx, ar.host, pi)
if err != nil {
log.Debugf("error reserving slot with %s: %s", pi.ID, err)
return false
}
case supportsv1:
ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
if err != nil {
log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err)
return false
}
if !ok {
// not a hop relay
return false
}
default:
// supports neither, unusable relay.
return false
}
@ -222,7 +335,11 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
if ar.host.Network().Connectedness(pi.ID) != network.Connected {
return false
}
ar.relays[pi.ID] = struct{}{}
ar.relays[pi.ID] = rsvp
// protect the connection
ar.host.ConnManager().Protect(pi.ID, autorelayTag)
return true
}
@ -246,8 +363,29 @@ func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool {
return false
}
// tag the connection as very important
ar.host.ConnManager().TagPeer(pi.ID, "relay", 42)
// wait for identify to complete in at least one conn so that we can check the supported protocols
conns := ar.host.Network().ConnsToPeer(pi.ID)
if len(conns) == 0 {
return false
}
ready := make(chan struct{}, len(conns))
for _, conn := range conns {
go func(conn network.Conn) {
select {
case <-ar.host.IDService().IdentifyWait(conn):
ready <- struct{}{}
case <-ctx.Done():
}
}(conn)
}
select {
case <-ready:
case <-ctx.Done():
return false
}
return true
}

54
p2p/host/relay/autorelay_test.go → p2p/host/autorelay/autorelay_test.go

@ -1,4 +1,4 @@
package relay_test
package autorelay_test
import (
"context"
@ -9,8 +9,9 @@ import (
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/host/relay"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
@ -27,8 +28,8 @@ import (
// test specific parameters
func init() {
relay.BootDelay = 1 * time.Second
relay.AdvertiseBootDelay = 100 * time.Millisecond
autorelay.BootDelay = 1 * time.Second
autorelay.AdvertiseBootDelay = 100 * time.Millisecond
}
// mock routing
@ -118,6 +119,13 @@ func connect(t *testing.T, a, b host.Host) {
func TestAutoRelay(t *testing.T) {
manet.Private4 = []*net.IPNet{}
t.Log("testing autorelay with circuitv1 relay")
testAutoRelay(t, false)
t.Log("testing autorelay with circuitv2 relay")
testAutoRelay(t, true)
}
func testAutoRelay(t *testing.T, useRelayv2 bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -133,26 +141,36 @@ func TestAutoRelay(t *testing.T) {
// this is the relay host
// announce dns addrs because filter out private addresses from relays,
// and we consider dns addresses "public".
relayHost, err := libp2p.New(libp2p.DisableRelay(), libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
relayHost, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
}
}
}
return addrs
}))
return addrs
}))
if err != nil {
t.Fatal(err)
}
// instantiate the relay
r, err := relayv1.NewRelay(relayHost)
if err != nil {
t.Fatal(err)
if useRelayv2 {
r, err := relayv2.New(relayHost)
if err != nil {
t.Fatal(err)
}
defer r.Close()
} else {
r, err := relayv1.NewRelay(relayHost)
if err != nil {
t.Fatal(err)
}
defer r.Close()
}
defer r.Close()
// advertise the relay
relayRouting, err := makeRouting(relayHost)
@ -160,7 +178,7 @@ func TestAutoRelay(t *testing.T) {
t.Fatal(err)
}
relayDiscovery := discovery.NewRoutingDiscovery(relayRouting)
relay.Advertise(ctx, relayDiscovery)
autorelay.Advertise(ctx, relayDiscovery)
// the client hosts
h1, err := libp2p.New(libp2p.EnableRelay())

2
p2p/host/relay/doc.go → p2p/host/autorelay/doc.go

@ -25,4 +25,4 @@ How it works:
advertising relay addresses. The new set of addresses is propagated to
connected peers through the `identify/push` protocol.
*/
package relay
package autorelay

19
p2p/host/autorelay/host.go

@ -0,0 +1,19 @@
package autorelay
import (
"github.com/libp2p/go-libp2p-core/host"
)
type AutoRelayHost struct {
host.Host
ar *AutoRelay
}
func (h *AutoRelayHost) Close() error {
_ = h.ar.Close()
return h.Host.Close()
}
func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost {
return &AutoRelayHost{Host: h, ar: ar}
}

2
p2p/host/relay/log.go → p2p/host/autorelay/log.go

@ -1,4 +1,4 @@
package relay
package autorelay
import (
logging "github.com/ipfs/go-log/v2"

2
p2p/host/relay/relay.go → p2p/host/autorelay/relay.go

@ -1,4 +1,4 @@
package relay
package autorelay
import (
"context"
Loading…
Cancel
Save