queue.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package mux
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/cnlh/nps/lib/pool"
  6. )
  7. type Element *bufNode
  8. type bufNode struct {
  9. val []byte //buf value
  10. l int //length
  11. }
  12. func NewBufNode(buf []byte, l int) *bufNode {
  13. return &bufNode{
  14. val: buf,
  15. l: l,
  16. }
  17. }
  18. type Queue interface {
  19. Push(e Element) //向队列中添加元素
  20. Pop() Element //移除队列中最前面的元素
  21. Clear() bool //清空队列
  22. Size() int //获取队列的元素个数
  23. IsEmpty() bool //判断队列是否是空
  24. }
  25. type sliceEntry struct {
  26. element []Element
  27. sync.Mutex
  28. }
  29. func NewQueue() *sliceEntry {
  30. return &sliceEntry{}
  31. }
  32. //向队列中添加元素
  33. func (entry *sliceEntry) Push(e Element) {
  34. entry.Lock()
  35. defer entry.Unlock()
  36. entry.element = append(entry.element, e)
  37. }
  38. //移除队列中最前面的额元素
  39. func (entry *sliceEntry) Pop() (Element, error) {
  40. if entry.IsEmpty() {
  41. return nil, errors.New("queue is empty!")
  42. }
  43. entry.Lock()
  44. defer entry.Unlock()
  45. firstElement := entry.element[0]
  46. entry.element = entry.element[1:]
  47. return firstElement, nil
  48. }
  49. func (entry *sliceEntry) Clear() bool {
  50. entry.Lock()
  51. defer entry.Unlock()
  52. if entry.IsEmpty() {
  53. return false
  54. }
  55. for i := 0; i < entry.Size(); i++ {
  56. pool.PutBufPoolCopy(entry.element[i].val)
  57. entry.element[i] = nil
  58. }
  59. entry.element = nil
  60. return true
  61. }
  62. func (entry *sliceEntry) Size() int {
  63. return len(entry.element)
  64. }
  65. func (entry *sliceEntry) IsEmpty() bool {
  66. if len(entry.element) == 0 {
  67. return true
  68. }
  69. return false
  70. }