Browse Source

Merge pull request #93 from libp2p/period-config

introduce WithGracePeriod and WithSilencePeriod configuration options
pull/1297/head^2
Marten Seemann 3 years ago
committed by GitHub
parent
commit
1dc5466eef
  1. 2
      p2p/net/connmgr/bench_test.go
  2. 23
      p2p/net/connmgr/connmgr.go
  3. 126
      p2p/net/connmgr/connmgr_test.go
  4. 2
      p2p/net/connmgr/decay_test.go
  5. 31
      p2p/net/connmgr/options.go

2
p2p/net/connmgr/bench_test.go

@ -19,7 +19,7 @@ func randomConns(tb testing.TB) (c [5000]network.Conn) {
func BenchmarkLockContention(b *testing.B) {
conns := randomConns(b)
cm, err := NewConnManager(1000, 1000, 0)
cm, err := NewConnManager(1000, 1000, WithGracePeriod(0))
require.NoError(b, err)
not := cm.Notifee()

23
p2p/net/connmgr/connmgr.go

@ -15,10 +15,6 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
var SilencePeriod = 10 * time.Second
var minCleanupInterval = 10 * time.Second
var log = logging.Logger("connmgr")
// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
@ -94,17 +90,15 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
}
// NewConnManager creates a new BasicConnMgr with the provided params:
// * lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) (*BasicConnMgr, error) {
// lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
cfg := &config{
highWater: hi,
lowWater: low,
gracePeriod: grace,
silencePeriod: SilencePeriod,
gracePeriod: time.Minute,
silencePeriod: 10 * time.Second,
}
for _, o := range opts {
if err := o(cfg); err != nil {
@ -242,12 +236,9 @@ func (cm *BasicConnMgr) background() {
defer cm.refCount.Done()
interval := cm.cfg.gracePeriod / 2
if interval < cm.cfg.silencePeriod {
if cm.cfg.silencePeriod != 0 {
interval = cm.cfg.silencePeriod
}
if interval < minCleanupInterval {
interval = minCleanupInterval
}
ticker := time.NewTicker(interval)
defer ticker.Stop()

126
p2p/net/connmgr/connmgr_test.go

@ -3,6 +3,7 @@ package connmgr
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
@ -12,7 +13,6 @@ import (
tu "github.com/libp2p/go-libp2p-core/test"
ma "github.com/multiformats/go-multiaddr"
detectrace "github.com/ipfs/go-detect-race"
"github.com/stretchr/testify/require"
)
@ -20,18 +20,22 @@ type tconn struct {
network.Conn
peer peer.ID
closed bool
closed uint32 // to be used atomically. Closed if 1
disconnectNotify func(net network.Network, conn network.Conn)
}
func (c *tconn) Close() error {
c.closed = true
atomic.StoreUint32(&c.closed, 1)
if c.disconnectNotify != nil {
c.disconnectNotify(nil, c)
}
return nil
}
func (c *tconn) isClosed() bool {
return atomic.LoadUint32(&c.closed) == 1
}
func (c *tconn) RemotePeer() peer.ID {
return c.peer
}
@ -51,7 +55,7 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw
// Make sure multiple trim calls block.
func TestTrimBlocks(t *testing.T) {
cm, err := NewConnManager(200, 300, 0)
cm, err := NewConnManager(200, 300, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
@ -80,7 +84,7 @@ func TestTrimBlocks(t *testing.T) {
// Make sure we return from trim when the context is canceled.
func TestTrimCancels(t *testing.T) {
cm, err := NewConnManager(200, 300, 0)
cm, err := NewConnManager(200, 300, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -100,7 +104,7 @@ func TestTrimCancels(t *testing.T) {
// Make sure trim returns when closed.
func TestTrimClosed(t *testing.T) {
cm, err := NewConnManager(200, 300, 0)
cm, err := NewConnManager(200, 300, WithGracePeriod(0))
require.NoError(t, err)
require.NoError(t, cm.Close())
cm.TrimOpenConns(context.Background())
@ -108,7 +112,7 @@ func TestTrimClosed(t *testing.T) {
// Make sure joining an existing trim works.
func TestTrimJoin(t *testing.T) {
cm, err := NewConnManager(200, 300, 0)
cm, err := NewConnManager(200, 300, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
@ -134,7 +138,7 @@ func TestTrimJoin(t *testing.T) {
}
func TestConnTrimming(t *testing.T) {
cm, err := NewConnManager(200, 300, 0)
cm, err := NewConnManager(200, 300, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -147,7 +151,7 @@ func TestConnTrimming(t *testing.T) {
}
for _, c := range conns {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Fatal("nothing should be closed yet")
}
}
@ -162,12 +166,12 @@ func TestConnTrimming(t *testing.T) {
for i := 0; i < 100; i++ {
c := conns[i]
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Fatal("these shouldnt be closed")
}
}
if !conns[299].(*tconn).closed {
if !conns[299].(*tconn).isClosed() {
t.Fatal("conn with bad tag should have gotten closed")
}
}
@ -182,7 +186,7 @@ func TestConnsToClose(t *testing.T) {
}
t.Run("below hi limit", func(t *testing.T) {
cm, err := NewConnManager(0, 10, 0)
cm, err := NewConnManager(0, 10, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
addConns(cm, 5)
@ -190,7 +194,7 @@ func TestConnsToClose(t *testing.T) {
})
t.Run("below low limit", func(t *testing.T) {
cm, err := NewConnManager(10, 0, 0)
cm, err := NewConnManager(10, 0, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
addConns(cm, 5)
@ -198,7 +202,7 @@ func TestConnsToClose(t *testing.T) {
})
t.Run("below low and hi limit", func(t *testing.T) {
cm, err := NewConnManager(1, 1, 0)
cm, err := NewConnManager(1, 1, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
addConns(cm, 1)
@ -206,7 +210,7 @@ func TestConnsToClose(t *testing.T) {
})
t.Run("within silence period", func(t *testing.T) {
cm, err := NewConnManager(1, 1, time.Duration(10*time.Minute))
cm, err := NewConnManager(1, 1, WithGracePeriod(10*time.Minute))
require.NoError(t, err)
defer cm.Close()
addConns(cm, 1)
@ -216,7 +220,7 @@ func TestConnsToClose(t *testing.T) {
func TestGetTagInfo(t *testing.T) {
start := time.Now()
cm, err := NewConnManager(1, 1, time.Duration(10*time.Minute))
cm, err := NewConnManager(1, 1, WithGracePeriod(10*time.Minute))
require.NoError(t, err)
defer cm.Close()
@ -289,7 +293,7 @@ func TestGetTagInfo(t *testing.T) {
}
func TestTagPeerNonExistant(t *testing.T) {
cm, err := NewConnManager(1, 1, time.Duration(10*time.Minute))
cm, err := NewConnManager(1, 1, WithGracePeriod(10*time.Minute))
require.NoError(t, err)
defer cm.Close()
@ -302,7 +306,7 @@ func TestTagPeerNonExistant(t *testing.T) {
}
func TestUntagPeer(t *testing.T) {
cm, err := NewConnManager(1, 1, time.Duration(10*time.Minute))
cm, err := NewConnManager(1, 1, WithGracePeriod(10*time.Minute))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -336,7 +340,7 @@ func TestUntagPeer(t *testing.T) {
func TestGetInfo(t *testing.T) {
start := time.Now()
const gp = 10 * time.Minute
cm, err := NewConnManager(1, 5, gp)
cm, err := NewConnManager(1, 5, WithGracePeriod(gp))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -365,7 +369,7 @@ func TestGetInfo(t *testing.T) {
func TestDoubleConnection(t *testing.T) {
const gp = 10 * time.Minute
cm, err := NewConnManager(1, 5, gp)
cm, err := NewConnManager(1, 5, WithGracePeriod(gp))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -383,7 +387,7 @@ func TestDoubleConnection(t *testing.T) {
func TestDisconnected(t *testing.T) {
const gp = 10 * time.Minute
cm, err := NewConnManager(1, 5, gp)
cm, err := NewConnManager(1, 5, WithGracePeriod(gp))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -417,15 +421,10 @@ func TestDisconnected(t *testing.T) {
}
func TestGracePeriod(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
SilencePeriod = 0
cm, err := NewConnManager(10, 20, 100*time.Millisecond)
const gp = 100 * time.Millisecond
cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Millisecond))
require.NoError(t, err)
defer cm.Close()
SilencePeriod = 10 * time.Second
not := cm.Notifee()
@ -437,9 +436,9 @@ func TestGracePeriod(t *testing.T) {
conns = append(conns, rc)
not.Connected(nil, rc)
time.Sleep(200 * time.Millisecond)
time.Sleep(2 * gp)
if rc.(*tconn).closed {
if rc.(*tconn).isClosed() {
t.Fatal("expected conn to remain open")
}
}
@ -454,7 +453,7 @@ func TestGracePeriod(t *testing.T) {
cm.TrimOpenConns(context.Background())
for _, c := range conns {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Fatal("expected no conns to be closed")
}
}
@ -465,7 +464,7 @@ func TestGracePeriod(t *testing.T) {
closed := 0
for _, c := range conns {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
closed++
}
}
@ -477,11 +476,7 @@ func TestGracePeriod(t *testing.T) {
// see https://github.com/libp2p/go-libp2p-connmgr/issues/23
func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
cm, err := NewConnManager(10, 20, 0)
cm, err := NewConnManager(10, 20, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -501,7 +496,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
for _, c := range conns {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
closed++
}
}
@ -514,16 +509,9 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
}
func TestPeerProtectionSingleTag(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
SilencePeriod = 0
cm, err := NewConnManager(19, 20, 0)
cm, err := NewConnManager(19, 20, WithGracePeriod(0), WithSilencePeriod(time.Millisecond))
require.NoError(t, err)
defer cm.Close()
SilencePeriod = 10 * time.Second
not := cm.Notifee()
var conns []network.Conn
@ -551,10 +539,11 @@ func TestPeerProtectionSingleTag(t *testing.T) {
// add 1 more conn, this shouldn't send us over the limit as protected conns don't count
addConn(20)
time.Sleep(100 * time.Millisecond)
cm.TrimOpenConns(context.Background())
for _, c := range conns {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Error("connection was closed by connection manager")
}
}
@ -567,14 +556,14 @@ func TestPeerProtectionSingleTag(t *testing.T) {
cm.TrimOpenConns(context.Background())
for _, c := range protected {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Error("protected connection was closed by connection manager")
}
}
closed := 0
for _, c := range conns {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
closed++
}
}
@ -592,27 +581,20 @@ func TestPeerProtectionSingleTag(t *testing.T) {
cm.TrimOpenConns(context.Background())
if !protected[0].(*tconn).closed {
if !protected[0].(*tconn).isClosed() {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Error("protected connection was closed by connection manager")
}
}
}
func TestPeerProtectionMultipleTags(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
SilencePeriod = 0
cm, err := NewConnManager(19, 20, 0)
cm, err := NewConnManager(19, 20, WithGracePeriod(0), WithSilencePeriod(time.Millisecond))
require.NoError(t, err)
defer cm.Close()
SilencePeriod = 10 * time.Second
not := cm.Notifee()
// produce 20 connections with unique peers.
@ -640,7 +622,7 @@ func TestPeerProtectionMultipleTags(t *testing.T) {
cm.TrimOpenConns(context.Background())
for _, c := range protected {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Error("protected connection was closed by connection manager")
}
}
@ -663,7 +645,7 @@ func TestPeerProtectionMultipleTags(t *testing.T) {
// connections should still remain open, as they were protected.
for _, c := range protected[0:] {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Error("protected connection was closed by connection manager")
}
}
@ -680,11 +662,11 @@ func TestPeerProtectionMultipleTags(t *testing.T) {
cm.TrimOpenConns(context.Background())
if !protected[0].(*tconn).closed {
if !protected[0].(*tconn).isClosed() {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
if c.(*tconn).isClosed() {
t.Error("protected connection was closed by connection manager")
}
}
@ -692,11 +674,9 @@ func TestPeerProtectionMultipleTags(t *testing.T) {
}
func TestPeerProtectionIdempotent(t *testing.T) {
SilencePeriod = 0
cm, err := NewConnManager(10, 20, 0)
cm, err := NewConnManager(10, 20, WithGracePeriod(0), WithSilencePeriod(time.Millisecond))
require.NoError(t, err)
defer cm.Close()
SilencePeriod = 10 * time.Second
id, _ := tu.RandPeerID()
cm.Protect(id, "global")
@ -726,7 +706,7 @@ func TestPeerProtectionIdempotent(t *testing.T) {
}
func TestUpsertTag(t *testing.T) {
cm, err := NewConnManager(1, 1, time.Duration(10*time.Minute))
cm, err := NewConnManager(1, 1, WithGracePeriod(10*time.Minute))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
@ -763,7 +743,7 @@ func TestUpsertTag(t *testing.T) {
}
func TestTemporaryEntriesClearedFirst(t *testing.T) {
cm, err := NewConnManager(1, 1, 0)
cm, err := NewConnManager(1, 1, WithGracePeriod(0))
require.NoError(t, err)
id := tu.RandPeerIDFatal(t)
@ -785,7 +765,7 @@ func TestTemporaryEntriesClearedFirst(t *testing.T) {
}
func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
cm, err := NewConnManager(1, 1, 0)
cm, err := NewConnManager(1, 1, WithGracePeriod(0))
require.NoError(t, err)
defer cm.Close()
@ -808,15 +788,9 @@ func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
// see https://github.com/libp2p/go-libp2p-connmgr/issues/82
func TestConcurrentCleanupAndTagging(t *testing.T) {
origMinCleanupInterval := minCleanupInterval
t.Cleanup(func() { minCleanupInterval = origMinCleanupInterval })
minCleanupInterval = time.Millisecond
SilencePeriod = 0
cm, err := NewConnManager(1, 1, 0)
cm, err := NewConnManager(1, 1, WithGracePeriod(0), WithSilencePeriod(time.Millisecond))
require.NoError(t, err)
defer cm.Close()
SilencePeriod = 10 * time.Second
for i := 0; i < 1000; i++ {
conn := randConn(t, nil)

2
p2p/net/connmgr/decay_test.go

@ -406,7 +406,7 @@ func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Moc
Clock: mockClock,
}
mgr, err := NewConnManager(10, 10, 1*time.Second, DecayerConfig(cfg))
mgr, err := NewConnManager(10, 10, WithGracePeriod(time.Second), DecayerConfig(cfg))
require.NoError(tb, err)
decay, ok := connmgr.SupportsDecay(mgr)
if !ok {

31
p2p/net/connmgr/options.go

@ -1,6 +1,9 @@
package connmgr
import "time"
import (
"errors"
"time"
)
// config is the configuration struct for the basic connection manager.
type config struct {
@ -21,3 +24,29 @@ func DecayerConfig(opts *DecayerCfg) Option {
return nil
}
}
// WithGracePeriod sets the grace period.
// The grace period is the time a newly opened connection is given before it becomes
// subject to pruning.
func WithGracePeriod(p time.Duration) Option {
return func(cfg *config) error {
if p < 0 {
return errors.New("grace period must be non-negative")
}
cfg.gracePeriod = p
return nil
}
}
// WithSilencePeriod sets the silence period.
// The connection manager will perform a cleanup once per silence period
// if the number of connections surpasses the high watermark.
func WithSilencePeriod(p time.Duration) Option {
return func(cfg *config) error {
if p <= 0 {
return errors.New("silence period must be non-zero")
}
cfg.silencePeriod = p
return nil
}
}

Loading…
Cancel
Save