mux.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package mux
  2. import (
  3. "bytes"
  4. "errors"
  5. "github.com/cnlh/nps/lib/common"
  6. "github.com/cnlh/nps/lib/pool"
  7. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  8. "math"
  9. "net"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type Mux struct {
  15. net.Listener
  16. conn net.Conn
  17. connMap *connMap
  18. newConnCh chan *conn
  19. id int32
  20. closeChan chan struct{}
  21. IsClose bool
  22. pingOk int
  23. connType string
  24. writeQueue chan *bytes.Buffer
  25. sync.Mutex
  26. }
  27. func NewMux(c net.Conn, connType string) *Mux {
  28. m := &Mux{
  29. conn: c,
  30. connMap: NewConnMap(),
  31. id: 0,
  32. closeChan: make(chan struct{}),
  33. newConnCh: make(chan *conn),
  34. IsClose: false,
  35. connType: connType,
  36. writeQueue: make(chan *bytes.Buffer, 20),
  37. }
  38. //read session by flag
  39. go m.readSession()
  40. //ping
  41. go m.ping()
  42. //go m.writeSession()
  43. return m
  44. }
  45. func (s *Mux) NewConn() (*conn, error) {
  46. if s.IsClose {
  47. return nil, errors.New("the mux has closed")
  48. }
  49. conn := NewConn(s.getId(), s)
  50. //it must be set before send
  51. s.connMap.Set(conn.connId, conn)
  52. if err := s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil); err != nil {
  53. return nil, err
  54. }
  55. //set a timer timeout 30 second
  56. timer := time.NewTimer(time.Minute * 2)
  57. defer timer.Stop()
  58. select {
  59. case <-conn.connStatusOkCh:
  60. return conn, nil
  61. case <-conn.connStatusFailCh:
  62. case <-timer.C:
  63. }
  64. return nil, errors.New("create connection fail,the server refused the connection")
  65. }
  66. func (s *Mux) Accept() (net.Conn, error) {
  67. if s.IsClose {
  68. return nil, errors.New("accpet error,the conn has closed")
  69. }
  70. conn := <-s.newConnCh
  71. if conn == nil {
  72. return nil, errors.New("accpet error,the conn has closed")
  73. }
  74. return conn, nil
  75. }
  76. func (s *Mux) Addr() net.Addr {
  77. return s.conn.LocalAddr()
  78. }
  79. func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) {
  80. if flag == common.MUX_NEW_MSG {
  81. }
  82. buf := pool.BuffPool.Get()
  83. pack := common.MuxPackager{}
  84. err = pack.NewPac(flag, id, content)
  85. if err != nil {
  86. s.Close()
  87. logs.Warn("new pack err", err)
  88. return
  89. }
  90. err = pack.Pack(buf)
  91. if err != nil {
  92. s.Close()
  93. logs.Warn("pack err", err)
  94. return
  95. }
  96. //s.writeQueue <- buf
  97. _, err = buf.WriteTo(s.conn)
  98. if err != nil {
  99. s.Close()
  100. logs.Warn("write err", err)
  101. }
  102. pool.BuffPool.Put(buf)
  103. if flag == common.MUX_CONN_CLOSE {
  104. }
  105. if flag == common.MUX_NEW_MSG {
  106. }
  107. return
  108. }
  109. func (s *Mux) writeSession() {
  110. go func() {
  111. for {
  112. buf := <-s.writeQueue
  113. l := buf.Len()
  114. n, err := buf.WriteTo(s.conn)
  115. pool.BuffPool.Put(buf)
  116. if err != nil || int(n) != l {
  117. logs.Warn("close from write to ", err, n, l)
  118. s.Close()
  119. break
  120. }
  121. }
  122. }()
  123. <-s.closeChan
  124. }
  125. func (s *Mux) ping() {
  126. go func() {
  127. ticker := time.NewTicker(time.Second * 1)
  128. for {
  129. select {
  130. case <-ticker.C:
  131. }
  132. //Avoid going beyond the scope
  133. if (math.MaxInt32 - s.id) < 10000 {
  134. s.id = 0
  135. }
  136. if err := s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") {
  137. s.Close()
  138. break
  139. }
  140. s.pingOk++
  141. }
  142. }()
  143. select {
  144. case <-s.closeChan:
  145. }
  146. }
  147. func (s *Mux) readSession() {
  148. var pack common.MuxPackager
  149. go func() {
  150. for {
  151. if pack.UnPack(s.conn) != nil {
  152. break
  153. }
  154. if pack.Flag != 0 && pack.Flag != 7 {
  155. if pack.Length > 10 {
  156. logs.Warn(pack.Flag, pack.Id, pack.Length, string(pack.Content[:10]))
  157. }
  158. }
  159. s.pingOk = 0
  160. switch pack.Flag {
  161. case common.MUX_NEW_CONN: //new conn
  162. //logs.Warn("mux new conn", pack.Id)
  163. conn := NewConn(pack.Id, s)
  164. s.connMap.Set(pack.Id, conn) //it has been set before send ok
  165. s.newConnCh <- conn
  166. s.sendInfo(common.MUX_NEW_CONN_OK, pack.Id, nil)
  167. continue
  168. case common.MUX_PING_FLAG: //ping
  169. s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil)
  170. continue
  171. case common.MUX_PING_RETURN:
  172. continue
  173. }
  174. if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose {
  175. switch pack.Flag {
  176. case common.MUX_NEW_MSG: //new msg from remote conn
  177. //insert wait queue
  178. conn.readQueue.Push(NewBufNode(pack.Content, int(pack.Length)))
  179. //judge len if >xxx ,send stop
  180. if conn.readWait {
  181. conn.readWait = false
  182. conn.readCh <- struct{}{}
  183. }
  184. case common.MUX_NEW_CONN_OK: //conn ok
  185. conn.connStatusOkCh <- struct{}{}
  186. case common.MUX_NEW_CONN_Fail:
  187. conn.connStatusFailCh <- struct{}{}
  188. case common.MUX_CONN_CLOSE: //close the connection
  189. conn.readQueue.Push(NewBufNode(nil, 0))
  190. if conn.readWait {
  191. logs.Warn("close read wait", pack.Id)
  192. conn.readWait = false
  193. conn.readCh <- struct{}{}
  194. }
  195. s.connMap.Delete(pack.Id)
  196. }
  197. } else if pack.Flag == common.MUX_NEW_MSG {
  198. pool.CopyBuff.Put(pack.Content)
  199. }
  200. }
  201. s.Close()
  202. }()
  203. select {
  204. case <-s.closeChan:
  205. }
  206. }
  207. func (s *Mux) Close() error {
  208. logs.Warn("close mux")
  209. if s.IsClose {
  210. return errors.New("the mux has closed")
  211. }
  212. s.IsClose = true
  213. s.connMap.Close()
  214. select {
  215. case s.closeChan <- struct{}{}:
  216. }
  217. select {
  218. case s.closeChan <- struct{}{}:
  219. }
  220. //s.closeChan <- struct{}{}
  221. close(s.writeQueue)
  222. close(s.newConnCh)
  223. return s.conn.Close()
  224. }
  225. //get new connId as unique flag
  226. func (s *Mux) getId() (id int32) {
  227. id = atomic.AddInt32(&s.id, 1)
  228. if _, ok := s.connMap.Get(id); ok {
  229. s.getId()
  230. }
  231. return
  232. }