queue.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package mux
  2. import (
  3. "container/list"
  4. "errors"
  5. "github.com/cnlh/nps/lib/common"
  6. "io"
  7. "sync"
  8. "time"
  9. )
  10. type QueueOp struct {
  11. readOp chan struct{}
  12. cleanOp chan struct{}
  13. popWait bool
  14. mutex sync.Mutex
  15. }
  16. func (Self *QueueOp) New() {
  17. Self.readOp = make(chan struct{})
  18. Self.cleanOp = make(chan struct{}, 2)
  19. }
  20. func (Self *QueueOp) allowPop() (closed bool) {
  21. Self.mutex.Lock()
  22. Self.popWait = false
  23. Self.mutex.Unlock()
  24. select {
  25. case Self.readOp <- struct{}{}:
  26. return false
  27. case <-Self.cleanOp:
  28. return true
  29. }
  30. }
  31. func (Self *QueueOp) Clean() {
  32. Self.cleanOp <- struct{}{}
  33. Self.cleanOp <- struct{}{}
  34. close(Self.cleanOp)
  35. }
  36. type PriorityQueue struct {
  37. list *list.List
  38. QueueOp
  39. }
  40. func (Self *PriorityQueue) New() {
  41. Self.list = list.New()
  42. Self.QueueOp.New()
  43. }
  44. func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
  45. Self.mutex.Lock()
  46. switch packager.Flag {
  47. case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
  48. Self.list.PushFront(packager)
  49. // the ping package need highest priority
  50. // prevent ping calculation error
  51. case common.MUX_CONN_CLOSE:
  52. Self.insert(packager)
  53. // the close package may need priority too, set second
  54. // prevent wait too long to close conn
  55. default:
  56. Self.list.PushBack(packager)
  57. }
  58. if Self.popWait {
  59. Self.mutex.Unlock()
  60. Self.allowPop()
  61. return
  62. }
  63. Self.mutex.Unlock()
  64. return
  65. }
  66. func (Self *PriorityQueue) insert(packager *common.MuxPackager) {
  67. element := Self.list.Back()
  68. for {
  69. if element == nil { // PriorityQueue dose not have any of msg package with this close package id
  70. element = Self.list.Front()
  71. if element != nil {
  72. Self.list.InsertAfter(packager, element)
  73. // insert close package to second
  74. } else {
  75. Self.list.PushFront(packager)
  76. // list is empty, push to front
  77. }
  78. break
  79. }
  80. if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG &&
  81. element.Value.(*common.MuxPackager).Id == packager.Id {
  82. Self.list.InsertAfter(packager, element) // PriorityQueue has some msg package
  83. // with this close package id, insert close package after last msg package
  84. break
  85. }
  86. element = element.Prev()
  87. }
  88. }
  89. func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
  90. Self.mutex.Lock()
  91. element := Self.list.Front()
  92. if element != nil {
  93. packager = element.Value.(*common.MuxPackager)
  94. Self.list.Remove(element)
  95. Self.mutex.Unlock()
  96. return
  97. }
  98. Self.popWait = true // PriorityQueue is empty, notice Push method
  99. Self.mutex.Unlock()
  100. select {
  101. case <-Self.readOp:
  102. return Self.Pop()
  103. case <-Self.cleanOp:
  104. return nil
  105. }
  106. }
  107. func (Self *PriorityQueue) Len() (n int) {
  108. n = Self.list.Len()
  109. return
  110. }
  111. type ListElement struct {
  112. buf []byte
  113. l uint16
  114. part bool
  115. }
  116. func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
  117. if uint16(len(buf)) != l {
  118. return errors.New("ListElement: buf length not match")
  119. }
  120. Self.buf = buf
  121. Self.l = l
  122. Self.part = part
  123. return nil
  124. }
  125. type FIFOQueue struct {
  126. list []*ListElement
  127. length uint32
  128. stopOp chan struct{}
  129. timeout time.Time
  130. QueueOp
  131. }
  132. func (Self *FIFOQueue) New() {
  133. Self.QueueOp.New()
  134. Self.stopOp = make(chan struct{}, 1)
  135. }
  136. func (Self *FIFOQueue) Push(element *ListElement) {
  137. Self.mutex.Lock()
  138. Self.list = append(Self.list, element)
  139. Self.length += uint32(element.l)
  140. if Self.popWait {
  141. Self.mutex.Unlock()
  142. Self.allowPop()
  143. return
  144. }
  145. Self.mutex.Unlock()
  146. return
  147. }
  148. func (Self *FIFOQueue) Pop() (element *ListElement, err error) {
  149. Self.mutex.Lock()
  150. if len(Self.list) == 0 {
  151. Self.popWait = true
  152. Self.mutex.Unlock()
  153. t := Self.timeout.Sub(time.Now())
  154. if t <= 0 {
  155. t = time.Minute
  156. }
  157. timer := time.NewTimer(t)
  158. defer timer.Stop()
  159. select {
  160. case <-Self.readOp:
  161. Self.mutex.Lock()
  162. case <-Self.cleanOp:
  163. return
  164. case <-Self.stopOp:
  165. err = io.EOF
  166. return
  167. case <-timer.C:
  168. err = errors.New("mux.queue: read time out")
  169. return
  170. }
  171. }
  172. element = Self.list[0]
  173. Self.list = Self.list[1:]
  174. Self.length -= uint32(element.l)
  175. Self.mutex.Unlock()
  176. return
  177. }
  178. func (Self *FIFOQueue) Len() (n uint32) {
  179. return Self.length
  180. }
  181. func (Self *FIFOQueue) Stop() {
  182. Self.stopOp <- struct{}{}
  183. }
  184. func (Self *FIFOQueue) SetTimeOut(t time.Time) {
  185. Self.timeout = t
  186. }