1
0

mux.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package mux
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "github.com/cnlh/nps/lib/pool"
  7. "math"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const (
  14. MUX_PING_FLAG int32 = iota
  15. MUX_NEW_CONN_OK
  16. MUX_NEW_CONN_Fail
  17. MUX_NEW_MSG
  18. MUX_MSG_SEND_OK
  19. MUX_NEW_CONN
  20. MUX_PING
  21. MUX_CONN_CLOSE
  22. MUX_PING_RETURN
  23. RETRY_TIME = 2 //Heart beat allowed fault tolerance times
  24. )
  25. type Mux struct {
  26. net.Listener
  27. conn net.Conn
  28. connMap *connMap
  29. sendMsgCh chan *msg //write msg chan
  30. sendStatusCh chan int32 //write read ok chan
  31. newConnCh chan *conn
  32. id int32
  33. closeChan chan struct{}
  34. IsClose bool
  35. pingOk int
  36. sync.Mutex
  37. }
  38. func NewMux(c net.Conn) *Mux {
  39. m := &Mux{
  40. conn: c,
  41. connMap: NewConnMap(),
  42. sendMsgCh: make(chan *msg),
  43. sendStatusCh: make(chan int32),
  44. id: 0,
  45. closeChan: make(chan struct{}),
  46. newConnCh: make(chan *conn),
  47. IsClose: false,
  48. }
  49. //read session by flag
  50. go m.readSession()
  51. //write session
  52. go m.writeSession()
  53. //ping
  54. go m.ping()
  55. return m
  56. }
  57. func (s *Mux) NewConn() (*conn, error) {
  58. if s.IsClose {
  59. return nil, errors.New("the mux has closed")
  60. }
  61. conn := NewConn(s.getId(), s, s.sendMsgCh, s.sendStatusCh)
  62. raw := bytes.NewBuffer([]byte{})
  63. if err := binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN); err != nil {
  64. return nil, err
  65. }
  66. if err := binary.Write(raw, binary.LittleEndian, conn.connId); err != nil {
  67. return nil, err
  68. }
  69. //it must be set before send
  70. s.connMap.Set(conn.connId, conn)
  71. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  72. return nil, err
  73. }
  74. select {
  75. case <-conn.connStatusOkCh:
  76. return conn, nil
  77. case <-conn.connStatusFailCh:
  78. }
  79. return nil, errors.New("create connection fail,the server refused the connection")
  80. }
  81. func (s *Mux) Accept() (net.Conn, error) {
  82. if s.IsClose {
  83. return nil, errors.New("accpet error,the conn has closed")
  84. }
  85. return <-s.newConnCh, nil
  86. }
  87. func (s *Mux) Addr() net.Addr {
  88. return s.conn.LocalAddr()
  89. }
  90. func (s *Mux) ping() {
  91. go func() {
  92. ticker := time.NewTicker(time.Second * 5)
  93. raw := bytes.NewBuffer([]byte{})
  94. for {
  95. select {
  96. case <-ticker.C:
  97. }
  98. //Avoid going beyond the scope
  99. if (math.MaxInt32 - s.id) < 10000 {
  100. s.id = 0
  101. }
  102. raw.Reset()
  103. binary.Write(raw, binary.LittleEndian, MUX_PING_FLAG)
  104. binary.Write(raw, binary.LittleEndian, MUX_PING)
  105. if _, err := s.conn.Write(raw.Bytes()); err != nil || s.pingOk > RETRY_TIME {
  106. s.Close()
  107. break
  108. }
  109. s.pingOk += 1
  110. }
  111. }()
  112. select {
  113. case <-s.closeChan:
  114. }
  115. }
  116. func (s *Mux) writeSession() {
  117. go func() {
  118. raw := bytes.NewBuffer([]byte{})
  119. for {
  120. raw.Reset()
  121. select {
  122. case msg := <-s.sendMsgCh:
  123. if msg == nil {
  124. break
  125. }
  126. if msg.content == nil { //close
  127. binary.Write(raw, binary.LittleEndian, MUX_CONN_CLOSE)
  128. binary.Write(raw, binary.LittleEndian, msg.connId)
  129. break
  130. }
  131. binary.Write(raw, binary.LittleEndian, MUX_NEW_MSG)
  132. binary.Write(raw, binary.LittleEndian, msg.connId)
  133. binary.Write(raw, binary.LittleEndian, int32(len(msg.content)))
  134. binary.Write(raw, binary.LittleEndian, msg.content)
  135. case connId := <-s.sendStatusCh:
  136. binary.Write(raw, binary.LittleEndian, MUX_MSG_SEND_OK)
  137. binary.Write(raw, binary.LittleEndian, connId)
  138. }
  139. if _, err := s.conn.Write(raw.Bytes()); err != nil {
  140. s.Close()
  141. break
  142. }
  143. }
  144. }()
  145. select {
  146. case <-s.closeChan:
  147. }
  148. }
  149. func (s *Mux) readSession() {
  150. go func() {
  151. raw := bytes.NewBuffer([]byte{})
  152. buf := pool.BufPoolCopy.Get().([]byte)
  153. defer pool.PutBufPoolCopy(buf)
  154. for {
  155. var flag, i int32
  156. var n int
  157. var err error
  158. if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
  159. if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
  160. break
  161. }
  162. switch flag {
  163. case MUX_NEW_CONN: //new conn
  164. conn := NewConn(i, s, s.sendMsgCh, s.sendStatusCh)
  165. s.connMap.Set(i, conn) //it has been set before send ok
  166. s.newConnCh <- conn
  167. raw.Reset()
  168. binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN_OK)
  169. binary.Write(raw, binary.LittleEndian, i)
  170. s.conn.Write(raw.Bytes())
  171. continue
  172. case MUX_PING_FLAG: //ping
  173. raw.Reset()
  174. binary.Write(raw, binary.LittleEndian, MUX_PING_RETURN)
  175. binary.Write(raw, binary.LittleEndian, MUX_PING)
  176. s.conn.Write(raw.Bytes())
  177. continue
  178. case MUX_PING_RETURN:
  179. s.pingOk -= 1
  180. continue
  181. case MUX_NEW_MSG:
  182. if n, err = ReadLenBytes(buf, s.conn); err != nil {
  183. break
  184. }
  185. }
  186. if conn, ok := s.connMap.Get(i); ok && !conn.isClose {
  187. switch flag {
  188. case MUX_NEW_MSG: //new msg from remote conn
  189. copy(conn.readBuffer, buf[:n])
  190. conn.endRead = n
  191. if conn.readWait {
  192. conn.readCh <- struct{}{}
  193. }
  194. case MUX_MSG_SEND_OK: //the remote has read
  195. conn.getStatusCh <- struct{}{}
  196. case MUX_NEW_CONN_OK: //conn ok
  197. conn.connStatusOkCh <- struct{}{}
  198. case MUX_NEW_CONN_Fail:
  199. conn.connStatusFailCh <- struct{}{}
  200. case MUX_CONN_CLOSE: //close the connection
  201. conn.Close()
  202. }
  203. }
  204. } else {
  205. break
  206. }
  207. }
  208. s.Close()
  209. }()
  210. select {
  211. case <-s.closeChan:
  212. }
  213. }
  214. func (s *Mux) Close() error {
  215. if s.IsClose {
  216. return errors.New("the mux has closed")
  217. }
  218. s.IsClose = true
  219. s.connMap.Close()
  220. s.closeChan <- struct{}{}
  221. s.closeChan <- struct{}{}
  222. s.closeChan <- struct{}{}
  223. close(s.closeChan)
  224. close(s.sendMsgCh)
  225. close(s.sendStatusCh)
  226. return s.conn.Close()
  227. }
  228. //get new connId as unique flag
  229. func (s *Mux) getId() int32 {
  230. return atomic.AddInt32(&s.id, 1)
  231. }