rate.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package lib
  2. import (
  3. "sync/atomic"
  4. "time"
  5. )
  6. type Rate struct {
  7. bucketSize int64 //木桶容量
  8. bucketSurplusSize int64 //当前桶中体积
  9. bucketAddSize int64 //每次加水大小
  10. stopChan chan bool //停止
  11. }
  12. func NewRate(addSize int64) *Rate {
  13. return &Rate{
  14. bucketSize: addSize * 2,
  15. bucketSurplusSize: 0,
  16. bucketAddSize: addSize,
  17. stopChan: make(chan bool),
  18. }
  19. }
  20. func (s *Rate) Start() {
  21. go s.session()
  22. }
  23. func (s *Rate) add(size int64) {
  24. if (s.bucketSize - s.bucketSurplusSize) < s.bucketAddSize {
  25. return
  26. }
  27. atomic.AddInt64(&s.bucketSurplusSize, size)
  28. }
  29. //回桶
  30. func (s *Rate) ReturnBucket(size int64) {
  31. s.add(size)
  32. }
  33. //停止
  34. func (s *Rate) Stop() {
  35. s.stopChan <- true
  36. }
  37. func (s *Rate) Get(size int64) {
  38. if s.bucketSurplusSize >= size {
  39. atomic.AddInt64(&s.bucketSurplusSize, -size)
  40. return
  41. }
  42. ticker := time.NewTicker(time.Millisecond * 100)
  43. for {
  44. select {
  45. case <-ticker.C:
  46. if s.bucketSurplusSize >= size {
  47. atomic.AddInt64(&s.bucketSurplusSize, -size)
  48. ticker.Stop()
  49. return
  50. }
  51. }
  52. }
  53. }
  54. func (s *Rate) session() {
  55. ticker := time.NewTicker(time.Second * 1)
  56. for {
  57. select {
  58. case <-ticker.C:
  59. s.add(s.bucketAddSize)
  60. case <-s.stopChan:
  61. ticker.Stop()
  62. return
  63. }
  64. }
  65. }