Browse Source

rcmgr: Add conn_limiter to limit number of conns per ip cidr (#2788)

* Add conn_limiter to limit number of conns per ip cidr

* Handle the case where we want to call OpenConnection without an IP address

* Delete key when count==0
gammazero/previous-well-known-resource
Marco Munizaga 6 months ago
committed by GitHub
parent
commit
5d547cf46a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 141
      p2p/host/resource-manager/conn_limiter.go
  2. 158
      p2p/host/resource-manager/conn_limiter_test.go
  3. 46
      p2p/host/resource-manager/rcmgr.go
  4. 18
      p2p/protocol/circuitv2/client/transport.go

141
p2p/host/resource-manager/conn_limiter.go

@ -0,0 +1,141 @@
package rcmgr
import (
"net/netip"
"sync"
)
type ConnLimitPerCIDR struct {
// How many leading 1 bits in the mask
BitMask int
ConnCount int
}
// 8 for now so that it matches the number of concurrent dials we may do
// in swarm_dial.go. With future smart dialing work we should bring this
// down
var defaultMaxConcurrentConns = 8
var defaultIP4Limit = ConnLimitPerCIDR{
ConnCount: defaultMaxConcurrentConns,
BitMask: 32,
}
var defaultIP6Limits = []ConnLimitPerCIDR{
{
ConnCount: defaultMaxConcurrentConns,
BitMask: 56,
},
{
ConnCount: 8 * defaultMaxConcurrentConns,
BitMask: 48,
},
}
func WithLimitPeersPerCIDR(ipv4 []ConnLimitPerCIDR, ipv6 []ConnLimitPerCIDR) Option {
return func(rm *resourceManager) error {
if ipv4 != nil {
rm.connLimiter.connLimitPerCIDRIP4 = ipv4
}
if ipv6 != nil {
rm.connLimiter.connLimitPerCIDRIP6 = ipv6
}
return nil
}
}
type connLimiter struct {
mu sync.Mutex
connLimitPerCIDRIP4 []ConnLimitPerCIDR
connLimitPerCIDRIP6 []ConnLimitPerCIDR
ip4connsPerLimit []map[string]int
ip6connsPerLimit []map[string]int
}
func newConnLimiter() *connLimiter {
return &connLimiter{
connLimitPerCIDRIP4: []ConnLimitPerCIDR{defaultIP4Limit},
connLimitPerCIDRIP6: defaultIP6Limits,
}
}
// addConn adds a connection for the given IP address. It returns true if the connection is allowed.
func (cl *connLimiter) addConn(ip netip.Addr) bool {
cl.mu.Lock()
defer cl.mu.Unlock()
limits := cl.connLimitPerCIDRIP4
countsPerLimit := cl.ip4connsPerLimit
isIP6 := ip.Is6()
if isIP6 {
limits = cl.connLimitPerCIDRIP6
countsPerLimit = cl.ip6connsPerLimit
}
if len(countsPerLimit) == 0 && len(limits) > 0 {
countsPerLimit = make([]map[string]int, len(limits))
if isIP6 {
cl.ip6connsPerLimit = countsPerLimit
} else {
cl.ip4connsPerLimit = countsPerLimit
}
}
for i, limit := range limits {
prefix, err := ip.Prefix(limit.BitMask)
if err != nil {
return false
}
masked := prefix.String()
counts, ok := countsPerLimit[i][masked]
if !ok {
if countsPerLimit[i] == nil {
countsPerLimit[i] = make(map[string]int)
}
countsPerLimit[i][masked] = 0
}
if counts+1 > limit.ConnCount {
return false
}
}
// All limit checks passed, now we update the counts
for i, limit := range limits {
prefix, _ := ip.Prefix(limit.BitMask)
masked := prefix.String()
countsPerLimit[i][masked]++
}
return true
}
func (cl *connLimiter) rmConn(ip netip.Addr) {
cl.mu.Lock()
defer cl.mu.Unlock()
limits := cl.connLimitPerCIDRIP4
countsPerLimit := cl.ip4connsPerLimit
isIP6 := ip.Is6()
if isIP6 {
limits = cl.connLimitPerCIDRIP6
countsPerLimit = cl.ip6connsPerLimit
}
for i, limit := range limits {
prefix, err := ip.Prefix(limit.BitMask)
if err != nil {
// Unexpected since we should have seen this IP before in addConn
log.Errorf("unexpected error getting prefix: %v", err)
continue
}
masked := prefix.String()
counts, ok := countsPerLimit[i][masked]
if !ok || counts == 0 {
// Unexpected, but don't panic
log.Errorf("unexpected conn count for %s ok=%v count=%v", masked, ok, counts)
continue
}
countsPerLimit[i][masked]--
if countsPerLimit[i][masked] == 0 {
delete(countsPerLimit[i], masked)
}
}
}

158
p2p/host/resource-manager/conn_limiter_test.go

@ -0,0 +1,158 @@
package rcmgr
import (
"encoding/binary"
"fmt"
"net"
"net/netip"
"testing"
"github.com/stretchr/testify/require"
)
func TestItLimits(t *testing.T) {
t.Run("IPv4", func(t *testing.T) {
ip, err := netip.ParseAddr("1.2.3.4")
require.NoError(t, err)
cl := newConnLimiter()
cl.connLimitPerCIDRIP4[0].ConnCount = 1
require.True(t, cl.addConn(ip))
// should fail the second time
require.False(t, cl.addConn(ip))
otherIP, err := netip.ParseAddr("1.2.3.5")
require.NoError(t, err)
require.True(t, cl.addConn(otherIP))
})
t.Run("IPv6", func(t *testing.T) {
ip, err := netip.ParseAddr("1:2:3:4::1")
require.NoError(t, err)
cl := newConnLimiter()
original := cl.connLimitPerCIDRIP6[0].ConnCount
cl.connLimitPerCIDRIP6[0].ConnCount = 1
defer func() {
cl.connLimitPerCIDRIP6[0].ConnCount = original
}()
require.True(t, cl.addConn(ip))
// should fail the second time
require.False(t, cl.addConn(ip))
otherIPSameSubnet := netip.MustParseAddr("1:2:3:4::2")
require.False(t, cl.addConn(otherIPSameSubnet))
otherIP := netip.MustParseAddr("2:2:3:4::2")
require.True(t, cl.addConn(otherIP))
})
t.Run("IPv6 with multiple limits", func(t *testing.T) {
cl := newConnLimiter()
for i := 0; i < defaultMaxConcurrentConns; i++ {
ip := net.ParseIP("ff:2:3:4::1")
binary.BigEndian.PutUint16(ip[14:], uint16(i))
ipAddr := netip.MustParseAddr(ip.String())
require.True(t, cl.addConn(ipAddr))
}
// Next one should fail
ip := net.ParseIP("ff:2:3:4::1")
binary.BigEndian.PutUint16(ip[14:], uint16(defaultMaxConcurrentConns+1))
require.False(t, cl.addConn(netip.MustParseAddr(ip.String())))
// But on a different root subnet should work
otherIP := netip.MustParseAddr("ffef:2:3::1")
require.True(t, cl.addConn(otherIP))
// But too many on the next subnet limit will fail too
for i := 0; i < defaultMaxConcurrentConns*8; i++ {
ip := net.ParseIP("ffef:2:3:4::1")
binary.BigEndian.PutUint16(ip[5:7], uint16(i))
fmt.Println(ip.String())
ipAddr := netip.MustParseAddr(ip.String())
require.True(t, cl.addConn(ipAddr))
}
ip = net.ParseIP("ffef:2:3:4::1")
binary.BigEndian.PutUint16(ip[5:7], uint16(defaultMaxConcurrentConns*8+1))
ipAddr := netip.MustParseAddr(ip.String())
require.False(t, cl.addConn(ipAddr))
})
}
func genIP(data *[]byte) (netip.Addr, bool) {
if len(*data) < 1 {
return netip.Addr{}, false
}
genIP6 := (*data)[0]&0x01 == 1
bytesRequired := 4
if genIP6 {
bytesRequired = 16
}
if len((*data)[1:]) < bytesRequired {
return netip.Addr{}, false
}
*data = (*data)[1:]
ip, ok := netip.AddrFromSlice((*data)[:bytesRequired])
*data = (*data)[bytesRequired:]
return ip, ok
}
func FuzzConnLimiter(f *testing.F) {
// The goal is to try to enter a state where the count is incorrectly 0
f.Fuzz(func(t *testing.T, data []byte) {
ips := make([]netip.Addr, 0, len(data)/5)
for {
ip, ok := genIP(&data)
if !ok {
break
}
ips = append(ips, ip)
}
cl := newConnLimiter()
addedConns := make([]netip.Addr, 0, len(ips))
for _, ip := range ips {
if cl.addConn(ip) {
addedConns = append(addedConns, ip)
}
}
addedCount := 0
for _, ip := range cl.ip4connsPerLimit {
for _, count := range ip {
addedCount += count
}
}
for _, ip := range cl.ip6connsPerLimit {
for _, count := range ip {
addedCount += count
}
}
if addedCount == 0 && len(addedConns) > 0 {
t.Fatalf("added count: %d", addedCount)
}
for _, ip := range addedConns {
cl.rmConn(ip)
}
leftoverCount := 0
for _, ip := range cl.ip4connsPerLimit {
for _, count := range ip {
leftoverCount += count
}
}
for _, ip := range cl.ip6connsPerLimit {
for _, count := range ip {
leftoverCount += count
}
}
if leftoverCount != 0 {
t.Fatalf("leftover count: %d", leftoverCount)
}
})
}

46
p2p/host/resource-manager/rcmgr.go

@ -3,6 +3,7 @@ package rcmgr
import ( import (
"context" "context"
"fmt" "fmt"
"net/netip"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -13,6 +14,7 @@ import (
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
) )
var log = logging.Logger("rcmgr") var log = logging.Logger("rcmgr")
@ -20,6 +22,8 @@ var log = logging.Logger("rcmgr")
type resourceManager struct { type resourceManager struct {
limits Limiter limits Limiter
connLimiter *connLimiter
trace *trace trace *trace
metrics *metrics metrics *metrics
disableMetrics bool disableMetrics bool
@ -103,6 +107,7 @@ type connectionScope struct {
rcmgr *resourceManager rcmgr *resourceManager
peer *peerScope peer *peerScope
endpoint multiaddr.Multiaddr endpoint multiaddr.Multiaddr
ip netip.Addr
} }
var _ network.ConnScope = (*connectionScope)(nil) var _ network.ConnScope = (*connectionScope)(nil)
@ -130,6 +135,7 @@ func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager
allowlist := newAllowlist() allowlist := newAllowlist()
r := &resourceManager{ r := &resourceManager{
limits: limits, limits: limits,
connLimiter: newConnLimiter(),
allowlist: &allowlist, allowlist: &allowlist,
svc: make(map[string]*serviceScope), svc: make(map[string]*serviceScope),
proto: make(map[protocol.ID]*protocolScope), proto: make(map[protocol.ID]*protocolScope),
@ -310,12 +316,38 @@ func (r *resourceManager) nextStreamId() int64 {
return r.streamId return r.streamId
} }
// OpenConnectionNoIP is like OpenConnection, but does not use IP information.
// Used when we still want to limit the connection by other scopes, but don't
// have IP information like when we are relaying.
func (r *resourceManager) OpenConnectionNoIP(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr) (network.ConnManagementScope, error) {
return r.openConnection(dir, usefd, endpoint, netip.Addr{})
}
func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr) (network.ConnManagementScope, error) { func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr) (network.ConnManagementScope, error) {
ip, err := manet.ToIP(endpoint)
if err != nil {
return nil, err
}
ipAddr, ok := netip.AddrFromSlice(ip)
if !ok {
return nil, fmt.Errorf("failed to convert ip to netip.Addr")
}
return r.openConnection(dir, usefd, endpoint, ipAddr)
}
func (r *resourceManager) openConnection(dir network.Direction, usefd bool, endpoint multiaddr.Multiaddr, ip netip.Addr) (network.ConnManagementScope, error) {
if ip.IsValid() {
if ok := r.connLimiter.addConn(ip); !ok {
return nil, fmt.Errorf("connections per ip limit exceeded for %s", endpoint)
}
}
var conn *connectionScope var conn *connectionScope
conn = newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint) conn = newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint, ip)
err := conn.AddConn(dir, usefd) err := conn.AddConn(dir, usefd)
if err != nil { if err != nil && ip.IsValid() {
// Try again if this is an allowlisted connection // Try again if this is an allowlisted connection
// Failed to open connection, let's see if this was allowlisted and try again // Failed to open connection, let's see if this was allowlisted and try again
allowed := r.allowlist.Allowed(endpoint) allowed := r.allowlist.Allowed(endpoint)
@ -476,7 +508,7 @@ func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
} }
} }
func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager, endpoint multiaddr.Multiaddr) *connectionScope { func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager, endpoint multiaddr.Multiaddr, ip netip.Addr) *connectionScope {
return &connectionScope{ return &connectionScope{
resourceScope: newResourceScope(limit, resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
@ -485,6 +517,7 @@ func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *r
usefd: usefd, usefd: usefd,
rcmgr: rcmgr, rcmgr: rcmgr,
endpoint: endpoint, endpoint: endpoint,
ip: ip,
} }
} }
@ -643,6 +676,13 @@ func (s *connectionScope) PeerScope() network.PeerScope {
return s.peer return s.peer
} }
func (s *connectionScope) Done() {
if s.ip.IsValid() {
s.rcmgr.connLimiter.rmConn(s.ip)
}
s.resourceScope.Done()
}
// transferAllowedToStandard transfers this connection scope from being part of // transferAllowedToStandard transfers this connection scope from being part of
// the allowlist set of scopes to being part of the standard set of scopes. // the allowlist set of scopes to being part of the standard set of scopes.
// Happens when we first allowlisted this connection due to its IP, but later // Happens when we first allowlisted this connection due to its IP, but later

