mux.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package mux
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "github.com/cnlh/nps/lib/pool"
  7. "math"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const (
  14. MUX_PING_FLAG int32 = iota
  15. MUX_NEW_CONN_OK
  16. MUX_NEW_CONN_Fail
  17. MUX_NEW_MSG
  18. MUX_MSG_SEND_OK
  19. MUX_NEW_CONN
  20. MUX_PING
  21. MUX_CONN_CLOSE
  22. )
  23. type Mux struct {
  24. net.Listener
  25. conn net.Conn
  26. connMap *connMap
  27. sendMsgCh chan *msg //write msg chan
  28. sendStatusCh chan int32 //write read ok chan
  29. newConnCh chan *conn
  30. id int32
  31. closeChan chan struct{}
  32. isClose bool
  33. sync.Mutex
  34. }
  35. func NewMux(c net.Conn) *Mux {
  36. m := &Mux{
  37. conn: c,
  38. connMap: NewConnMap(),
  39. sendMsgCh: make(chan *msg),
  40. sendStatusCh: make(chan int32),
  41. id: 0,
  42. closeChan: make(chan struct{}),
  43. newConnCh: make(chan *conn),
  44. isClose: false,
  45. }
  46. //read session by flag
  47. go m.readSession()
  48. //write session
  49. go m.writeSession()
  50. //ping
  51. go m.ping()
  52. return m
  53. }
  54. func (s *Mux) NewConn() (*conn, error) {
  55. if s.isClose {
  56. return nil, errors.New("the mux has closed")
  57. }
  58. conn := NewConn(s.getId(), s, s.sendMsgCh, s.sendStatusCh)
  59. raw := bytes.NewBuffer([]byte{})
  60. if err := binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN); err != nil {
  61. return nil, err
  62. }
  63. if err := binary.Write(raw, binary.LittleEndian, conn.connId); err != nil {
  64. return nil, err
  65. }
  66. //it must be set before send
  67. s.connMap.Set(conn.connId, conn)
  68. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  69. return nil, err
  70. }
  71. select {
  72. case <-conn.connStatusOkCh:
  73. return conn, nil
  74. case <-conn.connStatusFailCh:
  75. }
  76. return nil, errors.New("create connection fail,the server refused the connection")
  77. }
  78. func (s *Mux) Accept() (net.Conn, error) {
  79. if s.isClose {
  80. return nil, errors.New("accpet error,the conn has closed")
  81. }
  82. return <-s.newConnCh, nil
  83. }
  84. func (s *Mux) Addr() net.Addr {
  85. return s.conn.LocalAddr()
  86. }
  87. func (s *Mux) ping() {
  88. go func() {
  89. ticker := time.NewTicker(time.Second * 5)
  90. raw := bytes.NewBuffer([]byte{})
  91. for {
  92. select {
  93. case <-ticker.C:
  94. }
  95. //Avoid going beyond the scope
  96. if (math.MaxInt32 - s.id) < 10000 {
  97. s.id = 0
  98. }
  99. raw.Reset()
  100. binary.Write(raw, binary.LittleEndian, MUX_PING_FLAG)
  101. binary.Write(raw, binary.LittleEndian, MUX_PING)
  102. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  103. s.Close()
  104. break
  105. }
  106. }
  107. }()
  108. select {
  109. case <-s.closeChan:
  110. }
  111. }
  112. func (s *Mux) writeSession() {
  113. go func() {
  114. raw := bytes.NewBuffer([]byte{})
  115. for {
  116. raw.Reset()
  117. select {
  118. case msg := <-s.sendMsgCh:
  119. if msg.content == nil { //close
  120. binary.Write(raw, binary.LittleEndian, MUX_CONN_CLOSE)
  121. binary.Write(raw, binary.LittleEndian, msg.connId)
  122. break
  123. }
  124. binary.Write(raw, binary.LittleEndian, MUX_NEW_MSG)
  125. binary.Write(raw, binary.LittleEndian, msg.connId)
  126. binary.Write(raw, binary.LittleEndian, int32(len(msg.content)))
  127. binary.Write(raw, binary.LittleEndian, msg.content)
  128. case connId := <-s.sendStatusCh:
  129. binary.Write(raw, binary.LittleEndian, MUX_MSG_SEND_OK)
  130. binary.Write(raw, binary.LittleEndian, connId)
  131. }
  132. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  133. s.Close()
  134. break
  135. }
  136. }
  137. }()
  138. select {
  139. case <-s.closeChan:
  140. }
  141. }
  142. func (s *Mux) readSession() {
  143. go func() {
  144. raw := bytes.NewBuffer([]byte{})
  145. for {
  146. var flag, i int32
  147. if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
  148. if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
  149. break
  150. }
  151. switch flag {
  152. case MUX_NEW_CONN: //new conn
  153. conn := NewConn(i, s, s.sendMsgCh, s.sendStatusCh)
  154. s.connMap.Set(i, conn) //it has been set before send ok
  155. s.newConnCh <- conn
  156. raw.Reset()
  157. binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN_OK)
  158. binary.Write(raw, binary.LittleEndian, i)
  159. s.conn.Write(raw.Bytes())
  160. continue
  161. case MUX_PING_FLAG: //ping
  162. continue
  163. }
  164. if conn, ok := s.connMap.Get(i); ok {
  165. switch flag {
  166. case MUX_NEW_MSG: //new msg from remote conn
  167. buf := pool.BufPoolCopy.Get().([]byte)
  168. if n, err := ReadLenBytes(buf, s.conn); err == nil {
  169. if !conn.isClose {
  170. conn.readMsgCh <- buf[:n]
  171. } else {
  172. pool.PutBufPoolCopy(buf)
  173. }
  174. } else { //read len bytes error,the mux has broken
  175. break
  176. }
  177. case MUX_MSG_SEND_OK: //the remote has read
  178. conn.getStatusCh <- struct{}{}
  179. case MUX_NEW_CONN_OK: //conn ok
  180. conn.connStatusOkCh <- struct{}{}
  181. case MUX_NEW_CONN_Fail:
  182. conn.connStatusFailCh <- struct{}{}
  183. case MUX_CONN_CLOSE: //close the connection
  184. conn.Close()
  185. }
  186. }
  187. } else {
  188. break
  189. }
  190. }
  191. s.Close()
  192. }()
  193. select {
  194. case <-s.closeChan:
  195. }
  196. }
  197. func (s *Mux) Close() error {
  198. if s.isClose {
  199. return errors.New("the mux has closed")
  200. }
  201. s.isClose = true
  202. s.connMap.Close()
  203. s.closeChan <- struct{}{}
  204. s.closeChan <- struct{}{}
  205. s.closeChan <- struct{}{}
  206. close(s.closeChan)
  207. close(s.sendMsgCh)
  208. close(s.sendStatusCh)
  209. return s.conn.Close()
  210. }
  211. //get new connId as unique flag
  212. func (s *Mux) getId() int32 {
  213. return atomic.AddInt32(&s.id, 1)
  214. }