mux.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package mux
  2. import (
  3. "bytes"
  4. "errors"
  5. "math"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/astaxie/beego/logs"
  11. "github.com/cnlh/nps/lib/common"
  12. )
  13. type Mux struct {
  14. net.Listener
  15. conn net.Conn
  16. connMap *connMap
  17. newConnCh chan *conn
  18. id int32
  19. closeChan chan struct{}
  20. IsClose bool
  21. pingOk int
  22. connType string
  23. writeQueue Queue
  24. bufCh chan *bytes.Buffer
  25. sync.Mutex
  26. }
  27. func NewMux(c net.Conn, connType string) *Mux {
  28. m := &Mux{
  29. conn: c,
  30. connMap: NewConnMap(),
  31. id: 0,
  32. closeChan: make(chan struct{}),
  33. newConnCh: make(chan *conn),
  34. IsClose: false,
  35. connType: connType,
  36. bufCh: make(chan *bytes.Buffer),
  37. }
  38. m.writeQueue.New()
  39. //read session by flag
  40. go m.readSession()
  41. //ping
  42. go m.ping()
  43. go m.writeSession()
  44. return m
  45. }
  46. func (s *Mux) NewConn() (*conn, error) {
  47. if s.IsClose {
  48. return nil, errors.New("the mux has closed")
  49. }
  50. conn := NewConn(s.getId(), s)
  51. //it must be set before send
  52. s.connMap.Set(conn.connId, conn)
  53. s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil)
  54. //set a timer timeout 30 second
  55. timer := time.NewTimer(time.Minute * 2)
  56. defer timer.Stop()
  57. select {
  58. case <-conn.connStatusOkCh:
  59. return conn, nil
  60. case <-conn.connStatusFailCh:
  61. case <-timer.C:
  62. }
  63. return nil, errors.New("create connection fail,the server refused the connection")
  64. }
  65. func (s *Mux) Accept() (net.Conn, error) {
  66. if s.IsClose {
  67. return nil, errors.New("accpet error,the mux has closed")
  68. }
  69. conn := <-s.newConnCh
  70. if conn == nil {
  71. return nil, errors.New("accpet error,the conn has closed")
  72. }
  73. return conn, nil
  74. }
  75. func (s *Mux) Addr() net.Addr {
  76. return s.conn.LocalAddr()
  77. }
  78. func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
  79. var err error
  80. pack := common.MuxPack.Get()
  81. err = pack.NewPac(flag, id, data)
  82. if err != nil {
  83. common.MuxPack.Put(pack)
  84. return
  85. }
  86. s.writeQueue.Push(pack)
  87. return
  88. }
  89. func (s *Mux) writeSession() {
  90. go s.packBuf()
  91. go s.writeBuf()
  92. <-s.closeChan
  93. }
  94. func (s *Mux) packBuf() {
  95. for {
  96. pack := s.writeQueue.Pop()
  97. buffer := common.BuffPool.Get()
  98. err := pack.Pack(buffer)
  99. common.MuxPack.Put(pack)
  100. if err != nil {
  101. logs.Warn("pack err", err)
  102. common.BuffPool.Put(buffer)
  103. break
  104. }
  105. select {
  106. case s.bufCh <- buffer:
  107. case <-s.closeChan:
  108. break
  109. }
  110. }
  111. }
  112. func (s *Mux) writeBuf() {
  113. for {
  114. select {
  115. case buffer := <-s.bufCh:
  116. l := buffer.Len()
  117. n, err := buffer.WriteTo(s.conn)
  118. common.BuffPool.Put(buffer)
  119. if err != nil || int(n) != l {
  120. logs.Warn("close from write session fail ", err, n, l)
  121. s.Close()
  122. break
  123. }
  124. case <-s.closeChan:
  125. break
  126. }
  127. }
  128. }
  129. func (s *Mux) ping() {
  130. go func() {
  131. ticker := time.NewTicker(time.Second * 1)
  132. for {
  133. select {
  134. case <-ticker.C:
  135. }
  136. //Avoid going beyond the scope
  137. if (math.MaxInt32 - s.id) < 10000 {
  138. s.id = 0
  139. }
  140. s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil)
  141. if s.pingOk > 10 && s.connType == "kcp" {
  142. s.Close()
  143. break
  144. }
  145. s.pingOk++
  146. }
  147. }()
  148. select {
  149. case <-s.closeChan:
  150. }
  151. }
  152. func (s *Mux) readSession() {
  153. go func() {
  154. pack := common.MuxPack.Get()
  155. for {
  156. pack = common.MuxPack.Get()
  157. if pack.UnPack(s.conn) != nil {
  158. break
  159. }
  160. s.pingOk = 0
  161. switch pack.Flag {
  162. case common.MUX_NEW_CONN: //new connection
  163. connection := NewConn(pack.Id, s)
  164. s.connMap.Set(pack.Id, connection) //it has been set before send ok
  165. go func(connection *conn) {
  166. connection.sendWindow.SetAllowSize(512) // set the initial receive window
  167. }(connection)
  168. s.newConnCh <- connection
  169. s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
  170. continue
  171. case common.MUX_PING_FLAG: //ping
  172. go s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil)
  173. continue
  174. case common.MUX_PING_RETURN:
  175. continue
  176. }
  177. if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
  178. switch pack.Flag {
  179. case common.MUX_NEW_MSG: //new msg from remote connection
  180. //insert wait queue
  181. if connection.isClose {
  182. continue
  183. }
  184. connection.receiveWindow.WriteWg.Add(1)
  185. go func(connection *conn, content []byte) { // do not block read session
  186. _, err := connection.receiveWindow.Write(content)
  187. if err != nil {
  188. logs.Warn("mux new msg err close", err)
  189. connection.Close()
  190. }
  191. size := connection.receiveWindow.Size()
  192. if size == 0 {
  193. connection.receiveWindow.WindowFull = true
  194. }
  195. s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size)
  196. connection.receiveWindow.WriteWg.Done()
  197. }(connection, pack.Content)
  198. continue
  199. case common.MUX_NEW_CONN_OK: //connection ok
  200. connection.connStatusOkCh <- struct{}{}
  201. go connection.sendWindow.SetAllowSize(512)
  202. // set the initial receive window both side
  203. continue
  204. case common.MUX_NEW_CONN_Fail:
  205. connection.connStatusFailCh <- struct{}{}
  206. continue
  207. case common.MUX_MSG_SEND_OK:
  208. if connection.isClose {
  209. continue
  210. }
  211. go connection.sendWindow.SetAllowSize(pack.Window)
  212. continue
  213. case common.MUX_CONN_CLOSE: //close the connection
  214. s.connMap.Delete(pack.Id)
  215. connection.closeFlag = true
  216. go func(connection *conn) {
  217. connection.receiveWindow.WriteWg.Wait()
  218. connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window
  219. }(connection)
  220. continue
  221. }
  222. } else if pack.Flag == common.MUX_CONN_CLOSE {
  223. continue
  224. }
  225. common.MuxPack.Put(pack)
  226. }
  227. common.MuxPack.Put(pack)
  228. s.Close()
  229. }()
  230. select {
  231. case <-s.closeChan:
  232. }
  233. }
  234. func (s *Mux) Close() error {
  235. logs.Warn("close mux")
  236. if s.IsClose {
  237. return errors.New("the mux has closed")
  238. }
  239. s.IsClose = true
  240. s.connMap.Close()
  241. s.closeChan <- struct{}{}
  242. s.closeChan <- struct{}{}
  243. s.closeChan <- struct{}{}
  244. s.closeChan <- struct{}{}
  245. s.closeChan <- struct{}{}
  246. close(s.newConnCh)
  247. return s.conn.Close()
  248. }
  249. //get new connId as unique flag
  250. func (s *Mux) getId() (id int32) {
  251. id = atomic.AddInt32(&s.id, 1)
  252. if _, ok := s.connMap.Get(id); ok {
  253. s.getId()
  254. }
  255. return
  256. }