Browse Source

upgrade session with queue

pull/15/head
Jason 5 years ago
parent
commit
e008ec29da
  1. 71
      common/queue/queue.go
  2. 31
      common/stats/session/session.go

71
common/queue/queue.go

@ -0,0 +1,71 @@
package queue
import (
"sync"
)
// Queue is a simple concurrent safe queue
type Queue struct {
items []interface{}
lock sync.RWMutex
}
// Put add the item to the queue.
func (q *Queue) Put(items ...interface{}) {
if len(items) == 0 {
return
}
q.lock.Lock()
q.items = append(q.items, items...)
q.lock.Unlock()
}
// Pop returns the head of items.
func (q *Queue) Pop() interface{} {
if len(q.items) == 0 {
return nil
}
q.lock.Lock()
head := q.items[0]
q.items = q.items[1:]
q.lock.Unlock()
return head
}
// First returns the head of items without deleting.
func (q *Queue) First() interface{} {
if len(q.items) == 0 {
return nil
}
q.lock.RLock()
head := q.items[0]
q.lock.RUnlock()
return head
}
// Copy get the copy of queue.
func (q *Queue) Copy() []interface{} {
items := make([]interface{}, 0)
q.lock.RLock()
items = append(items, q.items...)
q.lock.RUnlock()
return items
}
// Len returns the number of items in this queue.
func (q *Queue) Len() int64 {
q.lock.Lock()
defer q.lock.Unlock()
return int64(len(q.items))
}
// New is a constructor for a new concurrent safe queue.
func New(hint int64) *Queue {
return &Queue{
items: make([]interface{}, 0, hint),
}
}

31
common/stats/session/session.go

@ -10,6 +10,7 @@ import (
"sync/atomic"
"github.com/xjasonlyu/tun2socks/common/log"
"github.com/xjasonlyu/tun2socks/common/queue"
"github.com/xjasonlyu/tun2socks/common/stats"
)
@ -23,11 +24,9 @@ var (
)
type simpleSessionStater struct {
sync.Mutex
server *http.Server
activeSessionMap sync.Map
completedSessions []*stats.Session
server *http.Server
activeSessionMap sync.Map
completedSessionQueue *queue.Queue
}
func NewSimpleSessionStater() stats.SessionStater {
@ -40,11 +39,17 @@ func (s *simpleSessionStater) Start() {
// Make a snapshot.
var activeSessions []*stats.Session
s.activeSessionMap.Range(func(key, value interface{}) bool {
sess := value.(*stats.Session)
activeSessions = append(activeSessions, sess)
activeSessions = append(activeSessions, value.(*stats.Session))
return true
})
var completedSessions []*stats.Session
for _, item := range s.completedSessionQueue.Copy() {
if sess, ok := item.(*stats.Session); ok {
completedSessions = append(completedSessions, sess)
}
}
tablePrint := func(w io.Writer, sessions []*stats.Session) {
// Sort by session start time.
sort.Slice(sessions, func(i, j int) bool {
@ -85,8 +90,8 @@ func (s *simpleSessionStater) Start() {
_, _ = fmt.Fprintf(w, "<p>Active sessions %d</p>", len(activeSessions))
tablePrint(w, activeSessions)
_, _ = fmt.Fprintf(w, "<br/><br/>")
_, _ = fmt.Fprintf(w, "<p>Recently completed sessions %d</p>", len(s.completedSessions))
tablePrint(w, s.completedSessions)
_, _ = fmt.Fprintf(w, "<p>Recently completed sessions %d</p>", len(completedSessions))
tablePrint(w, completedSessions)
_, _ = fmt.Fprintf(w, "</html>")
_ = w.Flush()
}
@ -118,12 +123,10 @@ func (s *simpleSessionStater) GetSession(key interface{}) *stats.Session {
func (s *simpleSessionStater) RemoveSession(key interface{}) {
if sess, ok := s.activeSessionMap.Load(key); ok {
// move to completed sessions
s.Lock()
s.completedSessions = append(s.completedSessions, sess.(*stats.Session))
if len(s.completedSessions) > maxCompletedSessions {
s.completedSessions = s.completedSessions[1:]
s.completedSessionQueue.Put(sess)
if s.completedSessionQueue.Len() > maxCompletedSessions {
s.completedSessionQueue.Pop()
}
s.Unlock()
// delete
s.activeSessionMap.Delete(key)
}

Loading…
Cancel
Save