Browse Source

use the resource manager in libp2p bundled services, initialize default one

pull/1275/head
vyzo 3 years ago
committed by Marten Seemann
parent
commit
df2301ba95
  1. 3
      config/config.go
  2. 18
      defaults.go
  3. 1
      go.mod
  4. 4
      go.sum
  5. 76
      limits.go
  6. 16
      p2p/host/autonat/client.go
  7. 21
      p2p/host/autonat/svc.go
  8. 7
      p2p/host/basic/basic_host.go
  9. 2
      p2p/net/mock/mock_conn.go
  10. 2
      p2p/net/mock/mock_peernet.go
  11. 2
      p2p/net/mock/mock_stream.go
  12. 109
      p2p/protocol/circuitv1/relay/relay.go
  13. 12
      p2p/protocol/circuitv2/client/dial.go
  14. 4
      p2p/protocol/circuitv2/client/transport.go
  15. 99
      p2p/protocol/circuitv2/relay/relay.go
  16. 37
      p2p/protocol/holepunch/coordination.go
  17. 27
      p2p/protocol/identify/id.go
  18. 17
      p2p/protocol/identify/id_delta.go
  19. 59
      p2p/protocol/ping/ping.go

3
config/config.go

@ -150,6 +150,9 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
if cfg.DialTimeout != 0 {
opts = append(opts, swarm.WithDialTimeout(cfg.DialTimeout))
}
if cfg.ResourceManager != nil {
opts = append(opts, swarm.WithResourceManager(cfg.ResourceManager))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
}

18
defaults.go

@ -10,6 +10,7 @@ import (
noise "github.com/libp2p/go-libp2p-noise"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
quic "github.com/libp2p/go-libp2p-quic-transport"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
tls "github.com/libp2p/go-libp2p-tls"
yamux "github.com/libp2p/go-libp2p-yamux"
"github.com/libp2p/go-tcp-transport"
@ -85,6 +86,19 @@ var DefaultEnableRelay = func(cfg *Config) error {
return cfg.Apply(EnableRelay())
}
var DefaultResourceManager = func(cfg *Config) error {
// Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB
limiter := rcmgr.NewDefaultLimiter()
SetDefaultServiceLimits(limiter)
mgr, err := rcmgr.NewResourceManager(limiter)
if err != nil {
return err
}
return cfg.Apply(ResourceManager(mgr))
}
// Complete list of default options and when to fallback on them.
//
// Please *DON'T* specify default options any other way. Putting this all here
@ -121,6 +135,10 @@ var defaults = []struct {
fallback: func(cfg *Config) bool { return !cfg.RelayCustom },
opt: DefaultEnableRelay,
},
{
fallback: func(cfg *Config) bool { return cfg.ResourceManager == nil },
opt: DefaultResourceManager,
},
}
// Defaults configures libp2p to use the default options. Can be combined with

1
go.mod

