Browse Source

p2p-circuit v2 (#125)

* v2 client scaffolding

* gomod: go-libp2p-core and go-libp2p-transport-upgrader feature dependencies

* Conn implements network.ConnStat

* add reservation stub

* utilities

* dial scaffolding and v1 compat dialing

* stream handling scaffolding and v1 incoming connection handling

* implement hop tagging

* export timeout variables

* v2 protobuf

* v2 client protocol implementation

* implement Reserve

* go get go-libp2p-swarm@feat/transient-conns

* implement client.New

* rework pb status codes

* client responds with UNEXPECTED_MESSAGE when it's actually an unexpected message

* relay scaffolding, reservation implementation

* implement relaying

* implement missing details

* add options for resources/limit

* gc idle conn counts

* fix clown shoes in cancellation check

* end to end relay test

* untag peers with expired reservations

* add time limit test

* better debug log for accepted conns

* add data limit test

* add v2-v1 compatibility tests

* godocs

* add WithACL relay option

* only return public relay addrs in reservation record

* remove the refresh restriction madness

* set default limit Data to 128K

* fix typo in AllowReserve godoc

* fix some small issues

- remove context from constructor
- remove stream handler when closing the host
- remove the awkward cancellation check from handleStream

* fix tests

* address review comments

- Add deadline for Reserve calls
- Add deadline for dials
- Add some comments for things that confuse aarsh.

* humor aarsh and add initializers for slices

* comment nitpicks

* fix bug in slice pre-allocations

* add deadline to connectV1

* make Relay.Close thread-safe

* untag peers with reservations when closing the relay

* gomod: get go-libp2p-asn-util

* add IP/ASN reservation constraints

* gomod: update deps

* fix e2e test

* increase default limit duration to 2min

* update protocol for vouched relay addrs; provide absolute expiration time instead of TTL

* update for reservation changes

* add voucher to the reservation pb

* TODO about reservation vouchers

* deduplicate protocol ID definitions between relay and client

* add reservation vouchers

* emit and consume reservation vouchers

* improve limit data test

* deduplicate concurrent relay dials to the samke peer

* improve dialer deduplication

* add a short timeout to dialing the relay in order to aid deduplication

* gomod: fix go1.16 madness

* spec compliance: don't include p2p-circuit in reservation addrs

* spec compliance: refuse reservation and connection attempts over relayed connections

* test shim: add empty file in test directory

* spec compliance: update protobuf

* spec compliance: use libp2p envelopes for reservation vouchers

* fix staticcheck

Co-authored-by: Marten Seemann <martenseemann@gmail.com>
vyzo 3 years ago
committed by Marten Seemann
parent
commit
7872bd5a44
  1. 3
      examples/go.sum
  2. 3
      examples/ipfs-camp-2019/go.sum
  3. 2
      examples/pubsub/chat/go.sum
  4. 4
      go.mod
  5. 4
      go.sum
  6. 66
      p2p/host/circuitv2/client/client.go
  7. 145
      p2p/host/circuitv2/client/conn.go
  8. 239
      p2p/host/circuitv2/client/dial.go
  9. 172
      p2p/host/circuitv2/client/handlers.go
  10. 54
      p2p/host/circuitv2/client/listen.go
  11. 122
      p2p/host/circuitv2/client/reservation.go
  12. 80
      p2p/host/circuitv2/client/transport.go
  13. 11
      p2p/host/circuitv2/pb/Makefile
  14. 1766
      p2p/host/circuitv2/pb/circuit.pb.go
  15. 60
      p2p/host/circuitv2/pb/circuit.proto
  16. 438
      p2p/host/circuitv2/pb/voucher.pb.go
  17. 9
      p2p/host/circuitv2/pb/voucher.proto
  18. 7
      p2p/host/circuitv2/proto/protocol.go
  19. 72
      p2p/host/circuitv2/proto/voucher.go
  20. 68
      p2p/host/circuitv2/proto/voucher_test.go
  21. 17
      p2p/host/circuitv2/relay/acl.go
  22. 110
      p2p/host/circuitv2/relay/ipcs.go
  23. 27
      p2p/host/circuitv2/relay/options.go
  24. 522
      p2p/host/circuitv2/relay/relay.go
  25. 62
      p2p/host/circuitv2/relay/resources.go
  26. 177
      p2p/host/circuitv2/test/compat_test.go
  27. 361
      p2p/host/circuitv2/test/e2e_test.go
  28. 1
      p2p/host/circuitv2/test/empty.go
  29. 69
      p2p/host/circuitv2/test/ipcs_test.go
  30. 67
      p2p/host/circuitv2/util/io.go
  31. 10
      p2p/host/circuitv2/util/ma.go
  32. 97
      p2p/host/circuitv2/util/pbconv.go

3
examples/go.sum

@ -429,8 +429,9 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052 h1:BM7aaOF7RpmNn9+9g6uTjGJ0cTzWr5j9i9IKeun2M8U=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.4.2 h1:YMp7StMi2dof+baaxkbxaizXjY1RPvU71CXfxExzcUU=
github.com/libp2p/go-libp2p-autonat v0.4.2/go.mod h1:YxaJlpr81FhdOv3W3BTconZPfhaYivRdf53g+S2wobk=
github.com/libp2p/go-libp2p-blankhost v0.2.0 h1:3EsGAi0CBGcZ33GwRuXEYJLLPoVWyXJ1bcJzAJjINkk=

3
examples/ipfs-camp-2019/go.sum

@ -429,8 +429,9 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052 h1:BM7aaOF7RpmNn9+9g6uTjGJ0cTzWr5j9i9IKeun2M8U=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.4.2 h1:YMp7StMi2dof+baaxkbxaizXjY1RPvU71CXfxExzcUU=
github.com/libp2p/go-libp2p-autonat v0.4.2/go.mod h1:YxaJlpr81FhdOv3W3BTconZPfhaYivRdf53g+S2wobk=
github.com/libp2p/go-libp2p-blankhost v0.2.0 h1:3EsGAi0CBGcZ33GwRuXEYJLLPoVWyXJ1bcJzAJjINkk=

2
examples/pubsub/chat/go.sum

@ -401,6 +401,7 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU=
github.com/libp2p/go-conn-security-multistream v0.2.1 h1:ft6/POSK7F+vl/2qzegnHDaXFU0iWB4yVTYrioC6Zy0=
github.com/libp2p/go-conn-security-multistream v0.2.1/go.mod h1:cR1d8gA0Hr59Fj6NhaTpFhJZrjSYuNmhpT2r25zYR70=
@ -409,6 +410,7 @@ github.com/libp2p/go-eventbus v0.2.1/go.mod h1:jc2S4SoEVPP48H9Wpzm5aiGwUCBMfGhVh
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.4.2 h1:YMp7StMi2dof+baaxkbxaizXjY1RPvU71CXfxExzcUU=
github.com/libp2p/go-libp2p-autonat v0.4.2/go.mod h1:YxaJlpr81FhdOv3W3BTconZPfhaYivRdf53g+S2wobk=
github.com/libp2p/go-libp2p-blankhost v0.2.0 h1:3EsGAi0CBGcZ33GwRuXEYJLLPoVWyXJ1bcJzAJjINkk=

4
go.mod

@ -12,14 +12,17 @@ require (
github.com/ipfs/go-datastore v0.4.6
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-log/v2 v2.3.0
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-addr-util v0.1.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.1
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a
github.com/libp2p/go-libp2p-autonat v0.4.2
github.com/libp2p/go-libp2p-blankhost v0.2.0
github.com/libp2p/go-libp2p-circuit v0.4.0
@ -45,6 +48,7 @@ require (
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/multiformats/go-multistream v0.2.2
github.com/multiformats/go-varint v0.0.6
github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/stretchr/testify v1.7.0

4
go.sum

@ -418,6 +418,8 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc=
github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU=
github.com/libp2p/go-conn-security-multistream v0.2.1 h1:ft6/POSK7F+vl/2qzegnHDaXFU0iWB4yVTYrioC6Zy0=
@ -432,6 +434,8 @@ github.com/libp2p/go-libp2p v0.6.1/go.mod h1:CTFnWXogryAHjXAKEbOf1OWY+VeAP3lDMZk
github.com/libp2p/go-libp2p v0.7.0/go.mod h1:hZJf8txWeCduQRDC/WSqBGMxaTHCOYHt2xSU1ivxn0k=
github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw=
github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-autonat v0.2.0/go.mod h1:DX+9teU4pEEoZUqR1PiMlqliONQdNbfzE1C718tcViI=
github.com/libp2p/go-libp2p-autonat v0.2.1/go.mod h1:MWtAhV5Ko1l6QBsHQNSuM6b1sRkXrpk0/LqCr+vCVxI=

66
p2p/host/circuitv2/client/client.go

@ -0,0 +1,66 @@
package client
import (
"context"
"sync"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/proto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
logging "github.com/ipfs/go-log"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
)
var log = logging.Logger("p2p-circuit")
// Client implements the client-side of the p2p-circuit/v2 protocol:
// - it implements dialing through v2 relays
// - it listens for incoming connections through v2 relays.
//
// For backwards compatibility with v1 relays and older nodes, the client will
// also accept relay connections through v1 relays and fallback dial peers using p2p-circuit/v1.
// This allows us to use the v2 code as drop in replacement for v1 in a host without breaking
// existing code and interoperability with older nodes.
type Client struct {
ctx context.Context
host host.Host
upgrader *tptu.Upgrader
incoming chan accept
mx sync.Mutex
activeDials map[peer.ID]*completion
hopCount map[peer.ID]int
}
type accept struct {
conn *Conn
writeResponse func() error
}
type completion struct {
ch chan struct{}
relay peer.ID
err error
}
// New constructs a new p2p-circuit/v2 client, attached to the given host and using the given
// upgrader to perform connection upgrades.
func New(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) (*Client, error) {
return &Client{
ctx: ctx,
host: h,
upgrader: upgrader,
incoming: make(chan accept),
activeDials: make(map[peer.ID]*completion),
hopCount: make(map[peer.ID]int),
}, nil
}
// Start registers the circuit (client) protocol stream handlers
func (c *Client) Start() {
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1)
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2)
}

145
p2p/host/circuitv2/client/conn.go

@ -0,0 +1,145 @@
package client
import (
"fmt"
"net"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
// HopTagWeight is the connection manager weight for connections carrying relay hop streams
var HopTagWeight = 5
type statLimitDuration struct{}
type statLimitData struct{}
var (
StatLimitDuration = statLimitDuration{}
StatLimitData = statLimitData{}
)
type Conn struct {
stream network.Stream
remote peer.AddrInfo
stat network.Stat
client *Client
}
type NetAddr struct {
Relay string
Remote string
}
var _ net.Addr = (*NetAddr)(nil)
func (n *NetAddr) Network() string {
return "libp2p-circuit-relay"
}
func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}
// Conn interface
var _ manet.Conn = (*Conn)(nil)
func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}
func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}
func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}
func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}
// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
// TODO: We should be able to do this directly without converting to/from a string.
relayAddr, err := ma.NewComponent(
ma.ProtocolWithCode(ma.P_P2P).Name,
c.stream.Conn().RemotePeer().Pretty(),
)
if err != nil {
panic(err)
}
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
}
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.stream.Conn().LocalMultiaddr()
}
func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
}
return na
}
func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}
// ConnStat interface
var _ network.ConnStat = (*Conn)(nil)
func (c *Conn) Stat() network.Stat {
return c.stat
}
// tagHop tags the underlying relay connection so that it can be (somewhat) protected from the
// connection manager as it is an important connection that proxies other connections.
// This is handled here so that the user code doesnt need to bother with this and avoid
// clown shoes situations where a high value peer connection is behind a relayed connection and it is
// implicitly because the connection manager closed the underlying relay connection.
func (c *Conn) tagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()
p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]++
if c.client.hopCount[p] == 1 {
c.client.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
}
}
// untagHop removes the relay-hop-stream tag if necessary; it is invoked when a relayed connection
// is closed.
func (c *Conn) untagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()
p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]--
if c.client.hopCount[p] == 0 {
c.client.host.ConnManager().UntagPeer(p, "relay-hop-stream")
delete(c.client.hopCount, p)
}
}

