base.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package proxy
  2. import (
  3. "errors"
  4. "net"
  5. "net/http"
  6. "sync"
  7. "ehang.io/nps/bridge"
  8. "ehang.io/nps/lib/common"
  9. "ehang.io/nps/lib/conn"
  10. "ehang.io/nps/lib/file"
  11. "github.com/astaxie/beego/logs"
  12. )
  13. type Service interface {
  14. Start() error
  15. Close() error
  16. }
  17. type NetBridge interface {
  18. SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (target net.Conn, err error)
  19. }
  20. //BaseServer struct
  21. type BaseServer struct {
  22. id int
  23. bridge NetBridge
  24. task *file.Tunnel
  25. errorContent []byte
  26. sync.Mutex
  27. }
  28. func NewBaseServer(bridge *bridge.Bridge, task *file.Tunnel) *BaseServer {
  29. return &BaseServer{
  30. bridge: bridge,
  31. task: task,
  32. errorContent: nil,
  33. Mutex: sync.Mutex{},
  34. }
  35. }
  36. //add the flow
  37. func (s *BaseServer) FlowAdd(in, out int64) {
  38. s.Lock()
  39. defer s.Unlock()
  40. s.task.Flow.ExportFlow += out
  41. s.task.Flow.InletFlow += in
  42. }
  43. //change the flow
  44. func (s *BaseServer) FlowAddHost(host *file.Host, in, out int64) {
  45. s.Lock()
  46. defer s.Unlock()
  47. host.Flow.ExportFlow += out
  48. host.Flow.InletFlow += in
  49. }
  50. //write fail bytes to the connection
  51. func (s *BaseServer) writeConnFail(c net.Conn) {
  52. c.Write([]byte(common.ConnectionFailBytes))
  53. c.Write(s.errorContent)
  54. }
  55. //auth check
  56. func (s *BaseServer) auth(r *http.Request, c *conn.Conn, u, p string) error {
  57. if u != "" && p != "" && !common.CheckAuth(r, u, p) {
  58. c.Write([]byte(common.UnauthorizedBytes))
  59. c.Close()
  60. return errors.New("401 Unauthorized")
  61. }
  62. return nil
  63. }
  64. //check flow limit of the client ,and decrease the allow num of client
  65. func (s *BaseServer) CheckFlowAndConnNum(client *file.Client) error {
  66. if client.Flow.FlowLimit > 0 && (client.Flow.FlowLimit<<20) < (client.Flow.ExportFlow+client.Flow.InletFlow) {
  67. return errors.New("Traffic exceeded")
  68. }
  69. if !client.GetConn() {
  70. return errors.New("Connections exceed the current client limit")
  71. }
  72. return nil
  73. }
  74. //create a new connection and start bytes copying
  75. func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string, f func(), flow *file.Flow, localProxy bool) error {
  76. link := conn.NewLink(tp, addr, client.Cnf.Crypt, client.Cnf.Compress, c.Conn.RemoteAddr().String(), localProxy)
  77. if target, err := s.bridge.SendLinkInfo(client.Id, link, s.task); err != nil {
  78. logs.Warn("get connection from client id %d error %s", client.Id, err.Error())
  79. c.Close()
  80. return err
  81. } else {
  82. if f != nil {
  83. f()
  84. }
  85. conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, flow, true, rb)
  86. }
  87. return nil
  88. }