Browse Source

remove all circuit v1 related code (#2107)

* autorelay: remove support for circuit v1 nodes

* circuitv2: remove v1 backwards compatibility

* remove circuitv1 implementation only used for testing

* remove circuitv1 protocol implementation
pull/2114/head
Marten Seemann 2 years ago
committed by GitHub
parent
commit
cec8c6584b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      limits.go
  2. 90
      p2p/host/autorelay/autorelay_test.go
  3. 9
      p2p/host/autorelay/options.go
  4. 51
      p2p/host/autorelay/relay_finder.go
  5. 448
      p2p/protocol/circuitv1/pb/circuitv1.pb.go
  6. 44
      p2p/protocol/circuitv1/pb/circuitv1.proto
  7. 3
      p2p/protocol/circuitv1/proto.go
  8. 46
      p2p/protocol/circuitv1/relay/options.go
  9. 452
      p2p/protocol/circuitv1/relay/relay.go
  10. 2
      p2p/protocol/circuitv2/client/client.go
  11. 67
      p2p/protocol/circuitv2/client/dial.go
  12. 83
      p2p/protocol/circuitv2/client/handlers.go
  13. 1
      p2p/protocol/circuitv2/proto/protocol.go
  14. 179
      p2p/protocol/circuitv2/relay/compat_test.go
  15. 50
      p2p/protocol/circuitv2/util/pbconv.go
  16. 21
      p2p/protocol/holepunch/holepunch_test.go
  17. 124
      p2p/protocol/internal/circuitv1-deprecated/conn.go
  18. 57
      p2p/protocol/internal/circuitv1-deprecated/dial.go
  19. 61
      p2p/protocol/internal/circuitv1-deprecated/listen.go
  20. 447
      p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go
  21. 44
      p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto
  22. 507
      p2p/protocol/internal/circuitv1-deprecated/relay.go
  23. 467
      p2p/protocol/internal/circuitv1-deprecated/relay_test.go
  24. 74
      p2p/protocol/internal/circuitv1-deprecated/transport.go
  25. 156
      p2p/protocol/internal/circuitv1-deprecated/transport_test.go
  26. 119
      p2p/protocol/internal/circuitv1-deprecated/util.go

15
limits.go

@ -4,7 +4,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/autonat"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuit "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
@ -76,18 +75,6 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
rcmgr.BaseLimitIncrease{},
)
// relay/v1
config.AddServiceLimit(
relayv1.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20},
)
config.AddServicePeerLimit(
relayv1.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)
// relay/v2
config.AddServiceLimit(
relayv2.ServiceName,
@ -101,7 +88,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
)
// circuit protocols, both client and service
for _, proto := range [...]protocol.ID{circuit.ProtoIDv1, circuit.ProtoIDv2Hop, circuit.ProtoIDv2Stop} {
for _, proto := range [...]protocol.ID{circuit.ProtoIDv2Hop, circuit.ProtoIDv2Stop} {
config.AddProtocolLimit(
proto,
rcmgr.BaseLimit{StreamsInbound: 640, StreamsOutbound: 640, Streams: 640, Memory: 16 << 20},

90
p2p/host/autorelay/autorelay_test.go

@ -14,7 +14,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
"github.com/benbjohnson/clock"
@ -102,29 +101,6 @@ func newRelay(t *testing.T) host.Host {
return h
}
func newRelayV1(t *testing.T) host.Host {
t.Helper()
h, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.ForceReachabilityPublic(),
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
}
}
return addrs
}),
)
require.NoError(t, err)
r, err := relayv1.NewRelay(h)
require.NoError(t, err)
t.Cleanup(func() { r.Close() })
return h
}
func TestSingleCandidate(t *testing.T) {
var counter int
h := newPrivateNode(t,
@ -180,32 +156,6 @@ func TestSingleRelay(t *testing.T) {
// test that we don't add any more relays
require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond)
}
func TestPreferRelayV2(t *testing.T) {
r := newRelay(t)
defer r.Close()
// The relay supports both v1 and v2. The v1 stream handler should never be called,
// if we prefer v2 relays.
r.SetStreamHandler(relayv1.ProtoID, func(str network.Stream) {
str.Reset()
t.Fatal("used relay v1")
})
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
return peerChan
},
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(99999),
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
}
func TestWaitForCandidates(t *testing.T) {
peerChan := make(chan peer.AddrInfo)
@ -305,46 +255,6 @@ func TestStaticRelays(t *testing.T) {
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 2*time.Second, 50*time.Millisecond)
}
func TestRelayV1(t *testing.T) {
t.Run("relay v1 support disabled", func(t *testing.T) {
peerChan := make(chan peer.AddrInfo, 1)
r := newRelayV1(t)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
close(peerChan)
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo { return peerChan },
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
require.Never(t, func() bool { return numRelays(h) > 0 }, 250*time.Millisecond, 100*time.Millisecond)
})
t.Run("relay v1 support enabled", func(t *testing.T) {
peerChan := make(chan peer.AddrInfo, 1)
r := newRelayV1(t)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
close(peerChan)
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo { return peerChan },
autorelay.WithBootDelay(0),
autorelay.WithCircuitV1Support(),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()
addrUpdated, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
require.NoError(t, err)
expectDeltaInAddrUpdated(t, addrUpdated, 1)
})
}
func TestConnectOnDisconnect(t *testing.T) {
const num = 3
peerChan := make(chan peer.AddrInfo, num)

9
p2p/host/autorelay/options.go

@ -42,7 +42,6 @@ type config struct {
// see WithMaxCandidateAge
maxCandidateAge time.Duration
setMinCandidates bool
enableCircuitV1 bool
}
var defaultConfig = config{
@ -151,14 +150,6 @@ func WithBackoff(d time.Duration) Option {
}
}
// WithCircuitV1Support enables support for circuit v1 relays.
func WithCircuitV1Support() Option {
return func(c *config) error {
c.enableCircuitV1 = true
return nil
}
}
// WithMaxCandidateAge sets the maximum age of a candidate.
// When we are connected to the desired number of relays, we don't ask the peer source for new candidates.
// This can lead to AutoRelay's candidate list becoming outdated, and means we won't be able

51
p2p/host/autorelay/relay_finder.go

@ -15,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
@ -23,13 +22,10 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)
const (
protoIDv1 = relayv1.ProtoID
protoIDv2 = circuitv2_proto.ProtoIDv2Hop
)
const protoIDv2 = circuitv2_proto.ProtoIDv2Hop
// Terminology:
// Candidate: Once we connect to a node and it supports (v1 / v2) relay protocol,
// Candidate: Once we connect to a node and it supports relay protocol,
// we call it a candidate, and consider using it as a relay.
// Relay: Out of the list of candidates, we select a relay to connect to.
// Currently, we just randomly select a candidate, but we can employ more sophisticated
@ -77,7 +73,7 @@ type relayFinder struct {
relayUpdated chan struct{}
relayMx sync.Mutex
relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay
relays map[peer.ID]*circuitv2.Reservation
cachedAddrs []ma.Multiaddr
cachedAddrsExpiry time.Time
@ -288,7 +284,7 @@ func (rf *relayFinder) notifyNewCandidate() {
}
}
// handleNewNode tests if a peer supports circuit v1 or v2.
// handleNewNode tests if a peer supports circuit v2.
// This method is only run on private nodes.
// If a peer does, it is added to the candidates map.
// Note that just supporting the protocol doesn't guarantee that we can also obtain a reservation.
@ -322,7 +318,7 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) (add
return true
}
// tryNode checks if a peer actually supports either circuit v1 or circuit v2.
// tryNode checks if a peer actually supports either circuit v2.
// It does not modify any internal state.
func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV2 bool, err error) {
if err := rf.host.Connect(ctx, pi); err != nil {
@ -357,42 +353,14 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR
return false, ctx.Err()
}
protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2)
protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv2)
if err != nil {
return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err)
}
// If the node speaks both, prefer circuit v2
var maybeSupportsV1, supportsV2 bool
for _, proto := range protos {
switch proto {
case protoIDv1:
maybeSupportsV1 = true
case protoIDv2:
supportsV2 = true
}
}
if supportsV2 {
return true, nil
}
if !rf.conf.enableCircuitV1 && !supportsV2 {
if len(protos) == 0 {
return false, errors.New("doesn't speak circuit v2")
}
if !maybeSupportsV1 && !supportsV2 {
return false, errors.New("doesn't speak circuit v1 or v2")
}
// The node *may* support circuit v1.
supportsV1, err := relayv1.CanHop(ctx, rf.host, pi.ID)
if err != nil {
return false, fmt.Errorf("CanHop failed: %w", err)
}
if !supportsV1 {
return false, errors.New("doesn't speak circuit v1 or v2")
}
return false, nil
return true, nil
}
// When a new node that could be a relay is found, we receive a notification on the maybeConnectToRelayTrigger chan.
@ -520,9 +488,6 @@ func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) b
// find reservations about to expire and refresh them in parallel
g := new(errgroup.Group)
for p, rsvp := range rf.relays {
if rsvp == nil { // this is a circuit v1 relay, there is no reservation
continue
}
if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) {
continue
}

448
p2p/protocol/circuitv1/pb/circuitv1.pb.go

