tcp.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package server
  2. import (
  3. "ehang.io/nps/core/handler"
  4. "ehang.io/nps/lib/enet"
  5. "ehang.io/nps/lib/logger"
  6. "ehang.io/nps/lib/pool"
  7. "github.com/panjf2000/ants/v2"
  8. "go.uber.org/zap"
  9. "io"
  10. "net"
  11. )
  12. var bp = pool.NewBufferPool(1500)
  13. type TcpServer struct {
  14. BaseServer
  15. ServerAddr string `json:"server_addr" required:"true" placeholder:"0.0.0.0:8080 or :8080" zh_name:"监听地址"`
  16. listener net.Listener
  17. gp *ants.PoolWithFunc
  18. }
  19. func (cm *TcpServer) GetServerAddr() string {
  20. if cm.listener == nil {
  21. return cm.ServerAddr
  22. }
  23. return cm.listener.Addr().String()
  24. }
  25. func (cm *TcpServer) Init() error {
  26. var err error
  27. cm.handlers = make(map[string]handler.Handler, 0)
  28. if err = cm.listen(); err != nil {
  29. return err
  30. }
  31. cm.gp, err = ants.NewPoolWithFunc(1000000, func(i interface{}) {
  32. rc := enet.NewReaderConn(i.(net.Conn))
  33. buf := bp.Get()
  34. defer bp.Put(buf)
  35. if _, err := io.ReadAtLeast(rc, buf, 3); err != nil {
  36. logger.Warn("read handle type fom connection failed", zap.String("remote addr", rc.RemoteAddr().String()))
  37. _ = rc.Close()
  38. return
  39. }
  40. logger.Debug("read handle type", zap.Uint8("type 1", buf[0]), zap.Uint8("type 2", buf[1]),
  41. zap.Uint8("type 3", buf[2]), zap.String("remote addr", rc.RemoteAddr().String()))
  42. for _, h := range cm.handlers {
  43. err = rc.Reset(0)
  44. if err != nil {
  45. logger.Warn("reset connection error", zap.Error(err), zap.String("remote addr", rc.RemoteAddr().String()))
  46. _ = rc.Close()
  47. return
  48. }
  49. ok, err := h.HandleConn(buf, rc)
  50. if err != nil {
  51. logger.Warn("handle connection error", zap.Error(err), zap.String("remote addr", rc.RemoteAddr().String()))
  52. return
  53. }
  54. if ok {
  55. logger.Debug("handle connection success", zap.String("remote addr", rc.RemoteAddr().String()))
  56. return
  57. }
  58. }
  59. })
  60. return nil
  61. }
  62. func (cm *TcpServer) GetName() string {
  63. return "tcp"
  64. }
  65. func (cm *TcpServer) GetZhName() string {
  66. return "tcp服务"
  67. }
  68. // create a listener accept user and npc
  69. func (cm *TcpServer) listen() error {
  70. var err error
  71. cm.listener, err = net.Listen("tcp", cm.ServerAddr)
  72. if err != nil {
  73. return err
  74. }
  75. return nil
  76. }
  77. func (cm *TcpServer) Serve() {
  78. for {
  79. c, err := cm.listener.Accept()
  80. if err != nil {
  81. logger.Error("accept enet error", zap.Error(err))
  82. break
  83. }
  84. _ = cm.gp.Invoke(c)
  85. }
  86. }