1
0

queue.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/common"
  5. "io"
  6. "math"
  7. "runtime"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "unsafe"
  12. )
  13. type PriorityQueue struct {
  14. highestChain *bufChain
  15. middleChain *bufChain
  16. lowestChain *bufChain
  17. starving uint8
  18. stop bool
  19. cond *sync.Cond
  20. }
  21. func (Self *PriorityQueue) New() {
  22. Self.highestChain = new(bufChain)
  23. Self.highestChain.new(4)
  24. Self.middleChain = new(bufChain)
  25. Self.middleChain.new(32)
  26. Self.lowestChain = new(bufChain)
  27. Self.lowestChain.new(256)
  28. locker := new(sync.Mutex)
  29. Self.cond = sync.NewCond(locker)
  30. }
  31. func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
  32. switch packager.Flag {
  33. case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
  34. Self.highestChain.pushHead(unsafe.Pointer(packager))
  35. // the ping package need highest priority
  36. // prevent ping calculation error
  37. case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail:
  38. // the new conn package need some priority too
  39. Self.middleChain.pushHead(unsafe.Pointer(packager))
  40. default:
  41. Self.lowestChain.pushHead(unsafe.Pointer(packager))
  42. }
  43. //atomic.AddUint32(&Self.count, 1)
  44. Self.cond.Signal()
  45. return
  46. }
  47. const maxStarving uint8 = 8
  48. func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
  49. // PriorityQueue is empty, notice Push method
  50. var iter bool
  51. for {
  52. packager = Self.pop()
  53. if packager != nil {
  54. return
  55. }
  56. if Self.stop {
  57. return
  58. }
  59. if iter {
  60. break
  61. }
  62. iter = true
  63. runtime.Gosched()
  64. }
  65. Self.cond.L.Lock()
  66. defer Self.cond.L.Unlock()
  67. for packager = Self.pop(); packager == nil; {
  68. if Self.stop {
  69. return
  70. }
  71. Self.cond.Wait()
  72. packager = Self.pop()
  73. }
  74. //atomic.AddUint32(&Self.count, ^uint32(0))
  75. return
  76. }
  77. func (Self *PriorityQueue) pop() (packager *common.MuxPackager) {
  78. ptr, ok := Self.highestChain.popTail()
  79. if ok {
  80. packager = (*common.MuxPackager)(ptr)
  81. return
  82. }
  83. if Self.starving < maxStarving {
  84. ptr, ok = Self.middleChain.popTail()
  85. if ok {
  86. packager = (*common.MuxPackager)(ptr)
  87. Self.starving++
  88. return
  89. }
  90. }
  91. ptr, ok = Self.lowestChain.popTail()
  92. if ok {
  93. packager = (*common.MuxPackager)(ptr)
  94. if Self.starving > 0 {
  95. Self.starving = uint8(Self.starving / 2)
  96. }
  97. return
  98. }
  99. if Self.starving > 0 {
  100. ptr, ok = Self.middleChain.popTail()
  101. if ok {
  102. packager = (*common.MuxPackager)(ptr)
  103. Self.starving++
  104. return
  105. }
  106. }
  107. return
  108. }
  109. func (Self *PriorityQueue) Stop() {
  110. Self.stop = true
  111. Self.cond.Broadcast()
  112. }
  113. type ListElement struct {
  114. buf []byte
  115. l uint16
  116. part bool
  117. }
  118. func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
  119. if uint16(len(buf)) != l {
  120. return errors.New("ListElement: buf length not match")
  121. }
  122. Self.buf = buf
  123. Self.l = l
  124. Self.part = part
  125. return nil
  126. }
  127. type ReceiveWindowQueue struct {
  128. chain *bufChain
  129. length uint32
  130. stopOp chan struct{}
  131. readOp chan struct{}
  132. popWait uint32
  133. timeout time.Time
  134. }
  135. func (Self *ReceiveWindowQueue) New() {
  136. Self.readOp = make(chan struct{})
  137. Self.chain = new(bufChain)
  138. Self.chain.new(64)
  139. Self.stopOp = make(chan struct{}, 2)
  140. }
  141. func (Self *ReceiveWindowQueue) Push(element *ListElement) {
  142. Self.chain.pushHead(unsafe.Pointer(element))
  143. atomic.AddUint32(&Self.length, uint32(element.l))
  144. Self.allowPop()
  145. return
  146. }
  147. func (Self *ReceiveWindowQueue) pop() (element *ListElement) {
  148. ptr, ok := Self.chain.popTail()
  149. if ok {
  150. element = (*ListElement)(ptr)
  151. atomic.AddUint32(&Self.length, ^uint32(element.l-1))
  152. return
  153. }
  154. return
  155. }
  156. func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) {
  157. var iter bool
  158. startPop:
  159. element = Self.pop()
  160. if element != nil {
  161. return
  162. }
  163. if !iter {
  164. iter = true
  165. runtime.Gosched()
  166. goto startPop
  167. }
  168. if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) {
  169. iter = false
  170. t := Self.timeout.Sub(time.Now())
  171. if t <= 0 {
  172. t = time.Minute
  173. }
  174. timer := time.NewTimer(t)
  175. defer timer.Stop()
  176. select {
  177. case <-Self.readOp:
  178. goto startPop
  179. case <-Self.stopOp:
  180. err = io.EOF
  181. return
  182. case <-timer.C:
  183. err = errors.New("mux.queue: read time out")
  184. return
  185. }
  186. }
  187. goto startPop
  188. }
  189. func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
  190. if atomic.CompareAndSwapUint32(&Self.popWait, 1, 0) {
  191. select {
  192. case Self.readOp <- struct{}{}:
  193. return false
  194. case <-Self.stopOp:
  195. return true
  196. }
  197. }
  198. return
  199. }
  200. func (Self *ReceiveWindowQueue) Len() (n uint32) {
  201. return atomic.LoadUint32(&Self.length)
  202. }
  203. func (Self *ReceiveWindowQueue) Stop() {
  204. Self.stopOp <- struct{}{}
  205. Self.stopOp <- struct{}{}
  206. }
  207. func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) {
  208. Self.timeout = t
  209. }
  210. // https://golang.org/src/sync/poolqueue.go
  211. type bufDequeue struct {
  212. // headTail packs together a 32-bit head index and a 32-bit
  213. // tail index. Both are indexes into vals modulo len(vals)-1.
  214. //
  215. // tail = index of oldest data in queue
  216. // head = index of next slot to fill
  217. //
  218. // Slots in the range [tail, head) are owned by consumers.
  219. // A consumer continues to own a slot outside this range until
  220. // it nils the slot, at which point ownership passes to the
  221. // producer.
  222. //
  223. // The head index is stored in the most-significant bits so
  224. // that we can atomically add to it and the overflow is
  225. // harmless.
  226. headTail uint64
  227. // vals is a ring buffer of interface{} values stored in this
  228. // dequeue. The size of this must be a power of 2.
  229. //
  230. // A slot is still in use until *both* the tail
  231. // index has moved beyond it and typ has been set to nil. This
  232. // is set to nil atomically by the consumer and read
  233. // atomically by the producer.
  234. vals []unsafe.Pointer
  235. starving uint32
  236. }
  237. const dequeueBits = 32
  238. // dequeueLimit is the maximum size of a bufDequeue.
  239. //
  240. // This must be at most (1<<dequeueBits)/2 because detecting fullness
  241. // depends on wrapping around the ring buffer without wrapping around
  242. // the index. We divide by 4 so this fits in an int on 32-bit.
  243. const dequeueLimit = (1 << dequeueBits) / 4
  244. func (d *bufDequeue) unpack(ptrs uint64) (head, tail uint32) {
  245. const mask = 1<<dequeueBits - 1
  246. head = uint32((ptrs >> dequeueBits) & mask)
  247. tail = uint32(ptrs & mask)
  248. return
  249. }
  250. func (d *bufDequeue) pack(head, tail uint32) uint64 {
  251. const mask = 1<<dequeueBits - 1
  252. return (uint64(head) << dequeueBits) |
  253. uint64(tail&mask)
  254. }
  255. // pushHead adds val at the head of the queue. It returns false if the
  256. // queue is full.
  257. func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
  258. var slot *unsafe.Pointer
  259. var starve uint8
  260. if atomic.LoadUint32(&d.starving) > 0 {
  261. runtime.Gosched()
  262. }
  263. for {
  264. ptrs := atomic.LoadUint64(&d.headTail)
  265. head, tail := d.unpack(ptrs)
  266. if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  267. // Queue is full.
  268. return false
  269. }
  270. ptrs2 := d.pack(head+1, tail)
  271. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  272. slot = &d.vals[head&uint32(len(d.vals)-1)]
  273. if starve >= 3 && atomic.LoadUint32(&d.starving) > 0 {
  274. atomic.StoreUint32(&d.starving, 0)
  275. }
  276. break
  277. }
  278. starve++
  279. if starve >= 3 {
  280. atomic.StoreUint32(&d.starving, 1)
  281. }
  282. }
  283. // The head slot is free, so we own it.
  284. *slot = val
  285. return true
  286. }
  287. // popTail removes and returns the element at the tail of the queue.
  288. // It returns false if the queue is empty. It may be called by any
  289. // number of consumers.
  290. func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
  291. ptrs := atomic.LoadUint64(&d.headTail)
  292. head, tail := d.unpack(ptrs)
  293. if tail == head {
  294. // Queue is empty.
  295. return nil, false
  296. }
  297. slot := &d.vals[tail&uint32(len(d.vals)-1)]
  298. var val unsafe.Pointer
  299. for {
  300. val = atomic.LoadPointer(slot)
  301. if val != nil {
  302. // We now own slot.
  303. break
  304. }
  305. // Another goroutine is still pushing data on the tail.
  306. }
  307. // Tell pushHead that we're done with this slot. Zeroing the
  308. // slot is also important so we don't leave behind references
  309. // that could keep this object live longer than necessary.
  310. //
  311. // We write to val first and then publish that we're done with
  312. atomic.StorePointer(slot, nil)
  313. // At this point pushHead owns the slot.
  314. if tail < math.MaxUint32 {
  315. atomic.AddUint64(&d.headTail, 1)
  316. } else {
  317. atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1))
  318. }
  319. return val, true
  320. }
  321. // bufChain is a dynamically-sized version of bufDequeue.
  322. //
  323. // This is implemented as a doubly-linked list queue of poolDequeues
  324. // where each dequeue is double the size of the previous one. Once a
  325. // dequeue fills up, this allocates a new one and only ever pushes to
  326. // the latest dequeue. Pops happen from the other end of the list and
  327. // once a dequeue is exhausted, it gets removed from the list.
  328. type bufChain struct {
  329. // head is the bufDequeue to push to. This is only accessed
  330. // by the producer, so doesn't need to be synchronized.
  331. head *bufChainElt
  332. // tail is the bufDequeue to popTail from. This is accessed
  333. // by consumers, so reads and writes must be atomic.
  334. tail *bufChainElt
  335. newChain uint32
  336. }
  337. type bufChainElt struct {
  338. bufDequeue
  339. // next and prev link to the adjacent poolChainElts in this
  340. // bufChain.
  341. //
  342. // next is written atomically by the producer and read
  343. // atomically by the consumer. It only transitions from nil to
  344. // non-nil.
  345. //
  346. // prev is written atomically by the consumer and read
  347. // atomically by the producer. It only transitions from
  348. // non-nil to nil.
  349. next, prev *bufChainElt
  350. }
  351. func storePoolChainElt(pp **bufChainElt, v *bufChainElt) {
  352. atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
  353. }
  354. func loadPoolChainElt(pp **bufChainElt) *bufChainElt {
  355. return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
  356. }
  357. func (c *bufChain) new(initSize int) {
  358. // Initialize the chain.
  359. // initSize must be a power of 2
  360. d := new(bufChainElt)
  361. d.vals = make([]unsafe.Pointer, initSize)
  362. storePoolChainElt(&c.head, d)
  363. storePoolChainElt(&c.tail, d)
  364. }
  365. func (c *bufChain) pushHead(val unsafe.Pointer) {
  366. startPush:
  367. for {
  368. if atomic.LoadUint32(&c.newChain) > 0 {
  369. runtime.Gosched()
  370. } else {
  371. break
  372. }
  373. }
  374. d := loadPoolChainElt(&c.head)
  375. if d.pushHead(val) {
  376. return
  377. }
  378. // The current dequeue is full. Allocate a new one of twice
  379. // the size.
  380. if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) {
  381. newSize := len(d.vals) * 2
  382. if newSize >= dequeueLimit {
  383. // Can't make it any bigger.
  384. newSize = dequeueLimit
  385. }
  386. d2 := &bufChainElt{prev: d}
  387. d2.vals = make([]unsafe.Pointer, newSize)
  388. d2.pushHead(val)
  389. storePoolChainElt(&c.head, d2)
  390. storePoolChainElt(&d.next, d2)
  391. atomic.StoreUint32(&c.newChain, 0)
  392. return
  393. }
  394. goto startPush
  395. }
  396. func (c *bufChain) popTail() (unsafe.Pointer, bool) {
  397. d := loadPoolChainElt(&c.tail)
  398. if d == nil {
  399. return nil, false
  400. }
  401. for {
  402. // It's important that we load the next pointer
  403. // *before* popping the tail. In general, d may be
  404. // transiently empty, but if next is non-nil before
  405. // the pop and the pop fails, then d is permanently
  406. // empty, which is the only condition under which it's
  407. // safe to drop d from the chain.
  408. d2 := loadPoolChainElt(&d.next)
  409. if val, ok := d.popTail(); ok {
  410. return val, ok
  411. }
  412. if d2 == nil {
  413. // This is the only dequeue. It's empty right
  414. // now, but could be pushed to in the future.
  415. return nil, false
  416. }
  417. // The tail of the chain has been drained, so move on
  418. // to the next dequeue. Try to drop it from the chain
  419. // so the next pop doesn't have to look at the empty
  420. // dequeue again.
  421. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
  422. // We won the race. Clear the prev pointer so
  423. // the garbage collector can collect the empty
  424. // dequeue and so popHead doesn't back up
  425. // further than necessary.
  426. storePoolChainElt(&d2.prev, nil)
  427. }
  428. d = d2
  429. }
  430. }