mux.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package mux
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "github.com/cnlh/nps/lib/pool"
  7. "math"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const (
  14. MUX_PING_FLAG int32 = iota
  15. MUX_NEW_CONN_OK
  16. MUX_NEW_CONN_Fail
  17. MUX_NEW_MSG
  18. MUX_MSG_SEND_OK
  19. MUX_NEW_CONN
  20. MUX_PING
  21. MUX_CONN_CLOSE
  22. MUX_PING_RETURN
  23. MUX_STOP_WRITE
  24. RETRY_TIME = 2 //Heart beat allowed fault tolerance times
  25. )
  26. type Mux struct {
  27. net.Listener
  28. conn net.Conn
  29. connMap *connMap
  30. newConnCh chan *conn
  31. id int32
  32. closeChan chan struct{}
  33. IsClose bool
  34. pingOk int
  35. waitQueueSize int
  36. sync.Mutex
  37. }
  38. func NewMux(c net.Conn) *Mux {
  39. m := &Mux{
  40. conn: c,
  41. connMap: NewConnMap(),
  42. id: 0,
  43. closeChan: make(chan struct{}),
  44. newConnCh: make(chan *conn),
  45. IsClose: false,
  46. waitQueueSize: 10, //TODO :In order to be more efficient, this value can be dynamically generated according to the delay algorithm.
  47. }
  48. //read session by flag
  49. go m.readSession()
  50. //ping
  51. go m.ping()
  52. return m
  53. }
  54. func (s *Mux) NewConn() (*conn, error) {
  55. if s.IsClose {
  56. return nil, errors.New("the mux has closed")
  57. }
  58. conn := NewConn(s.getId(), s)
  59. raw := bytes.NewBuffer([]byte{})
  60. if err := binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN); err != nil {
  61. return nil, err
  62. }
  63. if err := binary.Write(raw, binary.LittleEndian, conn.connId); err != nil {
  64. return nil, err
  65. }
  66. //it must be set before send
  67. s.connMap.Set(conn.connId, conn)
  68. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  69. return nil, err
  70. }
  71. //set a timer timeout 30 second
  72. timer := time.NewTimer(time.Second * 30)
  73. defer timer.Stop()
  74. select {
  75. case <-conn.connStatusOkCh:
  76. return conn, nil
  77. case <-conn.connStatusFailCh:
  78. case <-timer.C:
  79. }
  80. return nil, errors.New("create connection fail,the server refused the connection")
  81. }
  82. func (s *Mux) Accept() (net.Conn, error) {
  83. if s.IsClose {
  84. return nil, errors.New("accpet error,the conn has closed")
  85. }
  86. return <-s.newConnCh, nil
  87. }
  88. func (s *Mux) Addr() net.Addr {
  89. return s.conn.LocalAddr()
  90. }
  91. func (s *Mux) sendInfo(flag int32, id int32, content []byte) error {
  92. raw := bytes.NewBuffer([]byte{})
  93. binary.Write(raw, binary.LittleEndian, flag)
  94. binary.Write(raw, binary.LittleEndian, id)
  95. if content != nil && len(content) > 0 {
  96. binary.Write(raw, binary.LittleEndian, int32(len(content)))
  97. binary.Write(raw, binary.LittleEndian, content)
  98. }
  99. if _, err := s.conn.Write(raw.Bytes()); err != nil || s.pingOk > RETRY_TIME {
  100. s.Close()
  101. return err
  102. }
  103. return nil
  104. }
  105. func (s *Mux) ping() {
  106. go func() {
  107. ticker := time.NewTicker(time.Second * 5)
  108. for {
  109. select {
  110. case <-ticker.C:
  111. }
  112. //Avoid going beyond the scope
  113. if (math.MaxInt32 - s.id) < 10000 {
  114. s.id = 0
  115. }
  116. if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || s.pingOk > RETRY_TIME {
  117. break
  118. }
  119. s.pingOk += 1
  120. }
  121. }()
  122. select {
  123. case <-s.closeChan:
  124. }
  125. }
  126. func (s *Mux) readSession() {
  127. var buf []byte
  128. go func() {
  129. for {
  130. var flag, i int32
  131. var n int
  132. var err error
  133. if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
  134. if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
  135. break
  136. }
  137. switch flag {
  138. case MUX_NEW_CONN: //new conn
  139. conn := NewConn(i, s)
  140. s.connMap.Set(i, conn) //it has been set before send ok
  141. s.newConnCh <- conn
  142. s.sendInfo(MUX_NEW_CONN_OK, i, nil)
  143. continue
  144. case MUX_PING_FLAG: //ping
  145. s.sendInfo(MUX_PING_RETURN, MUX_PING, nil)
  146. continue
  147. case MUX_PING_RETURN:
  148. s.pingOk -= 1
  149. continue
  150. case MUX_NEW_MSG:
  151. buf = pool.GetBufPoolCopy()
  152. if n, err = ReadLenBytes(buf, s.conn); err != nil {
  153. break
  154. }
  155. }
  156. if conn, ok := s.connMap.Get(i); ok && !conn.isClose {
  157. switch flag {
  158. case MUX_NEW_MSG: //new msg from remote conn
  159. //insert wait queue
  160. conn.waitQueue.Push(NewBufNode(buf, n))
  161. //judge len if >xxx ,send stop
  162. if conn.readWait {
  163. conn.readWait = false
  164. conn.readCh <- struct{}{}
  165. }
  166. if conn.waitQueue.Size() > s.waitQueueSize {
  167. s.sendInfo(MUX_STOP_WRITE, conn.connId, nil)
  168. }
  169. case MUX_STOP_WRITE:
  170. conn.stopWrite = true
  171. case MUX_MSG_SEND_OK: //the remote has read
  172. if conn.stopWrite {
  173. conn.stopWrite = false
  174. select {
  175. case conn.getStatusCh <- struct{}{}:
  176. default:
  177. }
  178. }
  179. case MUX_NEW_CONN_OK: //conn ok
  180. conn.connStatusOkCh <- struct{}{}
  181. case MUX_NEW_CONN_Fail:
  182. conn.connStatusFailCh <- struct{}{}
  183. case MUX_CONN_CLOSE: //close the connection
  184. go conn.Close()
  185. s.connMap.Delete(i)
  186. }
  187. } else if flag == MUX_NEW_MSG {
  188. pool.PutBufPoolCopy(buf)
  189. }
  190. } else {
  191. break
  192. }
  193. }
  194. s.Close()
  195. }()
  196. select {
  197. case <-s.closeChan:
  198. }
  199. }
  200. func (s *Mux) Close() error {
  201. if s.IsClose {
  202. return errors.New("the mux has closed")
  203. }
  204. s.IsClose = true
  205. s.connMap.Close()
  206. s.closeChan <- struct{}{}
  207. s.closeChan <- struct{}{}
  208. s.closeChan <- struct{}{}
  209. return s.conn.Close()
  210. }
  211. //get new connId as unique flag
  212. func (s *Mux) getId() int32 {
  213. return atomic.AddInt32(&s.id, 1)
  214. }