mux.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "math"
  6. "net"
  7. "os"
  8. "sync/atomic"
  9. "syscall"
  10. "time"
  11. "github.com/astaxie/beego/logs"
  12. "github.com/cnlh/nps/lib/common"
  13. )
  14. type Mux struct {
  15. latency uint64 // we store latency in bits, but it's float64
  16. net.Listener
  17. conn net.Conn
  18. connMap *connMap
  19. newConnCh chan *conn
  20. id int32
  21. closeChan chan struct{}
  22. IsClose bool
  23. pingOk uint32
  24. counter *latencyCounter
  25. bw *bandwidth
  26. pingCh chan []byte
  27. pingCheckTime uint32
  28. connType string
  29. writeQueue PriorityQueue
  30. newConnQueue ConnQueue
  31. }
  32. func NewMux(c net.Conn, connType string) *Mux {
  33. //c.(*net.TCPConn).SetReadBuffer(0)
  34. //c.(*net.TCPConn).SetWriteBuffer(0)
  35. _ = c.SetDeadline(time.Time{})
  36. fd, err := getConnFd(c)
  37. if err != nil {
  38. logs.Warn(err)
  39. }
  40. m := &Mux{
  41. conn: c,
  42. connMap: NewConnMap(),
  43. id: 0,
  44. closeChan: make(chan struct{}, 1),
  45. newConnCh: make(chan *conn),
  46. bw: NewBandwidth(fd),
  47. IsClose: false,
  48. connType: connType,
  49. pingCh: make(chan []byte),
  50. counter: newLatencyCounter(),
  51. }
  52. m.writeQueue.New()
  53. m.newConnQueue.New()
  54. //read session by flag
  55. m.readSession()
  56. //ping
  57. m.ping()
  58. m.pingReturn()
  59. m.writeSession()
  60. return m
  61. }
  62. func getConnFd(c net.Conn) (fd *os.File, err error) {
  63. switch c.(type) {
  64. case *net.TCPConn:
  65. fd, err = c.(*net.TCPConn).File()
  66. if err != nil {
  67. return
  68. }
  69. return
  70. case *net.UDPConn:
  71. fd, err = c.(*net.UDPConn).File()
  72. if err != nil {
  73. return
  74. }
  75. return
  76. default:
  77. err = errors.New("mux:unknown conn type, only tcp or kcp")
  78. return
  79. }
  80. }
  81. func (s *Mux) NewConn() (*conn, error) {
  82. if s.IsClose {
  83. return nil, errors.New("the mux has closed")
  84. }
  85. conn := NewConn(s.getId(), s, "nps ")
  86. //it must be set before send
  87. s.connMap.Set(conn.connId, conn)
  88. s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil)
  89. //set a timer timeout 30 second
  90. timer := time.NewTimer(time.Minute * 2)
  91. defer timer.Stop()
  92. select {
  93. case <-conn.connStatusOkCh:
  94. return conn, nil
  95. case <-conn.connStatusFailCh:
  96. case <-timer.C:
  97. }
  98. return nil, errors.New("create connection fail,the server refused the connection")
  99. }
  100. func (s *Mux) Accept() (net.Conn, error) {
  101. if s.IsClose {
  102. return nil, errors.New("accpet error,the mux has closed")
  103. }
  104. conn := <-s.newConnCh
  105. if conn == nil {
  106. return nil, errors.New("accpet error,the conn has closed")
  107. }
  108. return conn, nil
  109. }
  110. func (s *Mux) Addr() net.Addr {
  111. return s.conn.LocalAddr()
  112. }
  113. func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
  114. if s.IsClose {
  115. return
  116. }
  117. var err error
  118. pack := common.MuxPack.Get()
  119. err = pack.NewPac(flag, id, data...)
  120. if err != nil {
  121. common.MuxPack.Put(pack)
  122. logs.Error("mux: new pack err", err)
  123. s.Close()
  124. return
  125. }
  126. s.writeQueue.Push(pack)
  127. return
  128. }
  129. func (s *Mux) writeSession() {
  130. go s.packBuf()
  131. //go s.writeBuf()
  132. }
  133. func (s *Mux) packBuf() {
  134. //buffer := common.BuffPool.Get()
  135. for {
  136. if s.IsClose {
  137. break
  138. }
  139. //buffer.Reset()
  140. pack := s.writeQueue.Pop()
  141. if s.IsClose {
  142. break
  143. }
  144. //buffer := common.BuffPool.Get()
  145. err := pack.Pack(s.conn)
  146. common.MuxPack.Put(pack)
  147. if err != nil {
  148. logs.Error("mux: pack err", err)
  149. //common.BuffPool.Put(buffer)
  150. s.Close()
  151. break
  152. }
  153. //logs.Warn(buffer.String())
  154. //s.bufQueue.Push(buffer)
  155. //l := buffer.Len()
  156. //n, err := buffer.WriteTo(s.conn)
  157. //common.BuffPool.Put(buffer)
  158. //if err != nil || int(n) != l {
  159. // logs.Error("mux: close from write session fail ", err, n, l)
  160. // s.Close()
  161. // break
  162. //}
  163. }
  164. }
  165. //func (s *Mux) writeBuf() {
  166. // for {
  167. // if s.IsClose {
  168. // break
  169. // }
  170. // buffer, err := s.bufQueue.Pop()
  171. // if err != nil {
  172. // break
  173. // }
  174. // l := buffer.Len()
  175. // n, err := buffer.WriteTo(s.conn)
  176. // common.BuffPool.Put(buffer)
  177. // if err != nil || int(n) != l {
  178. // logs.Warn("close from write session fail ", err, n, l)
  179. // s.Close()
  180. // break
  181. // }
  182. // }
  183. //}
  184. func (s *Mux) ping() {
  185. go func() {
  186. now, _ := time.Now().UTC().MarshalText()
  187. s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
  188. // send the ping flag and get the latency first
  189. ticker := time.NewTicker(time.Second * 5)
  190. defer ticker.Stop()
  191. for {
  192. if s.IsClose {
  193. break
  194. }
  195. select {
  196. case <-ticker.C:
  197. }
  198. if atomic.LoadUint32(&s.pingCheckTime) >= 60 {
  199. logs.Error("mux: ping time out")
  200. s.Close()
  201. // more than 5 minutes not receive the ping return package,
  202. // mux conn is damaged, maybe a packet drop, close it
  203. break
  204. }
  205. now, _ := time.Now().UTC().MarshalText()
  206. s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
  207. atomic.AddUint32(&s.pingCheckTime, 1)
  208. if atomic.LoadUint32(&s.pingOk) > 10 && s.connType == "kcp" {
  209. logs.Error("mux: kcp ping err")
  210. s.Close()
  211. break
  212. }
  213. atomic.AddUint32(&s.pingOk, 1)
  214. }
  215. return
  216. }()
  217. }
  218. func (s *Mux) pingReturn() {
  219. go func() {
  220. var now time.Time
  221. var data []byte
  222. for {
  223. if s.IsClose {
  224. break
  225. }
  226. select {
  227. case data = <-s.pingCh:
  228. atomic.StoreUint32(&s.pingCheckTime, 0)
  229. case <-s.closeChan:
  230. break
  231. }
  232. _ = now.UnmarshalText(data)
  233. latency := time.Now().UTC().Sub(now).Seconds() / 2
  234. if latency > 0 {
  235. atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency)))
  236. // convert float64 to bits, store it atomic
  237. }
  238. //logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency)))
  239. if cap(data) > 0 {
  240. common.WindowBuff.Put(data)
  241. }
  242. }
  243. }()
  244. }
  245. func (s *Mux) readSession() {
  246. go func() {
  247. var connection *conn
  248. for {
  249. if s.IsClose {
  250. break
  251. }
  252. connection = s.newConnQueue.Pop()
  253. if s.IsClose {
  254. break // make sure that is closed
  255. }
  256. s.connMap.Set(connection.connId, connection) //it has been set before send ok
  257. s.newConnCh <- connection
  258. s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
  259. }
  260. }()
  261. go func() {
  262. pack := common.MuxPack.Get()
  263. var l uint16
  264. var err error
  265. for {
  266. if s.IsClose {
  267. break
  268. }
  269. pack = common.MuxPack.Get()
  270. s.bw.StartRead()
  271. if l, err = pack.UnPack(s.conn); err != nil {
  272. logs.Error("mux: read session unpack from connection err", err)
  273. s.Close()
  274. break
  275. }
  276. s.bw.SetCopySize(l)
  277. atomic.StoreUint32(&s.pingOk, 0)
  278. switch pack.Flag {
  279. case common.MUX_NEW_CONN: //new connection
  280. connection := NewConn(pack.Id, s)
  281. s.newConnQueue.Push(connection)
  282. continue
  283. case common.MUX_PING_FLAG: //ping
  284. s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
  285. common.WindowBuff.Put(pack.Content)
  286. continue
  287. case common.MUX_PING_RETURN:
  288. //go func(content []byte) {
  289. s.pingCh <- pack.Content
  290. //}(pack.Content)
  291. continue
  292. }
  293. if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
  294. switch pack.Flag {
  295. case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
  296. err = s.newMsg(connection, pack)
  297. if err != nil {
  298. logs.Error("mux: read session connection new msg err", err)
  299. connection.Close()
  300. }
  301. continue
  302. case common.MUX_NEW_CONN_OK: //connection ok
  303. connection.connStatusOkCh <- struct{}{}
  304. continue
  305. case common.MUX_NEW_CONN_Fail:
  306. connection.connStatusFailCh <- struct{}{}
  307. continue
  308. case common.MUX_MSG_SEND_OK:
  309. if connection.isClose {
  310. continue
  311. }
  312. connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
  313. continue
  314. case common.MUX_CONN_CLOSE: //close the connection
  315. connection.closeFlag = true
  316. //s.connMap.Delete(pack.Id)
  317. //go func(connection *conn) {
  318. connection.receiveWindow.Stop() // close signal to receive window
  319. //}(connection)
  320. continue
  321. }
  322. } else if pack.Flag == common.MUX_CONN_CLOSE {
  323. continue
  324. }
  325. common.MuxPack.Put(pack)
  326. }
  327. common.MuxPack.Put(pack)
  328. s.Close()
  329. }()
  330. }
  331. func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) {
  332. if connection.isClose {
  333. err = io.ErrClosedPipe
  334. return
  335. }
  336. //logs.Warn("read session receive new msg", pack.Length)
  337. //go func(connection *conn, pack *common.MuxPackager) { // do not block read session
  338. //insert into queue
  339. if pack.Flag == common.MUX_NEW_MSG_PART {
  340. err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id)
  341. }
  342. if pack.Flag == common.MUX_NEW_MSG {
  343. err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id)
  344. }
  345. //logs.Warn("read session write success", pack.Length)
  346. return
  347. }
  348. func (s *Mux) Close() (err error) {
  349. logs.Warn("close mux")
  350. if s.IsClose {
  351. return errors.New("the mux has closed")
  352. }
  353. s.IsClose = true
  354. s.connMap.Close()
  355. s.connMap = nil
  356. //s.bufQueue.Stop()
  357. s.closeChan <- struct{}{}
  358. close(s.newConnCh)
  359. err = s.conn.Close()
  360. s.release()
  361. return
  362. }
  363. func (s *Mux) release() {
  364. for {
  365. pack := s.writeQueue.TryPop()
  366. if pack == nil {
  367. break
  368. }
  369. if pack.BasePackager.Content != nil {
  370. common.WindowBuff.Put(pack.BasePackager.Content)
  371. }
  372. common.MuxPack.Put(pack)
  373. }
  374. for {
  375. connection := s.newConnQueue.TryPop()
  376. if connection == nil {
  377. break
  378. }
  379. connection = nil
  380. }
  381. s.writeQueue.Stop()
  382. s.newConnQueue.Stop()
  383. }
  384. //get new connId as unique flag
  385. func (s *Mux) getId() (id int32) {
  386. //Avoid going beyond the scope
  387. if (math.MaxInt32 - s.id) < 10000 {
  388. atomic.StoreInt32(&s.id, 0)
  389. }
  390. id = atomic.AddInt32(&s.id, 1)
  391. if _, ok := s.connMap.Get(id); ok {
  392. return s.getId()
  393. }
  394. return
  395. }
  396. type bandwidth struct {
  397. readBandwidth uint64 // store in bits, but it's float64
  398. readStart time.Time
  399. lastReadStart time.Time
  400. bufLength uint32
  401. fd *os.File
  402. calcThreshold uint32
  403. }
  404. func NewBandwidth(fd *os.File) *bandwidth {
  405. return &bandwidth{fd: fd}
  406. }
  407. func (Self *bandwidth) StartRead() {
  408. if Self.readStart.IsZero() {
  409. Self.readStart = time.Now()
  410. }
  411. if Self.bufLength >= Self.calcThreshold {
  412. Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  413. Self.calcBandWidth()
  414. }
  415. }
  416. func (Self *bandwidth) SetCopySize(n uint16) {
  417. Self.bufLength += uint32(n)
  418. }
  419. func (Self *bandwidth) calcBandWidth() {
  420. t := Self.readStart.Sub(Self.lastReadStart)
  421. bufferSize, err := syscall.GetsockoptInt(int(Self.fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
  422. //logs.Warn(bufferSize)
  423. if err != nil {
  424. logs.Warn(err)
  425. Self.bufLength = 0
  426. return
  427. }
  428. if Self.bufLength >= uint32(bufferSize) {
  429. atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds()))
  430. // calculate the hole socket buffer, the time meaning to fill the buffer
  431. //logs.Warn(Self.Get())
  432. } else {
  433. Self.calcThreshold = uint32(bufferSize)
  434. }
  435. // socket buffer size is bigger than bufLength, so we don't calculate it
  436. Self.bufLength = 0
  437. }
  438. func (Self *bandwidth) Get() (bw float64) {
  439. // The zero value, 0 for numeric types
  440. bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth))
  441. if bw <= 0 {
  442. bw = 100
  443. }
  444. //logs.Warn(bw)
  445. return
  446. }
  447. const counterBits = 4
  448. const counterMask = 1<<counterBits - 1
  449. func newLatencyCounter() *latencyCounter {
  450. return &latencyCounter{
  451. buf: make([]float64, 1<<counterBits, 1<<counterBits),
  452. headMin: 0,
  453. }
  454. }
  455. type latencyCounter struct {
  456. buf []float64 //buf is a fixed length ring buffer,
  457. // if buffer is full, new value will replace the oldest one.
  458. headMin uint8 //head indicate the head in ring buffer,
  459. // in meaning, slot in list will be replaced;
  460. // min indicate this slot value is minimal in list.
  461. }
  462. func (Self *latencyCounter) unpack(idxs uint8) (head, min uint8) {
  463. head = uint8((idxs >> counterBits) & counterMask)
  464. // we set head is 4 bits
  465. min = uint8(idxs & counterMask)
  466. return
  467. }
  468. func (Self *latencyCounter) pack(head, min uint8) uint8 {
  469. return uint8(head<<counterBits) |
  470. uint8(min&counterMask)
  471. }
  472. func (Self *latencyCounter) add(value float64) {
  473. head, min := Self.unpack(Self.headMin)
  474. Self.buf[head] = value
  475. if head == min {
  476. min = Self.minimal()
  477. //if head equals min, means the min slot already be replaced,
  478. // so we need to find another minimal value in the list,
  479. // and change the min indicator
  480. }
  481. if Self.buf[min] > value {
  482. min = head
  483. }
  484. head++
  485. Self.headMin = Self.pack(head, min)
  486. }
  487. func (Self *latencyCounter) minimal() (min uint8) {
  488. var val float64
  489. var i uint8
  490. for i = 0; i < counterMask; i++ {
  491. if Self.buf[i] > 0 {
  492. if val > Self.buf[i] {
  493. val = Self.buf[i]
  494. min = i
  495. }
  496. }
  497. }
  498. return
  499. }
  500. func (Self *latencyCounter) Latency(value float64) (latency float64) {
  501. Self.add(value)
  502. _, min := Self.unpack(Self.headMin)
  503. latency = Self.buf[min] * Self.countSuccess()
  504. return
  505. }
  506. const lossRatio = 1.6
  507. func (Self *latencyCounter) countSuccess() (successRate float64) {
  508. var success, loss, i uint8
  509. _, min := Self.unpack(Self.headMin)
  510. for i = 0; i < counterMask; i++ {
  511. if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 {
  512. loss++
  513. }
  514. if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 {
  515. success++
  516. }
  517. }
  518. // counting all the data in the ring buf, except zero
  519. successRate = float64(success) / float64(loss+success)
  520. return
  521. }