From 70a8c2c575cb4ea3af608da880f4b7161a92ff29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 14 May 2020 15:54:39 +0100 Subject: [PATCH] connmgr: introduce abstractions and functions for decaying tags. (#104) --- core/connmgr/decay.go | 93 +++++++++++++++++++++++++++++++++++++++++ core/connmgr/manager.go | 21 +++++++--- core/connmgr/presets.go | 67 +++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 core/connmgr/decay.go create mode 100644 core/connmgr/presets.go diff --git a/core/connmgr/decay.go b/core/connmgr/decay.go new file mode 100644 index 000000000..62b772569 --- /dev/null +++ b/core/connmgr/decay.go @@ -0,0 +1,93 @@ +package connmgr + +import ( + "io" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +// Decayer is implemented by connection managers supporting decaying tags. A +// decaying tag is one whose value automatically decays over time. +// +// The actual application of the decay behaviour is encapsulated in a +// user-provided decaying function (DecayFn). The function is called on every +// tick (determined by the interval parameter), and returns either the new value +// of the tag, or whether it should be erased altogether. +// +// We do not set values on a decaying tag. Rather, we "bump" decaying tags by a +// delta. This calls the BumpFn with the old value and the delta, to determine +// the new value. +// +// Such a pluggable design affords a great deal of flexibility and versatility. +// Behaviours that are straightfoward to implement include: +// +// * Decay a tag by -1, or by half its current value, on every tick. +// * Every time a value is bumped, sum it to its current value. +// * Exponentially boost a score with every bump. +// * Sum the incoming score, but keep it within min, max bounds. +// +// Commonly used DecayFns and BumpFns are provided in the go-libp2p-connmgr +// module. +type Decayer interface { + io.Closer + + // RegisterDecayingTag creates and registers a new decaying tag, if and only + // if a tag with the supplied name doesn't exist yet. Otherwise, an error is + // returned. + // + // The caller provides the interval at which the tag is refreshed, as well + // as the decay function and the bump function. Refer to godocs on DecayFn + // and BumpFn for more info. + RegisterDecayingTag(name string, interval time.Duration, decayFn DecayFn, bumpFn BumpFn) (DecayingTag, error) +} + +// DecayFn applies a decay to the peer's score. The implementation must call +// DecayFn at the interval supplied when registering the tag. +// +// It receives a copy of the decaying value, and returns the score after +// applying the decay, as well as a flag to signal if the tag should be erased. +type DecayFn func(value DecayingValue) (after int, rm bool) + +// BumpFn applies a delta onto an existing score, and returns the new score. +// +// Non-trivial bump functions include exponential boosting, moving averages, +// ceilings, etc. +type BumpFn func(value DecayingValue, delta int) (after int) + +// DecayingTag represents a decaying tag. The tag is a long-lived general +// object, used to operate on tag values for peers. +type DecayingTag interface { + // Name returns the name of the tag. + Name() string + + // Interval is the effective interval at which this tag will tick. Upon + // registration, the desired interval may be overwritten depending on the + // decayer's resolution, and this method allows you to obtain the effective + // interval. + Interval() time.Duration + + // Bump applies a delta to a tag value, calling its bump function. The bump + // may be applied asynchronously, in which case the returned error is used + // to indicate an anomaly when queuing. + Bump(peer peer.ID, delta int) error +} + +// DecayingValue represents a value for a decaying tag. +type DecayingValue struct { + // Tag points to the tag this value belongs to. + Tag DecayingTag + + // Peer is the peer ID to whom this value is associated. + Peer peer.ID + + // Added is the timestamp when this value was added for the first time for + // a tag and a peer. + Added time.Time + + // LastVisit is the timestamp of the last visit. + LastVisit time.Time + + // Value is the current value of the tag. + Value int +} diff --git a/core/connmgr/manager.go b/core/connmgr/manager.go index 4e3186ff1..1e64e7943 100644 --- a/core/connmgr/manager.go +++ b/core/connmgr/manager.go @@ -14,14 +14,23 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -// ConnManager tracks connections to peers, and allows consumers to associate metadata -// with each peer. +// SupportsDecay evaluates if the provided ConnManager supports decay, and if +// so, it returns the Decayer object. Refer to godocs on Decayer for more info. +func SupportsDecay(mgr ConnManager) (Decayer, bool) { + d, ok := mgr.(Decayer) + return d, ok +} + +// ConnManager tracks connections to peers, and allows consumers to associate +// metadata with each peer. // -// It enables connections to be trimmed based on implementation-defined heuristics. -// The ConnManager allows libp2p to enforce an upper bound on the total number of -// open connections. +// It enables connections to be trimmed based on implementation-defined +// heuristics. The ConnManager allows libp2p to enforce an upper bound on the +// total number of open connections. +// +// ConnManagers supporting decaying tags implement Decayer. Use the +// SupportsDecay function to safely cast an instance to Decayer, if supported. type ConnManager interface { - // TagPeer tags a peer with a string, associating a weight with the tag. TagPeer(peer.ID, string, int) diff --git a/core/connmgr/presets.go b/core/connmgr/presets.go new file mode 100644 index 000000000..2bda032d9 --- /dev/null +++ b/core/connmgr/presets.go @@ -0,0 +1,67 @@ +package connmgr + +import ( + "math" + "time" +) + +// DecayNone applies no decay. +func DecayNone() DecayFn { + return func(value DecayingValue) (_ int, rm bool) { + return value.Value, false + } +} + +// DecayFixed subtracts from by the provided minuend, and deletes the tag when +// first reaching 0 or negative. +func DecayFixed(minuend int) DecayFn { + return func(value DecayingValue) (_ int, rm bool) { + v := value.Value - minuend + return v, v <= 0 + } +} + +// DecayLinear applies a fractional coefficient to the value of the current tag, +// rounding down via math.Floor. It erases the tag when the result is zero. +func DecayLinear(coef float64) DecayFn { + return func(value DecayingValue) (after int, rm bool) { + v := math.Floor(float64(value.Value) * coef) + return int(v), v <= 0 + } +} + +// DecayExpireWhenInactive expires a tag after a certain period of no bumps. +func DecayExpireWhenInactive(after time.Duration) DecayFn { + return func(value DecayingValue) (_ int, rm bool) { + rm = value.LastVisit.Sub(time.Now()) >= after + return 0, rm + } +} + +// BumpSumUnbounded adds the incoming value to the peer's score. +func BumpSumUnbounded() BumpFn { + return func(value DecayingValue, delta int) (after int) { + return value.Value + delta + } +} + +// BumpSumBounded keeps summing the incoming score, keeping it within a +// [min, max] range. +func BumpSumBounded(min, max int) BumpFn { + return func(value DecayingValue, delta int) (after int) { + v := value.Value + delta + if v >= max { + return max + } else if v <= min { + return min + } + return v + } +} + +// BumpOverwrite replaces the current value of the tag with the incoming one. +func BumpOverwrite() BumpFn { + return func(value DecayingValue, delta int) (after int) { + return delta + } +}