client.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package client
  2. import (
  3. "bufio"
  4. "bytes"
  5. "ehang.io/nps-mux"
  6. "net"
  7. "net/http"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/astaxie/beego/logs"
  12. "github.com/xtaci/kcp-go"
  13. "ehang.io/nps/lib/common"
  14. "ehang.io/nps/lib/config"
  15. "ehang.io/nps/lib/conn"
  16. "ehang.io/nps/lib/crypt"
  17. )
  18. type TRPClient struct {
  19. svrAddr string
  20. bridgeConnType string
  21. proxyUrl string
  22. vKey string
  23. p2pAddr map[string]string
  24. tunnel *nps_mux.Mux
  25. signal *conn.Conn
  26. ticker *time.Ticker
  27. cnf *config.Config
  28. disconnectTime int
  29. once sync.Once
  30. }
  31. //new client
  32. func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string, cnf *config.Config, disconnectTime int) *TRPClient {
  33. return &TRPClient{
  34. svrAddr: svraddr,
  35. p2pAddr: make(map[string]string, 0),
  36. vKey: vKey,
  37. bridgeConnType: bridgeConnType,
  38. proxyUrl: proxyUrl,
  39. cnf: cnf,
  40. disconnectTime: disconnectTime,
  41. once: sync.Once{},
  42. }
  43. }
  44. var NowStatus int
  45. var CloseClient bool
  46. //start
  47. func (s *TRPClient) Start() {
  48. CloseClient = false
  49. retry:
  50. if CloseClient {
  51. return
  52. }
  53. NowStatus = 0
  54. c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
  55. if err != nil {
  56. logs.Error("The connection server failed and will be reconnected in five seconds, error", err.Error())
  57. time.Sleep(time.Second * 5)
  58. goto retry
  59. }
  60. if c == nil {
  61. logs.Error("Error data from server, and will be reconnected in five seconds")
  62. time.Sleep(time.Second * 5)
  63. goto retry
  64. }
  65. logs.Info("Successful connection with server %s", s.svrAddr)
  66. //monitor the connection
  67. go s.ping()
  68. s.signal = c
  69. //start a channel connection
  70. go s.newChan()
  71. //start health check if the it's open
  72. if s.cnf != nil && len(s.cnf.Healths) > 0 {
  73. go heathCheck(s.cnf.Healths, s.signal)
  74. }
  75. NowStatus = 1
  76. //msg connection, eg udp
  77. s.handleMain()
  78. }
  79. //handle main connection
  80. func (s *TRPClient) handleMain() {
  81. for {
  82. flags, err := s.signal.ReadFlag()
  83. if err != nil {
  84. logs.Error("Accept server data error %s, end this service", err.Error())
  85. break
  86. }
  87. switch flags {
  88. case common.NEW_UDP_CONN:
  89. //read server udp addr and password
  90. if lAddr, err := s.signal.GetShortLenContent(); err != nil {
  91. logs.Warn(err)
  92. return
  93. } else if pwd, err := s.signal.GetShortLenContent(); err == nil {
  94. var localAddr string
  95. //The local port remains unchanged for a certain period of time
  96. if v, ok := s.p2pAddr[crypt.Md5(string(pwd)+strconv.Itoa(int(time.Now().Unix()/100)))]; !ok {
  97. tmpConn, err := common.GetLocalUdpAddr()
  98. if err != nil {
  99. logs.Error(err)
  100. return
  101. }
  102. localAddr = tmpConn.LocalAddr().String()
  103. } else {
  104. localAddr = v
  105. }
  106. go s.newUdpConn(localAddr, string(lAddr), string(pwd))
  107. }
  108. }
  109. }
  110. s.Close()
  111. }
  112. func (s *TRPClient) newUdpConn(localAddr, rAddr string, md5Password string) {
  113. var localConn net.PacketConn
  114. var err error
  115. var remoteAddress string
  116. if remoteAddress, localConn, err = handleP2PUdp(localAddr, rAddr, md5Password, common.WORK_P2P_PROVIDER); err != nil {
  117. logs.Error(err)
  118. return
  119. }
  120. l, err := kcp.ServeConn(nil, 150, 3, localConn)
  121. if err != nil {
  122. logs.Error(err)
  123. return
  124. }
  125. logs.Trace("start local p2p udp listen, local address", localConn.LocalAddr().String())
  126. for {
  127. udpTunnel, err := l.AcceptKCP()
  128. if err != nil {
  129. logs.Error(err)
  130. l.Close()
  131. return
  132. }
  133. if udpTunnel.RemoteAddr().String() == string(remoteAddress) {
  134. conn.SetUdpSession(udpTunnel)
  135. logs.Trace("successful connection with client ,address %s", udpTunnel.RemoteAddr().String())
  136. //read link info from remote
  137. conn.Accept(nps_mux.NewMux(udpTunnel, s.bridgeConnType, s.disconnectTime), func(c net.Conn) {
  138. go s.handleChan(c)
  139. })
  140. break
  141. }
  142. }
  143. }
  144. //pmux tunnel
  145. func (s *TRPClient) newChan() {
  146. tunnel, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
  147. if err != nil {
  148. logs.Error("connect to ", s.svrAddr, "error:", err)
  149. return
  150. }
  151. s.tunnel = nps_mux.NewMux(tunnel.Conn, s.bridgeConnType, s.disconnectTime)
  152. for {
  153. src, err := s.tunnel.Accept()
  154. if err != nil {
  155. logs.Warn(err)
  156. s.Close()
  157. break
  158. }
  159. go s.handleChan(src)
  160. }
  161. }
  162. func (s *TRPClient) handleChan(src net.Conn) {
  163. lk, err := conn.NewConn(src).GetLinkInfo()
  164. if err != nil || lk == nil {
  165. src.Close()
  166. logs.Error("get connection info from server error ", err)
  167. return
  168. }
  169. //host for target processing
  170. lk.Host = common.FormatAddress(lk.Host)
  171. //if Conn type is http, read the request and log
  172. if lk.ConnType == "http" {
  173. if targetConn, err := net.DialTimeout(common.CONN_TCP, lk.Host, lk.Option.Timeout); err != nil {
  174. logs.Warn("connect to %s error %s", lk.Host, err.Error())
  175. src.Close()
  176. } else {
  177. srcConn := conn.GetConn(src, lk.Crypt, lk.Compress, nil, false)
  178. go func() {
  179. common.CopyBuffer(srcConn, targetConn)
  180. srcConn.Close()
  181. targetConn.Close()
  182. }()
  183. for {
  184. if r, err := http.ReadRequest(bufio.NewReader(srcConn)); err != nil {
  185. srcConn.Close()
  186. targetConn.Close()
  187. break
  188. } else {
  189. logs.Trace("http request, method %s, host %s, url %s, remote address %s", r.Method, r.Host, r.URL.Path, r.RemoteAddr)
  190. r.Write(targetConn)
  191. }
  192. }
  193. }
  194. return
  195. }
  196. if lk.ConnType == "udp5" {
  197. logs.Trace("new %s connection with the goal of %s, remote address:%s", lk.ConnType, lk.Host, lk.RemoteAddr)
  198. s.handleUdp(src)
  199. }
  200. //connect to target if conn type is tcp or udp
  201. if targetConn, err := net.DialTimeout(lk.ConnType, lk.Host, lk.Option.Timeout); err != nil {
  202. logs.Warn("connect to %s error %s", lk.Host, err.Error())
  203. src.Close()
  204. } else {
  205. logs.Trace("new %s connection with the goal of %s, remote address:%s", lk.ConnType, lk.Host, lk.RemoteAddr)
  206. conn.CopyWaitGroup(src, targetConn, lk.Crypt, lk.Compress, nil, nil, false, nil)
  207. }
  208. }
  209. func (s *TRPClient) handleUdp(serverConn net.Conn) {
  210. // bind a local udp port
  211. local, err := net.ListenUDP("udp", nil)
  212. defer serverConn.Close()
  213. if err != nil {
  214. logs.Error("bind local udp port error ", err.Error())
  215. return
  216. }
  217. defer local.Close()
  218. go func() {
  219. defer serverConn.Close()
  220. b := common.BufPoolUdp.Get().([]byte)
  221. defer common.BufPoolUdp.Put(b)
  222. for {
  223. n, raddr, err := local.ReadFrom(b)
  224. if err != nil {
  225. logs.Error("read data from remote server error", err.Error())
  226. }
  227. buf := bytes.Buffer{}
  228. dgram := common.NewUDPDatagram(common.NewUDPHeader(0, 0, common.ToSocksAddr(raddr)), b[:n])
  229. dgram.Write(&buf)
  230. b, err := conn.GetLenBytes(buf.Bytes())
  231. if err != nil {
  232. logs.Warn("get len bytes error", err.Error())
  233. continue
  234. }
  235. if _, err := serverConn.Write(b); err != nil {
  236. logs.Error("write data to remote error", err.Error())
  237. return
  238. }
  239. }
  240. }()
  241. b := common.BufPoolUdp.Get().([]byte)
  242. defer common.BufPoolUdp.Put(b)
  243. for {
  244. n, err := serverConn.Read(b)
  245. if err != nil {
  246. logs.Error("read udp data from server error ", err.Error())
  247. return
  248. }
  249. udpData, err := common.ReadUDPDatagram(bytes.NewReader(b[:n]))
  250. if err != nil {
  251. logs.Error("unpack data error", err.Error())
  252. return
  253. }
  254. raddr, err := net.ResolveUDPAddr("udp", udpData.Header.Addr.String())
  255. if err != nil {
  256. logs.Error("build remote addr err", err.Error())
  257. continue // drop silently
  258. }
  259. _, err = local.WriteTo(udpData.Data, raddr)
  260. if err != nil {
  261. logs.Error("write data to remote ", raddr.String(), "error", err.Error())
  262. return
  263. }
  264. }
  265. }
  266. // Whether the monitor channel is closed
  267. func (s *TRPClient) ping() {
  268. s.ticker = time.NewTicker(time.Second * 5)
  269. loop:
  270. for {
  271. select {
  272. case <-s.ticker.C:
  273. if s.tunnel != nil && s.tunnel.IsClose {
  274. s.Close()
  275. break loop
  276. }
  277. }
  278. }
  279. }
  280. func (s *TRPClient) Close() {
  281. s.once.Do(s.closing)
  282. }
  283. func (s *TRPClient) closing() {
  284. CloseClient = true
  285. NowStatus = 0
  286. if s.tunnel != nil {
  287. _ = s.tunnel.Close()
  288. }
  289. if s.signal != nil {
  290. _ = s.signal.Close()
  291. }
  292. if s.ticker != nil {
  293. s.ticker.Stop()
  294. }
  295. }