client.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package client
  2. import (
  3. "github.com/cnlh/nps/lib/common"
  4. "github.com/cnlh/nps/lib/conn"
  5. "github.com/cnlh/nps/lib/pool"
  6. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  7. "net"
  8. "os"
  9. "sync"
  10. "time"
  11. )
  12. type TRPClient struct {
  13. svrAddr string
  14. linkMap map[int]*conn.Link
  15. tunnel *conn.Conn
  16. msgTunnel *conn.Conn
  17. bridgeConnType string
  18. stop chan bool
  19. proxyUrl string
  20. sync.Mutex
  21. vKey string
  22. }
  23. //new client
  24. func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string) *TRPClient {
  25. return &TRPClient{
  26. svrAddr: svraddr,
  27. linkMap: make(map[int]*conn.Link),
  28. Mutex: sync.Mutex{},
  29. vKey: vKey,
  30. bridgeConnType: bridgeConnType,
  31. stop: make(chan bool),
  32. proxyUrl: proxyUrl,
  33. }
  34. }
  35. //start
  36. func (s *TRPClient) Start() {
  37. retry:
  38. c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
  39. if err != nil {
  40. logs.Error("The connection server failed and will be reconnected in five seconds")
  41. time.Sleep(time.Second * 5)
  42. goto retry
  43. }
  44. logs.Info("Successful connection with server %s", s.svrAddr)
  45. s.processor(c)
  46. }
  47. func (s *TRPClient) Close() {
  48. s.tunnel.Close()
  49. s.stop <- true
  50. for _, v := range s.linkMap {
  51. if v.Conn != nil {
  52. v.Conn.Close()
  53. }
  54. }
  55. }
  56. //处理
  57. func (s *TRPClient) processor(c *conn.Conn) {
  58. go s.dealChan()
  59. go s.getMsgStatus()
  60. for {
  61. flags, err := c.ReadFlag()
  62. if err != nil {
  63. logs.Error("Accept server data error %s, end this service", err.Error())
  64. break
  65. }
  66. switch flags {
  67. case common.VERIFY_EER:
  68. logs.Error("VKey:%s is incorrect, the server refuses to connect, please check", s.vKey)
  69. os.Exit(0)
  70. case common.NEW_CONN:
  71. if link, err := c.GetLinkInfo(); err != nil {
  72. break
  73. } else {
  74. s.Lock()
  75. s.linkMap[link.Id] = link
  76. s.Unlock()
  77. link.MsgConn = s.msgTunnel
  78. go s.linkProcess(link, c)
  79. link.Run(false)
  80. }
  81. case common.RES_CLOSE:
  82. logs.Error("The authentication key is connected by another client or the server closes the client.")
  83. os.Exit(0)
  84. case common.RES_MSG:
  85. logs.Error("Server-side return error")
  86. break
  87. default:
  88. logs.Warn("The error could not be resolved")
  89. break
  90. }
  91. }
  92. c.Close()
  93. s.Close()
  94. }
  95. func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
  96. link.Host = common.FormatAddress(link.Host)
  97. //与目标建立连接
  98. server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
  99. if err != nil {
  100. c.WriteFail(link.Id)
  101. logs.Warn("connect to ", link.Host, "error:", err)
  102. return
  103. }
  104. c.WriteSuccess(link.Id)
  105. link.Conn = conn.NewConn(server)
  106. buf := pool.BufPoolCopy.Get().([]byte)
  107. for {
  108. if n, err := server.Read(buf); err != nil {
  109. s.tunnel.SendMsg([]byte(common.IO_EOF), link)
  110. break
  111. } else {
  112. if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil {
  113. c.Close()
  114. break
  115. }
  116. if link.ConnType == common.CONN_UDP {
  117. break
  118. }
  119. }
  120. <-link.StatusCh
  121. }
  122. pool.PutBufPoolCopy(buf)
  123. s.Lock()
  124. //TODO 删除map
  125. s.Unlock()
  126. }
  127. func (s *TRPClient) getMsgStatus() {
  128. var err error
  129. s.msgTunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_SEND_STATUS, s.proxyUrl)
  130. if err != nil {
  131. logs.Error("connect to ", s.svrAddr, "error:", err)
  132. return
  133. }
  134. go func() {
  135. for {
  136. if id, err := s.msgTunnel.GetLen(); err != nil {
  137. break
  138. } else {
  139. s.Lock()
  140. if v, ok := s.linkMap[id]; ok {
  141. s.Unlock()
  142. v.StatusCh <- true
  143. } else {
  144. s.Unlock()
  145. }
  146. }
  147. }
  148. }()
  149. <-s.stop
  150. }
  151. //隧道模式处理
  152. func (s *TRPClient) dealChan() {
  153. var err error
  154. s.tunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
  155. if err != nil {
  156. logs.Error("connect to ", s.svrAddr, "error:", err)
  157. return
  158. }
  159. go func() {
  160. for {
  161. if id, err := s.tunnel.GetLen(); err != nil {
  162. break
  163. } else {
  164. s.Lock()
  165. if v, ok := s.linkMap[id]; ok {
  166. s.Unlock()
  167. if content, err := s.tunnel.GetMsgContent(v); err != nil {
  168. pool.PutBufPoolCopy(content)
  169. break
  170. } else {
  171. v.MsgCh <- content
  172. }
  173. } else {
  174. s.Unlock()
  175. }
  176. }
  177. }
  178. }()
  179. <-s.stop
  180. }