mux.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "math"
  6. "net"
  7. "sync/atomic"
  8. "time"
  9. "github.com/astaxie/beego/logs"
  10. "github.com/cnlh/nps/lib/common"
  11. )
  12. type Mux struct {
  13. net.Listener
  14. conn net.Conn
  15. connMap *connMap
  16. newConnCh chan *conn
  17. id int32
  18. closeChan chan struct{}
  19. IsClose bool
  20. pingOk int
  21. latency float64
  22. bw *bandwidth
  23. pingCh chan []byte
  24. pingCheck bool
  25. connType string
  26. writeQueue PriorityQueue
  27. newConnQueue ConnQueue
  28. }
  29. func NewMux(c net.Conn, connType string) *Mux {
  30. //c.(*net.TCPConn).SetReadBuffer(0)
  31. //c.(*net.TCPConn).SetWriteBuffer(0)
  32. m := &Mux{
  33. conn: c,
  34. connMap: NewConnMap(),
  35. id: 0,
  36. closeChan: make(chan struct{}, 1),
  37. newConnCh: make(chan *conn),
  38. bw: new(bandwidth),
  39. IsClose: false,
  40. connType: connType,
  41. pingCh: make(chan []byte),
  42. }
  43. m.writeQueue.New()
  44. m.newConnQueue.New()
  45. //read session by flag
  46. m.readSession()
  47. //ping
  48. m.ping()
  49. m.pingReturn()
  50. m.writeSession()
  51. return m
  52. }
  53. func (s *Mux) NewConn() (*conn, error) {
  54. if s.IsClose {
  55. return nil, errors.New("the mux has closed")
  56. }
  57. conn := NewConn(s.getId(), s, "nps ")
  58. //it must be set before send
  59. s.connMap.Set(conn.connId, conn)
  60. s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil)
  61. //set a timer timeout 30 second
  62. timer := time.NewTimer(time.Minute * 2)
  63. defer timer.Stop()
  64. select {
  65. case <-conn.connStatusOkCh:
  66. return conn, nil
  67. case <-conn.connStatusFailCh:
  68. case <-timer.C:
  69. }
  70. return nil, errors.New("create connection fail,the server refused the connection")
  71. }
  72. func (s *Mux) Accept() (net.Conn, error) {
  73. if s.IsClose {
  74. return nil, errors.New("accpet error,the mux has closed")
  75. }
  76. conn := <-s.newConnCh
  77. if conn == nil {
  78. return nil, errors.New("accpet error,the conn has closed")
  79. }
  80. return conn, nil
  81. }
  82. func (s *Mux) Addr() net.Addr {
  83. return s.conn.LocalAddr()
  84. }
  85. func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
  86. if s.IsClose {
  87. return
  88. }
  89. var err error
  90. pack := common.MuxPack.Get()
  91. err = pack.NewPac(flag, id, data...)
  92. if err != nil {
  93. common.MuxPack.Put(pack)
  94. logs.Error("mux: new pack err")
  95. s.Close()
  96. return
  97. }
  98. s.writeQueue.Push(pack)
  99. return
  100. }
  101. func (s *Mux) writeSession() {
  102. go s.packBuf()
  103. //go s.writeBuf()
  104. }
  105. func (s *Mux) packBuf() {
  106. buffer := common.BuffPool.Get()
  107. for {
  108. if s.IsClose {
  109. break
  110. }
  111. buffer.Reset()
  112. pack := s.writeQueue.Pop()
  113. //buffer := common.BuffPool.Get()
  114. err := pack.Pack(buffer)
  115. common.MuxPack.Put(pack)
  116. if err != nil {
  117. logs.Error("mux: pack err", err)
  118. common.BuffPool.Put(buffer)
  119. break
  120. }
  121. //logs.Warn(buffer.String())
  122. //s.bufQueue.Push(buffer)
  123. l := buffer.Len()
  124. n, err := buffer.WriteTo(s.conn)
  125. //common.BuffPool.Put(buffer)
  126. if err != nil || int(n) != l {
  127. logs.Error("mux: close from write session fail ", err, n, l)
  128. s.Close()
  129. break
  130. }
  131. }
  132. }
  133. //func (s *Mux) writeBuf() {
  134. // for {
  135. // if s.IsClose {
  136. // break
  137. // }
  138. // buffer, err := s.bufQueue.Pop()
  139. // if err != nil {
  140. // break
  141. // }
  142. // l := buffer.Len()
  143. // n, err := buffer.WriteTo(s.conn)
  144. // common.BuffPool.Put(buffer)
  145. // if err != nil || int(n) != l {
  146. // logs.Warn("close from write session fail ", err, n, l)
  147. // s.Close()
  148. // break
  149. // }
  150. // }
  151. //}
  152. func (s *Mux) ping() {
  153. go func() {
  154. now, _ := time.Now().UTC().MarshalText()
  155. s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
  156. // send the ping flag and get the latency first
  157. ticker := time.NewTicker(time.Second * 5)
  158. for {
  159. if s.IsClose {
  160. ticker.Stop()
  161. break
  162. }
  163. select {
  164. case <-ticker.C:
  165. }
  166. if s.pingCheck {
  167. logs.Error("mux: ping time out")
  168. s.Close()
  169. // more than 5 seconds not receive the ping return package,
  170. // mux conn is damaged, maybe a packet drop, close it
  171. break
  172. }
  173. now, _ := time.Now().UTC().MarshalText()
  174. s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
  175. s.pingCheck = true
  176. if s.pingOk > 10 && s.connType == "kcp" {
  177. logs.Error("mux: kcp ping err")
  178. s.Close()
  179. break
  180. }
  181. s.pingOk++
  182. }
  183. }()
  184. }
  185. func (s *Mux) pingReturn() {
  186. go func() {
  187. var now time.Time
  188. var data []byte
  189. for {
  190. if s.IsClose {
  191. break
  192. }
  193. select {
  194. case data = <-s.pingCh:
  195. s.pingCheck = false
  196. case <-s.closeChan:
  197. break
  198. }
  199. _ = now.UnmarshalText(data)
  200. latency := time.Now().UTC().Sub(now).Seconds() / 2
  201. if latency < 0.5 && latency > 0 {
  202. s.latency = latency
  203. }
  204. //logs.Warn("latency", s.latency)
  205. common.WindowBuff.Put(data)
  206. }
  207. }()
  208. }
  209. func (s *Mux) readSession() {
  210. go func() {
  211. var connection *conn
  212. for {
  213. connection = s.newConnQueue.Pop()
  214. s.connMap.Set(connection.connId, connection) //it has been set before send ok
  215. s.newConnCh <- connection
  216. s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
  217. }
  218. }()
  219. go func() {
  220. pack := common.MuxPack.Get()
  221. var l uint16
  222. var err error
  223. for {
  224. if s.IsClose {
  225. break
  226. }
  227. pack = common.MuxPack.Get()
  228. s.bw.StartRead()
  229. if l, err = pack.UnPack(s.conn); err != nil {
  230. logs.Error("mux: read session unpack from connection err")
  231. s.Close()
  232. break
  233. }
  234. s.bw.SetCopySize(l)
  235. s.pingOk = 0
  236. switch pack.Flag {
  237. case common.MUX_NEW_CONN: //new connection
  238. connection := NewConn(pack.Id, s)
  239. s.newConnQueue.Push(connection)
  240. continue
  241. case common.MUX_PING_FLAG: //ping
  242. s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
  243. common.WindowBuff.Put(pack.Content)
  244. continue
  245. case common.MUX_PING_RETURN:
  246. //go func(content []byte) {
  247. s.pingCh <- pack.Content
  248. //}(pack.Content)
  249. continue
  250. }
  251. if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
  252. switch pack.Flag {
  253. case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
  254. err = s.newMsg(connection, pack)
  255. if err != nil {
  256. logs.Error("mux: read session connection new msg err")
  257. connection.Close()
  258. }
  259. continue
  260. case common.MUX_NEW_CONN_OK: //connection ok
  261. connection.connStatusOkCh <- struct{}{}
  262. continue
  263. case common.MUX_NEW_CONN_Fail:
  264. connection.connStatusFailCh <- struct{}{}
  265. continue
  266. case common.MUX_MSG_SEND_OK:
  267. if connection.isClose {
  268. continue
  269. }
  270. connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
  271. continue
  272. case common.MUX_CONN_CLOSE: //close the connection
  273. s.connMap.Delete(pack.Id)
  274. //go func(connection *conn) {
  275. connection.closeFlag = true
  276. connection.receiveWindow.Stop() // close signal to receive window
  277. //}(connection)
  278. continue
  279. }
  280. } else if pack.Flag == common.MUX_CONN_CLOSE {
  281. continue
  282. }
  283. common.MuxPack.Put(pack)
  284. }
  285. common.MuxPack.Put(pack)
  286. s.Close()
  287. }()
  288. }
  289. func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) {
  290. if connection.isClose {
  291. err = io.ErrClosedPipe
  292. return
  293. }
  294. //logs.Warn("read session receive new msg", pack.Length)
  295. //go func(connection *conn, pack *common.MuxPackager) { // do not block read session
  296. //insert into queue
  297. if pack.Flag == common.MUX_NEW_MSG_PART {
  298. err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id)
  299. }
  300. if pack.Flag == common.MUX_NEW_MSG {
  301. err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id)
  302. }
  303. //logs.Warn("read session write success", pack.Length)
  304. return
  305. }
  306. func (s *Mux) Close() error {
  307. logs.Warn("close mux")
  308. if s.IsClose {
  309. return errors.New("the mux has closed")
  310. }
  311. s.IsClose = true
  312. s.connMap.Close()
  313. //s.bufQueue.Stop()
  314. s.closeChan <- struct{}{}
  315. close(s.newConnCh)
  316. return s.conn.Close()
  317. }
  318. //get new connId as unique flag
  319. func (s *Mux) getId() (id int32) {
  320. //Avoid going beyond the scope
  321. if (math.MaxInt32 - s.id) < 10000 {
  322. atomic.StoreInt32(&s.id, 0)
  323. }
  324. id = atomic.AddInt32(&s.id, 1)
  325. if _, ok := s.connMap.Get(id); ok {
  326. return s.getId()
  327. }
  328. return
  329. }
  330. type bandwidth struct {
  331. readStart time.Time
  332. lastReadStart time.Time
  333. bufLength uint16
  334. readBandwidth float64
  335. }
  336. func (Self *bandwidth) StartRead() {
  337. if Self.readStart.IsZero() {
  338. Self.readStart = time.Now()
  339. }
  340. if Self.bufLength >= 16384 {
  341. Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  342. Self.calcBandWidth()
  343. }
  344. }
  345. func (Self *bandwidth) SetCopySize(n uint16) {
  346. Self.bufLength += n
  347. }
  348. func (Self *bandwidth) calcBandWidth() {
  349. t := Self.readStart.Sub(Self.lastReadStart)
  350. Self.readBandwidth = float64(Self.bufLength) / t.Seconds()
  351. Self.bufLength = 0
  352. }
  353. func (Self *bandwidth) Get() (bw float64) {
  354. // The zero value, 0 for numeric types
  355. if Self.readBandwidth <= 0 {
  356. Self.readBandwidth = 100
  357. }
  358. return Self.readBandwidth
  359. }