@ -1,448 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: pb/circuitv1.proto
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type CircuitRelay_Status int32
const (
CircuitRelay_SUCCESS CircuitRelay_Status = 100
CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220
CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221
CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250
CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251
CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260
CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261
CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262
CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270
CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280
CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320
CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321
CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350
CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351
CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390
CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400
)
// Enum value maps for CircuitRelay_Status.
var (
CircuitRelay_Status_name = map[int32]string{
100: "SUCCESS",
220: "HOP_SRC_ADDR_TOO_LONG",
221: "HOP_DST_ADDR_TOO_LONG",
250: "HOP_SRC_MULTIADDR_INVALID",
251: "HOP_DST_MULTIADDR_INVALID",
260: "HOP_NO_CONN_TO_DST",
261: "HOP_CANT_DIAL_DST",
262: "HOP_CANT_OPEN_DST_STREAM",
270: "HOP_CANT_SPEAK_RELAY",
280: "HOP_CANT_RELAY_TO_SELF",
320: "STOP_SRC_ADDR_TOO_LONG",
321: "STOP_DST_ADDR_TOO_LONG",
350: "STOP_SRC_MULTIADDR_INVALID",
351: "STOP_DST_MULTIADDR_INVALID",
390: "STOP_RELAY_REFUSED",
400: "MALFORMED_MESSAGE",
}
CircuitRelay_Status_value = map[string]int32{
"SUCCESS": 100,
"HOP_SRC_ADDR_TOO_LONG": 220,
"HOP_DST_ADDR_TOO_LONG": 221,
"HOP_SRC_MULTIADDR_INVALID": 250,
"HOP_DST_MULTIADDR_INVALID": 251,
"HOP_NO_CONN_TO_DST": 260,
"HOP_CANT_DIAL_DST": 261,
"HOP_CANT_OPEN_DST_STREAM": 262,
"HOP_CANT_SPEAK_RELAY": 270,
"HOP_CANT_RELAY_TO_SELF": 280,
"STOP_SRC_ADDR_TOO_LONG": 320,
"STOP_DST_ADDR_TOO_LONG": 321,
"STOP_SRC_MULTIADDR_INVALID": 350,
"STOP_DST_MULTIADDR_INVALID": 351,
"STOP_RELAY_REFUSED": 390,
"MALFORMED_MESSAGE": 400,
}
)
func (x CircuitRelay_Status) Enum() *CircuitRelay_Status {
p := new(CircuitRelay_Status)
*p = x
return p
}
func (x CircuitRelay_Status) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (CircuitRelay_Status) Descriptor() protoreflect.EnumDescriptor {
return file_pb_circuitv1_proto_enumTypes[0].Descriptor()
}
func (CircuitRelay_Status) Type() protoreflect.EnumType {
return &file_pb_circuitv1_proto_enumTypes[0]
}
func (x CircuitRelay_Status) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Do not use.
func (x *CircuitRelay_Status) UnmarshalJSON(b []byte) error {
num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b)
if err != nil {
return err
}
*x = CircuitRelay_Status(num)
return nil
}
// Deprecated: Use CircuitRelay_Status.Descriptor instead.
func (CircuitRelay_Status) EnumDescriptor() ([]byte, []int) {
return file_pb_circuitv1_proto_rawDescGZIP(), []int{0, 0}
}
type CircuitRelay_Type int32
const (
CircuitRelay_HOP CircuitRelay_Type = 1
CircuitRelay_STOP CircuitRelay_Type = 2
CircuitRelay_STATUS CircuitRelay_Type = 3
CircuitRelay_CAN_HOP CircuitRelay_Type = 4
)
// Enum value maps for CircuitRelay_Type.
var (
CircuitRelay_Type_name = map[int32]string{
1: "HOP",
2: "STOP",
3: "STATUS",
4: "CAN_HOP",
}
CircuitRelay_Type_value = map[string]int32{
"HOP": 1,
"STOP": 2,
"STATUS": 3,
"CAN_HOP": 4,
}
)
func (x CircuitRelay_Type) Enum() *CircuitRelay_Type {
p := new(CircuitRelay_Type)
*p = x
return p
}
func (x CircuitRelay_Type) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (CircuitRelay_Type) Descriptor() protoreflect.EnumDescriptor {
return file_pb_circuitv1_proto_enumTypes[1].Descriptor()
}
func (CircuitRelay_Type) Type() protoreflect.EnumType {
return &file_pb_circuitv1_proto_enumTypes[1]
}
func (x CircuitRelay_Type) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Do not use.
func (x *CircuitRelay_Type) UnmarshalJSON(b []byte) error {
num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b)
if err != nil {
return err
}
*x = CircuitRelay_Type(num)
return nil
}
// Deprecated: Use CircuitRelay_Type.Descriptor instead.
func (CircuitRelay_Type) EnumDescriptor() ([]byte, []int) {
return file_pb_circuitv1_proto_rawDescGZIP(), []int{0, 1}
}
type CircuitRelay struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=circuitv1.pb.CircuitRelay_Type" json:"type,omitempty"` // Type of the message
SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"` // srcPeer and dstPeer are used when Type is HOP or STOP
DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"`
Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=circuitv1.pb.CircuitRelay_Status" json:"code,omitempty"` // Status code, used when Type is STATUS
}
func (x *CircuitRelay) Reset() {
*x = CircuitRelay{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_circuitv1_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CircuitRelay) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CircuitRelay) ProtoMessage() {}
func (x *CircuitRelay) ProtoReflect() protoreflect.Message {
mi := &file_pb_circuitv1_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CircuitRelay.ProtoReflect.Descriptor instead.
func (*CircuitRelay) Descriptor() ([]byte, []int) {
return file_pb_circuitv1_proto_rawDescGZIP(), []int{0}
}
func (x *CircuitRelay) GetType() CircuitRelay_Type {
if x != nil && x.Type != nil {
return *x.Type
}
return CircuitRelay_HOP
}
func (x *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer {
if x != nil {
return x.SrcPeer
}
return nil
}
func (x *CircuitRelay) GetDstPeer() *CircuitRelay_Peer {
if x != nil {
return x.DstPeer
}
return nil
}
func (x *CircuitRelay) GetCode() CircuitRelay_Status {
if x != nil && x.Code != nil {
return *x.Code
}
return CircuitRelay_SUCCESS
}
type CircuitRelay_Peer struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` // peer id
Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` // peer's known addresses
}
func (x *CircuitRelay_Peer) Reset() {
*x = CircuitRelay_Peer{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_circuitv1_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CircuitRelay_Peer) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CircuitRelay_Peer) ProtoMessage() {}
func (x *CircuitRelay_Peer) ProtoReflect() protoreflect.Message {
mi := &file_pb_circuitv1_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CircuitRelay_Peer.ProtoReflect.Descriptor instead.
func (*CircuitRelay_Peer) Descriptor() ([]byte, []int) {
return file_pb_circuitv1_proto_rawDescGZIP(), []int{0, 0}
}
func (x *CircuitRelay_Peer) GetId() []byte {
if x != nil {
return x.Id
}
return nil
}
func (x *CircuitRelay_Peer) GetAddrs() [][]byte {
if x != nil {
return x.Addrs
}
return nil
}
var File_pb_circuitv1_proto protoreflect.FileDescriptor
var file_pb_circuitv1_proto_rawDesc = []byte{
0x0a, 0x12, 0x70, 0x62, 0x2f, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e,
0x70, 0x62, 0x22, 0x97, 0x06, 0x0a, 0x0c, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65,
0x6c, 0x61, 0x79, 0x12, 0x33, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0e, 0x32, 0x1f, 0x2e, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, 0x62,
0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x54, 0x79,
0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x50,
0x65, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x63, 0x69, 0x72, 0x63,
0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74,
0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50,
0x65, 0x65, 0x72, 0x12, 0x39, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x63, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31,
0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79,
0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x12, 0x35,
0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x63,
0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x76, 0x31, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63,
0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52,
0x04, 0x63, 0x6f, 0x64, 0x65, 0x1a, 0x2c, 0x0a, 0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x0e, 0x0a,
0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x02, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a,
0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x05, 0x61, 0x64,
0x64, 0x72, 0x73, 0x22, 0xc2, 0x03, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b,
0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x64, 0x12, 0x1a, 0x0a, 0x15, 0x48,
0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f,
0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xdc, 0x01, 0x12, 0x1a, 0x0a, 0x15, 0x48, 0x4f, 0x50, 0x5f, 0x44,
0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47,
0x10, 0xdd, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x4d,
0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44,
0x10, 0xfa, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x4d,
0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44,
0x10, 0xfb, 0x01, 0x12, 0x17, 0x0a, 0x12, 0x48, 0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x5f, 0x43, 0x4f,
0x4e, 0x4e, 0x5f, 0x54, 0x4f, 0x5f, 0x44, 0x53, 0x54, 0x10, 0x84, 0x02, 0x12, 0x16, 0x0a, 0x11,
0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x44, 0x49, 0x41, 0x4c, 0x5f, 0x44, 0x53,
0x54, 0x10, 0x85, 0x02, 0x12, 0x1d, 0x0a, 0x18, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54,
0x5f, 0x4f, 0x50, 0x45, 0x4e, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d,
0x10, 0x86, 0x02, 0x12, 0x19, 0x0a, 0x14, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f,
0x53, 0x50, 0x45, 0x41, 0x4b, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, 0x10, 0x8e, 0x02, 0x12, 0x1b,
0x0a, 0x16, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59,
0x5f, 0x54, 0x4f, 0x5f, 0x53, 0x45, 0x4c, 0x46, 0x10, 0x98, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x53,
0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f,
0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xc0, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x53, 0x54, 0x4f, 0x50,
0x5f, 0x44, 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f,
0x4e, 0x47, 0x10, 0xc1, 0x02, 0x12, 0x1f, 0x0a, 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52,
0x43, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41,
0x4c, 0x49, 0x44, 0x10, 0xde, 0x02, 0x12, 0x1f, 0x0a, 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x44,
0x53, 0x54, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56,
0x41, 0x4c, 0x49, 0x44, 0x10, 0xdf, 0x02, 0x12, 0x17, 0x0a, 0x12, 0x53, 0x54, 0x4f, 0x50, 0x5f,
0x52, 0x45, 0x4c, 0x41, 0x59, 0x5f, 0x52, 0x45, 0x46, 0x55, 0x53, 0x45, 0x44, 0x10, 0x86, 0x03,
0x12, 0x16, 0x0a, 0x11, 0x4d, 0x41, 0x4c, 0x46, 0x4f, 0x52, 0x4d, 0x45, 0x44, 0x5f, 0x4d, 0x45,
0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x90, 0x03, 0x22, 0x32, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65,
0x12, 0x07, 0x0a, 0x03, 0x48, 0x4f, 0x50, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x54, 0x4f,
0x50, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x10, 0x03, 0x12,
0x0b, 0x0a, 0x07, 0x43, 0x41, 0x4e, 0x5f, 0x48, 0x4f, 0x50, 0x10, 0x04,
}
var (
file_pb_circuitv1_proto_rawDescOnce sync.Once
file_pb_circuitv1_proto_rawDescData = file_pb_circuitv1_proto_rawDesc
)
func file_pb_circuitv1_proto_rawDescGZIP() []byte {
file_pb_circuitv1_proto_rawDescOnce.Do(func() {
file_pb_circuitv1_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_circuitv1_proto_rawDescData)
})
return file_pb_circuitv1_proto_rawDescData
}
var file_pb_circuitv1_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_pb_circuitv1_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_circuitv1_proto_goTypes = []interface{}{
(CircuitRelay_Status)(0), // 0: circuitv1.pb.CircuitRelay.Status
(CircuitRelay_Type)(0), // 1: circuitv1.pb.CircuitRelay.Type
(*CircuitRelay)(nil), // 2: circuitv1.pb.CircuitRelay
(*CircuitRelay_Peer)(nil), // 3: circuitv1.pb.CircuitRelay.Peer
}
var file_pb_circuitv1_proto_depIdxs = []int32{
1, // 0: circuitv1.pb.CircuitRelay.type:type_name -> circuitv1.pb.CircuitRelay.Type
3, // 1: circuitv1.pb.CircuitRelay.srcPeer:type_name -> circuitv1.pb.CircuitRelay.Peer
3, // 2: circuitv1.pb.CircuitRelay.dstPeer:type_name -> circuitv1.pb.CircuitRelay.Peer
0, // 3: circuitv1.pb.CircuitRelay.code:type_name -> circuitv1.pb.CircuitRelay.Status
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_pb_circuitv1_proto_init() }
func file_pb_circuitv1_proto_init() {
if File_pb_circuitv1_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_circuitv1_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CircuitRelay); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_circuitv1_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CircuitRelay_Peer); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_circuitv1_proto_rawDesc,
NumEnums: 2,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_pb_circuitv1_proto_goTypes,
DependencyIndexes: file_pb_circuitv1_proto_depIdxs,
EnumInfos: file_pb_circuitv1_proto_enumTypes,
MessageInfos: file_pb_circuitv1_proto_msgTypes,
}.Build()
File_pb_circuitv1_proto = out.File
file_pb_circuitv1_proto_rawDesc = nil
file_pb_circuitv1_proto_goTypes = nil
file_pb_circuitv1_proto_depIdxs = nil
}

