1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package mux
- import (
- "container/list"
- "github.com/cnlh/nps/lib/common"
- "sync"
- )
- type Queue struct {
- list *list.List
- readOp chan struct{}
- cleanOp chan struct{}
- popWait bool
- mutex sync.Mutex
- }
- func (Self *Queue) New() {
- Self.list = list.New()
- Self.readOp = make(chan struct{})
- Self.cleanOp = make(chan struct{}, 2)
- }
- func (Self *Queue) Push(packager *common.MuxPackager) {
- Self.mutex.Lock()
- if Self.popWait {
- defer Self.allowPop()
- }
- if packager.Flag == common.MUX_CONN_CLOSE {
- Self.insert(packager) // the close package may need priority,
- // prevent wait too long to close
- } else {
- Self.list.PushBack(packager)
- }
- Self.mutex.Unlock()
- return
- }
- func (Self *Queue) allowPop() (closed bool) {
- Self.mutex.Lock()
- Self.popWait = false
- Self.mutex.Unlock()
- select {
- case Self.readOp <- struct{}{}:
- return false
- case <-Self.cleanOp:
- return true
- }
- }
- func (Self *Queue) insert(packager *common.MuxPackager) {
- element := Self.list.Back()
- for {
- if element == nil { // Queue dose not have any of msg package with this close package id
- Self.list.PushFront(packager) // insert close package to first
- break
- }
- if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG &&
- element.Value.(*common.MuxPackager).Id == packager.Id {
- Self.list.InsertAfter(packager, element) // Queue has some msg package
- // with this close package id, insert close package after last msg package
- break
- }
- element = element.Prev()
- }
- }
- func (Self *Queue) Pop() (packager *common.MuxPackager) {
- Self.mutex.Lock()
- element := Self.list.Front()
- if element != nil {
- packager = element.Value.(*common.MuxPackager)
- Self.list.Remove(element)
- Self.mutex.Unlock()
- return
- }
- Self.popWait = true // Queue is empty, notice Push method
- Self.mutex.Unlock()
- select {
- case <-Self.readOp:
- return Self.Pop()
- case <-Self.cleanOp:
- return nil
- }
- }
- func (Self *Queue) Len() (n int) {
- n = Self.list.Len()
- return
- }
- func (Self *Queue) Clean() {
- Self.cleanOp <- struct{}{}
- Self.cleanOp <- struct{}{}
- close(Self.cleanOp)
- }
|