queue.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. package mux
  2. import (
  3. "ehang.io/nps/lib/common"
  4. "errors"
  5. "io"
  6. "math"
  7. "runtime"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "unsafe"
  12. )
  13. type PriorityQueue struct {
  14. highestChain *bufChain
  15. middleChain *bufChain
  16. lowestChain *bufChain
  17. starving uint8
  18. stop bool
  19. cond *sync.Cond
  20. }
  21. func (Self *PriorityQueue) New() {
  22. Self.highestChain = new(bufChain)
  23. Self.highestChain.new(4)
  24. Self.middleChain = new(bufChain)
  25. Self.middleChain.new(32)
  26. Self.lowestChain = new(bufChain)
  27. Self.lowestChain.new(256)
  28. locker := new(sync.Mutex)
  29. Self.cond = sync.NewCond(locker)
  30. }
  31. func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
  32. //logs.Warn("push start")
  33. Self.push(packager)
  34. Self.cond.Broadcast()
  35. //logs.Warn("push finish")
  36. return
  37. }
  38. func (Self *PriorityQueue) push(packager *common.MuxPackager) {
  39. switch packager.Flag {
  40. case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
  41. Self.highestChain.pushHead(unsafe.Pointer(packager))
  42. // the ping package need highest priority
  43. // prevent ping calculation error
  44. case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail:
  45. // the new conn package need some priority too
  46. Self.middleChain.pushHead(unsafe.Pointer(packager))
  47. default:
  48. Self.lowestChain.pushHead(unsafe.Pointer(packager))
  49. }
  50. }
  51. const maxStarving uint8 = 8
  52. func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
  53. var iter bool
  54. for {
  55. packager = Self.TryPop()
  56. if packager != nil {
  57. return
  58. }
  59. if Self.stop {
  60. return
  61. }
  62. if iter {
  63. break
  64. // trying to pop twice
  65. }
  66. iter = true
  67. runtime.Gosched()
  68. }
  69. Self.cond.L.Lock()
  70. defer Self.cond.L.Unlock()
  71. for packager = Self.TryPop(); packager == nil; {
  72. if Self.stop {
  73. return
  74. }
  75. //logs.Warn("queue into wait")
  76. Self.cond.Wait()
  77. // wait for it with no more iter
  78. packager = Self.TryPop()
  79. //logs.Warn("queue wait finish", packager)
  80. }
  81. return
  82. }
  83. func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) {
  84. ptr, ok := Self.highestChain.popTail()
  85. if ok {
  86. packager = (*common.MuxPackager)(ptr)
  87. return
  88. }
  89. if Self.starving < maxStarving {
  90. // not pop too much, lowestChain will wait too long
  91. ptr, ok = Self.middleChain.popTail()
  92. if ok {
  93. packager = (*common.MuxPackager)(ptr)
  94. Self.starving++
  95. return
  96. }
  97. }
  98. ptr, ok = Self.lowestChain.popTail()
  99. if ok {
  100. packager = (*common.MuxPackager)(ptr)
  101. if Self.starving > 0 {
  102. Self.starving = uint8(Self.starving / 2)
  103. }
  104. return
  105. }
  106. if Self.starving > 0 {
  107. ptr, ok = Self.middleChain.popTail()
  108. if ok {
  109. packager = (*common.MuxPackager)(ptr)
  110. Self.starving++
  111. return
  112. }
  113. }
  114. return
  115. }
  116. func (Self *PriorityQueue) Stop() {
  117. Self.stop = true
  118. Self.cond.Broadcast()
  119. }
  120. type ConnQueue struct {
  121. chain *bufChain
  122. starving uint8
  123. stop bool
  124. cond *sync.Cond
  125. }
  126. func (Self *ConnQueue) New() {
  127. Self.chain = new(bufChain)
  128. Self.chain.new(32)
  129. locker := new(sync.Mutex)
  130. Self.cond = sync.NewCond(locker)
  131. }
  132. func (Self *ConnQueue) Push(connection *conn) {
  133. Self.chain.pushHead(unsafe.Pointer(connection))
  134. Self.cond.Broadcast()
  135. return
  136. }
  137. func (Self *ConnQueue) Pop() (connection *conn) {
  138. var iter bool
  139. for {
  140. connection = Self.TryPop()
  141. if connection != nil {
  142. return
  143. }
  144. if Self.stop {
  145. return
  146. }
  147. if iter {
  148. break
  149. // trying to pop twice
  150. }
  151. iter = true
  152. runtime.Gosched()
  153. }
  154. Self.cond.L.Lock()
  155. defer Self.cond.L.Unlock()
  156. for connection = Self.TryPop(); connection == nil; {
  157. if Self.stop {
  158. return
  159. }
  160. //logs.Warn("queue into wait")
  161. Self.cond.Wait()
  162. // wait for it with no more iter
  163. connection = Self.TryPop()
  164. //logs.Warn("queue wait finish", packager)
  165. }
  166. return
  167. }
  168. func (Self *ConnQueue) TryPop() (connection *conn) {
  169. ptr, ok := Self.chain.popTail()
  170. if ok {
  171. connection = (*conn)(ptr)
  172. return
  173. }
  174. return
  175. }
  176. func (Self *ConnQueue) Stop() {
  177. Self.stop = true
  178. Self.cond.Broadcast()
  179. }
  180. func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) {
  181. if uint16(len(buf)) != l {
  182. err = errors.New("ListElement: buf length not match")
  183. return
  184. }
  185. //if l == 0 {
  186. // logs.Warn("push zero")
  187. //}
  188. element = common.ListElementPool.Get()
  189. element.Buf = buf
  190. element.L = l
  191. element.Part = part
  192. return
  193. }
  194. type ReceiveWindowQueue struct {
  195. lengthWait uint64
  196. chain *bufChain
  197. stopOp chan struct{}
  198. readOp chan struct{}
  199. // https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  200. // On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core.
  201. // On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility
  202. // to arrange for 64-bit alignment of 64-bit words accessed atomically.
  203. // The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.
  204. timeout time.Time
  205. }
  206. func NewReceiveWindowQueue() *ReceiveWindowQueue {
  207. queue := ReceiveWindowQueue{
  208. chain: new(bufChain),
  209. stopOp: make(chan struct{}, 2),
  210. readOp: make(chan struct{}),
  211. }
  212. queue.chain.new(64)
  213. return &queue
  214. }
  215. func (Self *ReceiveWindowQueue) Push(element *common.ListElement) {
  216. var length, wait uint32
  217. for {
  218. ptrs := atomic.LoadUint64(&Self.lengthWait)
  219. length, wait = Self.chain.head.unpack(ptrs)
  220. length += uint32(element.L)
  221. if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) {
  222. break
  223. }
  224. // another goroutine change the length or into wait, make sure
  225. }
  226. //logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf))
  227. Self.chain.pushHead(unsafe.Pointer(element))
  228. //logs.Warn("window push", Self.Len())
  229. if wait == 1 {
  230. Self.allowPop()
  231. }
  232. return
  233. }
  234. func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) {
  235. var length uint32
  236. startPop:
  237. ptrs := atomic.LoadUint64(&Self.lengthWait)
  238. length, _ = Self.chain.head.unpack(ptrs)
  239. if length == 0 {
  240. if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) {
  241. goto startPop // another goroutine is pushing
  242. }
  243. err = Self.waitPush()
  244. // there is no more data in queue, wait for it
  245. if err != nil {
  246. return
  247. }
  248. goto startPop // wait finish, trying to get the new status
  249. }
  250. // length is not zero, so try to pop
  251. for {
  252. element = Self.TryPop()
  253. if element != nil {
  254. return
  255. }
  256. runtime.Gosched() // another goroutine is still pushing
  257. }
  258. }
  259. func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) {
  260. ptr, ok := Self.chain.popTail()
  261. if ok {
  262. //logs.Warn("window pop before", Self.Len())
  263. element = (*common.ListElement)(ptr)
  264. atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<<dequeueBits - 1))
  265. //logs.Warn("window pop", Self.Len(), uint32(element.l))
  266. return
  267. }
  268. return nil
  269. }
  270. func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
  271. //logs.Warn("allow pop", Self.Len())
  272. select {
  273. case Self.readOp <- struct{}{}:
  274. return false
  275. case <-Self.stopOp:
  276. return true
  277. }
  278. }
  279. func (Self *ReceiveWindowQueue) waitPush() (err error) {
  280. //logs.Warn("wait push")
  281. //defer logs.Warn("wait push finish")
  282. t := Self.timeout.Sub(time.Now())
  283. if t <= 0 { // not set the timeout, so wait for it without timeout, just like a tcp connection
  284. select {
  285. case <-Self.readOp:
  286. return nil
  287. case <-Self.stopOp:
  288. err = io.EOF
  289. return
  290. }
  291. }
  292. timer := time.NewTimer(t)
  293. defer timer.Stop()
  294. //logs.Warn("queue into wait")
  295. select {
  296. case <-Self.readOp:
  297. //logs.Warn("queue wait finish")
  298. return nil
  299. case <-Self.stopOp:
  300. err = io.EOF
  301. return
  302. case <-timer.C:
  303. err = errors.New("mux.queue: read time out")
  304. return
  305. }
  306. }
  307. func (Self *ReceiveWindowQueue) Len() (n uint32) {
  308. ptrs := atomic.LoadUint64(&Self.lengthWait)
  309. n, _ = Self.chain.head.unpack(ptrs)
  310. return
  311. }
  312. func (Self *ReceiveWindowQueue) Stop() {
  313. Self.stopOp <- struct{}{}
  314. Self.stopOp <- struct{}{}
  315. }
  316. func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) {
  317. Self.timeout = t
  318. }
  319. // https://golang.org/src/sync/poolqueue.go
  320. type bufDequeue struct {
  321. // headTail packs together a 32-bit head index and a 32-bit
  322. // tail index. Both are indexes into vals modulo len(vals)-1.
  323. //
  324. // tail = index of oldest data in queue
  325. // head = index of next slot to fill
  326. //
  327. // Slots in the range [tail, head) are owned by consumers.
  328. // A consumer continues to own a slot outside this range until
  329. // it nils the slot, at which point ownership passes to the
  330. // producer.
  331. //
  332. // The head index is stored in the most-significant bits so
  333. // that we can atomically add to it and the overflow is
  334. // harmless.
  335. headTail uint64
  336. // vals is a ring buffer of interface{} values stored in this
  337. // dequeue. The size of this must be a power of 2.
  338. //
  339. // A slot is still in use until *both* the tail
  340. // index has moved beyond it and typ has been set to nil. This
  341. // is set to nil atomically by the consumer and read
  342. // atomically by the producer.
  343. vals []unsafe.Pointer
  344. starving uint32
  345. }
  346. const dequeueBits = 32
  347. // dequeueLimit is the maximum size of a bufDequeue.
  348. //
  349. // This must be at most (1<<dequeueBits)/2 because detecting fullness
  350. // depends on wrapping around the ring buffer without wrapping around
  351. // the index. We divide by 4 so this fits in an int on 32-bit.
  352. const dequeueLimit = (1 << dequeueBits) / 4
  353. func (d *bufDequeue) unpack(ptrs uint64) (head, tail uint32) {
  354. const mask = 1<<dequeueBits - 1
  355. head = uint32((ptrs >> dequeueBits) & mask)
  356. tail = uint32(ptrs & mask)
  357. return
  358. }
  359. func (d *bufDequeue) pack(head, tail uint32) uint64 {
  360. const mask = 1<<dequeueBits - 1
  361. return (uint64(head) << dequeueBits) |
  362. uint64(tail&mask)
  363. }
  364. // pushHead adds val at the head of the queue. It returns false if the
  365. // queue is full.
  366. func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
  367. var slot *unsafe.Pointer
  368. var starve uint8
  369. if atomic.LoadUint32(&d.starving) > 0 {
  370. runtime.Gosched()
  371. }
  372. for {
  373. ptrs := atomic.LoadUint64(&d.headTail)
  374. head, tail := d.unpack(ptrs)
  375. if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  376. // Queue is full.
  377. return false
  378. }
  379. ptrs2 := d.pack(head+1, tail)
  380. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  381. slot = &d.vals[head&uint32(len(d.vals)-1)]
  382. if starve >= 3 && atomic.LoadUint32(&d.starving) > 0 {
  383. atomic.StoreUint32(&d.starving, 0)
  384. }
  385. break
  386. }
  387. starve++
  388. if starve >= 3 {
  389. atomic.StoreUint32(&d.starving, 1)
  390. }
  391. }
  392. // The head slot is free, so we own it.
  393. *slot = val
  394. return true
  395. }
  396. // popTail removes and returns the element at the tail of the queue.
  397. // It returns false if the queue is empty. It may be called by any
  398. // number of consumers.
  399. func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
  400. ptrs := atomic.LoadUint64(&d.headTail)
  401. head, tail := d.unpack(ptrs)
  402. if tail == head {
  403. // Queue is empty.
  404. return nil, false
  405. }
  406. slot := &d.vals[tail&uint32(len(d.vals)-1)]
  407. var val unsafe.Pointer
  408. for {
  409. val = atomic.LoadPointer(slot)
  410. if val != nil {
  411. // We now own slot.
  412. break
  413. }
  414. // Another goroutine is still pushing data on the tail.
  415. }
  416. // Tell pushHead that we're done with this slot. Zeroing the
  417. // slot is also important so we don't leave behind references
  418. // that could keep this object live longer than necessary.
  419. //
  420. // We write to val first and then publish that we're done with
  421. atomic.StorePointer(slot, nil)
  422. // At this point pushHead owns the slot.
  423. if tail < math.MaxUint32 {
  424. atomic.AddUint64(&d.headTail, 1)
  425. } else {
  426. atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1))
  427. }
  428. return val, true
  429. }
  430. // bufChain is a dynamically-sized version of bufDequeue.
  431. //
  432. // This is implemented as a doubly-linked list queue of poolDequeues
  433. // where each dequeue is double the size of the previous one. Once a
  434. // dequeue fills up, this allocates a new one and only ever pushes to
  435. // the latest dequeue. Pops happen from the other end of the list and
  436. // once a dequeue is exhausted, it gets removed from the list.
  437. type bufChain struct {
  438. // head is the bufDequeue to push to. This is only accessed
  439. // by the producer, so doesn't need to be synchronized.
  440. head *bufChainElt
  441. // tail is the bufDequeue to popTail from. This is accessed
  442. // by consumers, so reads and writes must be atomic.
  443. tail *bufChainElt
  444. newChain uint32
  445. }
  446. type bufChainElt struct {
  447. bufDequeue
  448. // next and prev link to the adjacent poolChainElts in this
  449. // bufChain.
  450. //
  451. // next is written atomically by the producer and read
  452. // atomically by the consumer. It only transitions from nil to
  453. // non-nil.
  454. //
  455. // prev is written atomically by the consumer and read
  456. // atomically by the producer. It only transitions from
  457. // non-nil to nil.
  458. next, prev *bufChainElt
  459. }
  460. func storePoolChainElt(pp **bufChainElt, v *bufChainElt) {
  461. atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
  462. }
  463. func loadPoolChainElt(pp **bufChainElt) *bufChainElt {
  464. return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
  465. }
  466. func (c *bufChain) new(initSize int) {
  467. // Initialize the chain.
  468. // initSize must be a power of 2
  469. d := new(bufChainElt)
  470. d.vals = make([]unsafe.Pointer, initSize)
  471. storePoolChainElt(&c.head, d)
  472. storePoolChainElt(&c.tail, d)
  473. }
  474. func (c *bufChain) pushHead(val unsafe.Pointer) {
  475. startPush:
  476. for {
  477. if atomic.LoadUint32(&c.newChain) > 0 {
  478. runtime.Gosched()
  479. } else {
  480. break
  481. }
  482. }
  483. d := loadPoolChainElt(&c.head)
  484. if d.pushHead(val) {
  485. return
  486. }
  487. // The current dequeue is full. Allocate a new one of twice
  488. // the size.
  489. if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) {
  490. newSize := len(d.vals) * 2
  491. if newSize >= dequeueLimit {
  492. // Can't make it any bigger.
  493. newSize = dequeueLimit
  494. }
  495. d2 := &bufChainElt{prev: d}
  496. d2.vals = make([]unsafe.Pointer, newSize)
  497. d2.pushHead(val)
  498. storePoolChainElt(&c.head, d2)
  499. storePoolChainElt(&d.next, d2)
  500. atomic.StoreUint32(&c.newChain, 0)
  501. return
  502. }
  503. goto startPush
  504. }
  505. func (c *bufChain) popTail() (unsafe.Pointer, bool) {
  506. d := loadPoolChainElt(&c.tail)
  507. if d == nil {
  508. return nil, false
  509. }
  510. for {
  511. // It's important that we load the next pointer
  512. // *before* popping the tail. In general, d may be
  513. // transiently empty, but if next is non-nil before
  514. // the TryPop and the TryPop fails, then d is permanently
  515. // empty, which is the only condition under which it's
  516. // safe to drop d from the chain.
  517. d2 := loadPoolChainElt(&d.next)
  518. if val, ok := d.popTail(); ok {
  519. return val, ok
  520. }
  521. if d2 == nil {
  522. // This is the only dequeue. It's empty right
  523. // now, but could be pushed to in the future.
  524. return nil, false
  525. }
  526. // The tail of the chain has been drained, so move on
  527. // to the next dequeue. Try to drop it from the chain
  528. // so the next TryPop doesn't have to look at the empty
  529. // dequeue again.
  530. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
  531. // We won the race. Clear the prev pointer so
  532. // the garbage collector can collect the empty
  533. // dequeue and so popHead doesn't back up
  534. // further than necessary.
  535. storePoolChainElt(&d2.prev, nil)
  536. }
  537. d = d2
  538. }
  539. }