239
p2p/host/circuitv2/client/dial.go

@ -0,0 +1,239 @@
package client
import (
"context"
"fmt"
"time"
pbv1 "github.com/libp2p/go-libp2p-circuit/pb"
pbv2 "github.com/libp2p/go-libp2p/p2p/host/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/proto"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/util"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"
)
const maxMessageSize = 4096
var DialTimeout = time.Minute
var DialRelayTimeout = 5 * time.Second
// relay protocol errors; used for signalling deduplication
type relayError struct {
err string
}
func (e relayError) Error() string {
return e.err
}
func newRelayError(t string, args ...interface{}) error {
return relayError{err: fmt.Sprintf(t, args...)}
}
func isRelayError(err error) bool {
_, ok := err.(relayError)
return ok
}
// dialer
func (c *Client) dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) {
// split /a/p2p-circuit/b into (/a, /p2p-circuit/b)
relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool {
return c.Protocol().Code == ma.P_CIRCUIT
})
// If the address contained no /p2p-circuit part, the second part is nil.
if destaddr == nil {
return nil, fmt.Errorf("%s is not a relay address", a)
}
if relayaddr == nil {
return nil, fmt.Errorf("can't dial a p2p-circuit without specifying a relay: %s", a)
}
dinfo := peer.AddrInfo{ID: p}
// Strip the /p2p-circuit prefix from the destaddr so that we can pass the destination address
// (if present) for active relays
_, destaddr = ma.SplitFirst(destaddr)
if destaddr != nil {
dinfo.Addrs = append(dinfo.Addrs, destaddr)
}
rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr)
if err != nil {
return nil, fmt.Errorf("error parsing relay multiaddr '%s': %w", relayaddr, err)
}
// deduplicate active relay dials to the same peer
retry:
c.mx.Lock()
dedup, active := c.activeDials[p]
if !active {
dedup = &completion{ch: make(chan struct{}), relay: rinfo.ID}
c.activeDials[p] = dedup
}
c.mx.Unlock()
if active {
select {
case <-dedup.ch:
if dedup.err != nil {
if dedup.relay != rinfo.ID {
// different relay, retry
goto retry
}
if !isRelayError(dedup.err) {
// not a relay protocol error, retry
goto retry
}
// don't try the same relay if it failed to connect with a protocol error
return nil, fmt.Errorf("concurrent active dial through the same relay failed with a protocol error")
}
return nil, fmt.Errorf("concurrent active dial succeeded")
case <-ctx.Done():
return nil, ctx.Err()
}
}
conn, err := c.dialPeer(ctx, *rinfo, dinfo)
c.mx.Lock()
dedup.err = err
close(dedup.ch)
delete(c.activeDials, p)
c.mx.Unlock()
return conn, err
}
func (c *Client) dialPeer(ctx context.Context, relay, dest peer.AddrInfo) (*Conn, error) {
log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID)
if len(relay.Addrs) > 0 {
c.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL)
}
dialCtx, cancel := context.WithTimeout(ctx, DialRelayTimeout)
defer cancel()
s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop, proto.ProtoIDv1)
if err != nil {
return nil, fmt.Errorf("error opening hop stream to relay: %w", err)
}
switch s.Protocol() {
case proto.ProtoIDv2Hop:
return c.connectV2(s, dest)
case proto.ProtoIDv1:
return c.connectV1(s, dest)
default:
s.Reset()
return nil, fmt.Errorf("unexpected stream protocol: %s", s.Protocol())
}
}
func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
var msg pbv2.HopMessage
msg.Type = pbv2.HopMessage_CONNECT.Enum()
msg.Peer = util.PeerInfoToPeerV2(dest)
s.SetDeadline(time.Now().Add(DialTimeout))
err := wr.WriteMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
msg.Reset()
err = rd.ReadMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
s.SetDeadline(time.Time{})
if msg.GetType() != pbv2.HopMessage_STATUS {
s.Reset()
return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType())
}
status := msg.GetStatus()
if status != pbv2.Status_OK {
s.Reset()
return nil, newRelayError("error opening relay circuit: %s (%d)", pbv2.Status_name[int32(status)], status)
}
// check for a limit provided by the relay; if the limit is not nil, then this is a limited
// relay connection and we mark the connection as transient.
var stat network.Stat
if limit := msg.GetLimit(); limit != nil {
stat.Transient = true
stat.Extra = make(map[interface{}]interface{})
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
stat.Extra[StatLimitData] = limit.GetData()
}
return &Conn{stream: s, remote: dest, stat: stat, client: c}, nil
}
func (c *Client) connectV1(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
var msg pbv1.CircuitRelay
msg.Type = pbv1.CircuitRelay_HOP.Enum()
msg.SrcPeer = util.PeerInfoToPeerV1(c.host.Peerstore().PeerInfo(c.host.ID()))
msg.DstPeer = util.PeerInfoToPeerV1(dest)
s.SetDeadline(time.Now().Add(DialTimeout))
err := wr.WriteMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
msg.Reset()
err = rd.ReadMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
s.SetDeadline(time.Time{})
if msg.GetType() != pbv1.CircuitRelay_STATUS {
s.Reset()
return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType())
}
status := msg.GetCode()
if status != pbv1.CircuitRelay_SUCCESS {
s.Reset()
return nil, newRelayError("error opening relay circuit: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status)
}
return &Conn{stream: s, remote: dest, client: c}, nil
}

172
p2p/host/circuitv2/client/handlers.go

