conn.go 7.5 KB


  1. package utils
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "errors"
  7. "github.com/golang/snappy"
  8. "io"
  9. "log"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. const cryptKey = "1234567812345678"
  18. type CryptConn struct {
  19. conn net.Conn
  20. crypt bool
  21. }
  22. func NewCryptConn(conn net.Conn, crypt bool) *CryptConn {
  23. c := new(CryptConn)
  24. c.conn = conn
  25. c.crypt = crypt
  26. return c
  27. }
  28. //加密写
  29. func (s *CryptConn) Write(b []byte) (n int, err error) {
  30. n = len(b)
  31. if s.crypt {
  32. if b, err = AesEncrypt(b, []byte(cryptKey)); err != nil {
  33. return
  34. }
  35. }
  36. if b, err = GetLenBytes(b); err != nil {
  37. return
  38. }
  39. _, err = s.conn.Write(b)
  40. return
  41. }
  42. //解密读
  43. func (s *CryptConn) Read(b []byte) (n int, err error) {
  44. defer func() {
  45. if err == nil && n == len(IO_EOF) && string(b[:n]) == IO_EOF {
  46. err = io.EOF
  47. n = 0
  48. }
  49. }()
  50. var lens int
  51. var buf []byte
  52. var rb []byte
  53. c := NewConn(s.conn)
  54. if lens, err = c.GetLen(); err != nil {
  55. return
  56. }
  57. if buf, err = c.ReadLen(lens); err != nil {
  58. return
  59. }
  60. if s.crypt {
  61. if rb, err = AesDecrypt(buf, []byte(cryptKey)); err != nil {
  62. return
  63. }
  64. } else {
  65. rb = buf
  66. }
  67. copy(b, rb)
  68. n = len(rb)
  69. return
  70. }
  71. type SnappyConn struct {
  72. w *snappy.Writer
  73. r *snappy.Reader
  74. crypt bool
  75. }
  76. func NewSnappyConn(conn net.Conn, crypt bool) *SnappyConn {
  77. c := new(SnappyConn)
  78. c.w = snappy.NewBufferedWriter(conn)
  79. c.r = snappy.NewReader(conn)
  80. c.crypt = crypt
  81. return c
  82. }
  83. //snappy压缩写 包含加密
  84. func (s *SnappyConn) Write(b []byte) (n int, err error) {
  85. n = len(b)
  86. if s.crypt {
  87. if b, err = AesEncrypt(b, []byte(cryptKey)); err != nil {
  88. log.Println("encode crypt error:", err)
  89. return
  90. }
  91. }
  92. if _, err = s.w.Write(b); err != nil {
  93. return
  94. }
  95. err = s.w.Flush()
  96. return
  97. }
  98. //snappy压缩读 包含解密
  99. func (s *SnappyConn) Read(b []byte) (n int, err error) {
  100. defer func() {
  101. if err == nil && n == len(IO_EOF) && string(b[:n]) == IO_EOF {
  102. err = io.EOF
  103. n = 0
  104. }
  105. }()
  106. buf := bufPool.Get().([]byte)
  107. defer bufPool.Put(buf)
  108. if n, err = s.r.Read(buf); err != nil {
  109. return
  110. }
  111. var bs []byte
  112. if s.crypt {
  113. if bs, err = AesDecrypt(buf[:n], []byte(cryptKey)); err != nil {
  114. log.Println("decode crypt error:", err)
  115. return
  116. }
  117. } else {
  118. bs = buf[:n]
  119. }
  120. n = len(bs)
  121. copy(b, bs)
  122. return
  123. }
  124. type Conn struct {
  125. Conn net.Conn
  126. }
  127. //new conn
  128. func NewConn(conn net.Conn) *Conn {
  129. c := new(Conn)
  130. c.Conn = conn
  131. return c
  132. }
  133. //读取指定长度内容
  134. func (s *Conn) ReadLen(cLen int) ([]byte, error) {
  135. if cLen > poolSize {
  136. return nil, errors.New("长度错误" + strconv.Itoa(cLen))
  137. }
  138. var buf []byte
  139. if cLen <= poolSizeSmall {
  140. buf = bufPoolSmall.Get().([]byte)[:cLen]
  141. defer bufPoolSmall.Put(buf)
  142. } else {
  143. buf = bufPool.Get().([]byte)[:cLen]
  144. defer bufPool.Put(buf)
  145. }
  146. if n, err := io.ReadFull(s, buf); err != nil || n != cLen {
  147. return buf, errors.New("读取指定长度错误" + err.Error())
  148. }
  149. return buf, nil
  150. }
  151. //获取长度
  152. func (s *Conn) GetLen() (int, error) {
  153. val, err := s.ReadLen(4)
  154. if err != nil {
  155. return 0, err
  156. }
  157. return GetLenByBytes(val)
  158. }
  159. //写入长度+内容 粘包
  160. func (s *Conn) WriteLen(buf []byte) (int, error) {
  161. var b []byte
  162. var err error
  163. if b, err = GetLenBytes(buf); err != nil {
  164. return 0, err
  165. }
  166. return s.Write(b)
  167. }
  168. //读取flag
  169. func (s *Conn) ReadFlag() (string, error) {
  170. val, err := s.ReadLen(4)
  171. if err != nil {
  172. return "", err
  173. }
  174. return string(val), err
  175. }
  176. //读取host 连接地址 压缩类型
  177. func (s *Conn) GetHostFromConn() (typeStr string, host string, en, de int, crypt, mux bool, err error) {
  178. retry:
  179. lType, err := s.ReadLen(3)
  180. if err != nil {
  181. return
  182. }
  183. if typeStr = string(lType); typeStr == TEST_FLAG {
  184. en, de, crypt, mux = s.GetConnInfoFromConn()
  185. goto retry
  186. } else if typeStr != CONN_TCP && typeStr != CONN_UDP {
  187. err = errors.New("unknown conn type")
  188. return
  189. }
  190. cLen, err := s.GetLen()
  191. if err != nil || cLen > poolSize {
  192. return
  193. }
  194. hostByte, err := s.ReadLen(cLen)
  195. if err != nil {
  196. return
  197. }
  198. host = string(hostByte)
  199. return
  200. }
  201. //写连接类型 和 host地址
  202. func (s *Conn) WriteHost(ltype string, host string) (int, error) {
  203. raw := bytes.NewBuffer([]byte{})
  204. binary.Write(raw, binary.LittleEndian, []byte(ltype))
  205. binary.Write(raw, binary.LittleEndian, int32(len([]byte(host))))
  206. binary.Write(raw, binary.LittleEndian, []byte(host))
  207. return s.Write(raw.Bytes())
  208. }
  209. //设置连接为长连接
  210. func (s *Conn) SetAlive() {
  211. conn := s.Conn.(*net.TCPConn)
  212. conn.SetReadDeadline(time.Time{})
  213. conn.SetKeepAlive(true)
  214. conn.SetKeepAlivePeriod(time.Duration(2 * time.Second))
  215. }
  216. //从tcp报文中解析出host,连接类型等 TODO 多种情况
  217. func (s *Conn) GetHost() (method, address string, rb []byte, err error, r *http.Request) {
  218. var b [32 * 1024]byte
  219. var n int
  220. if n, err = s.Read(b[:]); err != nil {
  221. return
  222. }
  223. rb = b[:n]
  224. r, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(rb)))
  225. if err != nil {
  226. return
  227. }
  228. hostPortURL, err := url.Parse(r.Host)
  229. if err != nil {
  230. address = r.Host
  231. err = nil
  232. return
  233. }
  234. if hostPortURL.Opaque == "443" { //https访问
  235. address = r.Host + ":443"
  236. } else { //http访问
  237. if strings.Index(hostPortURL.Host, ":") == -1 { //host不带端口, 默认80
  238. address = r.Host + ":80"
  239. } else {
  240. address = r.Host
  241. }
  242. }
  243. return
  244. }
  245. //单独读(加密|压缩)
  246. func (s *Conn) ReadFrom(b []byte, compress int, crypt bool) (int, error) {
  247. if COMPRESS_SNAPY_DECODE == compress {
  248. return NewSnappyConn(s.Conn, crypt).Read(b)
  249. }
  250. return NewCryptConn(s.Conn, crypt).Read(b)
  251. }
  252. //单独写(加密|压缩)
  253. func (s *Conn) WriteTo(b []byte, compress int, crypt bool) (n int, err error) {
  254. if COMPRESS_SNAPY_ENCODE == compress {
  255. return NewSnappyConn(s.Conn, crypt).Write(b)
  256. }
  257. return NewCryptConn(s.Conn, crypt).Write(b)
  258. }
  259. //写压缩方式,加密
  260. func (s *Conn) WriteConnInfo(en, de int, crypt, mux bool) {
  261. s.Write([]byte(strconv.Itoa(en) + strconv.Itoa(de) + GetStrByBool(crypt) + GetStrByBool(mux)))
  262. }
  263. //获取压缩方式,是否加密
  264. func (s *Conn) GetConnInfoFromConn() (en, de int, crypt, mux bool) {
  265. buf, err := s.ReadLen(4)
  266. if err != nil {
  267. return
  268. }
  269. en, _ = strconv.Atoi(string(buf[0]))
  270. de, _ = strconv.Atoi(string(buf[1]))
  271. crypt = GetBoolByStr(string(buf[2]))
  272. mux = GetBoolByStr(string(buf[3]))
  273. return
  274. }
  275. //close
  276. func (s *Conn) Close() error {
  277. return s.Conn.Close()
  278. }
  279. //write
  280. func (s *Conn) Write(b []byte) (int, error) {
  281. return s.Conn.Write(b)
  282. }
  283. //read
  284. func (s *Conn) Read(b []byte) (int, error) {
  285. return s.Conn.Read(b)
  286. }
  287. //write error
  288. func (s *Conn) WriteError() (int, error) {
  289. return s.Write([]byte(RES_MSG))
  290. }
  291. //write sign flag
  292. func (s *Conn) WriteSign() (int, error) {
  293. return s.Write([]byte(RES_SIGN))
  294. }
  295. //write main
  296. func (s *Conn) WriteMain() (int, error) {
  297. return s.Write([]byte(WORK_MAIN))
  298. }
  299. //write chan
  300. func (s *Conn) WriteChan() (int, error) {
  301. return s.Write([]byte(WORK_CHAN))
  302. }
  303. //write test
  304. func (s *Conn) WriteTest() (int, error) {
  305. return s.Write([]byte(TEST_FLAG))
  306. }
  307. //write test
  308. func (s *Conn) WriteSuccess() (int, error) {
  309. return s.Write([]byte(CONN_SUCCESS))
  310. }
  311. //write test
  312. func (s *Conn) WriteFail() (int, error) {
  313. return s.Write([]byte(CONN_ERROR))
  314. }
  315. //获取长度+内容
  316. func GetLenBytes(buf []byte) (b []byte, err error) {
  317. raw := bytes.NewBuffer([]byte{})
  318. if err = binary.Write(raw, binary.LittleEndian, int32(len(buf))); err != nil {
  319. return
  320. }
  321. if err = binary.Write(raw, binary.LittleEndian, buf); err != nil {
  322. return
  323. }
  324. b = raw.Bytes()
  325. return
  326. }
  327. //解析出长度
  328. func GetLenByBytes(buf []byte) (int, error) {
  329. nlen := binary.LittleEndian.Uint32(buf)
  330. if nlen <= 0 {
  331. return 0, errors.New("数据长度错误")
  332. }
  333. return int(nlen), nil
  334. }