queue.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/common"
  5. "io"
  6. "math"
  7. "sync/atomic"
  8. "time"
  9. "unsafe"
  10. )
  11. type QueueOp struct {
  12. readOp chan struct{}
  13. cleanOp chan struct{}
  14. popWait int32
  15. }
  16. func (Self *QueueOp) New() {
  17. Self.readOp = make(chan struct{})
  18. Self.cleanOp = make(chan struct{}, 2)
  19. }
  20. func (Self *QueueOp) allowPop() (closed bool) {
  21. if atomic.CompareAndSwapInt32(&Self.popWait, 1, 0) {
  22. select {
  23. case Self.readOp <- struct{}{}:
  24. return false
  25. case <-Self.cleanOp:
  26. return true
  27. }
  28. }
  29. return
  30. }
  31. func (Self *QueueOp) Clean() {
  32. Self.cleanOp <- struct{}{}
  33. Self.cleanOp <- struct{}{}
  34. close(Self.cleanOp)
  35. }
  36. type PriorityQueue struct {
  37. QueueOp
  38. highestChain *bufChain
  39. middleChain *bufChain
  40. lowestChain *bufChain
  41. hunger uint8
  42. }
  43. func (Self *PriorityQueue) New() {
  44. Self.highestChain = new(bufChain)
  45. Self.highestChain.new(4)
  46. Self.middleChain = new(bufChain)
  47. Self.middleChain.new(32)
  48. Self.lowestChain = new(bufChain)
  49. Self.lowestChain.new(256)
  50. Self.QueueOp.New()
  51. }
  52. func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
  53. switch packager.Flag {
  54. case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
  55. Self.highestChain.pushHead(unsafe.Pointer(packager))
  56. // the ping package need highest priority
  57. // prevent ping calculation error
  58. case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail:
  59. // the new conn package need some priority too
  60. Self.middleChain.pushHead(unsafe.Pointer(packager))
  61. default:
  62. Self.lowestChain.pushHead(unsafe.Pointer(packager))
  63. }
  64. Self.allowPop()
  65. return
  66. }
  67. const maxHunger uint8 = 10
  68. func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
  69. startPop:
  70. ptr, ok := Self.highestChain.popTail()
  71. if ok {
  72. packager = (*common.MuxPackager)(ptr)
  73. return
  74. }
  75. if Self.hunger < maxHunger {
  76. ptr, ok = Self.middleChain.popTail()
  77. if ok {
  78. packager = (*common.MuxPackager)(ptr)
  79. Self.hunger++
  80. return
  81. }
  82. }
  83. ptr, ok = Self.lowestChain.popTail()
  84. if ok {
  85. packager = (*common.MuxPackager)(ptr)
  86. if Self.hunger > 0 {
  87. Self.hunger = uint8(Self.hunger / 2)
  88. }
  89. return
  90. }
  91. if Self.hunger > 0 {
  92. ptr, ok = Self.middleChain.popTail()
  93. if ok {
  94. packager = (*common.MuxPackager)(ptr)
  95. return
  96. }
  97. }
  98. // PriorityQueue is empty, notice Push method
  99. if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) {
  100. select {
  101. case <-Self.readOp:
  102. goto startPop
  103. case <-Self.cleanOp:
  104. return nil
  105. }
  106. }
  107. goto startPop
  108. }
  109. type ListElement struct {
  110. buf []byte
  111. l uint16
  112. part bool
  113. }
  114. func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
  115. if uint16(len(buf)) != l {
  116. return errors.New("ListElement: buf length not match")
  117. }
  118. Self.buf = buf
  119. Self.l = l
  120. Self.part = part
  121. return nil
  122. }
  123. type FIFOQueue struct {
  124. QueueOp
  125. chain *bufChain
  126. length uint32
  127. stopOp chan struct{}
  128. timeout time.Time
  129. }
  130. func (Self *FIFOQueue) New() {
  131. Self.QueueOp.New()
  132. Self.chain = new(bufChain)
  133. Self.chain.new(64)
  134. Self.stopOp = make(chan struct{}, 1)
  135. }
  136. func (Self *FIFOQueue) Push(element *ListElement) {
  137. Self.chain.pushHead(unsafe.Pointer(element))
  138. atomic.AddUint32(&Self.length, uint32(element.l))
  139. Self.allowPop()
  140. return
  141. }
  142. func (Self *FIFOQueue) Pop() (element *ListElement, err error) {
  143. startPop:
  144. ptr, ok := Self.chain.popTail()
  145. if ok {
  146. element = (*ListElement)(ptr)
  147. atomic.AddUint32(&Self.length, ^uint32(element.l-1))
  148. return
  149. }
  150. if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) {
  151. t := Self.timeout.Sub(time.Now())
  152. if t <= 0 {
  153. t = time.Minute
  154. }
  155. timer := time.NewTimer(t)
  156. defer timer.Stop()
  157. select {
  158. case <-Self.readOp:
  159. goto startPop
  160. case <-Self.cleanOp:
  161. return
  162. case <-Self.stopOp:
  163. err = io.EOF
  164. return
  165. case <-timer.C:
  166. err = errors.New("mux.queue: read time out")
  167. return
  168. }
  169. }
  170. goto startPop
  171. }
  172. func (Self *FIFOQueue) Len() (n uint32) {
  173. return atomic.LoadUint32(&Self.length)
  174. }
  175. func (Self *FIFOQueue) Stop() {
  176. Self.stopOp <- struct{}{}
  177. }
  178. func (Self *FIFOQueue) SetTimeOut(t time.Time) {
  179. Self.timeout = t
  180. }
  181. // https://golang.org/src/sync/poolqueue.go
  182. type bufDequeue struct {
  183. // headTail packs together a 32-bit head index and a 32-bit
  184. // tail index. Both are indexes into vals modulo len(vals)-1.
  185. //
  186. // tail = index of oldest data in queue
  187. // head = index of next slot to fill
  188. //
  189. // Slots in the range [tail, head) are owned by consumers.
  190. // A consumer continues to own a slot outside this range until
  191. // it nils the slot, at which point ownership passes to the
  192. // producer.
  193. //
  194. // The head index is stored in the most-significant bits so
  195. // that we can atomically add to it and the overflow is
  196. // harmless.
  197. headTail uint64
  198. // vals is a ring buffer of interface{} values stored in this
  199. // dequeue. The size of this must be a power of 2.
  200. //
  201. // A slot is still in use until *both* the tail
  202. // index has moved beyond it and typ has been set to nil. This
  203. // is set to nil atomically by the consumer and read
  204. // atomically by the producer.
  205. vals []unsafe.Pointer
  206. }
  207. const dequeueBits = 32
  208. // dequeueLimit is the maximum size of a bufDequeue.
  209. //
  210. // This must be at most (1<<dequeueBits)/2 because detecting fullness
  211. // depends on wrapping around the ring buffer without wrapping around
  212. // the index. We divide by 4 so this fits in an int on 32-bit.
  213. const dequeueLimit = (1 << dequeueBits) / 4
  214. func (d *bufDequeue) unpack(ptrs uint64) (head, tail uint32) {
  215. const mask = 1<<dequeueBits - 1
  216. head = uint32((ptrs >> dequeueBits) & mask)
  217. tail = uint32(ptrs & mask)
  218. return
  219. }
  220. func (d *bufDequeue) pack(head, tail uint32) uint64 {
  221. const mask = 1<<dequeueBits - 1
  222. return (uint64(head) << dequeueBits) |
  223. uint64(tail&mask)
  224. }
  225. // pushHead adds val at the head of the queue. It returns false if the
  226. // queue is full.
  227. func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
  228. var slot *unsafe.Pointer
  229. for {
  230. ptrs := atomic.LoadUint64(&d.headTail)
  231. head, tail := d.unpack(ptrs)
  232. if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  233. // Queue is full.
  234. return false
  235. }
  236. ptrs2 := d.pack(head+1, tail)
  237. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  238. slot = &d.vals[head&uint32(len(d.vals)-1)]
  239. break
  240. }
  241. }
  242. // The head slot is free, so we own it.
  243. *slot = val
  244. return true
  245. }
  246. // popTail removes and returns the element at the tail of the queue.
  247. // It returns false if the queue is empty. It may be called by any
  248. // number of consumers.
  249. func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
  250. ptrs := atomic.LoadUint64(&d.headTail)
  251. head, tail := d.unpack(ptrs)
  252. if tail == head {
  253. // Queue is empty.
  254. return nil, false
  255. }
  256. slot := &d.vals[tail&uint32(len(d.vals)-1)]
  257. var val unsafe.Pointer
  258. for {
  259. val = atomic.LoadPointer(slot)
  260. if val != nil {
  261. // We now own slot.
  262. break
  263. }
  264. // Another goroutine is still pushing data on the tail.
  265. }
  266. // Tell pushHead that we're done with this slot. Zeroing the
  267. // slot is also important so we don't leave behind references
  268. // that could keep this object live longer than necessary.
  269. //
  270. // We write to val first and then publish that we're done with
  271. atomic.StorePointer(slot, nil)
  272. // At this point pushHead owns the slot.
  273. if tail < math.MaxUint32 {
  274. atomic.AddUint64(&d.headTail, 1)
  275. } else {
  276. atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1))
  277. }
  278. return val, true
  279. }
  280. // bufChain is a dynamically-sized version of bufDequeue.
  281. //
  282. // This is implemented as a doubly-linked list queue of poolDequeues
  283. // where each dequeue is double the size of the previous one. Once a
  284. // dequeue fills up, this allocates a new one and only ever pushes to
  285. // the latest dequeue. Pops happen from the other end of the list and
  286. // once a dequeue is exhausted, it gets removed from the list.
  287. type bufChain struct {
  288. // head is the bufDequeue to push to. This is only accessed
  289. // by the producer, so doesn't need to be synchronized.
  290. head *bufChainElt
  291. // tail is the bufDequeue to popTail from. This is accessed
  292. // by consumers, so reads and writes must be atomic.
  293. tail *bufChainElt
  294. chainStatus int32
  295. }
  296. type bufChainElt struct {
  297. bufDequeue
  298. // next and prev link to the adjacent poolChainElts in this
  299. // bufChain.
  300. //
  301. // next is written atomically by the producer and read
  302. // atomically by the consumer. It only transitions from nil to
  303. // non-nil.
  304. //
  305. // prev is written atomically by the consumer and read
  306. // atomically by the producer. It only transitions from
  307. // non-nil to nil.
  308. next, prev *bufChainElt
  309. }
  310. func storePoolChainElt(pp **bufChainElt, v *bufChainElt) {
  311. atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
  312. }
  313. func loadPoolChainElt(pp **bufChainElt) *bufChainElt {
  314. return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
  315. }
  316. func (c *bufChain) new(initSize int) {
  317. // Initialize the chain.
  318. // initSize must be a power of 2
  319. d := new(bufChainElt)
  320. d.vals = make([]unsafe.Pointer, initSize)
  321. storePoolChainElt(&c.head, d)
  322. storePoolChainElt(&c.tail, d)
  323. }
  324. func (c *bufChain) pushHead(val unsafe.Pointer) {
  325. for {
  326. d := loadPoolChainElt(&c.head)
  327. if d.pushHead(val) {
  328. return
  329. }
  330. // The current dequeue is full. Allocate a new one of twice
  331. // the size.
  332. if atomic.CompareAndSwapInt32(&c.chainStatus, 0, 1) {
  333. newSize := len(d.vals) * 2
  334. if newSize >= dequeueLimit {
  335. // Can't make it any bigger.
  336. newSize = dequeueLimit
  337. }
  338. d2 := &bufChainElt{prev: d}
  339. d2.vals = make([]unsafe.Pointer, newSize)
  340. d2.pushHead(val)
  341. storePoolChainElt(&d.next, d2)
  342. storePoolChainElt(&c.head, d2)
  343. atomic.StoreInt32(&c.chainStatus, 0)
  344. }
  345. }
  346. }
  347. func (c *bufChain) popTail() (unsafe.Pointer, bool) {
  348. d := loadPoolChainElt(&c.tail)
  349. if d == nil {
  350. return nil, false
  351. }
  352. for {
  353. // It's important that we load the next pointer
  354. // *before* popping the tail. In general, d may be
  355. // transiently empty, but if next is non-nil before
  356. // the pop and the pop fails, then d is permanently
  357. // empty, which is the only condition under which it's
  358. // safe to drop d from the chain.
  359. d2 := loadPoolChainElt(&d.next)
  360. if val, ok := d.popTail(); ok {
  361. return val, ok
  362. }
  363. if d2 == nil {
  364. // This is the only dequeue. It's empty right
  365. // now, but could be pushed to in the future.
  366. return nil, false
  367. }
  368. // The tail of the chain has been drained, so move on
  369. // to the next dequeue. Try to drop it from the chain
  370. // so the next pop doesn't have to look at the empty
  371. // dequeue again.
  372. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
  373. // We won the race. Clear the prev pointer so
  374. // the garbage collector can collect the empty
  375. // dequeue and so popHead doesn't back up
  376. // further than necessary.
  377. storePoolChainElt(&d2.prev, nil)
  378. }
  379. d = d2
  380. }
  381. }