44
p2p/protocol/circuitv1/pb/circuitv1.proto

@ -1,44 +0,0 @@
syntax = "proto2";
package circuitv1.pb;
message CircuitRelay {
enum Status {
SUCCESS = 100;
HOP_SRC_ADDR_TOO_LONG = 220;
HOP_DST_ADDR_TOO_LONG = 221;
HOP_SRC_MULTIADDR_INVALID = 250;
HOP_DST_MULTIADDR_INVALID = 251;
HOP_NO_CONN_TO_DST = 260;
HOP_CANT_DIAL_DST = 261;
HOP_CANT_OPEN_DST_STREAM = 262;
HOP_CANT_SPEAK_RELAY = 270;
HOP_CANT_RELAY_TO_SELF = 280;
STOP_SRC_ADDR_TOO_LONG = 320;
STOP_DST_ADDR_TOO_LONG = 321;
STOP_SRC_MULTIADDR_INVALID = 350;
STOP_DST_MULTIADDR_INVALID = 351;
STOP_RELAY_REFUSED = 390;
MALFORMED_MESSAGE = 400;
}
enum Type { // RPC identifier, either HOP, STOP or STATUS
HOP = 1;
STOP = 2;
STATUS = 3;
CAN_HOP = 4;
}
message Peer {
required bytes id = 1; // peer id
repeated bytes addrs = 2; // peer's known addresses
}
optional Type type = 1; // Type of the message
optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP
optional Peer dstPeer = 3;
optional Status code = 4; // Status code, used when Type is STATUS
}

3
p2p/protocol/circuitv1/proto.go

@ -1,3 +0,0 @@
package circuitv1
//go:generate protoc --proto_path=$PWD:$PWD/../../.. --go_out=. --go_opt=Mpb/circuitv1.proto=./pb pb/circuitv1.proto

46
p2p/protocol/circuitv1/relay/options.go

@ -1,46 +0,0 @@
package relay
import (
"github.com/libp2p/go-libp2p/core/peer"
)
type Resources struct {
// MaxCircuits is the maximum number of active relay connections
MaxCircuits int
// MaxCircuitsPerPeer is the maximum number of active relay connections per peer
MaxCircuitsPerPeer int
// BufferSize is the buffer size for relaying in each direction
BufferSize int
}
func DefaultResources() Resources {
return Resources{
MaxCircuits: 1024,
MaxCircuitsPerPeer: 64,
BufferSize: 4096,
}
}
type ACLFilter interface {
AllowHop(src, dest peer.ID) bool
}
type Option func(r *Relay) error
// WithResources specifies resource limits for the relay
func WithResources(rc Resources) Option {
return func(r *Relay) error {
r.rc = rc
return nil
}
}
// WithACL specifies an ACLFilter for access control
func WithACL(acl ACLFilter) Option {
return func(r *Relay) error {
r.acl = acl
return nil
}
}

452
p2p/protocol/circuitv1/relay/relay.go

@ -1,452 +0,0 @@
package relay
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
pb "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("relay")
const (
ProtoID = "/libp2p/circuit/relay/0.1.0"
ServiceName = "libp2p.relay/v1"
StreamTimeout = time.Minute
ConnectTimeout = 30 * time.Second
HandshakeTimeout = time.Minute
relayHopTag = "relay-v1-hop"
relayHopTagValue = 2
maxMessageSize = 4096
)
type Relay struct {
closed atomic.Bool
ctx context.Context
cancel context.CancelFunc
host host.Host
rc Resources
acl ACLFilter
scope network.ResourceScopeSpan
mx sync.Mutex
conns map[peer.ID]int
active int
}
func NewRelay(h host.Host, opts ...Option) (*Relay, error) {
r := &Relay{
host: h,
rc: DefaultResources(),
conns: make(map[peer.ID]int),
}
r.ctx, r.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
err := opt(r)
if err != nil {
return nil, fmt.Errorf("error applying relay option: %w", err)
}
}
// get a scope for memory reservations at service level
err := h.Network().ResourceManager().ViewService(ServiceName,
func(s network.ServiceScope) error {
var err error
r.scope, err = s.BeginSpan()
return err
})
if err != nil {
return nil, err
}
h.SetStreamHandler(ProtoID, r.handleStream)
return r, nil
}
func (r *Relay) Close() error {
if r.closed.CompareAndSwap(false, true) {
r.host.RemoveStreamHandler(ProtoID)
r.scope.Done()
r.cancel()
}
return nil
}
func (r *Relay) handleStream(s network.Stream) {
log.Debugf("new relay stream from: %s", s.Conn().RemotePeer())
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to relay service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
s.SetReadDeadline(time.Now().Add(StreamTimeout))
var msg pb.CircuitRelay
err := rd.ReadMsg(&msg)
if err != nil {
r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE)
return
}
s.SetReadDeadline(time.Time{})
switch msg.GetType() {
case pb.CircuitRelay_HOP:
r.handleHopStream(s, &msg)
case pb.CircuitRelay_CAN_HOP:
r.handleCanHop(s, &msg)
case pb.CircuitRelay_STOP:
r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED)
default:
log.Warnf("unexpected relay handshake: %d", msg.GetType())
r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE)
}
}
func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
span, err := r.scope.BeginSpan()
if err != nil {
log.Debugf("failed to begin relay transaction: %s", err)
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
fail := func(code pb.CircuitRelay_Status) {
span.Done()
r.handleError(s, code)
}
// reserve buffers for the relay
if err := span.ReserveMemory(2*r.rc.BufferSize, network.ReservationPriorityHigh); err != nil {
log.Debugf("error reserving memory for relay: %s", err)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
fail(pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
if src.ID != s.Conn().RemotePeer() {
fail(pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
dest, err := peerToPeerInfo(msg.GetDstPeer())
if err != nil {
fail(pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID)
return
}
if dest.ID == r.host.ID() {
fail(pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF)
return
}
if r.acl != nil && !r.acl.AllowHop(src.ID, dest.ID) {
log.Debugf("refusing hop from %s to %s; ACL refused", src.ID, dest.ID)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
r.mx.Lock()
if r.active >= r.rc.MaxCircuits {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many active circuits", src.ID, dest.ID)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
srcConns := r.conns[src.ID]
if srcConns >= r.rc.MaxCircuitsPerPeer {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connections from %s", src.ID, dest.ID, src)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
destConns := r.conns[dest.ID]
if destConns >= r.rc.MaxCircuitsPerPeer {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src.ID, dest.ID, dest.ID)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
r.active++
r.addConn(src.ID)
r.addConn(src.ID)
r.mx.Unlock()
cleanup := func() {
span.Done()
r.mx.Lock()
r.active--
r.rmConn(src.ID)
r.rmConn(dest.ID)
r.mx.Unlock()
}
// open stream
ctx, cancel := context.WithTimeout(r.ctx, ConnectTimeout)
defer cancel()
ctx = network.WithNoDial(ctx, "relay hop")
bs, err := r.host.NewStream(ctx, dest.ID, ProtoID)
if err != nil {
log.Debugf("error opening relay stream to %s: %s", dest.ID.Pretty(), err.Error())
if err == network.ErrNoConn {
r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST)
} else {
r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST)
}
cleanup()
return
}
fail = func(code pb.CircuitRelay_Status) {
bs.Reset()
cleanup()
r.handleError(s, code)
}
if err := bs.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to relay service: %s", err)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
// stop handshake
if err := bs.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("failed to reserve memory for stream: %s", err)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
defer bs.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(bs, maxMessageSize)
wr := util.NewDelimitedWriter(bs)
defer rd.Close()
// set handshake deadline
bs.SetDeadline(time.Now().Add(HandshakeTimeout))
msg.Type = pb.CircuitRelay_STOP.Enum()
err = wr.WriteMsg(msg)
if err != nil {
log.Debugf("error writing stop handshake: %s", err.Error())
fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
msg.Reset()
err = rd.ReadMsg(msg)
if err != nil {
log.Debugf("error reading stop response: %s", err.Error())
fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetType() != pb.CircuitRelay_STATUS {
log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType())
fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetCode() != pb.CircuitRelay_SUCCESS {
log.Debugf("relay stop failure: %d", msg.GetCode())
fail(msg.GetCode())
return
}
err = r.writeResponse(s, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
bs.Reset()
s.Reset()
cleanup()
return
}
// relay connection
log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dest.ID.Pretty())
// reset deadline
bs.SetDeadline(time.Time{})
var goroutines atomic.Int32
goroutines.Store(2)
done := func() {
if goroutines.Add(-1) == 0 {
s.Close()
bs.Close()
cleanup()
}
}
go r.relayConn(s, bs, src.ID, dest.ID, done)
go r.relayConn(bs, s, dest.ID, src.ID, done)
}
func (r *Relay) addConn(p peer.ID) {
conns := r.conns[p]
conns++
r.conns[p] = conns
if conns == 1 {
r.host.ConnManager().TagPeer(p, relayHopTag, relayHopTagValue)
}
}
func (r *Relay) rmConn(p peer.ID) {
conns := r.conns[p]
conns--
if conns > 0 {
r.conns[p] = conns
} else {
delete(r.conns, p)
r.host.ConnManager().UntagPeer(p, relayHopTag)
}
}
func (r *Relay) relayConn(src, dest network.Stream, srcID, destID peer.ID, done func()) {
defer done()
buf := pool.Get(r.rc.BufferSize)
defer pool.Put(buf)
count, err := io.CopyBuffer(dest, src, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
src.Reset()
dest.Reset()
} else {
// propagate the close
dest.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, srcID, destID)
}
func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) {
err := r.writeResponse(s, pb.CircuitRelay_SUCCESS)
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) {
log.Warnf("relay error: %s", code)
err := r.writeResponse(s, code)
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
// Queries a peer for support of hop relay
func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) {
s, err := host.NewStream(ctx, id, ProtoID)
if err != nil {
return false, err
}
defer s.Close()
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_CAN_HOP.Enum()
if err := wr.WriteMsg(&msg); err != nil {
s.Reset()
return false, err
}
msg.Reset()
if err := rd.ReadMsg(&msg); err != nil {
s.Reset()
return false, err
}
if msg.GetType() != pb.CircuitRelay_STATUS {
return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
}
return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil
}
func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error {
wr := util.NewDelimitedWriter(s)
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_STATUS.Enum()
msg.Code = code.Enum()
return wr.WriteMsg(&msg)
}
func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, fmt.Errorf("nil peer")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, addrBytes := range p.Addrs {
a, err := ma.NewMultiaddrBytes(addrBytes)
if err == nil {
addrs = append(addrs, a)
}
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}

2
p2p/protocol/circuitv2/client/client.go

@ -66,13 +66,11 @@ func New(h host.Host, upgrader transport.Upgrader) (*Client, error) {
// Start registers the circuit (client) protocol stream handlers
func (c *Client) Start() {
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1)
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2)
}
func (c *Client) Close() error {
c.ctxCancel()
c.host.RemoveStreamHandler(proto.ProtoIDv1)
c.host.RemoveStreamHandler(proto.ProtoIDv2Stop)
return nil
}

67
p2p/protocol/circuitv2/client/dial.go

@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
pbv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
@ -124,25 +123,14 @@ func (c *Client) dialPeer(ctx context.Context, relay, dest peer.AddrInfo) (*Conn
dialCtx, cancel := context.WithTimeout(ctx, DialRelayTimeout)
defer cancel()
s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop, proto.ProtoIDv1)
s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop)
if err != nil {
return nil, fmt.Errorf("error opening hop stream to relay: %w", err)
}
switch s.Protocol() {
case proto.ProtoIDv2Hop:
return c.connectV2(s, dest)
case proto.ProtoIDv1:
return c.connectV1(s, dest)
default:
s.Reset()
return nil, fmt.Errorf("unexpected stream protocol: %s", s.Protocol())
}
return c.connect(s, dest)
}
func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
func (c *Client) connect(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
s.Reset()
return nil, err
@ -199,52 +187,3 @@ func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error)
return &Conn{stream: s, remote: dest, stat: stat, client: c}, nil
}
func (c *Client) connectV1(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
s.Reset()
return nil, err
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
var msg pbv1.CircuitRelay
msg.Type = pbv1.CircuitRelay_HOP.Enum()
msg.SrcPeer = util.PeerInfoToPeerV1(c.host.Peerstore().PeerInfo(c.host.ID()))
msg.DstPeer = util.PeerInfoToPeerV1(dest)
s.SetDeadline(time.Now().Add(DialTimeout))
err := wr.WriteMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
msg.Reset()
err = rd.ReadMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
s.SetDeadline(time.Time{})
if msg.GetType() != pbv1.CircuitRelay_STATUS {
s.Reset()
return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType())
}
status := msg.GetCode()
if status != pbv1.CircuitRelay_SUCCESS {
s.Reset()
return nil, newRelayError("error opening relay circuit: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status)
}
return &Conn{stream: s, remote: dest, client: c}, nil
}

83
p2p/protocol/circuitv2/client/handlers.go

@ -4,7 +4,6 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/network"
pbv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
)
@ -87,85 +86,3 @@ func (c *Client) handleStreamV2(s network.Stream) {
handleError(pbv2.Status_CONNECTION_FAILED)
}
}
func (c *Client) handleStreamV1(s network.Stream) {
log.Debugf("new relay/v1 stream from: %s", s.Conn().RemotePeer())
s.SetReadDeadline(time.Now().Add(StreamTimeout))
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
writeResponse := func(status pbv1.CircuitRelay_Status) error {
wr := util.NewDelimitedWriter(s)
var msg pbv1.CircuitRelay
msg.Type = pbv1.CircuitRelay_STATUS.Enum()
msg.Code = status.Enum()
return wr.WriteMsg(&msg)
}
handleError := func(status pbv1.CircuitRelay_Status) {
log.Debugf("protocol error: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status)
err := writeResponse(status)
if err != nil {
s.Reset()
log.Debugf("error writing circuit response: %s", err.Error())
} else {
s.Close()
}
}
var msg pbv1.CircuitRelay
err := rd.ReadMsg(&msg)
if err != nil {
handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE)
return
}
// reset stream deadline as message has been read
s.SetReadDeadline(time.Time{})
switch msg.GetType() {
case pbv1.CircuitRelay_STOP:
case pbv1.CircuitRelay_HOP:
handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
case pbv1.CircuitRelay_CAN_HOP:
handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
default:
log.Debugf("unexpected relay handshake: %d", msg.GetType())
handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE)
return
}
src, err := util.PeerToPeerInfoV1(msg.GetSrcPeer())
if err != nil {
handleError(pbv1.CircuitRelay_STOP_SRC_MULTIADDR_INVALID)
return
}
dst, err := util.PeerToPeerInfoV1(msg.GetDstPeer())
if err != nil || dst.ID != c.host.ID() {
handleError(pbv1.CircuitRelay_STOP_DST_MULTIADDR_INVALID)
return
}
log.Debugf("incoming relay connection from: %s", src.ID)
select {
case c.incoming <- accept{
conn: &Conn{stream: s, remote: src, client: c},
writeResponse: func() error {
return writeResponse(pbv1.CircuitRelay_SUCCESS)
},
}:
case <-time.After(AcceptTimeout):
handleError(pbv1.CircuitRelay_STOP_RELAY_REFUSED)
}
}