@ -29,6 +29,7 @@ require (
github.com/libp2p/go-libp2p-noise v0.3.0
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-quic-transport v0.16.0
github.com/libp2p/go-libp2p-resource-manager v0.1.0
github.com/libp2p/go-libp2p-swarm v0.10.0
github.com/libp2p/go-libp2p-testing v0.7.0
github.com/libp2p/go-libp2p-tls v0.3.1

4
go.sum

@ -435,6 +435,8 @@ github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYc
github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc=
github.com/libp2p/go-libp2p-quic-transport v0.16.0 h1:aVg9/jr+R2esov5sH7wkXrmYmqJiUjtLMLYX3L9KYdY=
github.com/libp2p/go-libp2p-quic-transport v0.16.0/go.mod h1:1BXjVMzr+w7EkPfiHkKnwsWjPjtfaNT0q8RS3tGDvEQ=
github.com/libp2p/go-libp2p-resource-manager v0.1.0 h1:tYpbhLPVC4egLavupAi9jGKKLeMemyGq5tnfBc8taBs=
github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-swarm v0.8.0/go.mod h1:sOMp6dPuqco0r0GHTzfVheVBh6UEL0L1lXUZ5ot2Fvc=
github.com/libp2p/go-libp2p-swarm v0.10.0 h1:1yr7UCwxCN92cw9g9Q+fnJSlk7lOB1RetoEewxhGVL0=
github.com/libp2p/go-libp2p-swarm v0.10.0/go.mod h1:71ceMcV6Rg/0rIQ97rsZWMzto1l9LnNquef+efcRbmA=
@ -649,6 +651,8 @@ github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnh
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=

76
limits.go

@ -0,0 +1,76 @@
package libp2p
import (
"github.com/libp2p/go-libp2p/p2p/host/autonat"
circuitv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
)
// SetDefaultServiceLimits sets the default limits for bundled libp2p services
func SetDefaultServiceLimits(limiter *rcmgr.BasicLimiter) {
peerSvcLimit := func(numStreamsIn, numStreamsOut, numStreamsTotal int) rcmgr.Limit {
return &rcmgr.StaticLimit{
// memory: 256kb for window buffers plus some change for message buffers per stream
Memory: int64(numStreamsTotal * (256<<10 + 16384)),
BaseLimit: rcmgr.BaseLimit{
StreamsInbound: numStreamsIn,
StreamsOutbound: numStreamsOut,
Streams: numStreamsTotal,
},
}
}
if limiter.ServiceLimits == nil {
limiter.ServiceLimits = make(map[string]rcmgr.Limit)
}
if limiter.ServicePeerLimits == nil {
limiter.ServicePeerLimits = make(map[string]rcmgr.Limit)
}
// identify
if _, ok := limiter.ServiceLimits[identify.ServiceName]; !ok {
limiter.ServiceLimits[identify.ServiceName] = limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 256) // max 256 streams -- symmetric
limiter.ServicePeerLimits[identify.ServiceName] = peerSvcLimit(16, 16, 32)
}
// ping
if _, ok := limiter.ServiceLimits[ping.ServiceName]; !ok {
limiter.ServiceLimits[ping.ServiceName] = limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 128) // max 128 streams - asymmetric
limiter.ServicePeerLimits[ping.ServiceName] = peerSvcLimit(2, 3, 4)
}
// autonat
if _, ok := limiter.ServiceLimits[autonat.ServiceName]; !ok {
limiter.ServiceLimits[autonat.ServiceName] = limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 128) // max 128 streams - asymmetric
limiter.ServicePeerLimits[autonat.ServiceName] = peerSvcLimit(2, 2, 2)
}
// holepunch
if _, ok := limiter.ServiceLimits[holepunch.ServiceName]; !ok {
limiter.ServiceLimits[holepunch.ServiceName] = limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 256) // max 256 streams - symmetric
limiter.ServicePeerLimits[autonat.ServiceName] = peerSvcLimit(2, 2, 2)
}
// relay/v1
if _, ok := limiter.ServiceLimits[circuitv1.ServiceName]; !ok {
limiter.ServiceLimits[circuitv1.ServiceName] = limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(1024, 1024, 1024) // max 1024 streams - asymmetric
limiter.ServicePeerLimits[circuitv1.ServiceName] = peerSvcLimit(128, 128, 128)
}
// relay/v2
if _, ok := limiter.ServiceLimits[circuitv2.ServiceName]; !ok {
limiter.ServiceLimits[circuitv2.ServiceName] = limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(1024, 1024, 1024) // max 1024 streams - asymmetric
limiter.ServicePeerLimits[circuitv2.ServiceName] = peerSvcLimit(128, 128, 128)
}
}

16
p2p/host/autonat/client.go

@ -36,12 +36,26 @@ func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
if err != nil {
return nil, err
}
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to autonat service: %s", err)
s.Reset()
return nil, err
}
if err := s.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for autonat stream: %s", err)
s.Reset()
return nil, err
}
defer s.Scope().ReleaseMemory(maxMsgSize)
s.SetDeadline(time.Now().Add(streamTimeout))
// Might as well just reset the stream. Once we get to this point, we
// don't care about being nice.
defer s.Close()
r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
r := protoio.NewDelimitedReader(s, maxMsgSize)
w := protoio.NewDelimitedWriter(s)
req := newDialMessage(peer.AddrInfo{ID: c.h.ID(), Addrs: c.addrFunc()})

