client.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package client
  2. import (
  3. "github.com/cnlh/nps/lib"
  4. "net"
  5. "sync"
  6. "time"
  7. )
  8. type TRPClient struct {
  9. svrAddr string
  10. linkMap map[int]*lib.Link
  11. stop chan bool
  12. tunnel *lib.Conn
  13. sync.Mutex
  14. vKey string
  15. }
  16. //new client
  17. func NewRPClient(svraddr string, vKey string) *TRPClient {
  18. return &TRPClient{
  19. svrAddr: svraddr,
  20. linkMap: make(map[int]*lib.Link),
  21. stop: make(chan bool),
  22. tunnel: nil,
  23. Mutex: sync.Mutex{},
  24. vKey: vKey,
  25. }
  26. }
  27. //start
  28. func (s *TRPClient) Start() error {
  29. s.NewConn()
  30. return nil
  31. }
  32. //新建
  33. func (s *TRPClient) NewConn() {
  34. retry:
  35. conn, err := net.Dial("tcp", s.svrAddr)
  36. if err != nil {
  37. lib.Println("连接服务端失败,五秒后将重连")
  38. time.Sleep(time.Second * 5)
  39. goto retry
  40. return
  41. }
  42. s.processor(lib.NewConn(conn))
  43. }
  44. //处理
  45. func (s *TRPClient) processor(c *lib.Conn) {
  46. c.SetAlive()
  47. if _, err := c.Write([]byte(lib.Getverifyval(s.vKey))); err != nil {
  48. return
  49. }
  50. c.WriteMain()
  51. go s.dealChan()
  52. for {
  53. flags, err := c.ReadFlag()
  54. if err != nil {
  55. lib.Println("服务端断开,正在重新连接")
  56. break
  57. }
  58. switch flags {
  59. case lib.VERIFY_EER:
  60. lib.Fatalf("vKey:%s不正确,服务端拒绝连接,请检查", s.vKey)
  61. case lib.NEW_CONN:
  62. if link, err := c.GetLinkInfo(); err != nil {
  63. break
  64. } else {
  65. s.Lock()
  66. s.linkMap[link.Id] = link
  67. s.Unlock()
  68. go s.linkProcess(link, c)
  69. }
  70. case lib.RES_CLOSE:
  71. lib.Fatalln("该vkey被另一客户连接")
  72. case lib.RES_MSG:
  73. lib.Println("服务端返回错误,重新连接")
  74. break
  75. default:
  76. lib.Println("无法解析该错误,重新连接")
  77. break
  78. }
  79. }
  80. s.stop <- true
  81. s.linkMap = make(map[int]*lib.Link)
  82. go s.NewConn()
  83. }
  84. func (s *TRPClient) linkProcess(link *lib.Link, c *lib.Conn) {
  85. //与目标建立连接
  86. server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
  87. if err != nil {
  88. c.WriteFail(link.Id)
  89. lib.Println("connect to ", link.Host, "error:", err)
  90. return
  91. }
  92. c.WriteSuccess(link.Id)
  93. link.Conn = lib.NewConn(server)
  94. for {
  95. buf := lib.BufPoolCopy.Get().([]byte)
  96. if n, err := server.Read(buf); err != nil {
  97. lib.PutBufPoolCopy(buf)
  98. s.tunnel.SendMsg([]byte(lib.IO_EOF), link)
  99. break
  100. } else {
  101. if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil {
  102. lib.PutBufPoolCopy(buf)
  103. c.Close()
  104. break
  105. }
  106. lib.PutBufPoolCopy(buf)
  107. //if link.ConnType == utils.CONN_UDP {
  108. // c.Close()
  109. // break
  110. //}
  111. }
  112. }
  113. s.Lock()
  114. delete(s.linkMap, link.Id)
  115. s.Unlock()
  116. }
  117. //隧道模式处理
  118. func (s *TRPClient) dealChan() {
  119. var err error
  120. //创建一个tcp连接
  121. conn, err := net.Dial("tcp", s.svrAddr)
  122. if err != nil {
  123. lib.Println("connect to ", s.svrAddr, "error:", err)
  124. return
  125. }
  126. //验证
  127. if _, err := conn.Write([]byte(lib.Getverifyval(s.vKey))); err != nil {
  128. lib.Println("connect to ", s.svrAddr, "error:", err)
  129. return
  130. }
  131. //默认长连接保持
  132. s.tunnel = lib.NewConn(conn)
  133. s.tunnel.SetAlive()
  134. //写标志
  135. s.tunnel.WriteChan()
  136. go func() {
  137. for {
  138. if id, err := s.tunnel.GetLen(); err != nil {
  139. lib.Println("get msg id error")
  140. break
  141. } else {
  142. s.Lock()
  143. if v, ok := s.linkMap[id]; ok {
  144. s.Unlock()
  145. if content, err := s.tunnel.GetMsgContent(v); err != nil {
  146. lib.Println("get msg content error:", err, id)
  147. break
  148. } else {
  149. if len(content) == len(lib.IO_EOF) && string(content) == lib.IO_EOF {
  150. v.Conn.Close()
  151. } else if v.Conn != nil {
  152. v.Conn.Write(content)
  153. }
  154. }
  155. } else {
  156. s.Unlock()
  157. }
  158. }
  159. }
  160. }()
  161. select {
  162. case <-s.stop:
  163. }
  164. }