1
p2p/protocol/circuitv2/proto/protocol.go

@ -1,7 +1,6 @@
package proto
const (
ProtoIDv1 = "/libp2p/circuit/relay/0.1.0"
ProtoIDv2Hop = "/libp2p/circuit/relay/0.2.0/hop"
ProtoIDv2Stop = "/libp2p/circuit/relay/0.2.0/stop"
)

179
p2p/protocol/circuitv2/relay/compat_test.go

@ -1,179 +0,0 @@
package relay_test
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
compatv1 "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated"
ma "github.com/multiformats/go-multiaddr"
)
func addTransportV1(t *testing.T, h host.Host, upgrader transport.Upgrader) {
err := compatv1.AddRelayTransport(h, upgrader)
if err != nil {
t.Fatal(err)
}
}
func TestRelayCompatV2DialV1(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransportV1(t, hosts[0], upgraders[0])
addTransport(t, hosts[2], upgraders[2])
rch := make(chan []byte, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
nread := 0
for nread < len(buf) {
n, err := s.Read(buf[nread:])
nread += n
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
rch <- buf[:nread]
})
r, err := relayv1.NewRelay(hosts[1])
if err != nil {
t.Fatal(err)
}
defer r.Close()
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if conns[0].Stat().Transient {
t.Fatal("expected non transient connection")
}
s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
msg := []byte("relay works!")
nwritten, err := s.Write(msg)
if err != nil {
t.Fatal(err)
}
if nwritten != len(msg) {
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
}
s.CloseWrite()
got := <-rch
if !bytes.Equal(msg, got) {
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
}
}
func TestRelayCompatV1DialV2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts, upgraders := getNetHosts(t, ctx, 3)
addTransport(t, hosts[0], upgraders[0])
addTransportV1(t, hosts[2], upgraders[2])
rch := make(chan []byte, 1)
hosts[0].SetStreamHandler("test", func(s network.Stream) {
defer s.Close()
defer close(rch)
buf := make([]byte, 1024)
nread := 0
for nread < len(buf) {
n, err := s.Read(buf[nread:])
nread += n
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
rch <- buf[:nread]
})
r, err := relayv1.NewRelay(hosts[1])
if err != nil {
t.Fatal(err)
}
defer r.Close()
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
if err != nil {
t.Fatal(err)
}
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
if err != nil {
t.Fatal(err)
}
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
if len(conns) != 1 {
t.Fatalf("expected 1 connection, but got %d", len(conns))
}
if conns[0].Stat().Transient {
t.Fatal("expected non transient connection")
}
s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test")
if err != nil {
t.Fatal(err)
}
msg := []byte("relay works!")
nwritten, err := s.Write(msg)
if err != nil {
t.Fatal(err)
}
if nwritten != len(msg) {
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
}
s.CloseWrite()
got := <-rch
if !bytes.Equal(msg, got) {
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
}
}

50
p2p/protocol/circuitv2/util/pbconv.go