18
p2p/protocol/circuitv2/client/transport.go

@ -48,8 +48,24 @@ func AddTransport(h host.Host, upgrader transport.Upgrader) error {
var _ transport.Transport = (*Client)(nil) var _ transport.Transport = (*Client)(nil)
var _ io.Closer = (*Client)(nil) var _ io.Closer = (*Client)(nil)
// If the resource manager supports OpenConnectionNoIP, we'll use it to open connections.
// That's because the swarm is already limiting by IP address at the swarm
// level, and here we want to limit by everything but the IP.
// Some ResourceManager implementations may not care about IP addresses, so we
// do our own interface check to see if they provide this option.
type rcmgrOpenConnectionNoIPer interface {
OpenConnectionNoIP(network.Direction, bool, ma.Multiaddr) (network.ConnManagementScope, error)
}
func (c *Client) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) { func (c *Client) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
connScope, err := c.host.Network().ResourceManager().OpenConnection(network.DirOutbound, false, a) var connScope network.ConnManagementScope
var err error
if rcmgr, ok := c.host.Network().ResourceManager().(rcmgrOpenConnectionNoIPer); ok {
connScope, err = rcmgr.OpenConnectionNoIP(network.DirOutbound, false, a)
} else {
connScope, err = c.host.Network().ResourceManager().OpenConnection(network.DirOutbound, false, a)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }

Loading…
Cancel
Save