@ -0,0 +1,172 @@
package client
import (
"time"
pbv1 "github.com/libp2p/go-libp2p-circuit/pb"
pbv2 "github.com/libp2p/go-libp2p/p2p/host/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/util"
"github.com/libp2p/go-libp2p-core/network"
)
var (
StreamTimeout = 1 * time.Minute
AcceptTimeout = 10 * time.Second
)
func (c *Client) handleStreamV2(s network.Stream) {
log.Debugf("new relay/v2 stream from: %s", s.Conn().RemotePeer())
s.SetReadDeadline(time.Now().Add(StreamTimeout))
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
writeResponse := func(status pbv2.Status) error {
wr := util.NewDelimitedWriter(s)
var msg pbv2.StopMessage
msg.Type = pbv2.StopMessage_STATUS.Enum()
msg.Status = status.Enum()
return wr.WriteMsg(&msg)
}
handleError := func(status pbv2.Status) {
log.Debugf("protocol error: %s (%d)", pbv2.Status_name[int32(status)], status)
err := writeResponse(status)
if err != nil {
s.Reset()
log.Debugf("error writing circuit response: %s", err.Error())
} else {
s.Close()
}
}
var msg pbv2.StopMessage
err := rd.ReadMsg(&msg)
if err != nil {
handleError(pbv2.Status_MALFORMED_MESSAGE)
return
}
// reset stream deadline as message has been read
s.SetReadDeadline(time.Time{})
if msg.GetType() != pbv2.StopMessage_CONNECT {
handleError(pbv2.Status_UNEXPECTED_MESSAGE)
return
}
src, err := util.PeerToPeerInfoV2(msg.GetPeer())
if err != nil {
handleError(pbv2.Status_MALFORMED_MESSAGE)
return
}
// check for a limit provided by the relay; if the limit is not nil, then this is a limited
// relay connection and we mark the connection as transient.
var stat network.Stat
if limit := msg.GetLimit(); limit != nil {
stat.Transient = true
stat.Extra = make(map[interface{}]interface{})
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
stat.Extra[StatLimitData] = limit.GetData()
}
log.Debugf("incoming relay connection from: %s", src.ID)
select {
case c.incoming <- accept{
conn: &Conn{stream: s, remote: src, stat: stat, client: c},
writeResponse: func() error {
return writeResponse(pbv2.Status_OK)
},
}:
case <-time.After(AcceptTimeout):
handleError(pbv2.Status_CONNECTION_FAILED)
}
}
func (c *Client) handleStreamV1(s network.Stream) {
log.Debugf("new relay/v1 stream from: %s", s.Conn().RemotePeer())
s.SetReadDeadline(time.Now().Add(StreamTimeout))
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
writeResponse := func(status pbv1.CircuitRelay_Status) error {
wr := util.NewDelimitedWriter(s)
var msg pbv1.CircuitRelay
msg.Type = pbv1.CircuitRelay_STATUS.Enum()
msg.Code = status.Enum()
return wr.WriteMsg(&msg)
}
handleError := func(status pbv1.CircuitRelay_Status) {
log.Debugf("protocol error: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status)
err := writeResponse(status)
if err != nil {
s.Reset()
log.Debugf("error writing circuit response: %s", err.Error())
} else {
s.Close()
}
}
var msg pbv1.CircuitRelay
err := rd.ReadMsg(&msg)
if err != nil {
handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE)
return
}
// reset stream deadline as message has been read
s.SetReadDeadline(time.Time{})
switch msg.GetType() {
case pbv1.CircuitRelay_STOP:
case pbv1.CircuitRelay_HOP:
handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
case pbv1.CircuitRelay_CAN_HOP:
handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
default:
log.Debugf("unexpected relay handshake: %d", msg.GetType())
handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE)
return
}
src, err := util.PeerToPeerInfoV1(msg.GetSrcPeer())
if err != nil {
handleError(pbv1.CircuitRelay_STOP_SRC_MULTIADDR_INVALID)
return
}
dst, err := util.PeerToPeerInfoV1(msg.GetDstPeer())
if err != nil || dst.ID != c.host.ID() {
handleError(pbv1.CircuitRelay_STOP_DST_MULTIADDR_INVALID)
return
}
log.Debugf("incoming relay connection from: %s", src.ID)
select {
case c.incoming <- accept{
conn: &Conn{stream: s, remote: src, client: c},
writeResponse: func() error {
return writeResponse(pbv1.CircuitRelay_SUCCESS)
},
}:
case <-time.After(AcceptTimeout):
handleError(pbv1.CircuitRelay_STOP_RELAY_REFUSED)
}
}

54
p2p/host/circuitv2/client/listen.go

@ -0,0 +1,54 @@
package client
import (
"net"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var _ manet.Listener = (*Listener)(nil)
type Listener Client
func (c *Client) Listener() *Listener {
return (*Listener)(c)
}
func (l *Listener) Accept() (manet.Conn, error) {
for {
select {
case evt := <-l.incoming:
err := evt.writeResponse()
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
evt.conn.stream.Reset()
continue
}
log.Debugf("accepted relay connection from %s through %s", evt.conn.remote.ID, evt.conn.RemoteMultiaddr())
evt.conn.tagHop()
return evt.conn, nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}
}
func (l *Listener) Addr() net.Addr {
return &NetAddr{
Relay: "any",
Remote: "any",
}
}
func (l *Listener) Multiaddr() ma.Multiaddr {
return circuitAddr
}
func (l *Listener) Close() error {
// noop for now
return nil
}

122
p2p/host/circuitv2/client/reservation.go

@ -0,0 +1,122 @@
package client
import (
"context"
"fmt"
"time"
pbv2 "github.com/libp2p/go-libp2p/p2p/host/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/proto"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/util"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/record"
ma "github.com/multiformats/go-multiaddr"
)
var ReserveTimeout = time.Minute
// Reservation is a struct carrying information about a relay/v2 slot reservation.
type Reservation struct {
// Expiration is the expiration time of the reservation
Expiration time.Time
// Addrs contains the vouched public addresses of the reserving peer, which can be
// announced to the network
Addrs []ma.Multiaddr
// LimitDuration is the time limit for which the relay will keep a relayed connection
// open. If 0, there is no limit.
LimitDuration time.Duration
// LimitData is the number of bytes that the relay will relay in each direction before
// resetting a relayed connection.
LimitData uint64
// Voucher is a signed reservation voucher provided by the relay
Voucher *proto.ReservationVoucher
}
// Reserve reserves a slot in a relay and returns the reservation information.
// Clients must reserve slots in order for the relay to relay connections to them.
func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, error) {
if len(ai.Addrs) > 0 {
h.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.TempAddrTTL)
}
s, err := h.NewStream(ctx, ai.ID, proto.ProtoIDv2Hop)
if err != nil {
return nil, err
}
defer s.Close()
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
var msg pbv2.HopMessage
msg.Type = pbv2.HopMessage_RESERVE.Enum()
s.SetDeadline(time.Now().Add(ReserveTimeout))
if err := wr.WriteMsg(&msg); err != nil {
s.Reset()
return nil, fmt.Errorf("error writing reservation message: %w", err)
}
msg.Reset()
if err := rd.ReadMsg(&msg); err != nil {
s.Reset()
return nil, fmt.Errorf("error reading reservation response message: %w", err)
}
if msg.GetType() != pbv2.HopMessage_STATUS {
return nil, fmt.Errorf("unexpected relay response: not a status message (%d)", msg.GetType())
}
if status := msg.GetStatus(); status != pbv2.Status_OK {
return nil, fmt.Errorf("reservation failed: %s (%d)", pbv2.Status_name[int32(status)], status)
}
rsvp := msg.GetReservation()
if rsvp == nil {
return nil, fmt.Errorf("missing reservation info")
}
result := &Reservation{}
result.Expiration = time.Unix(int64(rsvp.GetExpire()), 0)
for _, ab := range rsvp.GetAddrs() {
a, err := ma.NewMultiaddrBytes(ab)
if err != nil {
log.Warnf("ignoring unparsable relay address: %s", err)
continue
}
result.Addrs = append(result.Addrs, a)
}
voucherBytes := rsvp.GetVoucher()
if voucherBytes != nil {
_, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
if err != nil {
return nil, fmt.Errorf("error consuming voucher envelope: %w", err)
}
voucher, ok := rec.(*proto.ReservationVoucher)
if !ok {
return nil, fmt.Errorf("unexpected voucher record type: %+T", rec)
}
result.Voucher = voucher
}
limit := msg.GetLimit()
if limit != nil {
result.LimitDuration = time.Duration(limit.GetDuration()) * time.Second
result.LimitData = limit.GetData()
}
return result, nil
}

80
p2p/host/circuitv2/client/transport.go

