client.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package client
  2. import (
  3. "github.com/cnlh/nps/lib/common"
  4. "github.com/cnlh/nps/lib/conn"
  5. "github.com/cnlh/nps/lib/lg"
  6. "github.com/cnlh/nps/lib/pool"
  7. "net"
  8. "sync"
  9. "time"
  10. )
  11. type TRPClient struct {
  12. svrAddr string
  13. linkMap map[int]*conn.Link
  14. tunnel *conn.Conn
  15. msgTunnel *conn.Conn
  16. bridgeConnType string
  17. stop chan bool
  18. proxyUrl string
  19. sync.Mutex
  20. vKey string
  21. }
  22. //new client
  23. func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string) *TRPClient {
  24. return &TRPClient{
  25. svrAddr: svraddr,
  26. linkMap: make(map[int]*conn.Link),
  27. Mutex: sync.Mutex{},
  28. vKey: vKey,
  29. bridgeConnType: bridgeConnType,
  30. stop: make(chan bool),
  31. proxyUrl: proxyUrl,
  32. }
  33. }
  34. //start
  35. func (s *TRPClient) Start() {
  36. retry:
  37. c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
  38. if err != nil {
  39. lg.Println("The connection server failed and will be reconnected in five seconds")
  40. time.Sleep(time.Second * 5)
  41. goto retry
  42. }
  43. lg.Printf("Successful connection with server %s", s.svrAddr)
  44. s.processor(c)
  45. }
  46. func (s *TRPClient) Close() {
  47. s.tunnel.Close()
  48. s.stop <- true
  49. for _, v := range s.linkMap {
  50. if v.Conn != nil {
  51. v.Conn.Close()
  52. }
  53. }
  54. }
  55. //处理
  56. func (s *TRPClient) processor(c *conn.Conn) {
  57. go s.dealChan()
  58. go s.getMsgStatus()
  59. for {
  60. flags, err := c.ReadFlag()
  61. if err != nil {
  62. lg.Printf("Accept server data error %s, end this service", err.Error())
  63. break
  64. }
  65. switch flags {
  66. case common.VERIFY_EER:
  67. lg.Fatalf("VKey:%s is incorrect, the server refuses to connect, please check", s.vKey)
  68. case common.NEW_CONN:
  69. if link, err := c.GetLinkInfo(); err != nil {
  70. break
  71. } else {
  72. s.Lock()
  73. s.linkMap[link.Id] = link
  74. s.Unlock()
  75. link.MsgConn = s.msgTunnel
  76. go s.linkProcess(link, c)
  77. link.Run(false)
  78. }
  79. case common.RES_CLOSE:
  80. lg.Fatalln("The authentication key is connected by another client or the server closes the client.")
  81. case common.RES_MSG:
  82. lg.Println("Server-side return error")
  83. break
  84. default:
  85. lg.Println("The error could not be resolved")
  86. break
  87. }
  88. }
  89. c.Close()
  90. s.Close()
  91. }
  92. func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
  93. link.Host = common.FormatAddress(link.Host)
  94. //与目标建立连接
  95. server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
  96. if err != nil {
  97. c.WriteFail(link.Id)
  98. lg.Println("connect to ", link.Host, "error:", err)
  99. return
  100. }
  101. c.WriteSuccess(link.Id)
  102. link.Conn = conn.NewConn(server)
  103. buf := pool.BufPoolCopy.Get().([]byte)
  104. for {
  105. if n, err := server.Read(buf); err != nil {
  106. s.tunnel.SendMsg([]byte(common.IO_EOF), link)
  107. break
  108. } else {
  109. if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil {
  110. c.Close()
  111. break
  112. }
  113. if link.ConnType == common.CONN_UDP {
  114. break
  115. }
  116. }
  117. <-link.StatusCh
  118. }
  119. pool.PutBufPoolCopy(buf)
  120. s.Lock()
  121. //TODO 删除map
  122. s.Unlock()
  123. }
  124. func (s *TRPClient) getMsgStatus() {
  125. var err error
  126. s.msgTunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_SEND_STATUS, s.proxyUrl)
  127. if err != nil {
  128. lg.Println("connect to ", s.svrAddr, "error:", err)
  129. return
  130. }
  131. go func() {
  132. for {
  133. if id, err := s.msgTunnel.GetLen(); err != nil {
  134. break
  135. } else {
  136. s.Lock()
  137. if v, ok := s.linkMap[id]; ok {
  138. s.Unlock()
  139. v.StatusCh <- true
  140. } else {
  141. s.Unlock()
  142. }
  143. }
  144. }
  145. }()
  146. <-s.stop
  147. }
  148. //隧道模式处理
  149. func (s *TRPClient) dealChan() {
  150. var err error
  151. s.tunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
  152. if err != nil {
  153. lg.Println("connect to ", s.svrAddr, "error:", err)
  154. return
  155. }
  156. go func() {
  157. for {
  158. if id, err := s.tunnel.GetLen(); err != nil {
  159. break
  160. } else {
  161. s.Lock()
  162. if v, ok := s.linkMap[id]; ok {
  163. s.Unlock()
  164. if content, err := s.tunnel.GetMsgContent(v); err != nil {
  165. pool.PutBufPoolCopy(content)
  166. break
  167. } else {
  168. v.MsgCh <- content
  169. }
  170. } else {
  171. s.Unlock()
  172. }
  173. }
  174. }
  175. }()
  176. <-s.stop
  177. }