conn.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/pool"
  5. "io"
  6. "net"
  7. "time"
  8. )
  9. type conn struct {
  10. net.Conn
  11. getStatusCh chan struct{}
  12. connStatusOkCh chan struct{}
  13. connStatusFailCh chan struct{}
  14. readTimeOut time.Time
  15. writeTimeOut time.Time
  16. sendMsgCh chan *msg //mux
  17. sendStatusCh chan int32 //mux
  18. readBuffer []byte
  19. startRead int //now read position
  20. endRead int //now end read
  21. readFlag bool
  22. readCh chan struct{}
  23. connId int32
  24. isClose bool
  25. readWait bool
  26. mux *Mux
  27. }
  28. type msg struct {
  29. connId int32
  30. content []byte
  31. }
  32. func NewMsg(connId int32, content []byte) *msg {
  33. return &msg{
  34. connId: connId,
  35. content: content,
  36. }
  37. }
  38. func NewConn(connId int32, mux *Mux, sendMsgCh chan *msg, sendStatusCh chan int32) *conn {
  39. return &conn{
  40. readCh: make(chan struct{}),
  41. readBuffer: pool.BufPoolCopy.Get().([]byte),
  42. getStatusCh: make(chan struct{}),
  43. connStatusOkCh: make(chan struct{}),
  44. connStatusFailCh: make(chan struct{}),
  45. readTimeOut: time.Time{},
  46. writeTimeOut: time.Time{},
  47. sendMsgCh: sendMsgCh,
  48. sendStatusCh: sendStatusCh,
  49. connId: connId,
  50. isClose: false,
  51. mux: mux,
  52. }
  53. }
  54. func (s *conn) Read(buf []byte) (n int, err error) {
  55. if s.isClose {
  56. return 0, errors.New("the conn has closed")
  57. }
  58. if s.endRead-s.startRead == 0 {
  59. s.readWait = true
  60. if t := s.readTimeOut.Sub(time.Now()); t > 0 {
  61. timer := time.NewTimer(t)
  62. select {
  63. case <-timer.C:
  64. s.readWait = false
  65. return 0, errors.New("read timeout")
  66. case <-s.readCh:
  67. }
  68. } else {
  69. <-s.readCh
  70. }
  71. }
  72. s.readWait = false
  73. if s.isClose {
  74. return 0, io.EOF
  75. }
  76. if len(buf) < s.endRead-s.startRead {
  77. n = copy(buf, s.readBuffer[s.startRead:s.startRead+len(buf)])
  78. s.startRead += n
  79. } else {
  80. n = copy(buf, s.readBuffer[s.startRead:s.endRead])
  81. s.startRead = 0
  82. s.endRead = 0
  83. s.sendStatusCh <- s.connId
  84. }
  85. return
  86. }
  87. func (s *conn) Write(buf []byte) (int, error) {
  88. if s.isClose {
  89. return 0, errors.New("the conn has closed")
  90. }
  91. ch := make(chan struct{})
  92. go s.write(buf, ch)
  93. if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
  94. timer := time.NewTimer(t)
  95. select {
  96. case <-timer.C:
  97. return 0, errors.New("write timeout")
  98. case <-ch:
  99. }
  100. } else {
  101. <-ch
  102. }
  103. if s.isClose {
  104. return 0, io.EOF
  105. }
  106. return len(buf), nil
  107. }
  108. func (s *conn) write(buf []byte, ch chan struct{}) {
  109. start := 0
  110. l := len(buf)
  111. for {
  112. if l-start > pool.PoolSizeCopy {
  113. s.sendMsgCh <- NewMsg(s.connId, buf[start:start+pool.PoolSizeCopy])
  114. start += pool.PoolSizeCopy
  115. <-s.getStatusCh
  116. } else {
  117. s.sendMsgCh <- NewMsg(s.connId, buf[start:l])
  118. <-s.getStatusCh
  119. break
  120. }
  121. }
  122. ch <- struct{}{}
  123. }
  124. func (s *conn) Close() error {
  125. if s.isClose {
  126. return errors.New("the conn has closed")
  127. }
  128. s.isClose = true
  129. pool.PutBufPoolCopy(s.readBuffer)
  130. close(s.getStatusCh)
  131. close(s.connStatusOkCh)
  132. close(s.connStatusFailCh)
  133. close(s.readCh)
  134. if !s.mux.isClose {
  135. s.sendMsgCh <- NewMsg(s.connId, nil)
  136. }
  137. return nil
  138. }
  139. func (s *conn) LocalAddr() net.Addr {
  140. return s.mux.conn.LocalAddr()
  141. }
  142. func (s *conn) RemoteAddr() net.Addr {
  143. return s.mux.conn.RemoteAddr()
  144. }
  145. func (s *conn) SetDeadline(t time.Time) error {
  146. s.readTimeOut = t
  147. s.writeTimeOut = t
  148. return nil
  149. }
  150. func (s *conn) SetReadDeadline(t time.Time) error {
  151. s.readTimeOut = t
  152. return nil
  153. }
  154. func (s *conn) SetWriteDeadline(t time.Time) error {
  155. s.writeTimeOut = t
  156. return nil
  157. }