flow.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package limiter
  2. import (
  3. "ehang.io/nps/lib/enet"
  4. "errors"
  5. "sync/atomic"
  6. )
  7. // FlowStore is an interface to store or get the flow now
  8. type FlowStore interface {
  9. GetOutIn() (uint32, uint32)
  10. AddOut(out uint32) uint32
  11. AddIn(in uint32) uint32
  12. }
  13. // memStore is an implement of FlowStore
  14. type memStore struct {
  15. nowOut uint32
  16. nowIn uint32
  17. }
  18. // GetOutIn return out and in num 0
  19. func (m *memStore) GetOutIn() (uint32, uint32) {
  20. return m.nowOut, m.nowIn
  21. }
  22. // AddOut is used to add out now
  23. func (m *memStore) AddOut(out uint32) uint32 {
  24. return atomic.AddUint32(&m.nowOut, out)
  25. }
  26. // AddIn is used to add in now
  27. func (m *memStore) AddIn(in uint32) uint32 {
  28. return atomic.AddUint32(&m.nowIn, in)
  29. }
  30. // FlowLimiter is used to limit the flow of a service
  31. type FlowLimiter struct {
  32. Store FlowStore
  33. OutLimit uint32 `json:"out_limit" required:"true" placeholder:"1024(kb)" zh_name:"出口最大流量"` //unit: kb, 0 means not limit
  34. InLimit uint32 `json:"in_limit" required:"true" placeholder:"1024(kb)" zh_name:"入口最大流量"` //unit: kb, 0 means not limit
  35. }
  36. func (f *FlowLimiter) GetName() string {
  37. return "flow"
  38. }
  39. func (f *FlowLimiter) GetZhName() string {
  40. return "流量限制"
  41. }
  42. // DoLimit return a flow limited enet.Conn
  43. func (f *FlowLimiter) DoLimit(c enet.Conn) (enet.Conn, error) {
  44. return &flowConn{fl: f, Conn: c}, nil
  45. }
  46. // Init is used to set out or in num now
  47. func (f *FlowLimiter) Init() error {
  48. if f.Store == nil {
  49. f.Store = &memStore{}
  50. }
  51. return nil
  52. }
  53. // flowConn is an implement of
  54. type flowConn struct {
  55. enet.Conn
  56. fl *FlowLimiter
  57. }
  58. // Read add the in flow num of the service
  59. func (fs *flowConn) Read(b []byte) (n int, err error) {
  60. n, err = fs.Conn.Read(b)
  61. if fs.fl.InLimit > 0 && fs.fl.Store.AddIn(uint32(n)) > fs.fl.InLimit {
  62. err = errors.New("exceed the in flow limit")
  63. }
  64. return
  65. }
  66. // Write add the out flow num of the service
  67. func (fs *flowConn) Write(b []byte) (n int, err error) {
  68. n, err = fs.Conn.Write(b)
  69. if fs.fl.OutLimit > 0 && fs.fl.Store.AddOut(uint32(n)) > fs.fl.OutLimit {
  70. err = errors.New("exceed the out flow limit")
  71. }
  72. return
  73. }