conn.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. package conn
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "encoding/json"
  7. "errors"
  8. "github.com/astaxie/beego/logs"
  9. "github.com/cnlh/nps/lib/goroutine"
  10. "io"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "github.com/cnlh/nps/lib/common"
  18. "github.com/cnlh/nps/lib/crypt"
  19. "github.com/cnlh/nps/lib/file"
  20. "github.com/cnlh/nps/lib/mux"
  21. "github.com/cnlh/nps/lib/rate"
  22. "github.com/xtaci/kcp-go"
  23. )
  24. type Conn struct {
  25. Conn net.Conn
  26. Rb []byte
  27. }
  28. //new conn
  29. func NewConn(conn net.Conn) *Conn {
  30. return &Conn{Conn: conn}
  31. }
  32. //get host 、connection type、method...from connection
  33. func (s *Conn) GetHost() (method, address string, rb []byte, err error, r *http.Request) {
  34. var b [32 * 1024]byte
  35. var n int
  36. if n, err = s.Read(b[:]); err != nil {
  37. return
  38. }
  39. rb = b[:n]
  40. r, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(rb)))
  41. if err != nil {
  42. return
  43. }
  44. hostPortURL, err := url.Parse(r.Host)
  45. if err != nil {
  46. address = r.Host
  47. err = nil
  48. return
  49. }
  50. if hostPortURL.Opaque == "443" {
  51. if strings.Index(r.Host, ":") == -1 {
  52. address = r.Host + ":443"
  53. } else {
  54. address = r.Host
  55. }
  56. } else {
  57. if strings.Index(r.Host, ":") == -1 {
  58. address = r.Host + ":80"
  59. } else {
  60. address = r.Host
  61. }
  62. }
  63. return
  64. }
  65. func (s *Conn) GetShortLenContent() (b []byte, err error) {
  66. var l int
  67. if l, err = s.GetLen(); err != nil {
  68. return
  69. }
  70. if l < 0 || l > 32<<10 {
  71. err = errors.New("read length error")
  72. return
  73. }
  74. return s.GetShortContent(l)
  75. }
  76. func (s *Conn) GetShortContent(l int) (b []byte, err error) {
  77. buf := make([]byte, l)
  78. return buf, binary.Read(s, binary.LittleEndian, &buf)
  79. }
  80. //读取指定长度内容
  81. func (s *Conn) ReadLen(cLen int, buf []byte) (int, error) {
  82. if cLen > len(buf) || cLen <= 0 {
  83. return 0, errors.New("长度错误" + strconv.Itoa(cLen))
  84. }
  85. if n, err := io.ReadFull(s, buf[:cLen]); err != nil || n != cLen {
  86. return n, errors.New("Error reading specified length " + err.Error())
  87. }
  88. return cLen, nil
  89. }
  90. func (s *Conn) GetLen() (int, error) {
  91. var l int32
  92. err := binary.Read(s, binary.LittleEndian, &l)
  93. return int(l), err
  94. }
  95. func (s *Conn) WriteLenContent(buf []byte) (err error) {
  96. var b []byte
  97. if b, err = GetLenBytes(buf); err != nil {
  98. return
  99. }
  100. return binary.Write(s.Conn, binary.LittleEndian, b)
  101. }
  102. //read flag
  103. func (s *Conn) ReadFlag() (string, error) {
  104. buf := make([]byte, 4)
  105. return string(buf), binary.Read(s, binary.LittleEndian, &buf)
  106. }
  107. //set alive
  108. func (s *Conn) SetAlive(tp string) {
  109. switch s.Conn.(type) {
  110. case *kcp.UDPSession:
  111. s.Conn.(*kcp.UDPSession).SetReadDeadline(time.Time{})
  112. case *net.TCPConn:
  113. conn := s.Conn.(*net.TCPConn)
  114. conn.SetReadDeadline(time.Time{})
  115. //conn.SetKeepAlive(false)
  116. //conn.SetKeepAlivePeriod(time.Duration(2 * time.Second))
  117. case *mux.PortConn:
  118. s.Conn.(*mux.PortConn).SetReadDeadline(time.Time{})
  119. }
  120. }
  121. //set read deadline
  122. func (s *Conn) SetReadDeadlineBySecond(t time.Duration) {
  123. switch s.Conn.(type) {
  124. case *kcp.UDPSession:
  125. s.Conn.(*kcp.UDPSession).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second))
  126. case *net.TCPConn:
  127. s.Conn.(*net.TCPConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second))
  128. case *mux.PortConn:
  129. s.Conn.(*mux.PortConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second))
  130. }
  131. }
  132. //get link info from conn
  133. func (s *Conn) GetLinkInfo() (lk *Link, err error) {
  134. err = s.getInfo(&lk)
  135. return
  136. }
  137. //send info for link
  138. func (s *Conn) SendHealthInfo(info, status string) (int, error) {
  139. raw := bytes.NewBuffer([]byte{})
  140. common.BinaryWrite(raw, info, status)
  141. return s.Write(raw.Bytes())
  142. }
  143. //get health info from conn
  144. func (s *Conn) GetHealthInfo() (info string, status bool, err error) {
  145. var l int
  146. buf := common.BufPoolMax.Get().([]byte)
  147. defer common.PutBufPoolMax(buf)
  148. if l, err = s.GetLen(); err != nil {
  149. return
  150. } else if _, err = s.ReadLen(l, buf); err != nil {
  151. return
  152. } else {
  153. arr := strings.Split(string(buf[:l]), common.CONN_DATA_SEQ)
  154. if len(arr) >= 2 {
  155. return arr[0], common.GetBoolByStr(arr[1]), nil
  156. }
  157. }
  158. return "", false, errors.New("receive health info error")
  159. }
  160. //get task info
  161. func (s *Conn) GetHostInfo() (h *file.Host, err error) {
  162. err = s.getInfo(&h)
  163. h.Id = int(file.GetDb().JsonDb.GetHostId())
  164. h.Flow = new(file.Flow)
  165. h.NoStore = true
  166. return
  167. }
  168. //get task info
  169. func (s *Conn) GetConfigInfo() (c *file.Client, err error) {
  170. err = s.getInfo(&c)
  171. c.NoStore = true
  172. c.Status = true
  173. if c.Flow == nil {
  174. c.Flow = new(file.Flow)
  175. }
  176. c.NoDisplay = false
  177. return
  178. }
  179. //get task info
  180. func (s *Conn) GetTaskInfo() (t *file.Tunnel, err error) {
  181. err = s.getInfo(&t)
  182. t.Id = int(file.GetDb().JsonDb.GetTaskId())
  183. t.NoStore = true
  184. t.Flow = new(file.Flow)
  185. return
  186. }
  187. //send info
  188. func (s *Conn) SendInfo(t interface{}, flag string) (int, error) {
  189. /*
  190. The task info is formed as follows:
  191. +----+-----+---------+
  192. |type| len | content |
  193. +----+---------------+
  194. | 4 | 4 | ... |
  195. +----+---------------+
  196. */
  197. raw := bytes.NewBuffer([]byte{})
  198. if flag != "" {
  199. binary.Write(raw, binary.LittleEndian, []byte(flag))
  200. }
  201. b, err := json.Marshal(t)
  202. if err != nil {
  203. return 0, err
  204. }
  205. lenBytes, err := GetLenBytes(b)
  206. if err != nil {
  207. return 0, err
  208. }
  209. binary.Write(raw, binary.LittleEndian, lenBytes)
  210. return s.Write(raw.Bytes())
  211. }
  212. //get task info
  213. func (s *Conn) getInfo(t interface{}) (err error) {
  214. var l int
  215. buf := common.BufPoolMax.Get().([]byte)
  216. defer common.PutBufPoolMax(buf)
  217. if l, err = s.GetLen(); err != nil {
  218. return
  219. } else if _, err = s.ReadLen(l, buf); err != nil {
  220. return
  221. } else {
  222. json.Unmarshal(buf[:l], &t)
  223. }
  224. return
  225. }
  226. //close
  227. func (s *Conn) Close() error {
  228. return s.Conn.Close()
  229. }
  230. //write
  231. func (s *Conn) Write(b []byte) (int, error) {
  232. return s.Conn.Write(b)
  233. }
  234. //read
  235. func (s *Conn) Read(b []byte) (n int, err error) {
  236. if s.Rb != nil {
  237. //if the rb is not nil ,read rb first
  238. if len(s.Rb) > 0 {
  239. n = copy(b, s.Rb)
  240. s.Rb = s.Rb[n:]
  241. return
  242. }
  243. s.Rb = nil
  244. }
  245. return s.Conn.Read(b)
  246. }
  247. //write sign flag
  248. func (s *Conn) WriteClose() (int, error) {
  249. return s.Write([]byte(common.RES_CLOSE))
  250. }
  251. //write main
  252. func (s *Conn) WriteMain() (int, error) {
  253. return s.Write([]byte(common.WORK_MAIN))
  254. }
  255. //write main
  256. func (s *Conn) WriteConfig() (int, error) {
  257. return s.Write([]byte(common.WORK_CONFIG))
  258. }
  259. //write chan
  260. func (s *Conn) WriteChan() (int, error) {
  261. return s.Write([]byte(common.WORK_CHAN))
  262. }
  263. //get task or host result of add
  264. func (s *Conn) GetAddStatus() (b bool) {
  265. binary.Read(s.Conn, binary.LittleEndian, &b)
  266. return
  267. }
  268. func (s *Conn) WriteAddOk() error {
  269. return binary.Write(s.Conn, binary.LittleEndian, true)
  270. }
  271. func (s *Conn) WriteAddFail() error {
  272. defer s.Close()
  273. return binary.Write(s.Conn, binary.LittleEndian, false)
  274. }
  275. func (s *Conn) LocalAddr() net.Addr {
  276. return s.Conn.LocalAddr()
  277. }
  278. func (s *Conn) RemoteAddr() net.Addr {
  279. return s.Conn.RemoteAddr()
  280. }
  281. func (s *Conn) SetDeadline(t time.Time) error {
  282. return s.Conn.SetDeadline(t)
  283. }
  284. func (s *Conn) SetWriteDeadline(t time.Time) error {
  285. return s.Conn.SetWriteDeadline(t)
  286. }
  287. func (s *Conn) SetReadDeadline(t time.Time) error {
  288. return s.Conn.SetReadDeadline(t)
  289. }
  290. //get the assembled amount data(len 4 and content)
  291. func GetLenBytes(buf []byte) (b []byte, err error) {
  292. raw := bytes.NewBuffer([]byte{})
  293. if err = binary.Write(raw, binary.LittleEndian, int32(len(buf))); err != nil {
  294. return
  295. }
  296. if err = binary.Write(raw, binary.LittleEndian, buf); err != nil {
  297. return
  298. }
  299. b = raw.Bytes()
  300. return
  301. }
  302. //udp connection setting
  303. func SetUdpSession(sess *kcp.UDPSession) {
  304. sess.SetStreamMode(true)
  305. sess.SetWindowSize(1024, 1024)
  306. sess.SetReadBuffer(64 * 1024)
  307. sess.SetWriteBuffer(64 * 1024)
  308. sess.SetNoDelay(1, 10, 2, 1)
  309. sess.SetMtu(1600)
  310. sess.SetACKNoDelay(true)
  311. sess.SetWriteDelay(false)
  312. }
  313. //conn1 mux conn
  314. func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) {
  315. //var in, out int64
  316. //var wg sync.WaitGroup
  317. connHandle := GetConn(conn1, crypt, snappy, rate, isServer)
  318. if rb != nil {
  319. connHandle.Write(rb)
  320. }
  321. //go func(in *int64) {
  322. // wg.Add(1)
  323. // *in, _ = common.CopyBuffer(connHandle, conn2)
  324. // connHandle.Close()
  325. // conn2.Close()
  326. // wg.Done()
  327. //}(&in)
  328. //out, _ = common.CopyBuffer(conn2, connHandle)
  329. //connHandle.Close()
  330. //conn2.Close()
  331. //wg.Wait()
  332. //if flow != nil {
  333. // flow.Add(in, out)
  334. //}
  335. err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow))
  336. if err != nil {
  337. logs.Error(err)
  338. }
  339. }
  340. //get crypt or snappy conn
  341. func GetConn(conn net.Conn, cpt, snappy bool, rt *rate.Rate, isServer bool) io.ReadWriteCloser {
  342. if cpt {
  343. if isServer {
  344. return rate.NewRateConn(crypt.NewTlsServerConn(conn), rt)
  345. }
  346. return rate.NewRateConn(crypt.NewTlsClientConn(conn), rt)
  347. } else if snappy {
  348. return rate.NewRateConn(NewSnappyConn(conn), rt)
  349. }
  350. return rate.NewRateConn(conn, rt)
  351. }
  352. type LenConn struct {
  353. conn io.Writer
  354. Len int
  355. }
  356. func NewLenConn(conn io.Writer) *LenConn {
  357. return &LenConn{conn: conn}
  358. }
  359. func (c *LenConn) Write(p []byte) (n int, err error) {
  360. n, err = c.conn.Write(p)
  361. c.Len += n
  362. return
  363. }