queue.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package mux
  2. import (
  3. "container/list"
  4. "errors"
  5. "github.com/cnlh/nps/lib/common"
  6. "io"
  7. "sync"
  8. "time"
  9. )
  10. type QueueOp struct {
  11. readOp chan struct{}
  12. cleanOp chan struct{}
  13. popWait bool
  14. mutex sync.Mutex
  15. }
  16. func (Self *QueueOp) New() {
  17. Self.readOp = make(chan struct{})
  18. Self.cleanOp = make(chan struct{}, 2)
  19. }
  20. func (Self *QueueOp) allowPop() (closed bool) {
  21. Self.mutex.Lock()
  22. Self.popWait = false
  23. Self.mutex.Unlock()
  24. select {
  25. case Self.readOp <- struct{}{}:
  26. return false
  27. case <-Self.cleanOp:
  28. return true
  29. }
  30. }
  31. func (Self *QueueOp) Clean() {
  32. Self.cleanOp <- struct{}{}
  33. Self.cleanOp <- struct{}{}
  34. close(Self.cleanOp)
  35. }
  36. type PriorityQueue struct {
  37. list *list.List
  38. QueueOp
  39. }
  40. func (Self *PriorityQueue) New() {
  41. Self.list = list.New()
  42. Self.QueueOp.New()
  43. }
  44. func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
  45. Self.mutex.Lock()
  46. if Self.popWait {
  47. defer Self.allowPop()
  48. }
  49. if packager.Flag == common.MUX_CONN_CLOSE {
  50. Self.insert(packager) // the close package may need priority,
  51. // prevent wait too long to close
  52. } else {
  53. Self.list.PushBack(packager)
  54. }
  55. Self.mutex.Unlock()
  56. return
  57. }
  58. func (Self *PriorityQueue) insert(packager *common.MuxPackager) {
  59. element := Self.list.Back()
  60. for {
  61. if element == nil { // PriorityQueue dose not have any of msg package with this close package id
  62. Self.list.PushFront(packager) // insert close package to first
  63. break
  64. }
  65. if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG &&
  66. element.Value.(*common.MuxPackager).Id == packager.Id {
  67. Self.list.InsertAfter(packager, element) // PriorityQueue has some msg package
  68. // with this close package id, insert close package after last msg package
  69. break
  70. }
  71. element = element.Prev()
  72. }
  73. }
  74. func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
  75. Self.mutex.Lock()
  76. element := Self.list.Front()
  77. if element != nil {
  78. packager = element.Value.(*common.MuxPackager)
  79. Self.list.Remove(element)
  80. Self.mutex.Unlock()
  81. return
  82. }
  83. Self.popWait = true // PriorityQueue is empty, notice Push method
  84. Self.mutex.Unlock()
  85. select {
  86. case <-Self.readOp:
  87. return Self.Pop()
  88. case <-Self.cleanOp:
  89. return nil
  90. }
  91. }
  92. func (Self *PriorityQueue) Len() (n int) {
  93. n = Self.list.Len()
  94. return
  95. }
  96. type ListElement struct {
  97. buf []byte
  98. l uint16
  99. part bool
  100. }
  101. func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
  102. if uint16(len(buf)) != l {
  103. return errors.New("ListElement: buf length not match")
  104. }
  105. Self.buf = buf
  106. Self.l = l
  107. Self.part = part
  108. return nil
  109. }
  110. type FIFOQueue struct {
  111. list []*ListElement
  112. length uint32
  113. stopOp chan struct{}
  114. timeout time.Time
  115. QueueOp
  116. }
  117. func (Self *FIFOQueue) New() {
  118. Self.QueueOp.New()
  119. Self.stopOp = make(chan struct{}, 1)
  120. }
  121. func (Self *FIFOQueue) Push(element *ListElement) {
  122. Self.mutex.Lock()
  123. if Self.popWait {
  124. defer Self.allowPop()
  125. }
  126. Self.list = append(Self.list, element)
  127. Self.length += uint32(element.l)
  128. Self.mutex.Unlock()
  129. return
  130. }
  131. func (Self *FIFOQueue) Pop() (element *ListElement, err error) {
  132. Self.mutex.Lock()
  133. if len(Self.list) == 0 {
  134. Self.popWait = true
  135. Self.mutex.Unlock()
  136. t := Self.timeout.Sub(time.Now())
  137. if t <= 0 {
  138. t = time.Minute
  139. }
  140. timer := time.NewTimer(t)
  141. defer timer.Stop()
  142. select {
  143. case <-Self.readOp:
  144. Self.mutex.Lock()
  145. case <-Self.cleanOp:
  146. return
  147. case <-Self.stopOp:
  148. err = io.EOF
  149. return
  150. case <-timer.C:
  151. err = errors.New("mux.queue: read time out")
  152. return
  153. }
  154. }
  155. element = Self.list[0]
  156. Self.list = Self.list[1:]
  157. Self.length -= uint32(element.l)
  158. Self.mutex.Unlock()
  159. return
  160. }
  161. func (Self *FIFOQueue) Len() (n uint32) {
  162. return Self.length
  163. }
  164. func (Self *FIFOQueue) Stop() {
  165. Self.stopOp <- struct{}{}
  166. }
  167. func (Self *FIFOQueue) SetTimeOut(t time.Time) {
  168. Self.timeout = t
  169. }