pool.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package goroutine
  2. import (
  3. "ehang.io/nps/lib/common"
  4. "ehang.io/nps/lib/file"
  5. "github.com/panjf2000/ants/v2"
  6. "io"
  7. "net"
  8. "sync"
  9. )
  10. type connGroup struct {
  11. src io.ReadWriteCloser
  12. dst io.ReadWriteCloser
  13. wg *sync.WaitGroup
  14. n *int64
  15. }
  16. func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup {
  17. return connGroup{
  18. src: src,
  19. dst: dst,
  20. wg: wg,
  21. n: n,
  22. }
  23. }
  24. func copyConnGroup(group interface{}) {
  25. cg, ok := group.(connGroup)
  26. if !ok {
  27. return
  28. }
  29. var err error
  30. *cg.n, err = common.CopyBuffer(cg.dst, cg.src)
  31. if err != nil {
  32. cg.src.Close()
  33. cg.dst.Close()
  34. //logs.Warn("close npc by copy from nps", err, c.connId)
  35. }
  36. cg.wg.Done()
  37. }
  38. type Conns struct {
  39. conn1 io.ReadWriteCloser // mux connection
  40. conn2 net.Conn // outside connection
  41. flow *file.Flow
  42. }
  43. func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns {
  44. return Conns{
  45. conn1: c1,
  46. conn2: c2,
  47. flow: flow,
  48. }
  49. }
  50. func copyConns(group interface{}) {
  51. conns := group.(Conns)
  52. wg := new(sync.WaitGroup)
  53. wg.Add(2)
  54. var in, out int64
  55. _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in))
  56. // outside to mux : incoming
  57. _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out))
  58. // mux to outside : outgoing
  59. wg.Wait()
  60. if conns.flow != nil {
  61. conns.flow.Add(in, out)
  62. }
  63. }
  64. var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false))
  65. var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false))