paket.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package enet
  2. import (
  3. "bytes"
  4. "ehang.io/nps/lib/common"
  5. "ehang.io/nps/lib/pool"
  6. "github.com/pkg/errors"
  7. "net"
  8. "sync/atomic"
  9. "time"
  10. )
  11. var (
  12. _ net.PacketConn = (*TcpPacketConn)(nil)
  13. _ PacketConn = (*ReaderPacketConn)(nil)
  14. )
  15. type PacketConn interface {
  16. net.PacketConn
  17. SendPacket([]byte, net.Addr) error
  18. FirstPacket() ([]byte, net.Addr, error)
  19. }
  20. var udpBp = pool.NewBufferPool(1500)
  21. // TcpPacketConn is an implement of net.PacketConn by net.Conn
  22. type TcpPacketConn struct {
  23. udpBp []byte
  24. net.Conn
  25. }
  26. // NewTcpPacketConn return a *TcpPacketConn
  27. func NewTcpPacketConn(conn net.Conn) *TcpPacketConn {
  28. return &TcpPacketConn{Conn: conn}
  29. }
  30. // ReadFrom is a implement of net.PacketConn ReadFrom
  31. func (tp *TcpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  32. b := udpBp.Get()
  33. defer udpBp.Put(b)
  34. n, err = common.ReadLenBytes(tp.Conn, b)
  35. if err != nil {
  36. return
  37. }
  38. rAddr, err := common.ReadAddr(bytes.NewReader(b[:n]))
  39. if err != nil {
  40. return
  41. }
  42. n = copy(p, b[len(rAddr):n])
  43. addr, err = net.ResolveUDPAddr("udp", rAddr.String())
  44. return
  45. }
  46. // WriteTo is a implement of net.PacketConn WriteTo
  47. func (tp *TcpPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
  48. var pAddr common.Addr
  49. pAddr, err = common.ParseAddr(addr.String())
  50. if err != nil {
  51. return
  52. }
  53. return common.WriteLenBytes(tp.Conn, append(pAddr, p...))
  54. }
  55. // ReaderPacketConn is an implementation of net.PacketConn
  56. type ReaderPacketConn struct {
  57. ch chan *packet
  58. closeCh chan struct{}
  59. closed int32
  60. nowNum int32
  61. addr net.Addr
  62. writePacketConn net.PacketConn
  63. readTimer *time.Timer
  64. firstPacket []byte
  65. }
  66. type packet struct {
  67. b []byte
  68. addr net.Addr
  69. }
  70. // NewReaderPacketConn returns an initialized PacketConn
  71. func NewReaderPacketConn(writePacketConn net.PacketConn, firstPacket []byte, addr net.Addr) *ReaderPacketConn {
  72. return &ReaderPacketConn{
  73. ch: make(chan *packet, 10),
  74. closeCh: make(chan struct{}),
  75. addr: addr,
  76. writePacketConn: writePacketConn,
  77. readTimer: time.NewTimer(time.Hour * 24 * 3650),
  78. firstPacket: firstPacket,
  79. }
  80. }
  81. func (pc *ReaderPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  82. var pt *packet
  83. select {
  84. case pt = <-pc.ch:
  85. case <-pc.readTimer.C:
  86. }
  87. if pt == nil {
  88. return 0, nil, errors.New("the PacketConn is already closed")
  89. }
  90. copy(p, pt.b)
  91. return len(pt.b), pt.addr, nil
  92. }
  93. func (pc *ReaderPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
  94. return pc.writePacketConn.WriteTo(p, addr)
  95. }
  96. // LocalAddr returns the listener's address
  97. func (pc *ReaderPacketConn) LocalAddr() net.Addr {
  98. return pc.addr
  99. }
  100. func (pc *ReaderPacketConn) SetDeadline(t time.Time) error {
  101. pc.readTimer.Reset(t.Sub(time.Now()))
  102. return pc.writePacketConn.SetWriteDeadline(t)
  103. }
  104. func (pc *ReaderPacketConn) SetReadDeadline(t time.Time) error {
  105. pc.readTimer.Reset(t.Sub(time.Now()))
  106. return nil
  107. }
  108. func (pc *ReaderPacketConn) SetWriteDeadline(t time.Time) error {
  109. return pc.writePacketConn.SetWriteDeadline(t)
  110. }
  111. func (pc *ReaderPacketConn) FirstPacket() ([]byte, net.Addr, error) {
  112. if pc.firstPacket == nil || pc.addr == nil {
  113. return nil, nil, errors.New("not found first packet")
  114. }
  115. return pc.firstPacket, pc.addr, nil
  116. }
  117. // SendPacket is used to add connection to the listener
  118. func (pc *ReaderPacketConn) SendPacket(b []byte, addr net.Addr) error {
  119. if atomic.LoadInt32(&pc.closed) == 1 {
  120. return errors.New("the listener is already closed")
  121. }
  122. atomic.AddInt32(&pc.nowNum, 1)
  123. select {
  124. case pc.ch <- &packet{b: b, addr: addr}:
  125. return nil
  126. case <-pc.closeCh:
  127. case <-pc.readTimer.C:
  128. _ = pc.Close()
  129. }
  130. if atomic.AddInt32(&pc.nowNum, -1) == 0 && atomic.LoadInt32(&pc.closed) == 1 {
  131. close(pc.ch)
  132. }
  133. return errors.New("the packetConn is already closed")
  134. }
  135. // Close is used to close the listener, it will discard all existing connections
  136. func (pc *ReaderPacketConn) Close() error {
  137. if atomic.CompareAndSwapInt32(&pc.closed, 0, 1) {
  138. close(pc.closeCh)
  139. }
  140. return nil
  141. }