queue.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/common"
  5. "io"
  6. "math"
  7. "sync/atomic"
  8. "time"
  9. "unsafe"
  10. )
  11. type QueueOp struct {
  12. readOp chan struct{}
  13. cleanOp chan struct{}
  14. popWait int32
  15. }
  16. func (Self *QueueOp) New() {
  17. Self.readOp = make(chan struct{})
  18. Self.cleanOp = make(chan struct{}, 2)
  19. }
  20. func (Self *QueueOp) allowPop() (closed bool) {
  21. if atomic.CompareAndSwapInt32(&Self.popWait, 1, 0) {
  22. select {
  23. case Self.readOp <- struct{}{}:
  24. return false
  25. case <-Self.cleanOp:
  26. return true
  27. }
  28. }
  29. return
  30. }
  31. func (Self *QueueOp) Clean() {
  32. Self.cleanOp <- struct{}{}
  33. Self.cleanOp <- struct{}{}
  34. close(Self.cleanOp)
  35. }
  36. type PriorityQueue struct {
  37. QueueOp
  38. highestChain *bufChain
  39. middleChain *bufChain
  40. lowestChain *bufChain
  41. hunger uint8
  42. }
  43. func (Self *PriorityQueue) New() {
  44. Self.highestChain = new(bufChain)
  45. Self.highestChain.new(4)
  46. Self.middleChain = new(bufChain)
  47. Self.middleChain.new(32)
  48. Self.lowestChain = new(bufChain)
  49. Self.lowestChain.new(256)
  50. Self.QueueOp.New()
  51. }
  52. func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
  53. switch packager.Flag {
  54. case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
  55. Self.highestChain.pushHead(unsafe.Pointer(packager))
  56. // the ping package need highest priority
  57. // prevent ping calculation error
  58. case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail:
  59. // the new conn package need some priority too
  60. Self.middleChain.pushHead(unsafe.Pointer(packager))
  61. default:
  62. Self.lowestChain.pushHead(unsafe.Pointer(packager))
  63. }
  64. Self.allowPop()
  65. return
  66. }
  67. func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
  68. startPop:
  69. ptr, ok := Self.highestChain.popTail()
  70. if ok {
  71. packager = (*common.MuxPackager)(ptr)
  72. return
  73. }
  74. if Self.hunger < 100 {
  75. ptr, ok = Self.middleChain.popTail()
  76. if ok {
  77. packager = (*common.MuxPackager)(ptr)
  78. Self.hunger++
  79. return
  80. }
  81. }
  82. ptr, ok = Self.lowestChain.popTail()
  83. if ok {
  84. packager = (*common.MuxPackager)(ptr)
  85. if Self.hunger > 0 {
  86. Self.hunger = uint8(Self.hunger / 2)
  87. }
  88. return
  89. }
  90. // PriorityQueue is empty, notice Push method
  91. if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) {
  92. select {
  93. case <-Self.readOp:
  94. goto startPop
  95. case <-Self.cleanOp:
  96. return nil
  97. }
  98. }
  99. goto startPop
  100. }
  101. type ListElement struct {
  102. buf []byte
  103. l uint16
  104. part bool
  105. }
  106. func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
  107. if uint16(len(buf)) != l {
  108. return errors.New("ListElement: buf length not match")
  109. }
  110. Self.buf = buf
  111. Self.l = l
  112. Self.part = part
  113. return nil
  114. }
  115. type FIFOQueue struct {
  116. QueueOp
  117. chain *bufChain
  118. length uint32
  119. stopOp chan struct{}
  120. timeout time.Time
  121. }
  122. func (Self *FIFOQueue) New() {
  123. Self.QueueOp.New()
  124. Self.chain = new(bufChain)
  125. Self.chain.new(64)
  126. Self.stopOp = make(chan struct{}, 1)
  127. }
  128. func (Self *FIFOQueue) Push(element *ListElement) {
  129. Self.chain.pushHead(unsafe.Pointer(element))
  130. Self.length += uint32(element.l)
  131. Self.allowPop()
  132. return
  133. }
  134. func (Self *FIFOQueue) Pop() (element *ListElement, err error) {
  135. startPop:
  136. ptr, ok := Self.chain.popTail()
  137. if ok {
  138. element = (*ListElement)(ptr)
  139. Self.length -= uint32(element.l)
  140. return
  141. }
  142. if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) {
  143. t := Self.timeout.Sub(time.Now())
  144. if t <= 0 {
  145. t = time.Minute
  146. }
  147. timer := time.NewTimer(t)
  148. defer timer.Stop()
  149. select {
  150. case <-Self.readOp:
  151. goto startPop
  152. case <-Self.cleanOp:
  153. return
  154. case <-Self.stopOp:
  155. err = io.EOF
  156. return
  157. case <-timer.C:
  158. err = errors.New("mux.queue: read time out")
  159. return
  160. }
  161. }
  162. goto startPop
  163. }
  164. func (Self *FIFOQueue) Len() (n uint32) {
  165. return Self.length
  166. }
  167. func (Self *FIFOQueue) Stop() {
  168. Self.stopOp <- struct{}{}
  169. }
  170. func (Self *FIFOQueue) SetTimeOut(t time.Time) {
  171. Self.timeout = t
  172. }
  173. // https://golang.org/src/sync/poolqueue.go
  174. type bufDequeue struct {
  175. // headTail packs together a 32-bit head index and a 32-bit
  176. // tail index. Both are indexes into vals modulo len(vals)-1.
  177. //
  178. // tail = index of oldest data in queue
  179. // head = index of next slot to fill
  180. //
  181. // Slots in the range [tail, head) are owned by consumers.
  182. // A consumer continues to own a slot outside this range until
  183. // it nils the slot, at which point ownership passes to the
  184. // producer.
  185. //
  186. // The head index is stored in the most-significant bits so
  187. // that we can atomically add to it and the overflow is
  188. // harmless.
  189. headTail uint64
  190. // vals is a ring buffer of interface{} values stored in this
  191. // dequeue. The size of this must be a power of 2.
  192. //
  193. // A slot is still in use until *both* the tail
  194. // index has moved beyond it and typ has been set to nil. This
  195. // is set to nil atomically by the consumer and read
  196. // atomically by the producer.
  197. vals []unsafe.Pointer
  198. }
  199. const dequeueBits = 32
  200. // dequeueLimit is the maximum size of a bufDequeue.
  201. //
  202. // This must be at most (1<<dequeueBits)/2 because detecting fullness
  203. // depends on wrapping around the ring buffer without wrapping around
  204. // the index. We divide by 4 so this fits in an int on 32-bit.
  205. const dequeueLimit = (1 << dequeueBits) / 4
  206. func (d *bufDequeue) unpack(ptrs uint64) (head, tail uint32) {
  207. const mask = 1<<dequeueBits - 1
  208. head = uint32((ptrs >> dequeueBits) & mask)
  209. tail = uint32(ptrs & mask)
  210. return
  211. }
  212. func (d *bufDequeue) pack(head, tail uint32) uint64 {
  213. const mask = 1<<dequeueBits - 1
  214. return (uint64(head) << dequeueBits) |
  215. uint64(tail&mask)
  216. }
  217. // pushHead adds val at the head of the queue. It returns false if the
  218. // queue is full.
  219. func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
  220. var slot *unsafe.Pointer
  221. for {
  222. ptrs := atomic.LoadUint64(&d.headTail)
  223. head, tail := d.unpack(ptrs)
  224. if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  225. // Queue is full.
  226. return false
  227. }
  228. ptrs2 := d.pack(head+1, tail)
  229. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  230. slot = &d.vals[head&uint32(len(d.vals)-1)]
  231. break
  232. }
  233. }
  234. // The head slot is free, so we own it.
  235. *slot = val
  236. return true
  237. }
  238. // popTail removes and returns the element at the tail of the queue.
  239. // It returns false if the queue is empty. It may be called by any
  240. // number of consumers.
  241. func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
  242. ptrs := atomic.LoadUint64(&d.headTail)
  243. head, tail := d.unpack(ptrs)
  244. if tail == head {
  245. // Queue is empty.
  246. return nil, false
  247. }
  248. slot := &d.vals[tail&uint32(len(d.vals)-1)]
  249. for {
  250. typ := atomic.LoadPointer(slot)
  251. if typ != nil {
  252. break
  253. }
  254. // Another goroutine is still pushing data on the tail.
  255. }
  256. // We now own slot.
  257. val := *slot
  258. // Tell pushHead that we're done with this slot. Zeroing the
  259. // slot is also important so we don't leave behind references
  260. // that could keep this object live longer than necessary.
  261. //
  262. // We write to val first and then publish that we're done with
  263. atomic.StorePointer(slot, nil)
  264. // At this point pushHead owns the slot.
  265. if tail < math.MaxUint32 {
  266. atomic.AddUint64(&d.headTail, 1)
  267. } else {
  268. atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1))
  269. }
  270. return val, true
  271. }
  272. // bufChain is a dynamically-sized version of bufDequeue.
  273. //
  274. // This is implemented as a doubly-linked list queue of poolDequeues
  275. // where each dequeue is double the size of the previous one. Once a
  276. // dequeue fills up, this allocates a new one and only ever pushes to
  277. // the latest dequeue. Pops happen from the other end of the list and
  278. // once a dequeue is exhausted, it gets removed from the list.
  279. type bufChain struct {
  280. // head is the bufDequeue to push to. This is only accessed
  281. // by the producer, so doesn't need to be synchronized.
  282. head *bufChainElt
  283. // tail is the bufDequeue to popTail from. This is accessed
  284. // by consumers, so reads and writes must be atomic.
  285. tail *bufChainElt
  286. chainStatus int32
  287. }
  288. type bufChainElt struct {
  289. bufDequeue
  290. // next and prev link to the adjacent poolChainElts in this
  291. // bufChain.
  292. //
  293. // next is written atomically by the producer and read
  294. // atomically by the consumer. It only transitions from nil to
  295. // non-nil.
  296. //
  297. // prev is written atomically by the consumer and read
  298. // atomically by the producer. It only transitions from
  299. // non-nil to nil.
  300. next, prev *bufChainElt
  301. }
  302. func storePoolChainElt(pp **bufChainElt, v *bufChainElt) {
  303. atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
  304. }
  305. func loadPoolChainElt(pp **bufChainElt) *bufChainElt {
  306. return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
  307. }
  308. func (c *bufChain) new(initSize int) {
  309. // Initialize the chain.
  310. // initSize must be a power of 2
  311. d := new(bufChainElt)
  312. d.vals = make([]unsafe.Pointer, initSize)
  313. storePoolChainElt(&c.head, d)
  314. storePoolChainElt(&c.tail, d)
  315. }
  316. func (c *bufChain) pushHead(val unsafe.Pointer) {
  317. for {
  318. d := loadPoolChainElt(&c.head)
  319. if d.pushHead(val) {
  320. return
  321. }
  322. // The current dequeue is full. Allocate a new one of twice
  323. // the size.
  324. if atomic.CompareAndSwapInt32(&c.chainStatus, 0, 1) {
  325. newSize := len(d.vals) * 2
  326. if newSize >= dequeueLimit {
  327. // Can't make it any bigger.
  328. newSize = dequeueLimit
  329. }
  330. d2 := &bufChainElt{prev: d}
  331. d2.vals = make([]unsafe.Pointer, newSize)
  332. storePoolChainElt(&c.head, d2)
  333. storePoolChainElt(&d.next, d2)
  334. d2.pushHead(val)
  335. atomic.SwapInt32(&c.chainStatus, 0)
  336. }
  337. }
  338. }
  339. func (c *bufChain) popTail() (unsafe.Pointer, bool) {
  340. d := loadPoolChainElt(&c.tail)
  341. if d == nil {
  342. return nil, false
  343. }
  344. for {
  345. // It's important that we load the next pointer
  346. // *before* popping the tail. In general, d may be
  347. // transiently empty, but if next is non-nil before
  348. // the pop and the pop fails, then d is permanently
  349. // empty, which is the only condition under which it's
  350. // safe to drop d from the chain.
  351. d2 := loadPoolChainElt(&d.next)
  352. if val, ok := d.popTail(); ok {
  353. return val, ok
  354. }
  355. if d2 == nil {
  356. // This is the only dequeue. It's empty right
  357. // now, but could be pushed to in the future.
  358. return nil, false
  359. }
  360. // The tail of the chain has been drained, so move on
  361. // to the next dequeue. Try to drop it from the chain
  362. // so the next pop doesn't have to look at the empty
  363. // dequeue again.
  364. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
  365. // We won the race. Clear the prev pointer so
  366. // the garbage collector can collect the empty
  367. // dequeue and so popHead doesn't back up
  368. // further than necessary.
  369. storePoolChainElt(&d2.prev, nil)
  370. }
  371. d = d2
  372. }
  373. }