client.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package client
  2. import (
  3. "github.com/cnlh/easyProxy/utils"
  4. "log"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type TRPClient struct {
  11. svrAddr string
  12. tcpNum int
  13. connPoolSize int
  14. tunnelNum int64
  15. tunnel chan bool
  16. serverStatus bool
  17. sync.Mutex
  18. vKey string
  19. }
  20. //new client
  21. func NewRPClient(svraddr string, tcpNum int, vKey string) *TRPClient {
  22. c := new(TRPClient)
  23. c.svrAddr = svraddr
  24. c.tcpNum = tcpNum
  25. c.vKey = vKey
  26. c.tunnel = make(chan bool)
  27. c.connPoolSize = 5
  28. return c
  29. }
  30. //start
  31. func (s *TRPClient) Start() error {
  32. for i := 0; i < s.tcpNum; i++ {
  33. go s.NewConn()
  34. }
  35. for i := 0; i < 5; i++ {
  36. go s.dealChan()
  37. }
  38. go s.session()
  39. return nil
  40. }
  41. //新建
  42. func (s *TRPClient) NewConn() error {
  43. s.Lock()
  44. s.serverStatus = false
  45. conn, err := net.Dial("tcp", s.svrAddr)
  46. if err != nil {
  47. log.Println("连接服务端失败,五秒后将重连")
  48. time.Sleep(time.Second * 5)
  49. s.Unlock()
  50. go s.NewConn()
  51. return err
  52. }
  53. s.Unlock()
  54. return s.processor(utils.NewConn(conn))
  55. }
  56. //处理
  57. func (s *TRPClient) processor(c *utils.Conn) error {
  58. s.serverStatus = true
  59. c.SetAlive()
  60. if _, err := c.Write([]byte(utils.Getverifyval(s.vKey))); err != nil {
  61. return err
  62. }
  63. c.WriteMain()
  64. for {
  65. flags, err := c.ReadFlag()
  66. if err != nil {
  67. log.Println("服务端断开,五秒后将重连", err)
  68. go s.NewConn()
  69. break
  70. }
  71. switch flags {
  72. case utils.VERIFY_EER:
  73. log.Fatalln("vkey:", s.vKey, "不正确,服务端拒绝连接,请检查")
  74. case utils.WORK_CHAN: //隧道模式,每次开启10个,加快连接速度
  75. case utils.RES_CLOSE:
  76. log.Fatal("该vkey被另一客户连接")
  77. case utils.RES_MSG:
  78. log.Println("服务端返回错误。")
  79. default:
  80. log.Println("无法解析该错误。", flags)
  81. }
  82. }
  83. return nil
  84. }
  85. //隧道模式处理
  86. func (s *TRPClient) dealChan() {
  87. var err error
  88. //创建一个tcp连接
  89. conn, err := net.Dial("tcp", s.svrAddr)
  90. if err != nil {
  91. log.Println("connect to ", s.svrAddr, "error:", err)
  92. return
  93. }
  94. //验证
  95. if _, err := conn.Write([]byte(utils.Getverifyval(s.vKey))); err != nil {
  96. log.Println("connect to ", s.svrAddr, "error:", err)
  97. return
  98. }
  99. //默认长连接保持
  100. c := utils.NewConn(conn)
  101. c.SetAlive()
  102. //写标志
  103. c.WriteChan()
  104. re:
  105. atomic.AddInt64(&s.tunnelNum, 1)
  106. //获取连接的host type(tcp or udp)
  107. typeStr, host, en, de, crypt, mux, err := c.GetHostFromConn()
  108. s.tunnel <- true
  109. atomic.AddInt64(&s.tunnelNum, -1)
  110. if err != nil {
  111. c.Close()
  112. return
  113. }
  114. s.ConnectAndCopy(c, typeStr, host, en, de, crypt, mux)
  115. if mux {
  116. utils.FlushConn(conn)
  117. goto re
  118. } else {
  119. c.Close()
  120. }
  121. }
  122. func (s *TRPClient) session() {
  123. t := time.NewTicker(time.Millisecond * 1000)
  124. for {
  125. select {
  126. case <-s.tunnel:
  127. case <-t.C:
  128. }
  129. if s.serverStatus && s.tunnelNum < 5 {
  130. go s.dealChan()
  131. }
  132. }
  133. }
  134. func (s *TRPClient) ConnectAndCopy(c *utils.Conn, typeStr, host string, en, de int, crypt, mux bool) {
  135. //与目标建立连接,超时时间为3
  136. server, err := net.DialTimeout(typeStr, host, time.Second*3)
  137. if err != nil {
  138. log.Println("connect to ", host, "error:", err, mux)
  139. c.WriteFail()
  140. return
  141. }
  142. c.WriteSuccess()
  143. utils.ReplayWaitGroup(c.Conn, server, en, de, crypt, mux, nil)
  144. }