mux.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package mux
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "math"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/cnlh/nps/lib/pool"
  12. )
  13. const (
  14. MUX_PING_FLAG int32 = iota
  15. MUX_NEW_CONN_OK
  16. MUX_NEW_CONN_Fail
  17. MUX_NEW_MSG
  18. MUX_MSG_SEND_OK
  19. MUX_NEW_CONN
  20. MUX_PING
  21. MUX_CONN_CLOSE
  22. MUX_PING_RETURN
  23. )
  24. type Mux struct {
  25. net.Listener
  26. conn net.Conn
  27. connMap *connMap
  28. newConnCh chan *conn
  29. id int32
  30. closeChan chan struct{}
  31. IsClose bool
  32. pingOk int
  33. connType string
  34. sync.Mutex
  35. }
  36. func NewMux(c net.Conn, connType string) *Mux {
  37. m := &Mux{
  38. conn: c,
  39. connMap: NewConnMap(),
  40. id: 0,
  41. closeChan: make(chan struct{}),
  42. newConnCh: make(chan *conn),
  43. IsClose: false,
  44. connType: connType,
  45. }
  46. //read session by flag
  47. go m.readSession()
  48. //ping
  49. go m.ping()
  50. return m
  51. }
  52. func (s *Mux) NewConn() (*conn, error) {
  53. if s.IsClose {
  54. return nil, errors.New("the mux has closed")
  55. }
  56. conn := NewConn(s.getId(), s)
  57. //it must be set before send
  58. s.connMap.Set(conn.connId, conn)
  59. if err := s.sendInfo(MUX_NEW_CONN, conn.connId, nil); err != nil {
  60. return nil, err
  61. }
  62. //set a timer timeout 30 second
  63. timer := time.NewTimer(time.Minute * 2)
  64. defer timer.Stop()
  65. select {
  66. case <-conn.connStatusOkCh:
  67. return conn, nil
  68. case <-conn.connStatusFailCh:
  69. case <-timer.C:
  70. }
  71. return nil, errors.New("create connection fail,the server refused the connection")
  72. }
  73. func (s *Mux) Accept() (net.Conn, error) {
  74. if s.IsClose {
  75. return nil, errors.New("accpet error,the conn has closed")
  76. }
  77. conn := <-s.newConnCh
  78. if conn == nil {
  79. return nil, errors.New("accpet error,the conn has closed")
  80. }
  81. return conn, nil
  82. }
  83. func (s *Mux) Addr() net.Addr {
  84. return s.conn.LocalAddr()
  85. }
  86. func (s *Mux) sendInfo(flag int32, id int32, content []byte) error {
  87. raw := bytes.NewBuffer([]byte{})
  88. binary.Write(raw, binary.LittleEndian, flag)
  89. binary.Write(raw, binary.LittleEndian, id)
  90. if content != nil && len(content) > 0 {
  91. binary.Write(raw, binary.LittleEndian, int32(len(content)))
  92. binary.Write(raw, binary.LittleEndian, content)
  93. }
  94. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  95. s.Close()
  96. return err
  97. }
  98. return nil
  99. }
  100. func (s *Mux) ping() {
  101. go func() {
  102. ticker := time.NewTicker(time.Second * 1)
  103. for {
  104. select {
  105. case <-ticker.C:
  106. }
  107. //Avoid going beyond the scope
  108. if (math.MaxInt32 - s.id) < 10000 {
  109. s.id = 0
  110. }
  111. if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") {
  112. s.Close()
  113. break
  114. }
  115. s.pingOk++
  116. }
  117. }()
  118. select {
  119. case <-s.closeChan:
  120. }
  121. }
  122. func (s *Mux) readSession() {
  123. var buf []byte
  124. go func() {
  125. for {
  126. var flag, i int32
  127. var n int
  128. var err error
  129. if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
  130. if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
  131. break
  132. }
  133. s.pingOk = 0
  134. switch flag {
  135. case MUX_NEW_CONN: //new conn
  136. conn := NewConn(i, s)
  137. s.connMap.Set(i, conn) //it has been set before send ok
  138. s.newConnCh <- conn
  139. s.sendInfo(MUX_NEW_CONN_OK, i, nil)
  140. continue
  141. case MUX_PING_FLAG: //ping
  142. s.sendInfo(MUX_PING_RETURN, MUX_PING, nil)
  143. continue
  144. case MUX_PING_RETURN:
  145. continue
  146. case MUX_NEW_MSG:
  147. buf = pool.GetBufPoolCopy()
  148. if n, err = ReadLenBytes(buf, s.conn); err != nil {
  149. break
  150. }
  151. }
  152. if conn, ok := s.connMap.Get(i); ok && !conn.isClose {
  153. switch flag {
  154. case MUX_NEW_MSG: //new msg from remote conn
  155. //insert wait queue
  156. conn.waitQueue.Push(NewBufNode(buf, n))
  157. //judge len if >xxx ,send stop
  158. if conn.readWait {
  159. conn.readWait = false
  160. conn.readCh <- struct{}{}
  161. }
  162. case MUX_MSG_SEND_OK: //the remote has read
  163. select {
  164. case conn.getStatusCh <- struct{}{}:
  165. default:
  166. }
  167. conn.hasWrite--
  168. case MUX_NEW_CONN_OK: //conn ok
  169. conn.connStatusOkCh <- struct{}{}
  170. case MUX_NEW_CONN_Fail:
  171. conn.connStatusFailCh <- struct{}{}
  172. case MUX_CONN_CLOSE: //close the connection
  173. go conn.Close()
  174. s.connMap.Delete(i)
  175. }
  176. } else if flag == MUX_NEW_MSG {
  177. pool.PutBufPoolCopy(buf)
  178. }
  179. } else {
  180. break
  181. }
  182. }
  183. s.Close()
  184. }()
  185. select {
  186. case <-s.closeChan:
  187. }
  188. }
  189. func (s *Mux) Close() error {
  190. if s.IsClose {
  191. return errors.New("the mux has closed")
  192. }
  193. s.IsClose = true
  194. s.connMap.Close()
  195. select {
  196. case s.closeChan <- struct{}{}:
  197. }
  198. select {
  199. case s.closeChan <- struct{}{}:
  200. }
  201. close(s.newConnCh)
  202. return s.conn.Close()
  203. }
  204. //get new connId as unique flag
  205. func (s *Mux) getId() int32 {
  206. return atomic.AddInt32(&s.id, 1)
  207. }