@ -4,51 +4,11 @@ import (
"errors"
"github.com/libp2p/go-libp2p/core/peer"
pbv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/pb"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
ma "github.com/multiformats/go-multiaddr"
)
func PeerToPeerInfoV1(p *pbv1.CircuitRelay_Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("nil peer")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
var addrs []ma.Multiaddr
if len(p.Addrs) > 0 {
addrs = make([]ma.Multiaddr, 0, len(p.Addrs))
}
for _, addrBytes := range p.Addrs {
a, err := ma.NewMultiaddrBytes(addrBytes)
if err == nil {
addrs = append(addrs, a)
}
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func PeerInfoToPeerV1(pi peer.AddrInfo) *pbv1.CircuitRelay_Peer {
addrs := make([][]byte, 0, len(pi.Addrs))
for _, addr := range pi.Addrs {
addrs = append(addrs, addr.Bytes())
}
p := new(pbv1.CircuitRelay_Peer)
p.Id = []byte(pi.ID)
p.Addrs = addrs
return p
}
func PeerToPeerInfoV2(p *pbv2.Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("nil peer")
@ -73,14 +33,12 @@ func PeerToPeerInfoV2(p *pbv2.Peer) (peer.AddrInfo, error) {
func PeerInfoToPeerV2(pi peer.AddrInfo) *pbv2.Peer {
addrs := make([][]byte, 0, len(pi.Addrs))
for _, addr := range pi.Addrs {
addrs = append(addrs, addr.Bytes())
}
p := new(pbv2.Peer)
p.Id = []byte(pi.ID)
p.Addrs = addrs
return p
return &pbv2.Peer{
Id: []byte(pi.ID),
Addrs: addrs,
}
}

21
p2p/protocol/holepunch/holepunch_test.go

@ -7,21 +7,19 @@ import (
"testing"
"time"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-testing/race"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
@ -338,7 +336,7 @@ func TestFailuresOnResponder(t *testing.T) {
defer h2.Close()
defer relay.Close()
s, err := h2.NewStream(context.Background(), h1.ID(), holepunch.Protocol)
s, err := h2.NewStream(network.WithUseTransient(context.Background(), "holepunch"), h1.ID(), holepunch.Protocol)
require.NoError(t, err)
go tc.initiator(s)
@ -423,10 +421,7 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
h, err := libp2p.New(
libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")),
libp2p.EnableRelay(),
libp2p.EnableAutoRelayWithStaticRelays(
[]peer.AddrInfo{pi},
autorelay.WithCircuitV1Support(),
),
libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{pi}),
libp2p.ForceReachabilityPrivate(),
libp2p.ResourceManager(&network.NullResourceManager{}),
)
@ -454,7 +449,7 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
libp2p.ResourceManager(&network.NullResourceManager{}),
)
require.NoError(t, err)
_, err = relayv1.NewRelay(relay)
_, err = relayv2.New(relay)
require.NoError(t, err)
// make sure the relay service is started and advertised by Identify
@ -467,7 +462,7 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
defer h.Close()
require.NoError(t, h.Connect(context.Background(), peer.AddrInfo{ID: relay.ID(), Addrs: relay.Addrs()}))
require.Eventually(t, func() bool {
supported, err := h.Peerstore().SupportsProtocols(relay.ID(), proto.ProtoIDv2Hop, relayv1.ProtoID)
supported, err := h.Peerstore().SupportsProtocols(relay.ID(), proto.ProtoIDv2Hop)
return err == nil && len(supported) > 0
}, 3*time.Second, 100*time.Millisecond)

124
p2p/protocol/internal/circuitv1-deprecated/conn.go

@ -1,124 +0,0 @@
package relay
import (
"fmt"
"net"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
// HopTagWeight is the connection manager weight for connections carrying relay hop streams
var HopTagWeight = 5
type Conn struct {
stream network.Stream
remote peer.AddrInfo
host host.Host
relay *Relay
}
type NetAddr struct {
Relay string
Remote string
}
func (n *NetAddr) Network() string {
return "libp2p-circuit-relay"
}
func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}
func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}
func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}
func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}
func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}
func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}
// Increment the underlying relay connection tag by 1, thus increasing its protection from
// connection pruning. This ensures that connections to relays are not accidentally closed,
// by the connection manager, taking with them all the relayed connections (that may themselves
// be protected).
func (c *Conn) tagHop() {
c.relay.mx.Lock()
defer c.relay.mx.Unlock()
p := c.stream.Conn().RemotePeer()
c.relay.hopCount[p]++
if c.relay.hopCount[p] == 1 {
c.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
}
}
// Decrement the underlying relay connection tag by 1; this is performed when we close the
// relayed connection.
func (c *Conn) untagHop() {
c.relay.mx.Lock()
defer c.relay.mx.Unlock()
p := c.stream.Conn().RemotePeer()
c.relay.hopCount[p]--
if c.relay.hopCount[p] == 0 {
c.host.ConnManager().UntagPeer(p, "relay-hop-stream")
delete(c.relay.hopCount, p)
}
}
// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
// TODO: We should be able to do this directly without converting to/from a string.
relayAddr, err := ma.NewComponent(
ma.ProtocolWithCode(ma.P_P2P).Name,
c.stream.Conn().RemotePeer().Pretty(),
)
if err != nil {
panic(err)
}
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
}
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.stream.Conn().LocalMultiaddr()
}
func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
}
return na
}

57
p2p/protocol/internal/circuitv1-deprecated/dial.go

@ -1,57 +0,0 @@
package relay
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)
func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
c, err := d.Relay().Dial(ctx, a, p)
if err != nil {
return nil, err
}
c.tagHop()
scope, _ := (&network.NullResourceManager{}).OpenConnection(network.DirOutbound, false, a)
return d.upgrader.Upgrade(ctx, d, c, network.DirOutbound, p, scope)
}
func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) {
// split /a/p2p-circuit/b into (/a, /p2p-circuit/b)
relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool {
return c.Protocol().Code == ma.P_CIRCUIT
})
// If the address contained no /p2p-circuit part, the second part is nil.
if destaddr == nil {
return nil, fmt.Errorf("%s is not a relay address", a)
}
if relayaddr == nil {
return nil, fmt.Errorf(
"can't dial a p2p-circuit without specifying a relay: %s",
a,
)
}
// Strip the /p2p-circuit prefix from the destaddr.
_, destaddr = ma.SplitFirst(destaddr)
dinfo := &peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{}}
if destaddr != nil {
dinfo.Addrs = append(dinfo.Addrs, destaddr)
}
var rinfo *peer.AddrInfo
rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr)
if err != nil {
return nil, fmt.Errorf("error parsing multiaddr '%s': %s", relayaddr.String(), err)
}
return r.DialPeer(ctx, *rinfo, *dinfo)
}

61
p2p/protocol/internal/circuitv1-deprecated/listen.go