21
p2p/host/autonat/svc.go

@ -19,6 +19,12 @@ import (
var streamTimeout = 60 * time.Second
const (
ServiceName = "libp2p.autonat"
maxMsgSize = 4096
)
// AutoNATService provides NAT autodetection services to other peers
type autoNATService struct {
instanceLock sync.Mutex
@ -45,13 +51,26 @@ func newAutoNATService(c *config) (*autoNATService, error) {
}
func (as *autoNATService) handleStream(s network.Stream) {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to autonat service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for autonat stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(maxMsgSize)
s.SetDeadline(time.Now().Add(streamTimeout))
defer s.Close()
pid := s.Conn().RemotePeer()
log.Debugf("New stream from %s", pid.Pretty())
r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
r := protoio.NewDelimitedReader(s, maxMsgSize)
w := protoio.NewDelimitedWriter(s)
var req pb.Message

7
p2p/host/basic/basic_host.go

@ -405,7 +405,12 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
}
}
s.SetProtocol(protocol.ID(protoID))
if err := s.SetProtocol(protocol.ID(protoID)); err != nil {
log.Debugf("error setting stream protocol: %s", err)
s.Reset()
return
}
log.Debugf("protocol negotiation took %s", took)
go handle(protoID, s)

2
p2p/net/mock/mock_conn.go

@ -197,5 +197,5 @@ func (c *conn) Stat() network.ConnStats {
}
func (c *conn) Scope() network.ConnScope {
return nil
return network.NullScope
}

2
p2p/net/mock/mock_peernet.go

@ -370,5 +370,5 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
}
func (pn *peernet) ResourceManager() network.ResourceManager {
return nil
return network.NullResourceManager
}

2
p2p/net/mock/mock_stream.go

