client.go 4.3 KB


  1. package client
  2. import (
  3. "github.com/cnlh/nps/lib/common"
  4. "github.com/cnlh/nps/lib/conn"
  5. "github.com/cnlh/nps/lib/pool"
  6. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  7. "net"
  8. "os"
  9. "sync"
  10. "time"
  11. )
  12. type TRPClient struct {
  13. svrAddr string
  14. linkMap map[int]*conn.Link
  15. tunnel *conn.Conn
  16. msgTunnel *conn.Conn
  17. bridgeConnType string
  18. stop chan bool
  19. proxyUrl string
  20. sync.Mutex
  21. vKey string
  22. }
  23. //new client
  24. func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string) *TRPClient {
  25. return &TRPClient{
  26. svrAddr: svraddr,
  27. linkMap: make(map[int]*conn.Link),
  28. Mutex: sync.Mutex{},
  29. vKey: vKey,
  30. bridgeConnType: bridgeConnType,
  31. stop: make(chan bool),
  32. proxyUrl: proxyUrl,
  33. }
  34. }
  35. //start
  36. func (s *TRPClient) Start() {
  37. go s.linkCleanSession()
  38. retry:
  39. c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
  40. if err != nil {
  41. logs.Error("The connection server failed and will be reconnected in five seconds")
  42. time.Sleep(time.Second * 5)
  43. goto retry
  44. }
  45. logs.Info("Successful connection with server %s", s.svrAddr)
  46. s.processor(c)
  47. }
  48. func (s *TRPClient) Close() {
  49. s.tunnel.Close()
  50. s.stop <- true
  51. for _, v := range s.linkMap {
  52. if v.Conn != nil {
  53. v.Conn.Close()
  54. }
  55. }
  56. }
  57. //处理
  58. func (s *TRPClient) processor(c *conn.Conn) {
  59. go s.dealChan()
  60. go s.getMsgStatus()
  61. for {
  62. flags, err := c.ReadFlag()
  63. if err != nil {
  64. logs.Error("Accept server data error %s, end this service", err.Error())
  65. break
  66. }
  67. switch flags {
  68. case common.VERIFY_EER:
  69. logs.Error("VKey:%s is incorrect, the server refuses to connect, please check", s.vKey)
  70. os.Exit(0)
  71. case common.NEW_CONN:
  72. if link, err := c.GetLinkInfo(); err != nil {
  73. break
  74. } else {
  75. s.Lock()
  76. s.linkMap[link.Id] = link
  77. s.Unlock()
  78. link.MsgConn = s.msgTunnel
  79. go s.linkProcess(link, c)
  80. link.Run(false)
  81. }
  82. case common.RES_CLOSE:
  83. logs.Error("The authentication key is connected by another client or the server closes the client.")
  84. os.Exit(0)
  85. case common.RES_MSG:
  86. logs.Error("Server-side return error")
  87. break
  88. default:
  89. logs.Warn("The error could not be resolved")
  90. break
  91. }
  92. }
  93. c.Close()
  94. s.Close()
  95. }
  96. func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
  97. link.Host = common.FormatAddress(link.Host)
  98. //与目标建立连接
  99. server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
  100. if err != nil {
  101. c.WriteFail(link.Id)
  102. logs.Warn("connect to ", link.Host, "error:", err)
  103. return
  104. }
  105. c.WriteSuccess(link.Id)
  106. link.Conn = conn.NewConn(server)
  107. buf := pool.BufPoolCopy.Get().([]byte)
  108. for {
  109. if n, err := server.Read(buf); err != nil {
  110. s.tunnel.SendMsg([]byte(common.IO_EOF), link)
  111. break
  112. } else {
  113. if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil {
  114. c.Close()
  115. break
  116. }
  117. if link.ConnType == common.CONN_UDP {
  118. break
  119. }
  120. }
  121. <-link.StatusCh
  122. }
  123. pool.PutBufPoolCopy(buf)
  124. s.Lock()
  125. s.Unlock()
  126. }
  127. func (s *TRPClient) getMsgStatus() {
  128. var err error
  129. s.msgTunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_SEND_STATUS, s.proxyUrl)
  130. if err != nil {
  131. logs.Error("connect to ", s.svrAddr, "error:", err)
  132. return
  133. }
  134. go func() {
  135. for {
  136. if id, err := s.msgTunnel.GetLen(); err != nil {
  137. break
  138. } else {
  139. s.Lock()
  140. if v, ok := s.linkMap[id]; ok {
  141. s.Unlock()
  142. v.StatusCh <- true
  143. } else {
  144. s.Unlock()
  145. }
  146. }
  147. }
  148. }()
  149. <-s.stop
  150. }
  151. //隧道模式处理
  152. func (s *TRPClient) dealChan() {
  153. var err error
  154. s.tunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
  155. if err != nil {
  156. logs.Error("connect to ", s.svrAddr, "error:", err)
  157. return
  158. }
  159. go func() {
  160. for {
  161. if id, err := s.tunnel.GetLen(); err != nil {
  162. break
  163. } else {
  164. s.Lock()
  165. if v, ok := s.linkMap[id]; ok {
  166. s.Unlock()
  167. if content, err := s.tunnel.GetMsgContent(v); err != nil {
  168. pool.PutBufPoolCopy(content)
  169. break
  170. } else {
  171. v.MsgCh <- content
  172. }
  173. } else {
  174. s.Unlock()
  175. }
  176. }
  177. }
  178. }()
  179. <-s.stop
  180. }
  181. func (s *TRPClient) linkCleanSession() {
  182. ticker := time.NewTicker(time.Minute * 5)
  183. for {
  184. select {
  185. case <-ticker.C:
  186. s.Lock()
  187. for _, v := range s.linkMap {
  188. if v.FinishUse {
  189. delete(s.linkMap, v.Id)
  190. }
  191. }
  192. s.Unlock()
  193. }
  194. }
  195. }