@ -1,61 +0,0 @@
package relay
import (
"net"
pb "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var _ manet.Listener = (*RelayListener)(nil)
type RelayListener Relay
func (l *RelayListener) Relay() *Relay {
return (*Relay)(l)
}
func (r *Relay) Listener() *RelayListener {
// TODO: Only allow one!
return (*RelayListener)(r)
}
func (l *RelayListener) Accept() (manet.Conn, error) {
for {
select {
case c := <-l.incoming:
err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
c.stream.Reset()
continue
}
// TODO: Pretty print.
log.Infof("accepted relay connection: %q", c)
c.tagHop()
return c, nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}
}
func (l *RelayListener) Addr() net.Addr {
return &NetAddr{
Relay: "any",
Remote: "any",
}
}
func (l *RelayListener) Multiaddr() ma.Multiaddr {
return circuitAddr
}
func (l *RelayListener) Close() error {
// TODO: noop?
return nil
}

447
p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go

@ -1,447 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: pb/relay.proto
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type CircuitRelay_Status int32
const (
CircuitRelay_SUCCESS CircuitRelay_Status = 100
CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220
CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221
CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250
CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251
CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260
CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261
CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262
CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270
CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280
CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320
CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321
CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350
CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351
CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390
CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400
)
// Enum value maps for CircuitRelay_Status.
var (
CircuitRelay_Status_name = map[int32]string{
100: "SUCCESS",
220: "HOP_SRC_ADDR_TOO_LONG",
221: "HOP_DST_ADDR_TOO_LONG",
250: "HOP_SRC_MULTIADDR_INVALID",
251: "HOP_DST_MULTIADDR_INVALID",
260: "HOP_NO_CONN_TO_DST",
261: "HOP_CANT_DIAL_DST",
262: "HOP_CANT_OPEN_DST_STREAM",
270: "HOP_CANT_SPEAK_RELAY",
280: "HOP_CANT_RELAY_TO_SELF",
320: "STOP_SRC_ADDR_TOO_LONG",
321: "STOP_DST_ADDR_TOO_LONG",
350: "STOP_SRC_MULTIADDR_INVALID",
351: "STOP_DST_MULTIADDR_INVALID",
390: "STOP_RELAY_REFUSED",
400: "MALFORMED_MESSAGE",
}
CircuitRelay_Status_value = map[string]int32{
"SUCCESS": 100,
"HOP_SRC_ADDR_TOO_LONG": 220,
"HOP_DST_ADDR_TOO_LONG": 221,
"HOP_SRC_MULTIADDR_INVALID": 250,
"HOP_DST_MULTIADDR_INVALID": 251,
"HOP_NO_CONN_TO_DST": 260,
"HOP_CANT_DIAL_DST": 261,
"HOP_CANT_OPEN_DST_STREAM": 262,
"HOP_CANT_SPEAK_RELAY": 270,
"HOP_CANT_RELAY_TO_SELF": 280,
"STOP_SRC_ADDR_TOO_LONG": 320,
"STOP_DST_ADDR_TOO_LONG": 321,
"STOP_SRC_MULTIADDR_INVALID": 350,
"STOP_DST_MULTIADDR_INVALID": 351,
"STOP_RELAY_REFUSED": 390,
"MALFORMED_MESSAGE": 400,
}
)
func (x CircuitRelay_Status) Enum() *CircuitRelay_Status {
p := new(CircuitRelay_Status)
*p = x
return p
}
func (x CircuitRelay_Status) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (CircuitRelay_Status) Descriptor() protoreflect.EnumDescriptor {
return file_pb_relay_proto_enumTypes[0].Descriptor()
}
func (CircuitRelay_Status) Type() protoreflect.EnumType {
return &file_pb_relay_proto_enumTypes[0]
}
func (x CircuitRelay_Status) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Do not use.
func (x *CircuitRelay_Status) UnmarshalJSON(b []byte) error {
num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b)
if err != nil {
return err
}
*x = CircuitRelay_Status(num)
return nil
}
// Deprecated: Use CircuitRelay_Status.Descriptor instead.
func (CircuitRelay_Status) EnumDescriptor() ([]byte, []int) {
return file_pb_relay_proto_rawDescGZIP(), []int{0, 0}
}
type CircuitRelay_Type int32
const (
CircuitRelay_HOP CircuitRelay_Type = 1
CircuitRelay_STOP CircuitRelay_Type = 2
CircuitRelay_STATUS CircuitRelay_Type = 3
CircuitRelay_CAN_HOP CircuitRelay_Type = 4
)
// Enum value maps for CircuitRelay_Type.
var (
CircuitRelay_Type_name = map[int32]string{
1: "HOP",
2: "STOP",
3: "STATUS",
4: "CAN_HOP",
}
CircuitRelay_Type_value = map[string]int32{
"HOP": 1,
"STOP": 2,
"STATUS": 3,
"CAN_HOP": 4,
}
)
func (x CircuitRelay_Type) Enum() *CircuitRelay_Type {
p := new(CircuitRelay_Type)
*p = x
return p
}
func (x CircuitRelay_Type) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (CircuitRelay_Type) Descriptor() protoreflect.EnumDescriptor {
return file_pb_relay_proto_enumTypes[1].Descriptor()
}
func (CircuitRelay_Type) Type() protoreflect.EnumType {
return &file_pb_relay_proto_enumTypes[1]
}
func (x CircuitRelay_Type) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Do not use.
func (x *CircuitRelay_Type) UnmarshalJSON(b []byte) error {
num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b)
if err != nil {
return err
}
*x = CircuitRelay_Type(num)
return nil
}
// Deprecated: Use CircuitRelay_Type.Descriptor instead.
func (CircuitRelay_Type) EnumDescriptor() ([]byte, []int) {
return file_pb_relay_proto_rawDescGZIP(), []int{0, 1}
}
type CircuitRelay struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=relay.pb.CircuitRelay_Type" json:"type,omitempty"` // Type of the message
SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"` // srcPeer and dstPeer are used when Type is HOP or STOP
DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"`
Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=relay.pb.CircuitRelay_Status" json:"code,omitempty"` // Status code, used when Type is STATUS
}
func (x *CircuitRelay) Reset() {
*x = CircuitRelay{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_relay_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CircuitRelay) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CircuitRelay) ProtoMessage() {}
func (x *CircuitRelay) ProtoReflect() protoreflect.Message {
mi := &file_pb_relay_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CircuitRelay.ProtoReflect.Descriptor instead.
func (*CircuitRelay) Descriptor() ([]byte, []int) {
return file_pb_relay_proto_rawDescGZIP(), []int{0}
}
func (x *CircuitRelay) GetType() CircuitRelay_Type {
if x != nil && x.Type != nil {
return *x.Type
}
return CircuitRelay_HOP
}
func (x *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer {
if x != nil {
return x.SrcPeer
}
return nil
}
func (x *CircuitRelay) GetDstPeer() *CircuitRelay_Peer {
if x != nil {
return x.DstPeer
}
return nil
}
func (x *CircuitRelay) GetCode() CircuitRelay_Status {
if x != nil && x.Code != nil {
return *x.Code
}
return CircuitRelay_SUCCESS
}
type CircuitRelay_Peer struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` // peer id
Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` // peer's known addresses
}
func (x *CircuitRelay_Peer) Reset() {
*x = CircuitRelay_Peer{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_relay_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CircuitRelay_Peer) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CircuitRelay_Peer) ProtoMessage() {}
func (x *CircuitRelay_Peer) ProtoReflect() protoreflect.Message {
mi := &file_pb_relay_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CircuitRelay_Peer.ProtoReflect.Descriptor instead.
func (*CircuitRelay_Peer) Descriptor() ([]byte, []int) {
return file_pb_relay_proto_rawDescGZIP(), []int{0, 0}
}
func (x *CircuitRelay_Peer) GetId() []byte {
if x != nil {
return x.Id
}
return nil
}
func (x *CircuitRelay_Peer) GetAddrs() [][]byte {
if x != nil {
return x.Addrs
}
return nil
}
var File_pb_relay_proto protoreflect.FileDescriptor
var file_pb_relay_proto_rawDesc = []byte{
0x0a, 0x0e, 0x70, 0x62, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x08, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x62, 0x22, 0x87, 0x06, 0x0a, 0x0c, 0x43,
0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x2f, 0x0a, 0x04, 0x74,
0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x72, 0x65, 0x6c, 0x61,
0x79, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61,
0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x07,
0x73, 0x72, 0x63, 0x50, 0x65, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e,
0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74,
0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50,
0x65, 0x65, 0x72, 0x12, 0x35, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x62, 0x2e,
0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x65, 0x65,
0x72, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x63, 0x6f,
0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79,
0x2e, 0x70, 0x62, 0x2e, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x52, 0x65, 0x6c, 0x61, 0x79,
0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x1a, 0x2c, 0x0a,
0x04, 0x50, 0x65, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x02, 0x28,
0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02,
0x20, 0x03, 0x28, 0x0c, 0x52, 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x22, 0xc2, 0x03, 0x0a, 0x06,
0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53,
0x53, 0x10, 0x64, 0x12, 0x1a, 0x0a, 0x15, 0x48, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x41,
0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xdc, 0x01, 0x12,
0x1a, 0x0a, 0x15, 0x48, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x52, 0x5f,
0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xdd, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48,
0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52,
0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xfa, 0x01, 0x12, 0x1e, 0x0a, 0x19, 0x48,
0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41, 0x44, 0x44, 0x52,
0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xfb, 0x01, 0x12, 0x17, 0x0a, 0x12, 0x48,
0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x5f, 0x43, 0x4f, 0x4e, 0x4e, 0x5f, 0x54, 0x4f, 0x5f, 0x44, 0x53,
0x54, 0x10, 0x84, 0x02, 0x12, 0x16, 0x0a, 0x11, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54,
0x5f, 0x44, 0x49, 0x41, 0x4c, 0x5f, 0x44, 0x53, 0x54, 0x10, 0x85, 0x02, 0x12, 0x1d, 0x0a, 0x18,
0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x4f, 0x50, 0x45, 0x4e, 0x5f, 0x44, 0x53,
0x54, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x10, 0x86, 0x02, 0x12, 0x19, 0x0a, 0x14, 0x48,
0x4f, 0x50, 0x5f, 0x43, 0x41, 0x4e, 0x54, 0x5f, 0x53, 0x50, 0x45, 0x41, 0x4b, 0x5f, 0x52, 0x45,
0x4c, 0x41, 0x59, 0x10, 0x8e, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x48, 0x4f, 0x50, 0x5f, 0x43, 0x41,
0x4e, 0x54, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, 0x5f, 0x54, 0x4f, 0x5f, 0x53, 0x45, 0x4c, 0x46,
0x10, 0x98, 0x02, 0x12, 0x1b, 0x0a, 0x16, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f,
0x41, 0x44, 0x44, 0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xc0, 0x02,
0x12, 0x1b, 0x0a, 0x16, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x41, 0x44, 0x44,
0x52, 0x5f, 0x54, 0x4f, 0x4f, 0x5f, 0x4c, 0x4f, 0x4e, 0x47, 0x10, 0xc1, 0x02, 0x12, 0x1f, 0x0a,
0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x52, 0x43, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x41,
0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xde, 0x02, 0x12, 0x1f,
0x0a, 0x1a, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x44, 0x53, 0x54, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49,
0x41, 0x44, 0x44, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0xdf, 0x02, 0x12,
0x17, 0x0a, 0x12, 0x53, 0x54, 0x4f, 0x50, 0x5f, 0x52, 0x45, 0x4c, 0x41, 0x59, 0x5f, 0x52, 0x45,
0x46, 0x55, 0x53, 0x45, 0x44, 0x10, 0x86, 0x03, 0x12, 0x16, 0x0a, 0x11, 0x4d, 0x41, 0x4c, 0x46,
0x4f, 0x52, 0x4d, 0x45, 0x44, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x90, 0x03,
0x22, 0x32, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x48, 0x4f, 0x50, 0x10,
0x01, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x54, 0x4f, 0x50, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x53,
0x54, 0x41, 0x54, 0x55, 0x53, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x41, 0x4e, 0x5f, 0x48,
0x4f, 0x50, 0x10, 0x04,
}
var (
file_pb_relay_proto_rawDescOnce sync.Once
file_pb_relay_proto_rawDescData = file_pb_relay_proto_rawDesc
)
func file_pb_relay_proto_rawDescGZIP() []byte {
file_pb_relay_proto_rawDescOnce.Do(func() {
file_pb_relay_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_relay_proto_rawDescData)
})
return file_pb_relay_proto_rawDescData
}
var file_pb_relay_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_pb_relay_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_relay_proto_goTypes = []interface{}{
(CircuitRelay_Status)(0), // 0: relay.pb.CircuitRelay.Status
(CircuitRelay_Type)(0), // 1: relay.pb.CircuitRelay.Type
(*CircuitRelay)(nil), // 2: relay.pb.CircuitRelay
(*CircuitRelay_Peer)(nil), // 3: relay.pb.CircuitRelay.Peer
}
var file_pb_relay_proto_depIdxs = []int32{
1, // 0: relay.pb.CircuitRelay.type:type_name -> relay.pb.CircuitRelay.Type
3, // 1: relay.pb.CircuitRelay.srcPeer:type_name -> relay.pb.CircuitRelay.Peer
3, // 2: relay.pb.CircuitRelay.dstPeer:type_name -> relay.pb.CircuitRelay.Peer
0, // 3: relay.pb.CircuitRelay.code:type_name -> relay.pb.CircuitRelay.Status
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_pb_relay_proto_init() }
func file_pb_relay_proto_init() {
if File_pb_relay_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_relay_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CircuitRelay); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_relay_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CircuitRelay_Peer); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_relay_proto_rawDesc,
NumEnums: 2,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_pb_relay_proto_goTypes,
DependencyIndexes: file_pb_relay_proto_depIdxs,
EnumInfos: file_pb_relay_proto_enumTypes,
MessageInfos: file_pb_relay_proto_msgTypes,
}.Build()
File_pb_relay_proto = out.File
file_pb_relay_proto_rawDesc = nil
file_pb_relay_proto_goTypes = nil
file_pb_relay_proto_depIdxs = nil
}

44
p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto

@ -1,44 +0,0 @@
syntax = "proto2";
package relay.pb;
message CircuitRelay {
enum Status {
SUCCESS = 100;
HOP_SRC_ADDR_TOO_LONG = 220;
HOP_DST_ADDR_TOO_LONG = 221;
HOP_SRC_MULTIADDR_INVALID = 250;
HOP_DST_MULTIADDR_INVALID = 251;
HOP_NO_CONN_TO_DST = 260;
HOP_CANT_DIAL_DST = 261;
HOP_CANT_OPEN_DST_STREAM = 262;
HOP_CANT_SPEAK_RELAY = 270;
HOP_CANT_RELAY_TO_SELF = 280;
STOP_SRC_ADDR_TOO_LONG = 320;
STOP_DST_ADDR_TOO_LONG = 321;
STOP_SRC_MULTIADDR_INVALID = 350;
STOP_DST_MULTIADDR_INVALID = 351;
STOP_RELAY_REFUSED = 390;
MALFORMED_MESSAGE = 400;
}
enum Type { // RPC identifier, either HOP, STOP or STATUS
HOP = 1;
STOP = 2;
STATUS = 3;
CAN_HOP = 4;
}
message Peer {
required bytes id = 1; // peer id
repeated bytes addrs = 2; // peer's known addresses
}
optional Type type = 1; // Type of the message
optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP
optional Peer dstPeer = 3;
optional Status code = 4; // Status code, used when Type is STATUS
}

507
p2p/protocol/internal/circuitv1-deprecated/relay.go