@ -290,7 +290,7 @@ func (s *stream) transport() {
}
func (s *stream) Scope() network.StreamScope {
return nil
return network.NullScope
}
func (s *stream) cancelWrite(err error) {

109
p2p/protocol/circuitv1/relay/relay.go

@ -25,6 +25,8 @@ var log = logging.Logger("relay")
const (
ProtoID = "/libp2p/circuit/relay/0.1.0"
ServiceName = "libp2p.relay/v1"
StreamTimeout = time.Minute
ConnectTimeout = 30 * time.Second
HandshakeTimeout = time.Minute
@ -40,9 +42,10 @@ type Relay struct {
ctx context.Context
cancel context.CancelFunc
host host.Host
rc Resources
acl ACLFilter
host host.Host
rc Resources
acl ACLFilter
scope network.ResourceScopeSpan
mx sync.Mutex
conns map[peer.ID]int
@ -64,6 +67,17 @@ func NewRelay(h host.Host, opts ...Option) (*Relay, error) {
}
}
// get a scope for memory reservations at service level
err := h.Network().ResourceManager().ViewService(ServiceName,
func(s network.ServiceScope) error {
var err error
r.scope, err = s.BeginSpan()
return err
})
if err != nil {
return nil, err
}
h.SetStreamHandler(ProtoID, r.handleStream)
return r, nil
@ -72,19 +86,33 @@ func NewRelay(h host.Host, opts ...Option) (*Relay, error) {
func (r *Relay) Close() error {
if atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
r.host.RemoveStreamHandler(ProtoID)
r.scope.Done()
r.cancel()
}
return nil
}
func (r *Relay) handleStream(s network.Stream) {
s.SetReadDeadline(time.Now().Add(StreamTimeout))
log.Debugf("new relay stream from: %s", s.Conn().RemotePeer())
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to relay service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
s.SetReadDeadline(time.Now().Add(StreamTimeout))
var msg pb.CircuitRelay
err := rd.ReadMsg(&msg)
@ -108,31 +136,50 @@ func (r *Relay) handleStream(s network.Stream) {
}
func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
span, err := r.scope.BeginSpan()
if err != nil {
log.Debugf("failed to begin relay transaction: %s", err)
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
fail := func(code pb.CircuitRelay_Status) {
span.Done()
r.handleError(s, code)
}
// reserve buffers for the relay
if err := span.ReserveMemory(2*r.rc.BufferSize, network.ReservationPriorityHigh); err != nil {
log.Debugf("error reserving memory for relay: %s", err)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
fail(pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
if src.ID != s.Conn().RemotePeer() {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
fail(pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
dest, err := peerToPeerInfo(msg.GetDstPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID)
fail(pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID)
return
}
if dest.ID == r.host.ID() {
r.handleError(s, pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF)
fail(pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF)
return
}
if r.acl != nil && !r.acl.AllowHop(src.ID, dest.ID) {
log.Debugf("refusing hop from %s to %s; ACL refused", src.ID, dest.ID)
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
@ -140,7 +187,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
if r.active >= r.rc.MaxCircuits {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many active circuits", src.ID, dest.ID)
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
@ -148,7 +195,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
if srcConns >= r.rc.MaxCircuitsPerPeer {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connections from %s", src.ID, dest.ID, src)
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
@ -156,7 +203,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
if destConns >= r.rc.MaxCircuitsPerPeer {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src.ID, dest.ID, dest.ID)
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
@ -166,6 +213,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
r.mx.Unlock()
cleanup := func() {
span.Done()
r.mx.Lock()
r.active--
r.rmConn(src.ID)
@ -190,7 +238,26 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
return
}
fail = func(code pb.CircuitRelay_Status) {
bs.Reset()
cleanup()
r.handleError(s, code)
}
if err := bs.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to relay service: %s", err)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
// stop handshake
if err := bs.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("failed to reserve memory for stream: %s", err)
fail(pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
defer bs.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(bs, maxMessageSize)
wr := util.NewDelimitedWriter(bs)
defer rd.Close()
@ -203,9 +270,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
err = wr.WriteMsg(msg)
if err != nil {
log.Debugf("error writing stop handshake: %s", err.Error())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
cleanup()
fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
@ -214,25 +279,19 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
err = rd.ReadMsg(msg)
if err != nil {
log.Debugf("error reading stop response: %s", err.Error())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
cleanup()
fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetType() != pb.CircuitRelay_STATUS {
log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
cleanup()
fail(pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetCode() != pb.CircuitRelay_SUCCESS {
log.Debugf("relay stop failure: %d", msg.GetCode())
bs.Reset()
r.handleError(s, msg.GetCode())
cleanup()
fail(msg.GetCode())
return
}

12
p2p/protocol/circuitv2/client/dial.go

@ -144,6 +144,12 @@ func (c *Client) dialPeer(ctx context.Context, relay, dest peer.AddrInfo) (*Conn
}
func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
s.Reset()
return nil, err
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
@ -196,6 +202,12 @@ func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error)
}
func (c *Client) connectV1(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
s.Reset()
return nil, err
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()

4
p2p/protocol/circuitv2/client/transport.go

@ -54,6 +54,10 @@ func (c *Client) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport
if err != nil {
return nil, err
}
if err := connScope.SetPeer(p); err != nil {
connScope.Done()
return nil, err
}
conn, err := c.dial(ctx, a, p)
if err != nil {
connScope.Done()

99
p2p/protocol/circuitv2/relay/relay.go

@ -24,6 +24,8 @@ import (
)
const (
ServiceName = "libp2p.relay/v2"
ReservationTagWeight = 10
StreamTimeout = time.Minute
@ -48,6 +50,7 @@ type Relay struct {
rc Resources
acl ACLFilter
constraints *constraints
scope network.ResourceScopeSpan
mx sync.Mutex
rsvp map[peer.ID]time.Time
@ -77,6 +80,17 @@ func New(h host.Host, opts ...Option) (*Relay, error) {
}
}
// get a scope for memory reservations at service level
err := h.Network().ResourceManager().ViewService(ServiceName,
func(s network.ServiceScope) error {
var err error
r.scope, err = s.BeginSpan()
return err
})
if err != nil {
return nil, err
}
r.constraints = newConstraints(&r.rc)
r.selfAddr = ma.StringCast(fmt.Sprintf("/p2p/%s", h.ID()))
@ -93,6 +107,7 @@ func New(h host.Host, opts ...Option) (*Relay, error) {
func (r *Relay) Close() error {
if atomic.CompareAndSwapUint32(&r.closed, 0, 1) {
r.host.RemoveStreamHandler(proto.ProtoIDv2Hop)
r.scope.Done()
r.cancel()
r.mx.Lock()
for p := range r.rsvp {
@ -104,13 +119,26 @@ func (r *Relay) Close() error {
}
func (r *Relay) handleStream(s network.Stream) {
s.SetReadDeadline(time.Now().Add(StreamTimeout))
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to relay service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
defer rd.Close()
s.SetReadDeadline(time.Now().Add(StreamTimeout))
var msg pbv2.HopMessage
err := rd.ReadMsg(&msg)
@ -184,21 +212,40 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
src := s.Conn().RemotePeer()
a := s.Conn().RemoteMultiaddr()
span, err := r.scope.BeginSpan()
if err != nil {
log.Debugf("failed to begin relay transaction: %s", err)
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
fail := func(status pbv2.Status) {
span.Done()
r.handleError(s, status)
}
// reserve buffers for the relay
if err := span.ReserveMemory(2*r.rc.BufferSize, network.ReservationPriorityHigh); err != nil {
log.Debugf("error reserving memory for relay: %s", err)
fail(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
if isRelayAddr(a) {
log.Debugf("refusing connection from %s; connection attempt over relay connection")
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
fail(pbv2.Status_PERMISSION_DENIED)
return
}
dest, err := util.PeerToPeerInfoV2(msg.GetPeer())
if err != nil {
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
fail(pbv2.Status_MALFORMED_MESSAGE)
return
}
if r.acl != nil && !r.acl.AllowConnect(src, s.Conn().RemoteMultiaddr(), dest.ID) {
log.Debugf("refusing connection from %s to %s; permission denied", src, dest.ID)
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
fail(pbv2.Status_PERMISSION_DENIED)
return
}
@ -207,7 +254,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
if !rsvp {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; no reservation", src, dest.ID)
r.handleError(s, pbv2.Status_NO_RESERVATION)
fail(pbv2.Status_NO_RESERVATION)
return
}
@ -215,7 +262,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
if srcConns >= r.rc.MaxCircuits {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connections from %s", src, dest.ID, src)
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
fail(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
@ -223,7 +270,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
if destConns >= r.rc.MaxCircuits {
r.mx.Unlock()
log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src, dest.ID, dest.ID)
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
fail(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
@ -232,6 +279,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
r.mx.Unlock()
cleanup := func() {
span.Done()
r.mx.Lock()
r.rmConn(src)
r.rmConn(dest.ID)
@ -251,7 +299,26 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
return
}
fail = func(status pbv2.Status) {
bs.Reset()
cleanup()
r.handleError(s, status)
}
if err := bs.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to relay service: %s", err)
fail(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
// handshake
if err := bs.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("erro reserving memory for stream: %s", err)
fail(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
return
}
defer bs.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(bs, maxMessageSize)
wr := util.NewDelimitedWriter(bs)
defer rd.Close()
@ -266,9 +333,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
err = wr.WriteMsg(&stopmsg)
if err != nil {
log.Debugf("error writing stop handshake")
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
fail(pbv2.Status_CONNECTION_FAILED)
return
}
@ -277,25 +342,19 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
err = rd.ReadMsg(&stopmsg)
if err != nil {
log.Debugf("error reading stop response: %s", err.Error())
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
fail(pbv2.Status_CONNECTION_FAILED)
return
}
if t := stopmsg.GetType(); t != pbv2.StopMessage_STATUS {
log.Debugf("unexpected stop response; not a status message (%d)", t)
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
fail(pbv2.Status_CONNECTION_FAILED)
return
}
if status := stopmsg.GetStatus(); status != pbv2.Status_OK {
log.Debugf("relay stop failure: %d", status)
bs.Reset()
cleanup()
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
fail(pbv2.Status_CONNECTION_FAILED)
return
}

37
p2p/protocol/holepunch/coordination.go

@ -28,6 +28,8 @@ var StreamTimeout = 1 * time.Minute
// TODO Should we have options for these ?
const (
ServiceName = "libp2p.holepunch"
maxMsgSize = 4 * 1024 // 4K
dialTimeout = 5 * time.Second
maxRetries = 3
@ -146,15 +148,30 @@ func (hs *Service) Close() error {
func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, error) {
hpCtx := network.WithUseTransient(hs.ctx, "hole-punch")
sCtx := network.WithNoDial(hpCtx, "hole-punch")
str, err := hs.host.NewStream(sCtx, rp, Protocol)
if err != nil {
return nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
}
defer str.Close()
str.SetDeadline(time.Now().Add(StreamTimeout))
if err := str.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to holepunch service: %s", err)
str.Reset()
return nil, 0, err
}
if err := str.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for stream: %s, err")
str.Reset()
return nil, 0, err
}
defer str.Scope().ReleaseMemory(maxMsgSize)
w := protoio.NewDelimitedWriter(str)
rd := protoio.NewDelimitedReader(str, maxMsgSize)
str.SetDeadline(time.Now().Add(StreamTimeout))
// send a CONNECT and start RTT measurement.
msg := &pb.HolePunch{
Type: pb.HolePunch_CONNECT.Enum(),
@ -168,7 +185,6 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration,
}
// wait for a CONNECT message from the remote peer
rd := protoio.NewDelimitedReader(str, maxMsgSize)
msg.Reset()
if err := rd.ReadMsg(msg); err != nil {
str.Reset()
@ -318,12 +334,20 @@ func (hs *Service) incomingHolePunch(s network.Stream) (rtt time.Duration, addrs
return 0, nil, errors.New("rejecting hole punch request, as we don't have any public addresses")
}
s.SetDeadline(time.Now().Add(StreamTimeout))
if err := s.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for stream: %s, err")
return 0, nil, err
}
defer s.Scope().ReleaseMemory(maxMsgSize)
wr := protoio.NewDelimitedWriter(s)
rd := protoio.NewDelimitedReader(s, maxMsgSize)
// Read Connect message
msg := new(pb.HolePunch)
s.SetDeadline(time.Now().Add(StreamTimeout))
if err := rd.ReadMsg(msg); err != nil {
return 0, nil, fmt.Errorf("failed to read message from initator: %w", err)
}
@ -366,6 +390,13 @@ func (hs *Service) handleNewStream(s network.Stream) {
s.Reset()
return
}
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to holepunch service: %s", err)
s.Reset()
return
}
rp := s.Conn().RemotePeer()
rtt, addrs, err := hs.incomingHolePunch(s)
if err != nil {

27
p2p/protocol/identify/id.go

@ -41,6 +41,8 @@ const ID = "/ipfs/id/1.0.0"
// 0.4.17 which asserted an exact version match.
const LibP2PVersion = "ipfs/0.1.0"
const ServiceName = "libp2p.identify"
// StreamReadTimeout is the read timeout on all incoming Identify family streams.
var StreamReadTimeout = 60 * time.Second
@ -357,7 +359,11 @@ func (ids *idService) identifyConn(c network.Conn) error {
ids.removeConn(c)
return err
}
s.SetProtocol(ID)
if err := s.SetProtocol(ID); err != nil {
log.Warnf("error setting identify protocol for stream: %s", err)
s.Reset()
}
// ok give the response to our handler.
if err := msmux.SelectProtoOrFail(ID, s); err != nil {
@ -370,6 +376,12 @@ func (ids *idService) identifyConn(c network.Conn) error {
}
func (ids *idService) sendIdentifyResp(s network.Stream) {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Warnf("error attaching stream to identify service: %s", err)
s.Reset()
return
}
defer s.Close()
c := s.Conn()
@ -402,6 +414,19 @@ func (ids *idService) sendIdentifyResp(s network.Stream) {
}
func (ids *idService) handleIdentifyResponse(s network.Stream) error {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Warnf("error attaching stream to identify service: %s", err)
s.Reset()
return err
}
if err := s.Scope().ReserveMemory(signedIDSize, network.ReservationPriorityAlways); err != nil {
log.Warnf("error reserving memory for identify stream: %s", err)
s.Reset()
return err
}
defer s.Scope().ReleaseMemory(signedIDSize)
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))
c := s.Conn()

17
p2p/protocol/identify/id_delta.go

@ -15,13 +15,28 @@ import (
const IDDelta = "/p2p/id/delta/1.0.0"
const deltaMsgSize = 2048
// deltaHandler handles incoming delta updates from peers.
func (ids *idService) deltaHandler(s network.Stream) {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Warnf("error attaching stream to identify service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(deltaMsgSize, network.ReservationPriorityAlways); err != nil {
log.Warnf("error reserving memory for identify stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(deltaMsgSize)
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))
c := s.Conn()
r := protoio.NewDelimitedReader(s, 2048)
r := protoio.NewDelimitedReader(s, deltaMsgSize)
mes := pb.Identify{}
if err := r.ReadMsg(&mes); err != nil {
log.Warn("error reading identify message: ", err)

59
p2p/protocol/ping/ping.go

@ -9,6 +9,7 @@ import (
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
@ -16,11 +17,14 @@ import (
var log = logging.Logger("ping")
const PingSize = 32
const (
PingSize = 32
pingTimeout = time.Second * 60
const ID = "/ipfs/ping/1.0.0"
ID = "/ipfs/ping/1.0.0"
const pingTimeout = time.Second * 60
ServiceName = "libp2p.ping"
)
type PingService struct {
Host host.Host
@ -33,7 +37,21 @@ func NewPingService(h host.Host) *PingService {
}
func (p *PingService) PingHandler(s network.Stream) {
buf := make([]byte, PingSize)
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to ping service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(PingSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for ping stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(PingSize)
buf := pool.Get(PingSize)
defer pool.Put(buf)
errCh := make(chan error, 1)
defer close(errCh)
@ -81,15 +99,25 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) <-chan Result {
return Ping(ctx, ps.Host, p)
}
func pingError(err error) chan Result {
ch := make(chan Result, 1)
ch <- Result{Error: err}
close(ch)
return ch
}
// Ping pings the remote peer until the context is canceled, returning a stream
// of RTTs or errors.
func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
s, err := h.NewStream(network.WithUseTransient(ctx, "ping"), p, ID)
if err != nil {
ch := make(chan Result, 1)
ch <- Result{Error: err}
close(ch)
return ch
return pingError(err)
}
if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to ping service: %s", err)
s.Reset()
return pingError(err)
}
ctx, cancel := context.WithCancel(ctx)
@ -130,7 +158,16 @@ func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
}
func ping(s network.Stream) (time.Duration, error) {
buf := make([]byte, PingSize)
if err := s.Scope().ReserveMemory(2*PingSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for ping stream: %s", err)
s.Reset()
return 0, err
}
defer s.Scope().ReleaseMemory(2 * PingSize)
buf := pool.Get(PingSize)
defer pool.Put(buf)
u.NewTimeSeededRand().Read(buf)
before := time.Now()
@ -139,7 +176,9 @@ func ping(s network.Stream) (time.Duration, error) {
return 0, err
}
rbuf := make([]byte, PingSize)
rbuf := pool.Get(PingSize)
defer pool.Put(rbuf)
_, err = io.ReadFull(s, rbuf)
if err != nil {
return 0, err

Loading…
Cancel
Save