conn.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package conn
  2. import (
  3. "bufio"
  4. "bytes"
  5. "ehang.io/nps/lib/goroutine"
  6. "encoding/binary"
  7. "encoding/json"
  8. "errors"
  9. "github.com/astaxie/beego/logs"
  10. "io"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "ehang.io/nps/lib/common"
  19. "ehang.io/nps/lib/crypt"
  20. "ehang.io/nps/lib/file"
  21. "ehang.io/nps/lib/pmux"
  22. "ehang.io/nps/lib/rate"
  23. "github.com/xtaci/kcp-go"
  24. )
  25. type Conn struct {
  26. Conn net.Conn
  27. Rb []byte
  28. }
  29. //new conn
  30. func NewConn(conn net.Conn) *Conn {
  31. return &Conn{Conn: conn}
  32. }
  33. func (s *Conn) readRequest(buf []byte) (n int, err error) {
  34. var rd int
  35. for {
  36. rd, err = s.Read(buf[n:])
  37. if err != nil {
  38. return
  39. }
  40. n += rd
  41. if n < 4 {
  42. continue
  43. }
  44. if string(buf[n-4:n]) == "\r\n\r\n" {
  45. return
  46. }
  47. // buf is full, can't contain the request
  48. if n == cap(buf) {
  49. err = io.ErrUnexpectedEOF
  50. return
  51. }
  52. }
  53. }
  54. //get host 、connection type、method...from connection
  55. func (s *Conn) GetHost() (method, address string, rb []byte, err error, r *http.Request) {
  56. var b [32 * 1024]byte
  57. var n int
  58. if n, err = s.readRequest(b[:]); err != nil {
  59. return
  60. }
  61. rb = b[:n]
  62. r, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(rb)))
  63. if err != nil {
  64. return
  65. }
  66. hostPortURL, err := url.Parse(r.Host)
  67. if err != nil {
  68. address = r.Host
  69. err = nil
  70. return
  71. }
  72. if hostPortURL.Opaque == "443" {
  73. if strings.Index(r.Host, ":") == -1 {
  74. address = r.Host + ":443"
  75. } else {
  76. address = r.Host
  77. }
  78. } else {
  79. if strings.Index(r.Host, ":") == -1 {
  80. address = r.Host + ":80"
  81. } else {
  82. address = r.Host
  83. }
  84. }
  85. return
  86. }
  87. func (s *Conn) GetShortLenContent() (b []byte, err error) {
  88. var l int
  89. if l, err = s.GetLen(); err != nil {
  90. return
  91. }
  92. if l < 0 || l > 32<<10 {
  93. err = errors.New("read length error")
  94. return
  95. }
  96. return s.GetShortContent(l)
  97. }
  98. func (s *Conn) GetShortContent(l int) (b []byte, err error) {
  99. buf := make([]byte, l)
  100. return buf, binary.Read(s, binary.LittleEndian, &buf)
  101. }
  102. //读取指定长度内容
  103. func (s *Conn) ReadLen(cLen int, buf []byte) (int, error) {
  104. if cLen > len(buf) || cLen <= 0 {
  105. return 0, errors.New("长度错误" + strconv.Itoa(cLen))
  106. }
  107. if n, err := io.ReadFull(s, buf[:cLen]); err != nil || n != cLen {
  108. return n, errors.New("Error reading specified length " + err.Error())
  109. }
  110. return cLen, nil
  111. }
  112. func (s *Conn) GetLen() (int, error) {
  113. var l int32
  114. err := binary.Read(s, binary.LittleEndian, &l)
  115. return int(l), err
  116. }
  117. func (s *Conn) WriteLenContent(buf []byte) (err error) {
  118. var b []byte
  119. if b, err = GetLenBytes(buf); err != nil {
  120. return
  121. }
  122. return binary.Write(s.Conn, binary.LittleEndian, b)
  123. }
  124. //read flag
  125. func (s *Conn) ReadFlag() (string, error) {
  126. buf := make([]byte, 4)
  127. return string(buf), binary.Read(s, binary.LittleEndian, &buf)
  128. }
  129. //set alive
  130. func (s *Conn) SetAlive(tp string) {
  131. switch s.Conn.(type) {
  132. case *kcp.UDPSession:
  133. s.Conn.(*kcp.UDPSession).SetReadDeadline(time.Time{})
  134. case *net.TCPConn:
  135. conn := s.Conn.(*net.TCPConn)
  136. conn.SetReadDeadline(time.Time{})
  137. //conn.SetKeepAlive(false)
  138. //conn.SetKeepAlivePeriod(time.Duration(2 * time.Second))
  139. case *pmux.PortConn:
  140. s.Conn.(*pmux.PortConn).SetReadDeadline(time.Time{})
  141. }
  142. }
  143. //set read deadline
  144. func (s *Conn) SetReadDeadlineBySecond(t time.Duration) {
  145. switch s.Conn.(type) {
  146. case *kcp.UDPSession:
  147. s.Conn.(*kcp.UDPSession).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second))
  148. case *net.TCPConn:
  149. s.Conn.(*net.TCPConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second))
  150. case *pmux.PortConn:
  151. s.Conn.(*pmux.PortConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second))
  152. }
  153. }
  154. //get link info from conn
  155. func (s *Conn) GetLinkInfo() (lk *Link, err error) {
  156. err = s.getInfo(&lk)
  157. return
  158. }
  159. //send info for link
  160. func (s *Conn) SendHealthInfo(info, status string) (int, error) {
  161. raw := bytes.NewBuffer([]byte{})
  162. common.BinaryWrite(raw, info, status)
  163. return s.Write(raw.Bytes())
  164. }
  165. //get health info from conn
  166. func (s *Conn) GetHealthInfo() (info string, status bool, err error) {
  167. var l int
  168. buf := common.BufPoolMax.Get().([]byte)
  169. defer common.PutBufPoolMax(buf)
  170. if l, err = s.GetLen(); err != nil {
  171. return
  172. } else if _, err = s.ReadLen(l, buf); err != nil {
  173. return
  174. } else {
  175. arr := strings.Split(string(buf[:l]), common.CONN_DATA_SEQ)
  176. if len(arr) >= 2 {
  177. return arr[0], common.GetBoolByStr(arr[1]), nil
  178. }
  179. }
  180. return "", false, errors.New("receive health info error")
  181. }
  182. //get task info
  183. func (s *Conn) GetHostInfo() (h *file.Host, err error) {
  184. err = s.getInfo(&h)
  185. h.Id = int(file.GetDb().JsonDb.GetHostId())
  186. h.Flow = new(file.Flow)
  187. h.NoStore = true
  188. return
  189. }
  190. //get task info
  191. func (s *Conn) GetConfigInfo() (c *file.Client, err error) {
  192. err = s.getInfo(&c)
  193. c.NoStore = true
  194. c.Status = true
  195. if c.Flow == nil {
  196. c.Flow = new(file.Flow)
  197. }
  198. c.NoDisplay = false
  199. return
  200. }
  201. //get task info
  202. func (s *Conn) GetTaskInfo() (t *file.Tunnel, err error) {
  203. err = s.getInfo(&t)
  204. t.Id = int(file.GetDb().JsonDb.GetTaskId())
  205. t.NoStore = true
  206. t.Flow = new(file.Flow)
  207. return
  208. }
  209. //send info
  210. func (s *Conn) SendInfo(t interface{}, flag string) (int, error) {
  211. /*
  212. The task info is formed as follows:
  213. +----+-----+---------+
  214. |type| len | content |
  215. +----+---------------+
  216. | 4 | 4 | ... |
  217. +----+---------------+
  218. */
  219. raw := bytes.NewBuffer([]byte{})
  220. if flag != "" {
  221. binary.Write(raw, binary.LittleEndian, []byte(flag))
  222. }
  223. b, err := json.Marshal(t)
  224. if err != nil {
  225. return 0, err
  226. }
  227. lenBytes, err := GetLenBytes(b)
  228. if err != nil {
  229. return 0, err
  230. }
  231. binary.Write(raw, binary.LittleEndian, lenBytes)
  232. return s.Write(raw.Bytes())
  233. }
  234. //get task info
  235. func (s *Conn) getInfo(t interface{}) (err error) {
  236. var l int
  237. buf := common.BufPoolMax.Get().([]byte)
  238. defer common.PutBufPoolMax(buf)
  239. if l, err = s.GetLen(); err != nil {
  240. return
  241. } else if _, err = s.ReadLen(l, buf); err != nil {
  242. return
  243. } else {
  244. json.Unmarshal(buf[:l], &t)
  245. }
  246. return
  247. }
  248. //close
  249. func (s *Conn) Close() error {
  250. return s.Conn.Close()
  251. }
  252. //write
  253. func (s *Conn) Write(b []byte) (int, error) {
  254. return s.Conn.Write(b)
  255. }
  256. //read
  257. func (s *Conn) Read(b []byte) (n int, err error) {
  258. if s.Rb != nil {
  259. //if the rb is not nil ,read rb first
  260. if len(s.Rb) > 0 {
  261. n = copy(b, s.Rb)
  262. s.Rb = s.Rb[n:]
  263. return
  264. }
  265. s.Rb = nil
  266. }
  267. return s.Conn.Read(b)
  268. }
  269. //write sign flag
  270. func (s *Conn) WriteClose() (int, error) {
  271. return s.Write([]byte(common.RES_CLOSE))
  272. }
  273. //write main
  274. func (s *Conn) WriteMain() (int, error) {
  275. return s.Write([]byte(common.WORK_MAIN))
  276. }
  277. //write main
  278. func (s *Conn) WriteConfig() (int, error) {
  279. return s.Write([]byte(common.WORK_CONFIG))
  280. }
  281. //write chan
  282. func (s *Conn) WriteChan() (int, error) {
  283. return s.Write([]byte(common.WORK_CHAN))
  284. }
  285. //get task or host result of add
  286. func (s *Conn) GetAddStatus() (b bool) {
  287. binary.Read(s.Conn, binary.LittleEndian, &b)
  288. return
  289. }
  290. func (s *Conn) WriteAddOk() error {
  291. return binary.Write(s.Conn, binary.LittleEndian, true)
  292. }
  293. func (s *Conn) WriteAddFail() error {
  294. defer s.Close()
  295. return binary.Write(s.Conn, binary.LittleEndian, false)
  296. }
  297. func (s *Conn) LocalAddr() net.Addr {
  298. return s.Conn.LocalAddr()
  299. }
  300. func (s *Conn) RemoteAddr() net.Addr {
  301. return s.Conn.RemoteAddr()
  302. }
  303. func (s *Conn) SetDeadline(t time.Time) error {
  304. return s.Conn.SetDeadline(t)
  305. }
  306. func (s *Conn) SetWriteDeadline(t time.Time) error {
  307. return s.Conn.SetWriteDeadline(t)
  308. }
  309. func (s *Conn) SetReadDeadline(t time.Time) error {
  310. return s.Conn.SetReadDeadline(t)
  311. }
  312. //get the assembled amount data(len 4 and content)
  313. func GetLenBytes(buf []byte) (b []byte, err error) {
  314. raw := bytes.NewBuffer([]byte{})
  315. if err = binary.Write(raw, binary.LittleEndian, int32(len(buf))); err != nil {
  316. return
  317. }
  318. if err = binary.Write(raw, binary.LittleEndian, buf); err != nil {
  319. return
  320. }
  321. b = raw.Bytes()
  322. return
  323. }
  324. //udp connection setting
  325. func SetUdpSession(sess *kcp.UDPSession) {
  326. sess.SetStreamMode(true)
  327. sess.SetWindowSize(1024, 1024)
  328. sess.SetReadBuffer(64 * 1024)
  329. sess.SetWriteBuffer(64 * 1024)
  330. sess.SetNoDelay(1, 10, 2, 1)
  331. sess.SetMtu(1600)
  332. sess.SetACKNoDelay(true)
  333. sess.SetWriteDelay(false)
  334. }
  335. //conn1 mux conn
  336. func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) {
  337. //var in, out int64
  338. //var wg sync.WaitGroup
  339. connHandle := GetConn(conn1, crypt, snappy, rate, isServer)
  340. if rb != nil {
  341. connHandle.Write(rb)
  342. }
  343. //go func(in *int64) {
  344. // wg.Add(1)
  345. // *in, _ = common.CopyBuffer(connHandle, conn2)
  346. // connHandle.Close()
  347. // conn2.Close()
  348. // wg.Done()
  349. //}(&in)
  350. //out, _ = common.CopyBuffer(conn2, connHandle)
  351. //connHandle.Close()
  352. //conn2.Close()
  353. //wg.Wait()
  354. //if flow != nil {
  355. // flow.Add(in, out)
  356. //}
  357. wg := new(sync.WaitGroup)
  358. wg.Add(1)
  359. err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow, wg))
  360. wg.Wait()
  361. if err != nil {
  362. logs.Error(err)
  363. }
  364. }
  365. //get crypt or snappy conn
  366. func GetConn(conn net.Conn, cpt, snappy bool, rt *rate.Rate, isServer bool) io.ReadWriteCloser {
  367. if cpt {
  368. if isServer {
  369. return rate.NewRateConn(crypt.NewTlsServerConn(conn), rt)
  370. }
  371. return rate.NewRateConn(crypt.NewTlsClientConn(conn), rt)
  372. } else if snappy {
  373. return rate.NewRateConn(NewSnappyConn(conn), rt)
  374. }
  375. return rate.NewRateConn(conn, rt)
  376. }
  377. type LenConn struct {
  378. conn io.Writer
  379. Len int
  380. }
  381. func NewLenConn(conn io.Writer) *LenConn {
  382. return &LenConn{conn: conn}
  383. }
  384. func (c *LenConn) Write(p []byte) (n int, err error) {
  385. n, err = c.conn.Write(p)
  386. c.Len += n
  387. return
  388. }