1
0

mux.go 4.7 KB


  1. package mux
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "github.com/cnlh/nps/lib/pool"
  7. "math"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const (
  14. MUX_PING_FLAG int32 = iota
  15. MUX_NEW_CONN_OK
  16. MUX_NEW_CONN_Fail
  17. MUX_NEW_MSG
  18. MUX_MSG_SEND_OK
  19. MUX_NEW_CONN
  20. MUX_PING
  21. MUX_CONN_CLOSE
  22. MUX_PING_RETURN
  23. )
  24. type Mux struct {
  25. net.Listener
  26. conn net.Conn
  27. connMap *connMap
  28. newConnCh chan *conn
  29. id int32
  30. closeChan chan struct{}
  31. IsClose bool
  32. pingOk int
  33. connType string
  34. sync.Mutex
  35. }
  36. func NewMux(c net.Conn, connType string) *Mux {
  37. m := &Mux{
  38. conn: c,
  39. connMap: NewConnMap(),
  40. id: 0,
  41. closeChan: make(chan struct{}),
  42. newConnCh: make(chan *conn),
  43. IsClose: false,
  44. connType: connType,
  45. }
  46. //read session by flag
  47. go m.readSession()
  48. //ping
  49. go m.ping()
  50. return m
  51. }
  52. func (s *Mux) NewConn() (*conn, error) {
  53. if s.IsClose {
  54. return nil, errors.New("the mux has closed")
  55. }
  56. conn := NewConn(s.getId(), s)
  57. raw := bytes.NewBuffer([]byte{})
  58. if err := binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN); err != nil {
  59. return nil, err
  60. }
  61. if err := binary.Write(raw, binary.LittleEndian, conn.connId); err != nil {
  62. return nil, err
  63. }
  64. //it must be set before send
  65. s.connMap.Set(conn.connId, conn)
  66. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  67. return nil, err
  68. }
  69. //set a timer timeout 30 second
  70. timer := time.NewTimer(time.Second * 30)
  71. defer timer.Stop()
  72. select {
  73. case <-conn.connStatusOkCh:
  74. return conn, nil
  75. case <-conn.connStatusFailCh:
  76. case <-timer.C:
  77. }
  78. return nil, errors.New("create connection fail,the server refused the connection")
  79. }
  80. func (s *Mux) Accept() (net.Conn, error) {
  81. if s.IsClose {
  82. return nil, errors.New("accpet error,the conn has closed")
  83. }
  84. conn := <-s.newConnCh
  85. if conn == nil {
  86. return nil, errors.New("accpet error,the conn has closed")
  87. }
  88. return conn, nil
  89. }
  90. func (s *Mux) Addr() net.Addr {
  91. return s.conn.LocalAddr()
  92. }
  93. func (s *Mux) sendInfo(flag int32, id int32, content []byte) error {
  94. raw := bytes.NewBuffer([]byte{})
  95. binary.Write(raw, binary.LittleEndian, flag)
  96. binary.Write(raw, binary.LittleEndian, id)
  97. if content != nil && len(content) > 0 {
  98. binary.Write(raw, binary.LittleEndian, int32(len(content)))
  99. binary.Write(raw, binary.LittleEndian, content)
  100. }
  101. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  102. s.Close()
  103. return err
  104. }
  105. return nil
  106. }
  107. func (s *Mux) ping() {
  108. go func() {
  109. ticker := time.NewTicker(time.Second * 1)
  110. for {
  111. select {
  112. case <-ticker.C:
  113. }
  114. //Avoid going beyond the scope
  115. if (math.MaxInt32 - s.id) < 10000 {
  116. s.id = 0
  117. }
  118. if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") {
  119. s.Close()
  120. break
  121. }
  122. s.pingOk++
  123. }
  124. }()
  125. select {
  126. case <-s.closeChan:
  127. }
  128. }
  129. func (s *Mux) readSession() {
  130. var buf []byte
  131. go func() {
  132. for {
  133. var flag, i int32
  134. var n int
  135. var err error
  136. if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
  137. if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
  138. break
  139. }
  140. s.pingOk = 0
  141. switch flag {
  142. case MUX_NEW_CONN: //new conn
  143. conn := NewConn(i, s)
  144. s.connMap.Set(i, conn) //it has been set before send ok
  145. s.newConnCh <- conn
  146. s.sendInfo(MUX_NEW_CONN_OK, i, nil)
  147. continue
  148. case MUX_PING_FLAG: //ping
  149. s.sendInfo(MUX_PING_RETURN, MUX_PING, nil)
  150. continue
  151. case MUX_PING_RETURN:
  152. continue
  153. case MUX_NEW_MSG:
  154. buf = pool.GetBufPoolCopy()
  155. if n, err = ReadLenBytes(buf, s.conn); err != nil {
  156. break
  157. }
  158. }
  159. if conn, ok := s.connMap.Get(i); ok && !conn.isClose {
  160. switch flag {
  161. case MUX_NEW_MSG: //new msg from remote conn
  162. //insert wait queue
  163. conn.waitQueue.Push(NewBufNode(buf, n))
  164. //judge len if >xxx ,send stop
  165. if conn.readWait {
  166. conn.readWait = false
  167. conn.readCh <- struct{}{}
  168. }
  169. case MUX_MSG_SEND_OK: //the remote has read
  170. select {
  171. case conn.getStatusCh <- struct{}{}:
  172. default:
  173. }
  174. conn.hasWrite --
  175. case MUX_NEW_CONN_OK: //conn ok
  176. conn.connStatusOkCh <- struct{}{}
  177. case MUX_NEW_CONN_Fail:
  178. conn.connStatusFailCh <- struct{}{}
  179. case MUX_CONN_CLOSE: //close the connection
  180. go conn.Close()
  181. s.connMap.Delete(i)
  182. }
  183. } else if flag == MUX_NEW_MSG {
  184. pool.PutBufPoolCopy(buf)
  185. }
  186. } else {
  187. break
  188. }
  189. }
  190. s.Close()
  191. }()
  192. select {
  193. case <-s.closeChan:
  194. }
  195. }
  196. func (s *Mux) Close() error {
  197. if s.IsClose {
  198. return errors.New("the mux has closed")
  199. }
  200. s.IsClose = true
  201. s.connMap.Close()
  202. select {
  203. case s.closeChan <- struct{}{}:
  204. }
  205. select {
  206. case s.closeChan <- struct{}{}:
  207. }
  208. close(s.newConnCh)
  209. return s.conn.Close()
  210. }
  211. //get new connId as unique flag
  212. func (s *Mux) getId() int32 {
  213. return atomic.AddInt32(&s.id, 1)
  214. }