@ -0,0 +1,80 @@
package client
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)
var circuitProtocol = ma.ProtocolWithCode(ma.P_CIRCUIT)
var circuitAddr = ma.Cast(circuitProtocol.VCode)
// AddTransport constructs a new p2p-circuit/v2 client and adds it as a transport to the
// host network
func AddTransport(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) error {
n, ok := h.Network().(transport.TransportNetwork)
if !ok {
return fmt.Errorf("%v is not a transport network", h.Network())
}
c, err := New(ctx, h, upgrader)
if err != nil {
return fmt.Errorf("error constructing circuit client: %w", err)
}
err = n.AddTransport(c)
if err != nil {
return fmt.Errorf("error adding circuit transport: %w", err)
}
err = n.Listen(circuitAddr)
if err != nil {
return fmt.Errorf("error listening to circuit addr: %w", err)
}
c.Start()
return nil
}
// Transport interface
var _ transport.Transport = (*Client)(nil)
func (c *Client) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
conn, err := c.dial(ctx, a, p)
if err != nil {
return nil, err
}
conn.tagHop()
return c.upgrader.UpgradeOutbound(ctx, c, conn, p)
}
func (c *Client) CanDial(addr ma.Multiaddr) bool {
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}
func (c *Client) Listen(addr ma.Multiaddr) (transport.Listener, error) {
// TODO connect to the relay and reserve slot if specified
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil {
return nil, err
}
return c.upgrader.UpgradeListener(c, c.Listener()), nil
}
func (c *Client) Protocols() []int {
return []int{ma.P_CIRCUIT}
}
func (c *Client) Proxy() bool {
return true
}

11
p2p/host/circuitv2/pb/Makefile

@ -0,0 +1,11 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogofast_out=. $<
clean:
rm -f *.pb.go
rm -f *.go

1766
p2p/host/circuitv2/pb/circuit.pb.go

File diff suppressed because it is too large

60
p2p/host/circuitv2/pb/circuit.proto

@ -0,0 +1,60 @@
syntax = "proto2";
package circuit.pb;
message HopMessage {
enum Type {
RESERVE = 0;
CONNECT = 1;
STATUS = 2;
}
required Type type = 1;
optional Peer peer = 2;
optional Reservation reservation = 3;
optional Limit limit = 4;
optional Status status = 5;
}
message StopMessage {
enum Type {
CONNECT = 0;
STATUS = 1;
}
required Type type = 1;
optional Peer peer = 2;
optional Limit limit = 3;
optional Status status = 4;
}
message Peer {
required bytes id = 1;
repeated bytes addrs = 2;
}
message Reservation {
optional uint64 expire = 1; // Unix expiration time (UTC)
repeated bytes addrs = 2; // relay addrs for reserving peer
optional bytes voucher = 3; // reservation voucher
}
message Limit {
optional uint32 duration = 1; // seconds
optional uint64 data = 2; // bytes
}
enum Status {
OK = 100;
RESERVATION_REFUSED = 200;
RESOURCE_LIMIT_EXCEEDED = 201;
PERMISSION_DENIED = 202;
CONNECTION_FAILED = 203;
NO_RESERVATION = 204;
MALFORMED_MESSAGE = 400;
UNEXPECTED_MESSAGE = 401;
}

438
p2p/host/circuitv2/pb/voucher.pb.go

