conn.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/pool"
  5. "io"
  6. "net"
  7. "time"
  8. )
  9. type conn struct {
  10. net.Conn
  11. readMsgCh chan []byte
  12. getStatusCh chan struct{}
  13. connStatusOkCh chan struct{}
  14. connStatusFailCh chan struct{}
  15. readTimeOut time.Time
  16. writeTimeOut time.Time
  17. sendMsgCh chan *msg //mux
  18. sendStatusCh chan int32 //mux
  19. connId int32
  20. isClose bool
  21. mux *Mux
  22. }
  23. type msg struct {
  24. connId int32
  25. content []byte
  26. }
  27. func NewMsg(connId int32, content []byte) *msg {
  28. return &msg{
  29. connId: connId,
  30. content: content,
  31. }
  32. }
  33. func NewConn(connId int32, mux *Mux, sendMsgCh chan *msg, sendStatusCh chan int32) *conn {
  34. return &conn{
  35. readMsgCh: make(chan []byte),
  36. getStatusCh: make(chan struct{}),
  37. connStatusOkCh: make(chan struct{}),
  38. connStatusFailCh: make(chan struct{}),
  39. readTimeOut: time.Time{},
  40. writeTimeOut: time.Time{},
  41. sendMsgCh: sendMsgCh,
  42. sendStatusCh: sendStatusCh,
  43. connId: connId,
  44. isClose: false,
  45. mux: mux,
  46. }
  47. }
  48. func (s *conn) Read(buf []byte) (int, error) {
  49. if s.isClose {
  50. return 0, errors.New("the conn has closed")
  51. }
  52. var b []byte
  53. if t := s.readTimeOut.Sub(time.Now()); t > 0 {
  54. timer := time.NewTimer(t)
  55. select {
  56. case <-timer.C:
  57. s.Close()
  58. return 0, errors.New("read timeout")
  59. case b = <-s.readMsgCh:
  60. }
  61. } else {
  62. b = <-s.readMsgCh
  63. }
  64. defer pool.PutBufPoolCopy(b)
  65. if s.isClose {
  66. return 0, io.EOF
  67. }
  68. s.sendStatusCh <- s.connId
  69. return copy(buf, b), nil
  70. }
  71. func (s *conn) Write(buf []byte) (int, error) {
  72. if s.isClose {
  73. return 0, errors.New("the conn has closed")
  74. }
  75. if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
  76. timer := time.NewTimer(t)
  77. select {
  78. case <-timer.C:
  79. s.Close()
  80. return 0, errors.New("write timeout")
  81. case s.sendMsgCh <- NewMsg(s.connId, buf):
  82. }
  83. } else {
  84. s.sendMsgCh <- NewMsg(s.connId, buf)
  85. }
  86. if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
  87. timer := time.NewTimer(t)
  88. select {
  89. case <-timer.C:
  90. s.Close()
  91. return 0, errors.New("write timeout")
  92. case <-s.getStatusCh:
  93. }
  94. } else {
  95. <-s.getStatusCh
  96. }
  97. if s.isClose {
  98. return 0, io.EOF
  99. }
  100. return len(buf), nil
  101. }
  102. func (s *conn) Close() error {
  103. if s.isClose {
  104. return errors.New("the conn has closed")
  105. }
  106. s.isClose = true
  107. close(s.getStatusCh)
  108. close(s.readMsgCh)
  109. close(s.connStatusOkCh)
  110. close(s.connStatusFailCh)
  111. s.sendMsgCh <- NewMsg(s.connId, nil)
  112. return nil
  113. }
  114. func (s *conn) LocalAddr() net.Addr {
  115. return s.mux.conn.LocalAddr()
  116. }
  117. func (s *conn) RemoteAddr() net.Addr {
  118. return s.mux.conn.RemoteAddr()
  119. }
  120. func (s *conn) SetDeadline(t time.Time) error {
  121. s.readTimeOut = t
  122. s.writeTimeOut = t
  123. return nil
  124. }
  125. func (s *conn) SetReadDeadline(t time.Time) error {
  126. s.readTimeOut = t
  127. return nil
  128. }
  129. func (s *conn) SetWriteDeadline(t time.Time) error {
  130. s.writeTimeOut = t
  131. return nil
  132. }