@ -1,507 +0,0 @@
package relay
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
pb "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/transport"
pool "github.com/libp2p/go-buffer-pool"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("relay")
const ProtoID = "/libp2p/circuit/relay/0.1.0"
const maxMessageSize = 4096
var (
RelayAcceptTimeout = 10 * time.Second
HopConnectTimeout = 30 * time.Second
StopHandshakeTimeout = 1 * time.Minute
HopStreamBufferSize = 4096
HopStreamLimit = 1 << 19 // 512K hops for 1M goroutines
streamTimeout = 1 * time.Minute
)
// Relay is the relay transport and service.
type Relay struct {
host host.Host
upgrader transport.Upgrader
ctx context.Context
ctxCancel context.CancelFunc
self peer.ID
active bool
hop bool
incoming chan *Conn
// atomic counters
streamCount atomic.Int32
liveHopCount atomic.Int32
// per peer hop counters
mx sync.Mutex
hopCount map[peer.ID]int
}
// RelayOpts are options for configuring the relay transport.
type RelayOpt int
var (
// OptActive configures the relay transport to actively establish
// outbound connections on behalf of clients. You probably don't want to
// enable this unless you know what you're doing.
OptActive = RelayOpt(0)
// OptHop configures the relay transport to accept requests to relay
// traffic on behalf of third-parties. Unless OptActive is specified,
// this will only relay traffic between peers already connected to this
// node.
OptHop = RelayOpt(1)
// OptDiscovery is a no-op. It was introduced as a way to probe new
// peers to see if they were willing to act as a relays. However, in
// practice, it's useless. While it does test to see if these peers are
// relays, it doesn't (and can't), check to see if these peers are
// _active_ relays (i.e., will actively dial the target peer).
//
// This option may be re-enabled in the future but for now you shouldn't
// use it.
OptDiscovery = RelayOpt(2)
)
type RelayError struct {
Code pb.CircuitRelay_Status
}
func (e RelayError) Error() string {
return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[int32(e.Code)], e.Code)
}
// NewRelay constructs a new relay.
func NewRelay(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) (*Relay, error) {
r := &Relay{
upgrader: upgrader,
host: h,
self: h.ID(),
incoming: make(chan *Conn),
hopCount: make(map[peer.ID]int),
}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
for _, opt := range opts {
switch opt {
case OptActive:
r.active = true
case OptHop:
r.hop = true
case OptDiscovery:
log.Errorf(
"circuit.OptDiscovery is now a no-op: %s",
"dialing peers with a random relay is no longer supported",
)
default:
return nil, fmt.Errorf("unrecognized option: %d", opt)
}
}
h.SetStreamHandler(ProtoID, r.handleNewStream)
return r, nil
}
// Increment the live hop count and increment the connection manager tags by 1 for the two
// sides of the hop stream. This ensures that connections with many hop streams will be protected
// from pruning, thus minimizing disruption from connection trimming in a relay node.
func (r *Relay) addLiveHop(from, to peer.ID) {
r.liveHopCount.Add(1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", incrementTag)
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", incrementTag)
}
// Decrement the live hpo count and decrement the connection manager tags for the two sides
// of the hop stream.
func (r *Relay) rmLiveHop(from, to peer.ID) {
r.liveHopCount.Add(-1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", decrementTag)
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", decrementTag)
}
func (r *Relay) GetActiveHops() int32 {
return r.liveHopCount.Load()
}
func (r *Relay) DialPeer(ctx context.Context, relay peer.AddrInfo, dest peer.AddrInfo) (*Conn, error) {
log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID)
if len(relay.Addrs) > 0 {
r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL)
}
s, err := r.host.NewStream(ctx, relay.ID, ProtoID)
if err != nil {
return nil, err
}
rd := newDelimitedReader(s, maxMessageSize)
wr := newDelimitedWriter(s)
defer rd.Close()
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_HOP.Enum()
msg.SrcPeer = peerInfoToPeer(r.host.Peerstore().PeerInfo(r.self))
msg.DstPeer = peerInfoToPeer(dest)
err = wr.WriteMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
msg.Reset()
err = rd.ReadMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
if msg.GetType() != pb.CircuitRelay_STATUS {
s.Reset()
return nil, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
}
if msg.GetCode() != pb.CircuitRelay_SUCCESS {
s.Reset()
return nil, RelayError{msg.GetCode()}
}
return &Conn{stream: s, remote: dest, host: r.host, relay: r}, nil
}
func (r *Relay) Matches(addr ma.Multiaddr) bool {
// TODO: Look at the prefix transport as well.
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}
// Queries a peer for support of hop relay
func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) {
s, err := host.NewStream(ctx, id, ProtoID)
if err != nil {
return false, err
}
defer s.Close()
rd := newDelimitedReader(s, maxMessageSize)
wr := newDelimitedWriter(s)
defer rd.Close()
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_CAN_HOP.Enum()
if err := wr.WriteMsg(&msg); err != nil {
s.Reset()
return false, err
}
msg.Reset()
if err := rd.ReadMsg(&msg); err != nil {
s.Reset()
return false, err
}
if msg.GetType() != pb.CircuitRelay_STATUS {
return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
}
return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil
}
func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {
return CanHop(ctx, r.host, id)
}
func (r *Relay) handleNewStream(s network.Stream) {
s.SetReadDeadline(time.Now().Add(streamTimeout))
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())
rd := newDelimitedReader(s, maxMessageSize)
defer rd.Close()
var msg pb.CircuitRelay
err := rd.ReadMsg(&msg)
if err != nil {
r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE)
return
}
// reset stream deadline as message has been read
s.SetReadDeadline(time.Time{})
switch msg.GetType() {
case pb.CircuitRelay_HOP:
r.handleHopStream(s, &msg)
case pb.CircuitRelay_STOP:
r.handleStopStream(s, &msg)
case pb.CircuitRelay_CAN_HOP:
r.handleCanHop(s, &msg)
default:
log.Warnf("unexpected relay handshake: %d", msg.GetType())
r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE)
}
}
func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
if !r.hop {
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
streamCount := r.streamCount.Add(1)
liveHopCount := r.liveHopCount.Load()
defer r.streamCount.Add(-1)
if (streamCount + liveHopCount) > int32(HopStreamLimit) {
log.Warn("hop stream limit exceeded; resetting stream")
s.Reset()
return
}
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
if src.ID != s.Conn().RemotePeer() {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
dst, err := peerToPeerInfo(msg.GetDstPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID)
return
}
if dst.ID == r.self {
r.handleError(s, pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF)
return
}
// open stream
ctx, cancel := context.WithTimeout(r.ctx, HopConnectTimeout)
defer cancel()
if !r.active {
ctx = network.WithNoDial(ctx, "relay hop")
} else if len(dst.Addrs) > 0 {
r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, peerstore.TempAddrTTL)
}
bs, err := r.host.NewStream(ctx, dst.ID, ProtoID)
if err != nil {
log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error())
if err == network.ErrNoConn {
r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST)
} else {
r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST)
}
return
}
// stop handshake
rd := newDelimitedReader(bs, maxMessageSize)
wr := newDelimitedWriter(bs)
defer rd.Close()
// set handshake deadline
bs.SetDeadline(time.Now().Add(StopHandshakeTimeout))
msg.Type = pb.CircuitRelay_STOP.Enum()
err = wr.WriteMsg(msg)
if err != nil {
log.Debugf("error writing stop handshake: %s", err.Error())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
msg.Reset()
err = rd.ReadMsg(msg)
if err != nil {
log.Debugf("error reading stop response: %s", err.Error())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetType() != pb.CircuitRelay_STATUS {
log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetCode() != pb.CircuitRelay_SUCCESS {
log.Debugf("relay stop failure: %d", msg.GetCode())
bs.Reset()
r.handleError(s, msg.GetCode())
return
}
err = r.writeResponse(s, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
bs.Reset()
s.Reset()
return
}
// relay connection
log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty())
// reset deadline
bs.SetDeadline(time.Time{})
r.addLiveHop(src.ID, dst.ID)
var goroutines atomic.Int32
goroutines.Store(2)
done := func() {
if goroutines.Add(-1) == 0 {
s.Close()
bs.Close()
r.rmLiveHop(src.ID, dst.ID)
}
}
// Don't reset streams after finishing or the other side will get an
// error, not an EOF.
go func() {
defer done()
buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)
count, err := io.CopyBuffer(s, bs, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
s.Reset()
bs.Reset()
} else {
// propagate the close
s.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty())
}()
go func() {
defer done()
buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)
count, err := io.CopyBuffer(bs, s, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
bs.Reset()
s.Reset()
} else {
// propagate the close
bs.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty())
}()
}
func (r *Relay) handleStopStream(s network.Stream, msg *pb.CircuitRelay) {
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID)
return
}
dst, err := peerToPeerInfo(msg.GetDstPeer())
if err != nil || dst.ID != r.self {
r.handleError(s, pb.CircuitRelay_STOP_DST_MULTIADDR_INVALID)
return
}
log.Infof("relay connection from: %s", src.ID)
if len(src.Addrs) > 0 {
r.host.Peerstore().AddAddrs(src.ID, src.Addrs, peerstore.TempAddrTTL)
}
select {
case r.incoming <- &Conn{stream: s, remote: src, host: r.host, relay: r}:
case <-time.After(RelayAcceptTimeout):
r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED)
}
}
func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) {
var err error
if r.hop {
err = r.writeResponse(s, pb.CircuitRelay_SUCCESS)
} else {
err = r.writeResponse(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
}
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) {
log.Warnf("relay error: %s (%d)", pb.CircuitRelay_Status_name[int32(code)], code)
err := r.writeResponse(s, code)
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error {
wr := newDelimitedWriter(s)
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_STATUS.Enum()
msg.Code = code.Enum()
return wr.WriteMsg(&msg)
}

467
p2p/protocol/internal/circuitv1-deprecated/relay_test.go

@ -1,467 +0,0 @@
//lint:file-ignore U1000 Ignore all unused code, we're not running any tests.
package relay_test
import (
"bytes"
"context"
"fmt"
"io"
"net"
"testing"
"time"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
. "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated"
pb "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb"
"github.com/libp2p/go-libp2p/core/host"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
/* TODO: add tests
- simple A -[R]-> B
- A tries to relay through R, R doesnt support relay
- A tries to relay through R to B, B doesnt support relay
- A sends too long multiaddr
- R drops stream mid-message
- A relays through R, R has no connection to B
*/
func getNetHosts(t *testing.T, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := swarmt.GenSwarm(t)
h := bhost.NewBlankHost(netw)
out = append(out, h)
}
return out
}
func newTestRelay(t *testing.T, host host.Host, opts ...RelayOpt) *Relay {
r, err := NewRelay(host, swarmt.GenUpgrader(t, host.Network().(*swarm.Swarm), nil), opts...)
if err != nil {
t.Fatal(err)
}
return r
}
func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
func TestBasicRelay(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
var (
conn1, conn2 net.Conn
done = make(chan struct{})
)
defer func() {
<-done
if conn1 != nil {
conn1.Close()
}
if conn2 != nil {
conn2.Close()
}
}()
msg := []byte("relay works!")
go func() {
defer close(done)
list := r3.Listener()
var err error
conn1, err = list.Accept()
if err != nil {
t.Error(err)
return
}
_, err = conn1.Write(msg)
if err != nil {
t.Error(err)
return
}
}()
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
var err error
conn2, err = r1.DialPeer(rctx, rinfo, dinfo)
if err != nil {
t.Fatal(err)
}
result := make([]byte, len(msg))
_, err = io.ReadFull(conn2, result)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(result, msg) {
t.Fatal("message was incorrect:", string(result))
}
}
func TestRelayReset(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
ready := make(chan struct{})
msg := []byte("relay works!")
go func() {
list := r3.Listener()
con, err := list.Accept()
if err != nil {
t.Error(err)
return
}
<-ready
_, err = con.Write(msg)
if err != nil {
t.Error(err)
return
}
hosts[2].Network().ClosePeer(hosts[1].ID())
}()
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
con, err := r1.DialPeer(rctx, rinfo, dinfo)
if err != nil {
t.Fatal(err)
}
close(ready)
_, err = io.ReadAll(con)
if err == nil {
t.Fatal("expected error for reset relayed connection")
}
}
func TestBasicRelayDial(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
_ = newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
var (
conn1, conn2 net.Conn
done = make(chan struct{})
)
defer func() {
cancel()
<-done
if conn1 != nil {
conn1.Close()
}
if conn2 != nil {
conn2.Close()
}
}()
msg := []byte("relay works!")
go func() {
defer close(done)
list := r3.Listener()
var err error
conn1, err = list.Accept()
if err != nil {
t.Error(err)
return
}
_, err = conn1.Write(msg)
if err != nil {
t.Error(err)
return
}
}()
addr := ma.StringCast(fmt.Sprintf("/ipfs/%s/p2p-circuit", hosts[1].ID().Pretty()))
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
var err error
conn2, err = r1.Dial(rctx, addr, hosts[2].ID())
if err != nil {
t.Fatal(err)
}
data := make([]byte, len(msg))
_, err = io.ReadFull(conn2, data)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}
func TestUnspecificRelayDialFails(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
hosts := getNetHosts(t, 3)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(100 * time.Millisecond)
go func() {
if _, err := r3.Listener().Accept(); err == nil {
t.Error("should not have received relay connection")
}
}()
addr := ma.StringCast("/p2p-circuit")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := r1.Dial(ctx, addr, hosts[2].ID()); err == nil {
t.Fatal("expected dial with unspecified relay address to fail, even if we're connected to a relay")
}
}
func TestRelayThroughNonHop(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1])
newTestRelay(t, hosts[2])
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
_, err := r1.DialPeer(rctx, rinfo, dinfo)
if err == nil {
t.Fatal("expected error")
}
rerr, ok := err.(RelayError)
if !ok {
t.Fatalf("expected RelayError: %#v", err)
}
if rerr.Code != pb.CircuitRelay_HOP_CANT_SPEAK_RELAY {
t.Fatal("expected 'HOP_CANT_SPEAK_RELAY' error")
}
}
func TestRelayNoDestConnection(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
_, err := r1.DialPeer(rctx, rinfo, dinfo)
if err == nil {
t.Fatal("expected error")
}
rerr, ok := err.(RelayError)
if !ok {
t.Fatalf("expected RelayError: %#v", err)
}
if rerr.Code != pb.CircuitRelay_HOP_NO_CONN_TO_DST {
t.Fatal("expected 'HOP_NO_CONN_TO_DST' error")
}
}
func TestActiveRelay(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop, OptActive)
r3 := newTestRelay(t, hosts[2])
connChan := make(chan manet.Conn)
msg := []byte("relay works!")
go func() {
defer close(connChan)
list := r3.Listener()
conn1, err := list.Accept()
if err != nil {
t.Error(err)
return
}
if _, err := conn1.Write(msg); err != nil {
t.Error(err)
return
}
connChan <- conn1
}()
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
conn2, err := r1.DialPeer(rctx, rinfo, dinfo)
if err != nil {
t.Fatal(err)
}
defer conn2.Close()
data := make([]byte, len(msg))
_, err = io.ReadFull(conn2, data)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
conn1, ok := <-connChan
if !ok {
t.Fatal("listener didn't accept a connection")
}
conn1.Close()
}
func TestRelayCanHop(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 2)
connect(t, hosts[0], hosts[1])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
canhop, err := r1.CanHop(ctx, hosts[1].ID())
if err != nil {
t.Fatal(err)
}
if !canhop {
t.Fatal("Relay can't hop")
}
}

