goroutine.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package pool
  2. import (
  3. "ehang.io/nps/lib/common"
  4. "github.com/panjf2000/ants/v2"
  5. "io"
  6. "net"
  7. "sync"
  8. )
  9. var connBp = NewBufferPool(MaxReadSize)
  10. var packetBp = NewBufferPool(1500)
  11. const MaxReadSize = 32 * 1024
  12. var CopyConnGoroutinePool *ants.PoolWithFunc
  13. var CopyPacketGoroutinePool *ants.PoolWithFunc
  14. type CopyConnGpParams struct {
  15. Writer io.Writer
  16. Reader io.Reader
  17. Wg *sync.WaitGroup
  18. }
  19. type CopyPacketGpParams struct {
  20. RPacket net.PacketConn
  21. WPacket net.PacketConn
  22. Wg *sync.WaitGroup
  23. }
  24. func init() {
  25. var err error
  26. CopyConnGoroutinePool, err = ants.NewPoolWithFunc(1000000, func(i interface{}) {
  27. gpp, ok := i.(*CopyConnGpParams)
  28. if !ok {
  29. return
  30. }
  31. buf := connBp.Get()
  32. _, _ = common.CopyBuffer(gpp.Writer, gpp.Reader, buf)
  33. connBp.Put(buf)
  34. gpp.Wg.Done()
  35. if v, ok := gpp.Reader.(*net.TCPConn); ok {
  36. _ = v.CloseWrite()
  37. }
  38. if v, ok := gpp.Writer.(*net.TCPConn); ok {
  39. _ = v.CloseRead()
  40. }
  41. })
  42. if err != nil {
  43. panic(err)
  44. }
  45. CopyPacketGoroutinePool, err = ants.NewPoolWithFunc(1000000, func(i interface{}) {
  46. cpp, ok := i.(*CopyPacketGpParams)
  47. if !ok {
  48. return
  49. }
  50. buf := connBp.Get()
  51. for {
  52. n, addr, err := cpp.RPacket.ReadFrom(buf)
  53. if err != nil {
  54. break
  55. }
  56. _, err = cpp.WPacket.WriteTo(buf[:n], addr)
  57. if err != nil {
  58. break
  59. }
  60. }
  61. connBp.Put(buf)
  62. _ = cpp.RPacket.Close()
  63. _ = cpp.WPacket.Close()
  64. cpp.Wg.Done()
  65. })
  66. if err != nil {
  67. panic(err)
  68. }
  69. }