From e008ec29da3a2f2dde4076602dbed91deba76182 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 14 Aug 2019 00:20:18 +0800 Subject: [PATCH] upgrade session with queue --- common/queue/queue.go | 71 +++++++++++++++++++++++++++++++++ common/stats/session/session.go | 31 +++++++------- 2 files changed, 88 insertions(+), 14 deletions(-) create mode 100644 common/queue/queue.go diff --git a/common/queue/queue.go b/common/queue/queue.go new file mode 100644 index 0000000..777b63b --- /dev/null +++ b/common/queue/queue.go @@ -0,0 +1,71 @@ +package queue + +import ( + "sync" +) + +// Queue is a simple concurrent safe queue +type Queue struct { + items []interface{} + lock sync.RWMutex +} + +// Put add the item to the queue. +func (q *Queue) Put(items ...interface{}) { + if len(items) == 0 { + return + } + + q.lock.Lock() + q.items = append(q.items, items...) + q.lock.Unlock() +} + +// Pop returns the head of items. +func (q *Queue) Pop() interface{} { + if len(q.items) == 0 { + return nil + } + + q.lock.Lock() + head := q.items[0] + q.items = q.items[1:] + q.lock.Unlock() + return head +} + +// First returns the head of items without deleting. +func (q *Queue) First() interface{} { + if len(q.items) == 0 { + return nil + } + + q.lock.RLock() + head := q.items[0] + q.lock.RUnlock() + return head +} + +// Copy get the copy of queue. +func (q *Queue) Copy() []interface{} { + items := make([]interface{}, 0) + q.lock.RLock() + items = append(items, q.items...) + q.lock.RUnlock() + return items +} + +// Len returns the number of items in this queue. +func (q *Queue) Len() int64 { + q.lock.Lock() + defer q.lock.Unlock() + + return int64(len(q.items)) +} + +// New is a constructor for a new concurrent safe queue. +func New(hint int64) *Queue { + return &Queue{ + items: make([]interface{}, 0, hint), + } +} diff --git a/common/stats/session/session.go b/common/stats/session/session.go index 1f0357d..3df7ae7 100644 --- a/common/stats/session/session.go +++ b/common/stats/session/session.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "github.com/xjasonlyu/tun2socks/common/log" + "github.com/xjasonlyu/tun2socks/common/queue" "github.com/xjasonlyu/tun2socks/common/stats" ) @@ -23,11 +24,9 @@ var ( ) type simpleSessionStater struct { - sync.Mutex - - server *http.Server - activeSessionMap sync.Map - completedSessions []*stats.Session + server *http.Server + activeSessionMap sync.Map + completedSessionQueue *queue.Queue } func NewSimpleSessionStater() stats.SessionStater { @@ -40,11 +39,17 @@ func (s *simpleSessionStater) Start() { // Make a snapshot. var activeSessions []*stats.Session s.activeSessionMap.Range(func(key, value interface{}) bool { - sess := value.(*stats.Session) - activeSessions = append(activeSessions, sess) + activeSessions = append(activeSessions, value.(*stats.Session)) return true }) + var completedSessions []*stats.Session + for _, item := range s.completedSessionQueue.Copy() { + if sess, ok := item.(*stats.Session); ok { + completedSessions = append(completedSessions, sess) + } + } + tablePrint := func(w io.Writer, sessions []*stats.Session) { // Sort by session start time. sort.Slice(sessions, func(i, j int) bool { @@ -85,8 +90,8 @@ func (s *simpleSessionStater) Start() { _, _ = fmt.Fprintf(w, "

Active sessions %d

", len(activeSessions)) tablePrint(w, activeSessions) _, _ = fmt.Fprintf(w, "

") - _, _ = fmt.Fprintf(w, "

Recently completed sessions %d

", len(s.completedSessions)) - tablePrint(w, s.completedSessions) + _, _ = fmt.Fprintf(w, "

Recently completed sessions %d

", len(completedSessions)) + tablePrint(w, completedSessions) _, _ = fmt.Fprintf(w, "") _ = w.Flush() } @@ -118,12 +123,10 @@ func (s *simpleSessionStater) GetSession(key interface{}) *stats.Session { func (s *simpleSessionStater) RemoveSession(key interface{}) { if sess, ok := s.activeSessionMap.Load(key); ok { // move to completed sessions - s.Lock() - s.completedSessions = append(s.completedSessions, sess.(*stats.Session)) - if len(s.completedSessions) > maxCompletedSessions { - s.completedSessions = s.completedSessions[1:] + s.completedSessionQueue.Put(sess) + if s.completedSessionQueue.Len() > maxCompletedSessions { + s.completedSessionQueue.Pop() } - s.Unlock() // delete s.activeSessionMap.Delete(key) }