74
p2p/protocol/internal/circuitv1-deprecated/transport.go

@ -1,74 +0,0 @@
package relay
import (
"fmt"
"io"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)
var circuitAddr = ma.Cast(ma.ProtocolWithCode(ma.P_CIRCUIT).VCode)
var _ transport.Transport = (*RelayTransport)(nil)
var _ io.Closer = (*RelayTransport)(nil)
type RelayTransport Relay
func (t *RelayTransport) Relay() *Relay {
return (*Relay)(t)
}
func (r *Relay) Transport() *RelayTransport {
return (*RelayTransport)(r)
}
func (t *RelayTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
// TODO: Ensure we have a connection to the relay, if specified. Also,
// make sure the multiaddr makes sense.
if !t.Relay().Matches(laddr) {
return nil, fmt.Errorf("%s is not a relay address", laddr)
}
return t.upgrader.UpgradeListener(t, t.Relay().Listener()), nil
}
func (t *RelayTransport) CanDial(raddr ma.Multiaddr) bool {
return t.Relay().Matches(raddr)
}
func (t *RelayTransport) Proxy() bool {
return true
}
func (t *RelayTransport) Protocols() []int {
return []int{ma.P_CIRCUIT}
}
func (r *RelayTransport) Close() error {
r.ctxCancel()
return nil
}
// AddRelayTransport constructs a relay and adds it as a transport to the host network.
func AddRelayTransport(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) error {
n, ok := h.Network().(transport.TransportNetwork)
if !ok {
return fmt.Errorf("%v is not a transport network", h.Network())
}
r, err := NewRelay(h, upgrader, opts...)
if err != nil {
return err
}
// There's no nice way to handle these errors as we have no way to tear
// down the relay.
// TODO
if err := n.AddTransport(r.Transport()); err != nil {
log.Error("failed to add relay transport:", err)
} else if err := n.Listen(r.Listener().Multiaddr()); err != nil {
log.Error("failed to listen on relay transport:", err)
}
return nil
}

156
p2p/protocol/internal/circuitv1-deprecated/transport_test.go

@ -1,156 +0,0 @@
//lint:file-ignore U1000 Ignore all unused code, we're not running any tests.
package relay_test
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
. "github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
ma "github.com/multiformats/go-multiaddr"
)
const TestProto = "test/relay-transport"
var msg = []byte("relay works!")
func testSetupRelay(t *testing.T) []host.Host {
hosts := getNetHosts(t, 3)
err := AddRelayTransport(hosts[0], swarmt.GenUpgrader(t, hosts[0].Network().(*swarm.Swarm), nil))
if err != nil {
t.Fatal(err)
}
err = AddRelayTransport(hosts[1], swarmt.GenUpgrader(t, hosts[1].Network().(*swarm.Swarm), nil), OptHop)
if err != nil {
t.Fatal(err)
}
err = AddRelayTransport(hosts[2], swarmt.GenUpgrader(t, hosts[2].Network().(*swarm.Swarm), nil))
if err != nil {
t.Fatal(err)
}
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(100 * time.Millisecond)
handler := func(s network.Stream) {
_, err := s.Write(msg)
if err != nil {
t.Error(err)
}
s.Close()
}
hosts[2].SetStreamHandler(TestProto, handler)
return hosts
}
func TestFullAddressTransportDial(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
hosts := testSetupRelay(t)
var relayAddr ma.Multiaddr
for _, addr := range hosts[1].Addrs() {
// skip relay addrs.
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil {
relayAddr = addr
}
}
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s/p2p-circuit/p2p/%s", relayAddr.String(), hosts[1].ID().Pretty(), hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s, err := hosts[0].NewStream(ctx, hosts[2].ID(), TestProto)
if err != nil {
t.Fatal(err)
}
data, err := io.ReadAll(s)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}
func TestSpecificRelayTransportDial(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := testSetupRelay(t)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].ID().Pretty(), hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)
s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err != nil {
t.Fatal(err)
}
data, err := io.ReadAll(s)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}
func TestUnspecificRelayTransportDialFails(t *testing.T) {
t.Skip("This package is legacy code we only keep around for testing purposes.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := testSetupRelay(t)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p-circuit/ipfs/%s", hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)
_, err = hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err == nil {
t.Fatal("dial to unspecified address should have failed")
}
}

119
p2p/protocol/internal/circuitv1-deprecated/util.go

@ -1,119 +0,0 @@
package relay
import (
"errors"
"io"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/internal/circuitv1-deprecated/pb"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-varint"
"google.golang.org/protobuf/proto"
)
//go:generate protoc --proto_path=$PWD:$PWD/../../../.. --go_out=. --go_opt=Mpb/relay.proto=./pb pb/relay.proto
func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("nil peer")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, addrBytes := range p.Addrs {
a, err := ma.NewMultiaddrBytes(addrBytes)
if err == nil {
addrs = append(addrs, a)
}
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func peerInfoToPeer(pi peer.AddrInfo) *pb.CircuitRelay_Peer {
addrs := make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
addrs[i] = addr.Bytes()
}
p := new(pb.CircuitRelay_Peer)
p.Id = []byte(pi.ID)
p.Addrs = addrs
return p
}
func incrementTag(v int) int {
return v + 1
}
func decrementTag(v int) int {
if v > 0 {
return v - 1
} else {
return v
}
}
type delimitedReader struct {
r io.Reader
buf []byte
}
// The protobuf NewDelimitedReader is buffered, which may eat up stream data.
// So we need to implement a compatible delimited reader that reads unbuffered.
// There is a slowdown from unbuffered reading: when reading the message
// it can take multiple single byte Reads to read the length and another Read
// to read the message payload.
// However, this is not critical performance degradation as
// - the reader is utilized to read one (dialer, stop) or two messages (hop) during
// the handshake, so it's a drop in the water for the connection lifetime.
// - messages are small (max 4k) and the length fits in a couple of bytes,
// so overall we have at most three reads per message.
func newDelimitedReader(r io.Reader, maxSize int) *delimitedReader {
return &delimitedReader{r: r, buf: pool.Get(maxSize)}
}
func (d *delimitedReader) Close() {
if d.buf != nil {
pool.Put(d.buf)
d.buf = nil
}
}
func (d *delimitedReader) ReadByte() (byte, error) {
buf := d.buf[:1]
_, err := d.r.Read(buf)
return buf[0], err
}
func (d *delimitedReader) ReadMsg(msg proto.Message) error {
mlen, err := varint.ReadUvarint(d)
if err != nil {
return err
}
if uint64(len(d.buf)) < mlen {
return errors.New("message too large")
}
buf := d.buf[:mlen]
_, err = io.ReadFull(d.r, buf)
if err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func newDelimitedWriter(w io.Writer) pbio.WriteCloser {
return pbio.NewDelimitedWriter(w)
}
Loading…
Cancel
Save