pool.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package goroutine
  2. import (
  3. "ehang.io/nps/lib/common"
  4. "ehang.io/nps/lib/file"
  5. "github.com/panjf2000/ants/v2"
  6. "io"
  7. "net"
  8. "sync"
  9. )
  10. type connGroup struct {
  11. src io.ReadWriteCloser
  12. dst io.ReadWriteCloser
  13. wg *sync.WaitGroup
  14. n *int64
  15. }
  16. func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup {
  17. return connGroup{
  18. src: src,
  19. dst: dst,
  20. wg: wg,
  21. n: n,
  22. }
  23. }
  24. func copyConnGroup(group interface{}) {
  25. cg, ok := group.(connGroup)
  26. if !ok {
  27. return
  28. }
  29. var err error
  30. *cg.n, err = common.CopyBuffer(cg.dst, cg.src)
  31. if err != nil {
  32. cg.src.Close()
  33. cg.dst.Close()
  34. //logs.Warn("close npc by copy from nps", err, c.connId)
  35. }
  36. cg.wg.Done()
  37. }
  38. type Conns struct {
  39. conn1 io.ReadWriteCloser // mux connection
  40. conn2 net.Conn // outside connection
  41. flow *file.Flow
  42. wg *sync.WaitGroup
  43. }
  44. func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow, wg *sync.WaitGroup) Conns {
  45. return Conns{
  46. conn1: c1,
  47. conn2: c2,
  48. flow: flow,
  49. wg: wg,
  50. }
  51. }
  52. func copyConns(group interface{}) {
  53. conns := group.(Conns)
  54. wg := new(sync.WaitGroup)
  55. wg.Add(2)
  56. var in, out int64
  57. _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in))
  58. // outside to mux : incoming
  59. _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out))
  60. // mux to outside : outgoing
  61. wg.Wait()
  62. if conns.flow != nil {
  63. conns.flow.Add(in, out)
  64. }
  65. conns.wg.Done()
  66. }
  67. var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false))
  68. var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false))