client.go 6.7 KB


  1. package client
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/common"
  5. "github.com/cnlh/nps/lib/config"
  6. "github.com/cnlh/nps/lib/conn"
  7. "github.com/cnlh/nps/lib/lg"
  8. "github.com/cnlh/nps/lib/pool"
  9. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  10. "github.com/cnlh/nps/vender/golang.org/x/net/proxy"
  11. "io/ioutil"
  12. "net"
  13. "net/url"
  14. "path/filepath"
  15. "sync"
  16. "time"
  17. )
  18. type TRPClient struct {
  19. svrAddr string
  20. linkMap map[int]*conn.Link
  21. tunnel *conn.Conn
  22. msgTunnel *conn.Conn
  23. bridgeConnType string
  24. stop chan bool
  25. proxyUrl string
  26. sync.Mutex
  27. vKey string
  28. }
  29. //new client
  30. func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string) *TRPClient {
  31. return &TRPClient{
  32. svrAddr: svraddr,
  33. linkMap: make(map[int]*conn.Link),
  34. Mutex: sync.Mutex{},
  35. vKey: vKey,
  36. bridgeConnType: bridgeConnType,
  37. stop: make(chan bool),
  38. proxyUrl: proxyUrl,
  39. }
  40. }
  41. //start
  42. func (s *TRPClient) Start() {
  43. retry:
  44. c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
  45. if err != nil {
  46. lg.Println("The connection server failed and will be reconnected in five seconds")
  47. time.Sleep(time.Second * 5)
  48. goto retry
  49. }
  50. lg.Printf("Successful connection with server %s", s.svrAddr)
  51. s.processor(c)
  52. }
  53. func (s *TRPClient) Close() {
  54. s.tunnel.Close()
  55. s.stop <- true
  56. for _, v := range s.linkMap {
  57. if v.Conn != nil {
  58. v.Conn.Close()
  59. }
  60. }
  61. }
  62. //处理
  63. func (s *TRPClient) processor(c *conn.Conn) {
  64. go s.dealChan()
  65. go s.getMsgStatus()
  66. for {
  67. flags, err := c.ReadFlag()
  68. if err != nil {
  69. lg.Printf("Accept server data error %s, end this service", err.Error())
  70. break
  71. }
  72. switch flags {
  73. case common.VERIFY_EER:
  74. lg.Fatalf("VKey:%s is incorrect, the server refuses to connect, please check", s.vKey)
  75. case common.NEW_CONN:
  76. if link, err := c.GetLinkInfo(); err != nil {
  77. break
  78. } else {
  79. s.Lock()
  80. s.linkMap[link.Id] = link
  81. s.Unlock()
  82. link.MsgConn = s.msgTunnel
  83. go s.linkProcess(link, c)
  84. link.Run(false)
  85. }
  86. case common.RES_CLOSE:
  87. lg.Fatalln("The authentication key is connected by another client or the server closes the client.")
  88. case common.RES_MSG:
  89. lg.Println("Server-side return error")
  90. break
  91. default:
  92. lg.Println("The error could not be resolved")
  93. break
  94. }
  95. }
  96. c.Close()
  97. s.Close()
  98. }
  99. func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
  100. link.Host = common.FormatAddress(link.Host)
  101. //与目标建立连接
  102. server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
  103. if err != nil {
  104. c.WriteFail(link.Id)
  105. lg.Println("connect to ", link.Host, "error:", err)
  106. return
  107. }
  108. c.WriteSuccess(link.Id)
  109. link.Conn = conn.NewConn(server)
  110. buf := pool.BufPoolCopy.Get().([]byte)
  111. for {
  112. if n, err := server.Read(buf); err != nil {
  113. s.tunnel.SendMsg([]byte(common.IO_EOF), link)
  114. break
  115. } else {
  116. if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil {
  117. c.Close()
  118. break
  119. }
  120. if link.ConnType == common.CONN_UDP {
  121. break
  122. }
  123. }
  124. <-link.StatusCh
  125. }
  126. pool.PutBufPoolCopy(buf)
  127. s.Lock()
  128. delete(s.linkMap, link.Id)
  129. s.Unlock()
  130. }
  131. func (s *TRPClient) getMsgStatus() {
  132. var err error
  133. s.msgTunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_SEND_STATUS, s.proxyUrl)
  134. if err != nil {
  135. lg.Println("connect to ", s.svrAddr, "error:", err)
  136. return
  137. }
  138. go func() {
  139. for {
  140. if id, err := s.msgTunnel.GetLen(); err != nil {
  141. break
  142. } else {
  143. s.Lock()
  144. if v, ok := s.linkMap[id]; ok {
  145. s.Unlock()
  146. v.StatusCh <- true
  147. } else {
  148. s.Unlock()
  149. }
  150. }
  151. }
  152. }()
  153. <-s.stop
  154. }
  155. //隧道模式处理
  156. func (s *TRPClient) dealChan() {
  157. var err error
  158. s.tunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
  159. if err != nil {
  160. lg.Println("connect to ", s.svrAddr, "error:", err)
  161. return
  162. }
  163. go func() {
  164. for {
  165. if id, err := s.tunnel.GetLen(); err != nil {
  166. lg.Println("get id error", err, id)
  167. break
  168. } else {
  169. s.Lock()
  170. if v, ok := s.linkMap[id]; ok {
  171. s.Unlock()
  172. if content, err := s.tunnel.GetMsgContent(v); err != nil {
  173. pool.PutBufPoolCopy(content)
  174. break
  175. } else {
  176. v.MsgCh <- content
  177. }
  178. } else {
  179. s.Unlock()
  180. }
  181. }
  182. }
  183. }()
  184. <-s.stop
  185. }
  186. var errAdd = errors.New("The server returned an error, which port or host may have been occupied or not allowed to open.")
  187. func StartFromFile(path string) {
  188. first := true
  189. cnf, err := config.NewConfig(path)
  190. if err != nil {
  191. lg.Fatalln(err)
  192. }
  193. lg.Printf("Loading configuration file %s successfully", path)
  194. re:
  195. if first || cnf.CommonConfig.AutoReconnection {
  196. if !first {
  197. lg.Println("Reconnecting...")
  198. time.Sleep(time.Second * 5)
  199. }
  200. } else {
  201. return
  202. }
  203. first = false
  204. c, err := NewConn(cnf.CommonConfig.Tp, cnf.CommonConfig.VKey, cnf.CommonConfig.Server, common.WORK_CONFIG, cnf.CommonConfig.ProxyUrl)
  205. if err != nil {
  206. lg.Println(err)
  207. goto re
  208. }
  209. if _, err := c.SendConfigInfo(cnf.CommonConfig.Cnf); err != nil {
  210. lg.Println(err)
  211. goto re
  212. }
  213. var b []byte
  214. if b, err = c.ReadLen(16); err != nil {
  215. lg.Println(err)
  216. goto re
  217. } else {
  218. ioutil.WriteFile(filepath.Join(common.GetTmpPath(), "npc_vkey.txt"), []byte(string(b)), 0600)
  219. }
  220. if !c.GetAddStatus() {
  221. lg.Println(errAdd)
  222. goto re
  223. }
  224. for _, v := range cnf.Hosts {
  225. if _, err := c.SendHostInfo(v); err != nil {
  226. lg.Println(err)
  227. goto re
  228. }
  229. if !c.GetAddStatus() {
  230. lg.Println(errAdd, v.Host)
  231. goto re
  232. }
  233. }
  234. for _, v := range cnf.Tasks {
  235. if _, err := c.SendTaskInfo(v); err != nil {
  236. lg.Println(err)
  237. goto re
  238. }
  239. if !c.GetAddStatus() {
  240. lg.Println(errAdd, v.Ports)
  241. goto re
  242. }
  243. }
  244. c.Close()
  245. NewRPClient(cnf.CommonConfig.Server, string(b), cnf.CommonConfig.Tp, cnf.CommonConfig.ProxyUrl).Start()
  246. goto re
  247. }
  248. //Create a new connection with the server and verify it
  249. func NewConn(tp string, vkey string, server string, connType string, proxyUrl string) (*conn.Conn, error) {
  250. var err error
  251. var connection net.Conn
  252. var sess *kcp.UDPSession
  253. if tp == "tcp" {
  254. if proxyUrl != "" {
  255. u, er := url.Parse(proxyUrl)
  256. if er != nil {
  257. return nil, er
  258. }
  259. n, er := proxy.FromURL(u, nil)
  260. if er != nil {
  261. return nil, er
  262. }
  263. connection, err = n.Dial("tcp", server)
  264. } else {
  265. connection, err = net.Dial("tcp", server)
  266. }
  267. } else {
  268. sess, err = kcp.DialWithOptions(server, nil, 10, 3)
  269. conn.SetUdpSession(sess)
  270. connection = sess
  271. }
  272. if err != nil {
  273. return nil, err
  274. }
  275. c := conn.NewConn(connection)
  276. if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil {
  277. lg.Println(err)
  278. }
  279. if s, err := c.ReadFlag(); err != nil {
  280. lg.Println(err)
  281. } else if s == common.VERIFY_EER {
  282. lg.Fatalf("Validation key %s incorrect", vkey)
  283. }
  284. if _, err := c.Write([]byte(connType)); err != nil {
  285. lg.Println(err)
  286. }
  287. c.SetAlive(tp)
  288. return c, nil
  289. }