@ -0,0 +1,438 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: voucher.proto
package circuit_pb
import (
fmt "fmt"
github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ReservationVoucher struct {
Relay []byte `protobuf:"bytes,1,req,name=relay" json:"relay,omitempty"`
Peer []byte `protobuf:"bytes,2,req,name=peer" json:"peer,omitempty"`
Expiration *uint64 `protobuf:"varint,3,req,name=expiration" json:"expiration,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReservationVoucher) Reset() { *m = ReservationVoucher{} }
func (m *ReservationVoucher) String() string { return proto.CompactTextString(m) }
func (*ReservationVoucher) ProtoMessage() {}
func (*ReservationVoucher) Descriptor() ([]byte, []int) {
return fileDescriptor_a22a9b0d3335ba25, []int{0}
}
func (m *ReservationVoucher) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ReservationVoucher) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ReservationVoucher.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ReservationVoucher) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReservationVoucher.Merge(m, src)
}
func (m *ReservationVoucher) XXX_Size() int {
return m.Size()
}
func (m *ReservationVoucher) XXX_DiscardUnknown() {
xxx_messageInfo_ReservationVoucher.DiscardUnknown(m)
}
var xxx_messageInfo_ReservationVoucher proto.InternalMessageInfo
func (m *ReservationVoucher) GetRelay() []byte {
if m != nil {
return m.Relay
}
return nil
}
func (m *ReservationVoucher) GetPeer() []byte {
if m != nil {
return m.Peer
}
return nil
}
func (m *ReservationVoucher) GetExpiration() uint64 {
if m != nil && m.Expiration != nil {
return *m.Expiration
}
return 0
}
func init() {
proto.RegisterType((*ReservationVoucher)(nil), "circuit.pb.ReservationVoucher")
}
func init() { proto.RegisterFile("voucher.proto", fileDescriptor_a22a9b0d3335ba25) }
var fileDescriptor_a22a9b0d3335ba25 = []byte{
// 135 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0xcb, 0x2f, 0x4d,
0xce, 0x48, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4a, 0xce, 0x2c, 0x4a, 0x2e,
0xcd, 0x2c, 0xd1, 0x2b, 0x48, 0x52, 0x8a, 0xe3, 0x12, 0x0a, 0x4a, 0x2d, 0x4e, 0x2d, 0x2a, 0x4b,
0x2c, 0xc9, 0xcc, 0xcf, 0x0b, 0x83, 0xa8, 0x13, 0x12, 0xe1, 0x62, 0x2d, 0x4a, 0xcd, 0x49, 0xac,
0x94, 0x60, 0x54, 0x60, 0xd2, 0xe0, 0x09, 0x82, 0x70, 0x84, 0x84, 0xb8, 0x58, 0x0a, 0x52, 0x53,
0x8b, 0x24, 0x98, 0xc0, 0x82, 0x60, 0xb6, 0x90, 0x1c, 0x17, 0x57, 0x6a, 0x45, 0x41, 0x66, 0x11,
0x58, 0xbb, 0x04, 0xb3, 0x02, 0x93, 0x06, 0x4b, 0x10, 0x92, 0x88, 0x13, 0xcf, 0x89, 0x47, 0x72,
0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x08, 0x08, 0x00, 0x00, 0xff, 0xff, 0xc0,
0x81, 0x3a, 0xee, 0x89, 0x00, 0x00, 0x00,
}
func (m *ReservationVoucher) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReservationVoucher) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReservationVoucher) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Expiration == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("expiration")
} else {
i = encodeVarintVoucher(dAtA, i, uint64(*m.Expiration))
i--
dAtA[i] = 0x18
}
if m.Peer == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("peer")
} else {
i -= len(m.Peer)
copy(dAtA[i:], m.Peer)
i = encodeVarintVoucher(dAtA, i, uint64(len(m.Peer)))
i--
dAtA[i] = 0x12
}
if m.Relay == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("relay")
} else {
i -= len(m.Relay)
copy(dAtA[i:], m.Relay)
i = encodeVarintVoucher(dAtA, i, uint64(len(m.Relay)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintVoucher(dAtA []byte, offset int, v uint64) int {
offset -= sovVoucher(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *ReservationVoucher) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Relay != nil {
l = len(m.Relay)
n += 1 + l + sovVoucher(uint64(l))
}
if m.Peer != nil {
l = len(m.Peer)
n += 1 + l + sovVoucher(uint64(l))
}
if m.Expiration != nil {
n += 1 + sovVoucher(uint64(*m.Expiration))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovVoucher(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozVoucher(x uint64) (n int) {
return sovVoucher(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *ReservationVoucher) Unmarshal(dAtA []byte) error {
var hasFields [1]uint64
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowVoucher
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReservationVoucher: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReservationVoucher: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Relay", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowVoucher
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthVoucher
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthVoucher
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Relay = append(m.Relay[:0], dAtA[iNdEx:postIndex]...)
if m.Relay == nil {
m.Relay = []byte{}
}
iNdEx = postIndex
hasFields[0] |= uint64(0x00000001)
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Peer", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowVoucher
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthVoucher
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthVoucher
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Peer = append(m.Peer[:0], dAtA[iNdEx:postIndex]...)
if m.Peer == nil {
m.Peer = []byte{}
}
iNdEx = postIndex
hasFields[0] |= uint64(0x00000002)
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType)
}
var v uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowVoucher
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Expiration = &v
hasFields[0] |= uint64(0x00000004)
default:
iNdEx = preIndex
skippy, err := skipVoucher(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthVoucher
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthVoucher
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if hasFields[0]&uint64(0x00000001) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("relay")
}
if hasFields[0]&uint64(0x00000002) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("peer")
}
if hasFields[0]&uint64(0x00000004) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("expiration")
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipVoucher(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowVoucher
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowVoucher
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowVoucher
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthVoucher
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupVoucher
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthVoucher
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthVoucher = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowVoucher = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupVoucher = fmt.Errorf("proto: unexpected end of group")
)

9
p2p/host/circuitv2/pb/voucher.proto

@ -0,0 +1,9 @@
syntax = "proto2";
package circuit.pb;
message ReservationVoucher {
required bytes relay = 1;
required bytes peer = 2;
required uint64 expiration = 3;
}

7
p2p/host/circuitv2/proto/protocol.go

@ -0,0 +1,7 @@
package proto
const (
ProtoIDv1 = "/libp2p/circuit/relay/0.1.0"
ProtoIDv2Hop = "/libp2p/circuit/relay/0.2.0/hop"
ProtoIDv2Stop = "/libp2p/circuit/relay/0.2.0/stop"
)

72
p2p/host/circuitv2/proto/voucher.go

@ -0,0 +1,72 @@
package proto
import (
"time"
pbv2 "github.com/libp2p/go-libp2p/p2p/host/circuitv2/pb"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/record"
)
const RecordDomain = "libp2p-relay-rsvp"
// TODO: register in multicodec table in https://github.com/multiformats/multicodec
var RecordCodec = []byte{0x03, 0x02}
func init() {
record.RegisterType(&ReservationVoucher{})
}
type ReservationVoucher struct {
// Relay is the ID of the peer providing relay service
Relay peer.ID
// Peer is the ID of the peer receiving relay service through Relay
Peer peer.ID
// Expiration is the expiration time of the reservation
Expiration time.Time
}
var _ record.Record = (*ReservationVoucher)(nil)
func (rv *ReservationVoucher) Domain() string {
return RecordDomain
}
func (rv *ReservationVoucher) Codec() []byte {
return RecordCodec
}
func (rv *ReservationVoucher) MarshalRecord() ([]byte, error) {
relay := []byte(rv.Relay)
peer := []byte(rv.Peer)
expiration := uint64(rv.Expiration.Unix())
pbrv := &pbv2.ReservationVoucher{
Relay: relay,
Peer: peer,
Expiration: &expiration,
}
return pbrv.Marshal()
}
func (rv *ReservationVoucher) UnmarshalRecord(blob []byte) error {
pbrv := pbv2.ReservationVoucher{}
err := pbrv.Unmarshal(blob)
if err != nil {
return err
}
rv.Relay, err = peer.IDFromBytes(pbrv.GetRelay())
if err != nil {
return err
}
rv.Peer, err = peer.IDFromBytes(pbrv.GetPeer())
if err != nil {
return err
}
rv.Expiration = time.Unix(int64(pbrv.GetExpiration()), 0)
return nil
}

68
p2p/host/circuitv2/proto/voucher_test.go

@ -0,0 +1,68 @@
package proto
import (
"testing"
"time"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/record"
)
func TestReservationVoucher(t *testing.T) {
relayPrivk, relayPubk, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
if err != nil {
t.Fatal(err)
}
_, peerPubk, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
if err != nil {
t.Fatal(err)
}
relayID, err := peer.IDFromPublicKey(relayPubk)
if err != nil {
t.Fatal(err)
}
peerID, err := peer.IDFromPublicKey(peerPubk)
if err != nil {
t.Fatal(err)
}
rsvp := &ReservationVoucher{
Relay: relayID,
Peer: peerID,
Expiration: time.Now().Add(time.Hour),
}
envelope, err := record.Seal(rsvp, relayPrivk)
if err != nil {
t.Fatal(err)
}
blob, err := envelope.Marshal()
if err != nil {
t.Fatal(err)
}
_, rec, err := record.ConsumeEnvelope(blob, RecordDomain)
if err != nil {
t.Fatal(err)
}
rsvp2, ok := rec.(*ReservationVoucher)
if !ok {
t.Fatalf("invalid record type %+T", rec)
}
if rsvp.Relay != rsvp2.Relay {
t.Fatal("relay IDs don't match")
}
if rsvp.Peer != rsvp2.Peer {
t.Fatal("peer IDs don't match")
}
if rsvp.Expiration.Unix() != rsvp2.Expiration.Unix() {
t.Fatal("expirations don't match")
}
}

17
p2p/host/circuitv2/relay/acl.go

@ -0,0 +1,17 @@
package relay
import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// ACLFilter is an Access Control mechanism for relayed connect.
type ACLFilter interface {
// AllowReserve returns true if a reservation from a peer with the given peer ID and multiaddr
// is allowed.
AllowReserve(p peer.ID, a ma.Multiaddr) bool
// AllowConnect returns true if a source peer, with a given multiaddr is allowed to connect
// to a destination peer.
AllowConnect(src peer.ID, srcAddr ma.Multiaddr, dest peer.ID) bool
}

110
p2p/host/circuitv2/relay/ipcs.go

@ -0,0 +1,110 @@
package relay
import (
"errors"
"net"
"github.com/libp2p/go-libp2p-core/peer"
asnutil "github.com/libp2p/go-libp2p-asn-util"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var (
ErrNoIP = errors.New("no IP address associated with peer")
ErrTooManyPeersInIP = errors.New("too many peers in IP address")
ErrTooManyPeersInASN = errors.New("too many peers in ASN")
)
// IPConstraints implements reservation constraints per IP
type IPConstraints struct {
iplimit, asnlimit int
peers map[peer.ID]net.IP
ips map[string]map[peer.ID]struct{}
asns map[string]map[peer.ID]struct{}
}
// NewIPConstraints creates a new IPConstraints object.
// The methods are *not* thread-safe; an external lock must be held if synchronization
// is required.
func NewIPConstraints(rc Resources) *IPConstraints {
return &IPConstraints{
iplimit: rc.MaxReservationsPerIP,
asnlimit: rc.MaxReservationsPerASN,
peers: make(map[peer.ID]net.IP),
ips: make(map[string]map[peer.ID]struct{}),
asns: make(map[string]map[peer.ID]struct{}),
}
}
// AddReservation adds a reservation for a given peer with a given multiaddr.
// If adding this reservation violates IP constraints, an error is returned.
func (ipcs *IPConstraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
ip, err := manet.ToIP(a)
if err != nil {
return ErrNoIP
}
ips := ip.String()
peersInIP := ipcs.ips[ips]
if len(peersInIP) >= ipcs.iplimit {
return ErrTooManyPeersInIP
}
var peersInAsn map[peer.ID]struct{}
asn, _ := asnutil.Store.AsnForIPv6(ip)
peersInAsn = ipcs.asns[asn]
if len(peersInAsn) >= ipcs.asnlimit {
return ErrTooManyPeersInASN
}
ipcs.peers[p] = ip
if peersInIP == nil {
peersInIP = make(map[peer.ID]struct{})
ipcs.ips[ips] = peersInIP
}
peersInIP[p] = struct{}{}
if asn != "" {
if peersInAsn == nil {
peersInAsn = make(map[peer.ID]struct{})
ipcs.asns[asn] = peersInAsn
}
peersInAsn[p] = struct{}{}
}
return nil
}
// RemoveReservation removes a peer from the constraints.
func (ipcs *IPConstraints) RemoveReservation(p peer.ID) {
ip, ok := ipcs.peers[p]
if !ok {
return
}
ips := ip.String()
asn, _ := asnutil.Store.AsnForIPv6(ip)
delete(ipcs.peers, p)
peersInIP, ok := ipcs.ips[ips]
if ok {
delete(peersInIP, p)
if len(peersInIP) == 0 {
delete(ipcs.ips, ips)
}
}
peersInAsn, ok := ipcs.asns[asn]
if ok {
delete(peersInAsn, p)
if len(peersInAsn) == 0 {
delete(ipcs.asns, asn)
}
}
}

27
p2p/host/circuitv2/relay/options.go

@ -0,0 +1,27 @@
package relay
type Option func(*Relay) error
// WithResources is a Relay option that sets specific relay resources for the relay.
func WithResources(rc Resources) Option {
return func(r *Relay) error {
r.rc = rc
return nil
}
}
// WithLimit is a Relay option that sets only the relayed connection limits for the relay.
func WithLimit(limit *RelayLimit) Option {
return func(r *Relay) error {
r.rc.Limit = limit
return nil
}
}
// WithACL is a Relay option that supplies an ACLFilter for access control.
func WithACL(acl ACLFilter) Option {
return func(r *Relay) error {
r.acl = acl
return nil
}
}

522
p2p/host/circuitv2/relay/relay.go

@ -0,0 +1,522 @@
package relay
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
pbv2 "github.com/libp2p/go-libp2p/p2p/host/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/proto"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/util"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/record"
logging "github.com/ipfs/go-log"
pool "github.com/libp2p/go-buffer-pool"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
const (
ReservationTagWeight = 10
StreamTimeout = time.Minute
ConnectTimeout = 30 * time.Second
HandshakeTimeout = time.Minute
maxMessageSize = 4096
)
var log = logging.Logger("relay")
// Relay is the (limited) relay service object.
type Relay struct {
closed uint32
ctx context.Context
cancel func()
host host.Host
rc Resources
acl ACLFilter
ipcs *IPConstraints
mx sync.Mutex
rsvp map[peer.ID]time.Time
conns map[peer.ID]int
selfAddr ma.Multiaddr
}
// New constructs a new limited relay that can provide relay services in the given host.
func New(h host.Host, opts ...Option) (*Relay, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &Relay{
ctx: ctx,
cancel: cancel,
host: h,
rc: DefaultResources(),
acl: nil,
rsvp: make(map[peer.ID]time.Time),
conns: make(map[peer.ID]int),
}
for _, opt := range opts {
err := opt(r)
if err != nil {
return nil, fmt.Errorf("error applying relay option: %w", err)
}
}
r.ipcs = NewIPConstraints(r.rc)
r.selfAddr = ma.StringCast(fmt.Sprintf("/p2p/%s", h.ID()))
h.SetStreamHandler(proto.ProtoIDv2Hop, r.handleStream)
h.Network().Notify(
&network.NotifyBundle{
DisconnectedF: r.disconnected,
})
go r.background()
return r, nil
}
func (r *Relay) Close() error {
if atomic.CompareAndSwapUint32(&r.closed, 0, 1) {
r.host.RemoveStreamHandler(proto.ProtoIDv2Hop)
r.cancel()
r.mx.Lock()
for p := range r.rsvp {
r.host.ConnManager().UntagPeer(p, "relay-reservation")
}
r.mx.Unlock()
}
return nil
}
func (r *Relay) handleStream(s network.Stream) {
s.SetReadDeadline(time.Now().Add(StreamTimeout))
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
var msg pbv2.HopMessage
err := rd.ReadMsg(&msg)
if err != nil {
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
return
}
// reset stream deadline as message has been read
s.SetReadDeadline(time.Time{})
switch msg.GetType() {
case pbv2.HopMessage_RESERVE:
r.handleReserve(s, &msg)
case pbv2.HopMessage_CONNECT:
r.handleConnect(s, &msg)
default:
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
}
}
func (r *Relay) handleReserve(s network.Stream, msg *pbv2.HopMessage) {
defer s.Close()
p := s.Conn().RemotePeer()
a := s.Conn().RemoteMultiaddr()
if util.IsRelayAddr(a) {
log.Debugf("refusing relay reservation for %s; reservation attempt over relay connection")
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
return
}
if r.acl != nil && !r.acl.AllowReserve(p, a) {
log.Debugf("refusing relay reservation for %s; permission denied", p)
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
return
}
r.mx.Lock()
now := time.Now()
_, exists := r.rsvp[p]
if !exists {
active := len(r.rsvp)
if active >= r.rc.MaxReservations {
r.mx.Unlock()
log.Debugf("refusing relay reservation for %s; too many reservations", p)
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
return
}
if err := r.ipcs.AddReservation(p, a); err != nil {
r.mx.Unlock()
log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err)
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
return
}
}
expire := now.Add(r.rc.ReservationTTL)
r.rsvp[p] = expire
r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight)
r.mx.Unlock()
log.Debugf("reserving relay slot for %s", p)
err := r.writeResponse(s, pbv2.Status_OK, r.makeReservationMsg(p, expire), r.makeLimitMsg(p))
if err != nil {
s.Reset()
log.Debugf("error writing reservation response; retracting reservation for %s", p)
r.mx.Lock()
delete(r.rsvp, p)
r.ipcs.RemoveReservation(p)
r.host.ConnManager().UntagPeer(p, "relay-reservation")
r.mx.Unlock()
}
}
func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
src := s.Conn().RemotePeer()
a := s.Conn().RemoteMultiaddr()
if util.IsRelayAddr(a) {
log.Debugf("refusing connection from %s; connection attempt over relay connection")
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
return
}
dest, err := util.PeerToPeerInfoV2(msg.GetPeer())
if err != nil {
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
return
}
if r.acl != nil && !r.acl.AllowConnect(src, s.Conn().RemoteMultiaddr(), dest.ID) {
log.Debugf("refusing connection from %s to %s; permission denied", src, dest.ID)
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
return
}
r.mx.Lock()
_, rsvp := r.rsvp[dest.ID]
if !rsvp {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; no reservation", src, dest.ID)
r.handleError(s, pbv2.Status_NO_RESERVATION)
return
}
srcConns := r.conns[src]
if srcConns >= r.rc.MaxCircuits {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connections from %s", src, dest.ID, src)
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
r.conns[src]++
destConns := r.conns[dest.ID]
if destConns >= r.rc.MaxCircuits {
r.conns[src]--
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src, dest.ID, dest.ID)
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
r.conns[dest.ID]++
r.mx.Unlock()
cleanup := func() {
r.mx.Lock()
r.conns[src]--
r.conns[dest.ID]--
r.mx.Unlock()
}
ctx, cancel := context.WithTimeout(r.ctx, ConnectTimeout)
defer cancel()
ctx = network.WithNoDial(ctx, "relay connect")
bs, err := r.host.NewStream(ctx, dest.ID, proto.ProtoIDv2Stop)
if err != nil {
log.Debugf("error opening relay stream to %s: %s", dest.ID, err)
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
return
}
// handshake
rd := util.NewDelimitedReader(bs, maxMessageSize)
wr := util.NewDelimitedWriter(bs)
defer rd.Close()
var stopmsg pbv2.StopMessage
stopmsg.Type = pbv2.StopMessage_CONNECT.Enum()
stopmsg.Peer = util.PeerInfoToPeerV2(peer.AddrInfo{ID: src})
stopmsg.Limit = r.makeLimitMsg(dest.ID)
bs.SetDeadline(time.Now().Add(HandshakeTimeout))
err = wr.WriteMsg(&stopmsg)
if err != nil {
log.Debugf("error writing stop handshake")
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
return
}
stopmsg.Reset()
err = rd.ReadMsg(&stopmsg)
if err != nil {
log.Debugf("error reading stop response: %s", err.Error())
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
return
}
if t := stopmsg.GetType(); t != pbv2.StopMessage_STATUS {
log.Debugf("unexpected stop response; not a status message (%d)", t)
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
return
}
if status := stopmsg.GetStatus(); status != pbv2.Status_OK {
log.Debugf("relay stop failure: %d", status)
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
return
}
var response pbv2.HopMessage
response.Type = pbv2.HopMessage_STATUS.Enum()
response.Status = pbv2.Status_OK.Enum()
response.Limit = r.makeLimitMsg(dest.ID)
wr = util.NewDelimitedWriter(s)
err = wr.WriteMsg(&response)
if err != nil {
log.Debugf("error writing relay response: %s", err)
bs.Reset()
s.Reset()
cleanup()
return
}
// reset deadline
bs.SetDeadline(time.Time{})
log.Infof("relaying connection from %s to %s", src, dest.ID)
goroutines := new(int32)
*goroutines = 2
done := func() {
if atomic.AddInt32(goroutines, -1) == 0 {
s.Close()
bs.Close()
cleanup()
}
}
if r.rc.Limit != nil {
deadline := time.Now().Add(r.rc.Limit.Duration)
s.SetDeadline(deadline)
bs.SetDeadline(deadline)
go r.relayLimited(s, bs, src, dest.ID, r.rc.Limit.Data, done)
go r.relayLimited(bs, s, dest.ID, src, r.rc.Limit.Data, done)
} else {
go r.relayUnlimited(s, bs, src, dest.ID, done)
go r.relayUnlimited(bs, s, dest.ID, src, done)
}
}
func (r *Relay) relayLimited(src, dest network.Stream, srcID, destID peer.ID, limit int64, done func()) {
defer done()
buf := pool.Get(r.rc.BufferSize)
defer pool.Put(buf)
limitedSrc := io.LimitReader(src, limit)
count, err := io.CopyBuffer(dest, limitedSrc, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
src.Reset()
dest.Reset()
} else {
// propagate the close
dest.CloseWrite()
if count == limit {
// we've reached the limit, discard further input
src.CloseRead()
}
}
log.Debugf("relayed %d bytes from %s to %s", count, srcID, destID)
}
func (r *Relay) relayUnlimited(src, dest network.Stream, srcID, destID peer.ID, done func()) {
defer done()
buf := pool.Get(r.rc.BufferSize)
defer pool.Put(buf)
count, err := io.CopyBuffer(dest, src, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
src.Reset()
dest.Reset()
} else {
// propagate the close
dest.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, srcID, destID)
}
func (r *Relay) handleError(s network.Stream, status pbv2.Status) {
log.Debugf("relay error: %s (%d)", pbv2.Status_name[int32(status)], status)
err := r.writeResponse(s, status, nil, nil)
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
func (r *Relay) writeResponse(s network.Stream, status pbv2.Status, rsvp *pbv2.Reservation, limit *pbv2.Limit) error {
wr := util.NewDelimitedWriter(s)
var msg pbv2.HopMessage
msg.Type = pbv2.HopMessage_STATUS.Enum()
msg.Status = status.Enum()
msg.Reservation = rsvp
msg.Limit = limit
return wr.WriteMsg(&msg)
}
func (r *Relay) makeReservationMsg(p peer.ID, expire time.Time) *pbv2.Reservation {
expireUnix := uint64(expire.Unix())
var addrBytes [][]byte
for _, addr := range r.host.Addrs() {
if !manet.IsPublicAddr(addr) {
continue
}
addr = addr.Encapsulate(r.selfAddr)
addrBytes = append(addrBytes, addr.Bytes())
}
rsvp := &pbv2.Reservation{
Expire: &expireUnix,
Addrs: addrBytes,
}
voucher := &proto.ReservationVoucher{
Relay: r.host.ID(),
Peer: p,
Expiration: expire,
}
envelope, err := record.Seal(voucher, r.host.Peerstore().PrivKey(r.host.ID()))
if err != nil {
log.Errorf("error sealing voucher for %s: %s", p, err)
return rsvp
}
blob, err := envelope.Marshal()
if err != nil {
log.Errorf("error marshalling voucher for %s: %s", p, err)
return rsvp
}
rsvp.Voucher = blob
return rsvp
}
func (r *Relay) makeLimitMsg(p peer.ID) *pbv2.Limit {
if r.rc.Limit == nil {
return nil
}
duration := uint32(r.rc.Limit.Duration / time.Second)
data := uint64(r.rc.Limit.Data)
return &pbv2.Limit{
Duration: &duration,
Data: &data,
}
}
func (r *Relay) background() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.gc()
case <-r.ctx.Done():
return
}
}
}
func (r *Relay) gc() {
r.mx.Lock()
defer r.mx.Unlock()
now := time.Now()
for p, expire := range r.rsvp {
if expire.Before(now) {
delete(r.rsvp, p)
r.ipcs.RemoveReservation(p)
r.host.ConnManager().UntagPeer(p, "relay-reservation")
}
}
for p, count := range r.conns {
if count == 0 {
delete(r.conns, p)
}
}
}
func (r *Relay) disconnected(n network.Network, c network.Conn) {
p := c.RemotePeer()
if n.Connectedness(p) == network.Connected {
return
}
r.mx.Lock()
defer r.mx.Unlock()
delete(r.rsvp, p)
r.ipcs.RemoveReservation(p)
}

62
p2p/host/circuitv2/relay/resources.go

@ -0,0 +1,62 @@
package relay
import (
"time"
)
// Resources are the resource limits associated with the relay service.
type Resources struct {
// Limit is the (optional) relayed connection limits.
Limit *RelayLimit
// ReservationTTL is the duration of a new (or refreshed reservation).
// Defaults to 1hr.
ReservationTTL time.Duration
// MaxReservations is the maximum number of active relay slots; defaults to 128.
MaxReservations int
// MaxCircuits is the maximum number of open relay connections for each peer; defaults to 16.
MaxCircuits int
// BufferSize is the size of the relayed connection buffers; defaults to 2048.
BufferSize int
// MaxReservationsPerIP is the maximum number of reservations originating from the same
// IP address; default is 4.
MaxReservationsPerIP int
// MaxReservationsPerASN is the maximum number of reservations origination from the same
// ASN; default is 32
MaxReservationsPerASN int
}
// RelayLimit are the per relayed connection resource limits.
type RelayLimit struct {
// Duration is the time limit before resetting a relayed connection; defaults to 2min.
Duration time.Duration
// Data is the limit of data relayed (on each direction) before resetting the connection.
// Defaults to 128KB
Data int64
}
// DefaultResources returns a Resources object with the default filled in.
func DefaultResources() Resources {
return Resources{
Limit: DefaultLimit(),
ReservationTTL: time.Hour,
MaxReservations: 128,
MaxCircuits: 16,
BufferSize: 2048,
MaxReservationsPerIP: 4,
MaxReservationsPerASN: 32,
}
}
// DefaultLimit returns a RelayLimit object with the defaults filled in.
func DefaultLimit() *RelayLimit {
return &RelayLimit{
Duration: 2 * time.Minute,
Data: 1 << 17, // 128K
}
}

177
p2p/host/circuitv2/test/compat_test.go

@ -0,0 +1,177 @@
package test
import (
"bytes"
"context"
"fmt"
"io"
"testing"
v1 "github.com/libp2p/go-libp2p-circuit"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)
func addTransportV1(t *testing.T, ctx context.Context, h host.Host, upgrader *tptu.Upgrader) {
err := v1.AddRelayTransport(ctx, h, upgrader)
if err != nil {
t.Fatal(err)
}
}
func TestRelayCompatV2DialV1(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransportV1(t, ctx, hosts[0], upgraders[0])
addTransport(t, ctx, hosts[2], upgraders[2])
rch := make(chan []byte, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
nread := 0
for nread < len(buf) {
n, err := s.Read(buf[nread:])
nread += n
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
rch <- buf[:nread]
})
_, err := v1.NewRelay(ctx, hosts[1], upgraders[1], v1.OptHop)
if err != nil {
t.Fatal(err)
}
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if conns[0].Stat().Transient {
t.Fatal("expected non transient connection")
}
s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
msg := []byte("relay works!")
nwritten, err := s.Write(msg)
if err != nil {
t.Fatal(err)
}
if nwritten != len(msg) {
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
}
s.CloseWrite()
got := <-rch
if !bytes.Equal(msg, got) {
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
}
}
func TestRelayCompatV1DialV2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransport(t, ctx, hosts[0], upgraders[0])
addTransportV1(t, ctx, hosts[2], upgraders[2])
rch := make(chan []byte, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
nread := 0
for nread < len(buf) {
n, err := s.Read(buf[nread:])
nread += n
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
rch <- buf[:nread]
})
_, err := v1.NewRelay(ctx, hosts[1], upgraders[1], v1.OptHop)
if err != nil {
t.Fatal(err)
}
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if conns[0].Stat().Transient {
t.Fatal("expected non transient connection")
}
s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
msg := []byte("relay works!")
nwritten, err := s.Write(msg)
if err != nil {
t.Fatal(err)
}
if nwritten != len(msg) {
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
}
s.CloseWrite()
got := <-rch
if !bytes.Equal(msg, got) {
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
}
}

361
p2p/host/circuitv2/test/e2e_test.go

@ -0,0 +1,361 @@
package test
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/relay"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
logging "github.com/ipfs/go-log"
bhost "github.com/libp2p/go-libp2p-blankhost"
metrics "github.com/libp2p/go-libp2p-core/metrics"
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
swarm "github.com/libp2p/go-libp2p-swarm"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
tcp "github.com/libp2p/go-tcp-transport"
ma "github.com/multiformats/go-multiaddr"
)
func init() {
// TODO temporary for debugging purposes; to be removed for merge.
logging.SetLogLevel("relay", "DEBUG")
logging.SetLogLevel("p2p-circuit", "DEBUG")
}
func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, upgraders []*tptu.Upgrader) {
for i := 0; i < n; i++ {
privk, pubk, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
if err != nil {
t.Fatal(err)
}
p, err := peer.IDFromPublicKey(pubk)
if err != nil {
t.Fatal(err)
}
ps := pstoremem.NewPeerstore()
err = ps.AddPrivKey(p, privk)
if err != nil {
t.Fatal(err)
}
bwr := metrics.NewBandwidthCounter()
netw := swarm.NewSwarm(ctx, p, ps, bwr)
upgrader := swarmt.GenUpgrader(netw)
upgraders = append(upgraders, upgrader)
err = netw.AddTransport(tcp.NewTCPTransport(upgrader))
if err != nil {
t.Fatal(err)
}
err = netw.Listen(ma.StringCast("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}
h := bhost.NewBlankHost(netw)
hosts = append(hosts, h)
}
return hosts, upgraders
}
func connect(t *testing.T, a, b host.Host) {
pi := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()}
err := b.Connect(context.Background(), pi)
if err != nil {
t.Fatal(err)
}
}
func addTransport(t *testing.T, ctx context.Context, h host.Host, upgrader *tptu.Upgrader) {
err := client.AddTransport(ctx, h, upgrader)
if err != nil {
t.Fatal(err)
}
}
func TestBasicRelay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransport(t, ctx, hosts[0], upgraders[0])
addTransport(t, ctx, hosts[2], upgraders[2])
rch := make(chan []byte, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
nread := 0
for nread < len(buf) {
n, err := s.Read(buf[nread:])
nread += n
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
rch <- buf[:nread]
})
r, err := relay.New(hosts[1])
if err != nil {
t.Fatal(err)
}
defer r.Close()
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
rsvp, err := client.Reserve(ctx, hosts[0], rinfo)
if err != nil {
t.Fatal(err)
}
if rsvp.Voucher == nil {
t.Fatal("no reservation voucher")
}
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if !conns[0].Stat().Transient {
t.Fatal("expected transient connection")
}
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
msg := []byte("relay works!")
nwritten, err := s.Write(msg)
if err != nil {
t.Fatal(err)
}
if nwritten != len(msg) {
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
}
s.CloseWrite()
got := <-rch
if !bytes.Equal(msg, got) {
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
}
}
func TestRelayLimitTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransport(t, ctx, hosts[0], upgraders[0])
addTransport(t, ctx, hosts[2], upgraders[2])
rch := make(chan error, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
_, err := s.Read(buf)
rch <- err
})
rc := relay.DefaultResources()
rc.Limit.Duration = time.Second
r, err := relay.New(hosts[1], relay.WithResources(rc))
if err != nil {
t.Fatal(err)
}
defer r.Close()
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
_, err = client.Reserve(ctx, hosts[0], rinfo)
if err != nil {
t.Fatal(err)
}
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if !conns[0].Stat().Transient {
t.Fatal("expected transient connection")
}
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
n, err := s.Write([]byte("should be closed"))
if n > 0 {
t.Fatalf("expected to write 0 bytes, wrote %d", n)
}
if err != mux.ErrReset {
t.Fatalf("expected reset, but got %s", err)
}
err = <-rch
if err != mux.ErrReset {
t.Fatalf("expected reset, but got %s", err)
}
}
func TestRelayLimitData(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransport(t, ctx, hosts[0], upgraders[0])
addTransport(t, ctx, hosts[2], upgraders[2])
rch := make(chan int, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
for i := 0; i < 3; i++ {
n, err := s.Read(buf)
if err != nil {
t.Fatal(err)
}
rch <- n
}
n, err := s.Read(buf)
if err != mux.ErrReset {
t.Fatalf("expected reset but got %s", err)
}
rch <- n
})
rc := relay.DefaultResources()
rc.Limit.Duration = time.Second
rc.Limit.Data = 4096
r, err := relay.New(hosts[1], relay.WithResources(rc))
if err != nil {
t.Fatal(err)
}
defer r.Close()
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
_, err = client.Reserve(ctx, hosts[0], rinfo)
if err != nil {
t.Fatal(err)
}
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if !conns[0].Stat().Transient {
t.Fatal("expected transient connection")
}
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 1024)
for i := 0; i < 3; i++ {
_, err = rand.Read(buf)
if err != nil {
t.Fatal(err)
}
n, err := s.Write(buf)
if err != nil {
t.Fatal(err)
}
if n != len(buf) {
t.Fatalf("expected to write %d bytes but wrote %d", len(buf), n)
}
n = <-rch
if n != len(buf) {
t.Fatalf("expected to read %d bytes but read %d", len(buf), n)
}
}
buf = make([]byte, 4096)
_, err = rand.Read(buf)
if err != nil {
t.Fatal(err)
}
s.Write(buf)
n := <-rch
if n != 0 {
t.Fatalf("expected to read 0 bytes but read %d", n)
}
}

1
p2p/host/circuitv2/test/empty.go

@ -0,0 +1 @@
package test

69
p2p/host/circuitv2/test/ipcs_test.go

@ -0,0 +1,69 @@
package test
import (
"fmt"
"net"
"testing"
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/relay"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
func TestIPConstraints(t *testing.T) {
ipcs := relay.NewIPConstraints(relay.Resources{
MaxReservationsPerIP: 1,
MaxReservationsPerASN: 2,
})
peerA := peer.ID("A")
peerB := peer.ID("B")
peerC := peer.ID("C")
peerD := peer.ID("D")
peerE := peer.ID("E")
ipA := net.ParseIP("1.2.3.4")
ipB := ipA
ipC := net.ParseIP("2001:200::1")
ipD := net.ParseIP("2001:200::2")
ipE := net.ParseIP("2001:200::3")
err := ipcs.AddReservation(peerA, ma.StringCast(fmt.Sprintf("/ip4/%s/tcp/1234", ipA)))
if err != nil {
t.Fatal(err)
}
err = ipcs.AddReservation(peerB, ma.StringCast(fmt.Sprintf("/ip4/%s/tcp/1234", ipB)))
if err != relay.ErrTooManyPeersInIP {
t.Fatalf("unexpected error: %s", err)
}
ipcs.RemoveReservation(peerA)
err = ipcs.AddReservation(peerB, ma.StringCast(fmt.Sprintf("/ip4/%s/tcp/1234", ipB)))
if err != nil {
t.Fatal(err)
}
err = ipcs.AddReservation(peerC, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipC)))
if err != nil {
t.Fatal(err)
}
err = ipcs.AddReservation(peerD, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipD)))
if err != nil {
t.Fatal(err)
}
err = ipcs.AddReservation(peerE, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipE)))
if err != relay.ErrTooManyPeersInASN {
t.Fatalf("unexpected error: %s", err)
}
ipcs.RemoveReservation(peerD)
err = ipcs.AddReservation(peerE, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipE)))
if err != nil {
t.Fatal(err)
}
}

67
p2p/host/circuitv2/util/io.go

@ -0,0 +1,67 @@
package util
import (
"errors"
"io"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-msgio/protoio"
"github.com/gogo/protobuf/proto"
"github.com/multiformats/go-varint"
)
type DelimitedReader struct {
r io.Reader
buf []byte
}
// The gogo protobuf NewDelimitedReader is buffered, which may eat up stream data.
// So we need to implement a compatible delimited reader that reads unbuffered.
// There is a slowdown from unbuffered reading: when reading the message
// it can take multiple single byte Reads to read the length and another Read
// to read the message payload.
// However, this is not critical performance degradation as
// - the reader is utilized to read one (dialer, stop) or two messages (hop) during
// the handshake, so it's a drop in the water for the connection lifetime.
// - messages are small (max 4k) and the length fits in a couple of bytes,
// so overall we have at most three reads per message.
func NewDelimitedReader(r io.Reader, maxSize int) *DelimitedReader {
return &DelimitedReader{r: r, buf: pool.Get(maxSize)}
}
func (d *DelimitedReader) Close() {
if d.buf != nil {
pool.Put(d.buf)
d.buf = nil
}
}
func (d *DelimitedReader) ReadByte() (byte, error) {
buf := d.buf[:1]
_, err := d.r.Read(buf)
return buf[0], err
}
func (d *DelimitedReader) ReadMsg(msg proto.Message) error {
mlen, err := varint.ReadUvarint(d)
if err != nil {
return err
}
if uint64(len(d.buf)) < mlen {
return errors.New("message too large")
}
buf := d.buf[:mlen]
_, err = io.ReadFull(d.r, buf)
if err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func NewDelimitedWriter(w io.Writer) protoio.WriteCloser {
return protoio.NewDelimitedWriter(w)
}

10
p2p/host/circuitv2/util/ma.go

@ -0,0 +1,10 @@
package util
import (
ma "github.com/multiformats/go-multiaddr"
)
func IsRelayAddr(a ma.Multiaddr) bool {
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}

97
p2p/host/circuitv2/util/pbconv.go

@ -0,0 +1,97 @@
package util
import (
"errors"
pbv1 "github.com/libp2p/go-libp2p-circuit/pb"
pbv2 "github.com/libp2p/go-libp2p/p2p/host/circuitv2/pb"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
func PeerToPeerInfoV1(p *pbv1.CircuitRelay_Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("nil peer")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
var addrs []ma.Multiaddr
if len(p.Addrs) > 0 {
addrs = make([]ma.Multiaddr, 0, len(p.Addrs))
}
for _, addrBytes := range p.Addrs {
a, err := ma.NewMultiaddrBytes(addrBytes)
if err == nil {
addrs = append(addrs, a)
}
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func PeerInfoToPeerV1(pi peer.AddrInfo) *pbv1.CircuitRelay_Peer {
var addrs [][]byte
if len(pi.Addrs) > 0 {
addrs = make([][]byte, 0, len(pi.Addrs))
}
for _, addr := range pi.Addrs {
addrs = append(addrs, addr.Bytes())
}
p := new(pbv1.CircuitRelay_Peer)
p.Id = []byte(pi.ID)
p.Addrs = addrs
return p
}
func PeerToPeerInfoV2(p *pbv2.Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("nil peer")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
var addrs []ma.Multiaddr
if len(p.Addrs) > 0 {
addrs = make([]ma.Multiaddr, 0, len(p.Addrs))
}
for _, addrBytes := range p.Addrs {
a, err := ma.NewMultiaddrBytes(addrBytes)
if err == nil {
addrs = append(addrs, a)
}
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func PeerInfoToPeerV2(pi peer.AddrInfo) *pbv2.Peer {
var addrs [][]byte
if len(pi.Addrs) > 0 {
addrs = make([][]byte, 0, len(pi.Addrs))
}
for _, addr := range pi.Addrs {
addrs = append(addrs, addr.Bytes())
}
p := new(pbv2.Peer)
p.Id = []byte(pi.ID)
p.Addrs = addrs
return p
}
Loading…
Cancel
Save