Browse Source

simplify reuse gc

pull/1424/head
Marten Seemann 3 years ago
parent
commit
2750bef9b5
  1. 50
      p2p/transport/quic/reuse.go
  2. 69
      p2p/transport/quic/reuse_test.go

50
p2p/transport/quic/reuse.go

@ -51,10 +51,8 @@ func (c *reuseConn) ShouldGarbageCollect(now time.Time) bool {
type reuse struct {
mutex sync.Mutex
garbageCollectorRunning bool
closeChan chan struct{}
garbageCollectorStopChan chan struct{}
closeChan chan struct{}
gcStopChan chan struct{}
unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
@ -62,15 +60,18 @@ type reuse struct {
}
func newReuse() *reuse {
return &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
r := &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
gcStopChan: make(chan struct{}),
}
go r.gc()
return r
}
func (r *reuse) runGarbageCollector() {
defer close(r.garbageCollectorStopChan)
func (r *reuse) gc() {
defer close(r.gcStopChan)
ticker := time.NewTicker(garbageCollectInterval)
defer ticker.Stop()
@ -79,7 +80,6 @@ func (r *reuse) runGarbageCollector() {
case <-r.closeChan:
return
case now := <-ticker.C:
var shouldExit bool
r.mutex.Lock()
for key, conn := range r.global {
if conn.ShouldGarbageCollect(now) {
@ -98,29 +98,11 @@ func (r *reuse) runGarbageCollector() {
delete(r.unicast, ukey)
}
}
// stop the garbage collector if we're not tracking any connections
if len(r.global) == 0 && len(r.unicast) == 0 {
r.garbageCollectorRunning = false
shouldExit = true
}
r.mutex.Unlock()
if shouldExit {
return
}
}
}
}
// must be called while holding the mutex
func (r *reuse) maybeStartGarbageCollector() {
if !r.garbageCollectorRunning {
r.garbageCollectorRunning = true
r.garbageCollectorStopChan = make(chan struct{})
go r.runGarbageCollector()
}
}
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
var ip *net.IP
if router, err := netroute.New(); err == nil {
@ -138,7 +120,6 @@ func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
return nil, err
}
conn.IncreaseCount()
r.maybeStartGarbageCollector()
return conn, nil
}
@ -190,8 +171,6 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.maybeStartGarbageCollector()
// Deal with listen on a global address
if localAddr.IP.IsUnspecified() {
// The kernel already checked that the laddr is not already listen
@ -212,12 +191,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
}
func (r *reuse) Close() error {
r.mutex.Lock()
defer r.mutex.Unlock()
close(r.closeChan)
if r.garbageCollectorRunning {
<-r.garbageCollectorStopChan
r.garbageCollectorRunning = false
}
<-r.gcStopChan
return nil
}

69
p2p/transport/quic/reuse_test.go

@ -51,15 +51,16 @@ var _ = Describe("Reuse", func() {
})
AfterEach(func() {
isGarbageCollectorRunning := func() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).gc")
}
Expect(reuse.Close()).To(Succeed())
Expect(isGarbageCollectorRunning()).To(BeFalse())
})
isGarbageCollectorRunning := func() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).runGarbageCollector")
}
Context("creating and reusing connections", func() {
AfterEach(func() { closeAllConns(reuse) })
@ -126,54 +127,32 @@ var _ = Describe("Reuse", func() {
})
})
Context("garbage-collecting connections", func() {
It("garbage collects connections once they're not used any more for a certain time", func() {
numGlobals := func() int {
reuse.mutex.Lock()
defer reuse.mutex.Unlock()
return len(reuse.global)
}
BeforeEach(func() {
maxUnusedDuration = 100 * time.Millisecond
})
maxUnusedDuration = 100 * time.Millisecond
It("garbage collects connections once they're not used any more for a certain time", func() {
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
closeTime := time.Now()
lconn.DecreaseCount()
closeTime := time.Now()
lconn.DecreaseCount()
for {
num := numGlobals()
if closeTime.Add(maxUnusedDuration).Before(time.Now()) {
break
}
Expect(num).To(Equal(1))
time.Sleep(2 * time.Millisecond)
for {
num := numGlobals()
if closeTime.Add(maxUnusedDuration).Before(time.Now()) {
break
}
Eventually(numGlobals).Should(BeZero())
})
It("only stops the garbage collector when there are no more connections", func() {
addr1, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
conn1, err := reuse.Listen("udp4", addr1)
Expect(err).ToNot(HaveOccurred())
addr2, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
conn2, err := reuse.Listen("udp4", addr2)
Expect(err).ToNot(HaveOccurred())
Eventually(isGarbageCollectorRunning).Should(BeTrue())
conn1.DecreaseCount()
Consistently(isGarbageCollectorRunning, 2*maxUnusedDuration).Should(BeTrue())
conn2.DecreaseCount()
Eventually(isGarbageCollectorRunning, 2*maxUnusedDuration).Should(BeFalse())
})
Expect(num).To(Equal(1))
time.Sleep(2 * time.Millisecond)
}
Eventually(numGlobals).Should(BeZero())
})
})

Loading…
Cancel
Save