rate.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package rate
  2. import (
  3. "sync/atomic"
  4. "time"
  5. )
  6. type Rate struct {
  7. bucketSize int64
  8. bucketSurplusSize int64
  9. bucketAddSize int64
  10. stopChan chan bool
  11. NowRate int64
  12. }
  13. func NewRate(addSize int64) *Rate {
  14. return &Rate{
  15. bucketSize: addSize * 2,
  16. bucketSurplusSize: 0,
  17. bucketAddSize: addSize,
  18. stopChan: make(chan bool),
  19. }
  20. }
  21. func (s *Rate) Start() {
  22. go s.session()
  23. }
  24. func (s *Rate) add(size int64) {
  25. if res := s.bucketSize - s.bucketSurplusSize; res < s.bucketAddSize {
  26. atomic.AddInt64(&s.bucketSurplusSize, res)
  27. return
  28. }
  29. atomic.AddInt64(&s.bucketSurplusSize, size)
  30. }
  31. //回桶
  32. func (s *Rate) ReturnBucket(size int64) {
  33. s.add(size)
  34. }
  35. //停止
  36. func (s *Rate) Stop() {
  37. s.stopChan <- true
  38. }
  39. func (s *Rate) Get(size int64) {
  40. if s.bucketSurplusSize >= size {
  41. atomic.AddInt64(&s.bucketSurplusSize, -size)
  42. return
  43. }
  44. ticker := time.NewTicker(time.Millisecond * 100)
  45. for {
  46. select {
  47. case <-ticker.C:
  48. if s.bucketSurplusSize >= size {
  49. atomic.AddInt64(&s.bucketSurplusSize, -size)
  50. ticker.Stop()
  51. return
  52. }
  53. }
  54. }
  55. }
  56. func (s *Rate) session() {
  57. ticker := time.NewTicker(time.Second * 1)
  58. for {
  59. select {
  60. case <-ticker.C:
  61. if rs := s.bucketAddSize - s.bucketSurplusSize; rs > 0 {
  62. s.NowRate = rs
  63. } else {
  64. s.NowRate = s.bucketSize - s.bucketSurplusSize
  65. }
  66. s.add(s.bucketAddSize)
  67. case <-s.stopChan:
  68. ticker.Stop()
  69. return
  70. }
  71. }
  72. }