conn.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "math"
  6. "net"
  7. "runtime"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/cnlh/nps/lib/common"
  12. )
  13. type conn struct {
  14. net.Conn
  15. getStatusCh chan struct{}
  16. connStatusOkCh chan struct{}
  17. connStatusFailCh chan struct{}
  18. connId int32
  19. isClose bool
  20. closeFlag bool // close conn flag
  21. receiveWindow *ReceiveWindow
  22. sendWindow *SendWindow
  23. once sync.Once
  24. //label string
  25. }
  26. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  27. c := &conn{
  28. getStatusCh: make(chan struct{}),
  29. connStatusOkCh: make(chan struct{}),
  30. connStatusFailCh: make(chan struct{}),
  31. connId: connId,
  32. receiveWindow: new(ReceiveWindow),
  33. sendWindow: new(SendWindow),
  34. once: sync.Once{},
  35. }
  36. //if len(label) > 0 {
  37. // c.label = label[0]
  38. //}
  39. c.receiveWindow.New(mux)
  40. c.sendWindow.New(mux)
  41. //logm := &connLog{
  42. // startTime: time.Now(),
  43. // isClose: false,
  44. // logs: []string{c.label + "new conn success"},
  45. //}
  46. //setM(label[0], int(connId), logm)
  47. return c
  48. }
  49. func (s *conn) Read(buf []byte) (n int, err error) {
  50. if s.isClose || buf == nil {
  51. return 0, errors.New("the conn has closed")
  52. }
  53. if len(buf) == 0 {
  54. return 0, nil
  55. }
  56. // waiting for takeout from receive window finish or timeout
  57. //now := time.Now()
  58. n, err = s.receiveWindow.Read(buf, s.connId)
  59. //t := time.Now().Sub(now)
  60. //if t.Seconds() > 0.5 {
  61. //logs.Warn("conn read long", n, t.Seconds())
  62. //}
  63. //var errstr string
  64. //if err == nil {
  65. // errstr = "err:nil"
  66. //} else {
  67. // errstr = err.Error()
  68. //}
  69. //d := getM(s.label, int(s.connId))
  70. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  71. //setM(s.label, int(s.connId), d)
  72. return
  73. }
  74. func (s *conn) Write(buf []byte) (n int, err error) {
  75. if s.isClose {
  76. return 0, errors.New("the conn has closed")
  77. }
  78. if s.closeFlag {
  79. //s.Close()
  80. return 0, errors.New("io: write on closed conn")
  81. }
  82. if len(buf) == 0 {
  83. return 0, nil
  84. }
  85. //logs.Warn("write buf", len(buf))
  86. //now := time.Now()
  87. n, err = s.sendWindow.WriteFull(buf, s.connId)
  88. //t := time.Now().Sub(now)
  89. //if t.Seconds() > 0.5 {
  90. // logs.Warn("conn write long", n, t.Seconds())
  91. //}
  92. return
  93. }
  94. func (s *conn) Close() (err error) {
  95. s.once.Do(s.closeProcess)
  96. return
  97. }
  98. func (s *conn) closeProcess() {
  99. s.isClose = true
  100. s.receiveWindow.mux.connMap.Delete(s.connId)
  101. if !s.receiveWindow.mux.IsClose {
  102. // if server or user close the conn while reading, will get a io.EOF
  103. // and this Close method will be invoke, send this signal to close other side
  104. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  105. }
  106. s.sendWindow.CloseWindow()
  107. s.receiveWindow.CloseWindow()
  108. //d := getM(s.label, int(s.connId))
  109. //d.isClose = true
  110. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  111. //setM(s.label, int(s.connId), d)
  112. return
  113. }
  114. func (s *conn) LocalAddr() net.Addr {
  115. return s.receiveWindow.mux.conn.LocalAddr()
  116. }
  117. func (s *conn) RemoteAddr() net.Addr {
  118. return s.receiveWindow.mux.conn.RemoteAddr()
  119. }
  120. func (s *conn) SetDeadline(t time.Time) error {
  121. _ = s.SetReadDeadline(t)
  122. _ = s.SetWriteDeadline(t)
  123. return nil
  124. }
  125. func (s *conn) SetReadDeadline(t time.Time) error {
  126. s.receiveWindow.SetTimeOut(t)
  127. return nil
  128. }
  129. func (s *conn) SetWriteDeadline(t time.Time) error {
  130. s.sendWindow.SetTimeOut(t)
  131. return nil
  132. }
  133. type window struct {
  134. remainingWait uint64 // 64bit alignment
  135. off uint32
  136. maxSize uint32
  137. closeOp bool
  138. closeOpCh chan struct{}
  139. mux *Mux
  140. }
  141. func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
  142. const mask = 1<<dequeueBits - 1
  143. remaining = uint32((ptrs >> dequeueBits) & mask)
  144. wait = uint32(ptrs & mask)
  145. return
  146. }
  147. func (Self *window) pack(remaining, wait uint32) uint64 {
  148. const mask = 1<<dequeueBits - 1
  149. return (uint64(remaining) << dequeueBits) |
  150. uint64(wait&mask)
  151. }
  152. func (Self *window) New() {
  153. Self.closeOpCh = make(chan struct{}, 2)
  154. }
  155. func (Self *window) CloseWindow() {
  156. if !Self.closeOp {
  157. Self.closeOp = true
  158. Self.closeOpCh <- struct{}{}
  159. Self.closeOpCh <- struct{}{}
  160. }
  161. }
  162. type ReceiveWindow struct {
  163. window
  164. bufQueue ReceiveWindowQueue
  165. element *common.ListElement
  166. count int8
  167. once sync.Once
  168. }
  169. func (Self *ReceiveWindow) New(mux *Mux) {
  170. // initial a window for receive
  171. Self.bufQueue.New()
  172. Self.element = common.ListElementPool.Get()
  173. Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
  174. Self.mux = mux
  175. Self.window.New()
  176. }
  177. func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
  178. // receive window remaining
  179. l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
  180. l -= int64(delta)
  181. if l > 0 {
  182. n = uint32(l)
  183. }
  184. return
  185. }
  186. func (Self *ReceiveWindow) calcSize() {
  187. // calculating maximum receive window size
  188. if Self.count == 0 {
  189. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  190. conns := Self.mux.connMap.Size()
  191. n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
  192. Self.mux.bw.Get() / float64(conns))
  193. if n < common.MAXIMUM_SEGMENT_SIZE*10 {
  194. n = common.MAXIMUM_SEGMENT_SIZE * 10
  195. }
  196. bufLen := Self.bufQueue.Len()
  197. if n < bufLen {
  198. n = bufLen
  199. }
  200. if n < Self.maxSize/2 {
  201. n = Self.maxSize / 2
  202. }
  203. // set the minimal size
  204. if n > 2*Self.maxSize {
  205. n = 2 * Self.maxSize
  206. }
  207. if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
  208. n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
  209. }
  210. // set the maximum size
  211. //logs.Warn("n", n)
  212. atomic.StoreUint32(&Self.maxSize, n)
  213. Self.count = -10
  214. }
  215. Self.count += 1
  216. return
  217. }
  218. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  219. if Self.closeOp {
  220. return errors.New("conn.receiveWindow: write on closed window")
  221. }
  222. element, err := NewListElement(buf, l, part)
  223. //logs.Warn("push the buf", len(buf), l, (&element).l)
  224. if err != nil {
  225. return
  226. }
  227. Self.calcSize() // calculate the max window size
  228. var wait uint32
  229. start:
  230. ptrs := atomic.LoadUint64(&Self.remainingWait)
  231. _, wait = Self.unpack(ptrs)
  232. newRemaining := Self.remainingSize(l)
  233. // calculate the remaining window size now, plus the element we will push
  234. if newRemaining == 0 {
  235. //logs.Warn("window full true", remaining)
  236. wait = 1
  237. }
  238. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
  239. goto start
  240. // another goroutine change the status, make sure shall we need wait
  241. }
  242. Self.bufQueue.Push(element)
  243. // status check finish, now we can push the element into the queue
  244. if wait == 0 {
  245. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, newRemaining)
  246. // send the remaining window size, not including zero size
  247. }
  248. return nil
  249. }
  250. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  251. if Self.closeOp {
  252. return 0, io.EOF // receive close signal, returns eof
  253. }
  254. pOff := 0
  255. l := 0
  256. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  257. copyData:
  258. if Self.off == uint32(Self.element.L) {
  259. // on the first Read method invoked, Self.off and Self.element.l
  260. // both zero value
  261. common.ListElementPool.Put(Self.element)
  262. if Self.closeOp {
  263. return 0, io.EOF
  264. }
  265. Self.element, err = Self.bufQueue.Pop()
  266. // if the queue is empty, Pop method will wait until one element push
  267. // into the queue successful, or timeout.
  268. // timer start on timeout parameter is set up ,
  269. // reset to 60s if timeout and data still available
  270. Self.off = 0
  271. if err != nil {
  272. Self.CloseWindow() // also close the window, to avoid read twice
  273. return // queue receive stop or time out, break the loop and return
  274. }
  275. //logs.Warn("pop element", Self.element.l, Self.element.part)
  276. }
  277. l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
  278. pOff += l
  279. Self.off += uint32(l)
  280. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  281. n += l
  282. l = 0
  283. if Self.off == uint32(Self.element.L) {
  284. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  285. common.WindowBuff.Put(Self.element.Buf)
  286. Self.sendStatus(id, Self.element.L)
  287. // check the window full status
  288. }
  289. if pOff < len(p) && Self.element.Part {
  290. // element is a part of the segments, trying to fill up buf p
  291. goto copyData
  292. }
  293. return // buf p is full or all of segments in buf, return
  294. }
  295. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  296. var remaining, wait uint32
  297. for {
  298. ptrs := atomic.LoadUint64(&Self.remainingWait)
  299. remaining, wait = Self.unpack(ptrs)
  300. remaining += uint32(l)
  301. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
  302. break
  303. }
  304. runtime.Gosched()
  305. // another goroutine change remaining or wait status, make sure
  306. // we need acknowledge other side
  307. }
  308. // now we get the current window status success
  309. if wait == 1 {
  310. //logs.Warn("send the wait status", remaining)
  311. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, remaining)
  312. }
  313. return
  314. }
  315. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  316. // waiting for FIFO queue Pop method
  317. Self.bufQueue.SetTimeOut(t)
  318. }
  319. func (Self *ReceiveWindow) Stop() {
  320. // queue has no more data to push, so unblock pop method
  321. Self.once.Do(Self.bufQueue.Stop)
  322. }
  323. func (Self *ReceiveWindow) CloseWindow() {
  324. Self.window.CloseWindow()
  325. Self.Stop()
  326. Self.release()
  327. }
  328. func (Self *ReceiveWindow) release() {
  329. //if Self.element != nil {
  330. // if Self.element.Buf != nil {
  331. // common.WindowBuff.Put(Self.element.Buf)
  332. // }
  333. // common.ListElementPool.Put(Self.element)
  334. //}
  335. for {
  336. ele := Self.bufQueue.TryPop()
  337. if ele == nil {
  338. return
  339. }
  340. if ele.Buf != nil {
  341. common.WindowBuff.Put(ele.Buf)
  342. }
  343. common.ListElementPool.Put(ele)
  344. } // release resource
  345. }
  346. type SendWindow struct {
  347. window
  348. buf []byte
  349. setSizeCh chan struct{}
  350. timeout time.Time
  351. }
  352. func (Self *SendWindow) New(mux *Mux) {
  353. Self.setSizeCh = make(chan struct{})
  354. Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
  355. atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
  356. Self.mux = mux
  357. Self.window.New()
  358. }
  359. func (Self *SendWindow) SetSendBuf(buf []byte) {
  360. // send window buff from conn write method, set it to send window
  361. Self.buf = buf
  362. Self.off = 0
  363. }
  364. func (Self *SendWindow) SetSize(newRemaining uint32) (closed bool) {
  365. // set the window size from receive window
  366. defer func() {
  367. if recover() != nil {
  368. closed = true
  369. }
  370. }()
  371. if Self.closeOp {
  372. close(Self.setSizeCh)
  373. return true
  374. }
  375. //logs.Warn("set send window size to ", windowSize, newRemaining)
  376. var remaining, wait, newWait uint32
  377. for {
  378. ptrs := atomic.LoadUint64(&Self.remainingWait)
  379. remaining, wait = Self.unpack(ptrs)
  380. if remaining == newRemaining {
  381. //logs.Warn("waiting for another window size")
  382. return false // waiting for receive another usable window size
  383. }
  384. if newRemaining == 0 && wait == 1 {
  385. newWait = 1 // keep the wait status,
  386. // also if newRemaining is not zero, change wait to 0
  387. }
  388. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
  389. break
  390. }
  391. // anther goroutine change wait status or window size
  392. }
  393. if wait == 1 {
  394. // send window into the wait status, need notice the channel
  395. //logs.Warn("send window remaining size is 0")
  396. Self.allow()
  397. }
  398. // send window not into the wait status, so just do slide
  399. return false
  400. }
  401. func (Self *SendWindow) allow() {
  402. select {
  403. case Self.setSizeCh <- struct{}{}:
  404. //logs.Warn("send window remaining size is 0 finish")
  405. return
  406. case <-Self.closeOpCh:
  407. close(Self.setSizeCh)
  408. return
  409. }
  410. }
  411. func (Self *SendWindow) sent(sentSize uint32) {
  412. atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
  413. }
  414. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  415. // returns buf segments, return only one segments, need a loop outside
  416. // until err = io.EOF
  417. if Self.closeOp {
  418. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  419. }
  420. if Self.off == uint32(len(Self.buf)) {
  421. return nil, 0, false, io.EOF
  422. // send window buff is drain, return eof and get another one
  423. }
  424. var remaining uint32
  425. start:
  426. ptrs := atomic.LoadUint64(&Self.remainingWait)
  427. remaining, _ = Self.unpack(ptrs)
  428. if remaining == 0 {
  429. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
  430. goto start // another goroutine change the window, try again
  431. }
  432. // into the wait status
  433. //logs.Warn("send window into wait status")
  434. err = Self.waitReceiveWindow()
  435. if err != nil {
  436. return nil, 0, false, err
  437. }
  438. //logs.Warn("rem into wait finish")
  439. goto start
  440. }
  441. // there are still remaining window
  442. //logs.Warn("rem", remaining)
  443. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  444. sendSize = common.MAXIMUM_SEGMENT_SIZE
  445. //logs.Warn("cut buf by mss")
  446. } else {
  447. sendSize = uint32(len(Self.buf[Self.off:]))
  448. }
  449. if remaining < sendSize {
  450. // usable window size is small than
  451. // window MAXIMUM_SEGMENT_SIZE or send buf left
  452. sendSize = remaining
  453. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  454. }
  455. //logs.Warn("send size", sendSize)
  456. if sendSize < uint32(len(Self.buf[Self.off:])) {
  457. part = true
  458. }
  459. p = Self.buf[Self.off : sendSize+Self.off]
  460. Self.off += sendSize
  461. Self.sent(sendSize)
  462. return
  463. }
  464. func (Self *SendWindow) waitReceiveWindow() (err error) {
  465. t := Self.timeout.Sub(time.Now())
  466. if t < 0 { // not set the timeout, wait for it as long as connection close
  467. select {
  468. case _, ok := <-Self.setSizeCh:
  469. if !ok {
  470. return errors.New("conn.writeWindow: window closed")
  471. }
  472. return nil
  473. case <-Self.closeOpCh:
  474. return errors.New("conn.writeWindow: window closed")
  475. }
  476. }
  477. timer := time.NewTimer(t)
  478. defer timer.Stop()
  479. // waiting for receive usable window size, or timeout
  480. select {
  481. case _, ok := <-Self.setSizeCh:
  482. if !ok {
  483. return errors.New("conn.writeWindow: window closed")
  484. }
  485. return nil
  486. case <-timer.C:
  487. return errors.New("conn.writeWindow: write to time out")
  488. case <-Self.closeOpCh:
  489. return errors.New("conn.writeWindow: window closed")
  490. }
  491. }
  492. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  493. Self.SetSendBuf(buf) // set the buf to send window
  494. //logs.Warn("set the buf to send window")
  495. var bufSeg []byte
  496. var part bool
  497. var l uint32
  498. for {
  499. bufSeg, l, part, err = Self.WriteTo()
  500. //logs.Warn("buf seg", len(bufSeg), part, err)
  501. // get the buf segments from send window
  502. if bufSeg == nil && part == false && err == io.EOF {
  503. // send window is drain, break the loop
  504. err = nil
  505. break
  506. }
  507. if err != nil {
  508. break
  509. }
  510. n += int(l)
  511. l = 0
  512. if part {
  513. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  514. } else {
  515. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  516. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  517. }
  518. // send to other side, not send nil data to other side
  519. }
  520. //logs.Warn("buf seg write success")
  521. return
  522. }
  523. func (Self *SendWindow) SetTimeOut(t time.Time) {
  524. // waiting for receive a receive window size
  525. Self.timeout = t
  526. }
  527. //type bandwidth struct {
  528. // readStart time.Time
  529. // lastReadStart time.Time
  530. // readEnd time.Time
  531. // lastReadEnd time.Time
  532. // bufLength int
  533. // lastBufLength int
  534. // count int8
  535. // readBW float64
  536. // writeBW float64
  537. // readBandwidth float64
  538. //}
  539. //
  540. //func (Self *bandwidth) StartRead() {
  541. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  542. // if !Self.lastReadStart.IsZero() {
  543. // if Self.count == -5 {
  544. // Self.calcBandWidth()
  545. // }
  546. // }
  547. //}
  548. //
  549. //func (Self *bandwidth) EndRead() {
  550. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  551. // if Self.count == -5 {
  552. // Self.calcWriteBandwidth()
  553. // }
  554. // if Self.count == 0 {
  555. // Self.calcReadBandwidth()
  556. // Self.count = -6
  557. // }
  558. // Self.count += 1
  559. //}
  560. //
  561. //func (Self *bandwidth) SetCopySize(n int) {
  562. // // must be invoke between StartRead and EndRead
  563. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  564. //}
  565. //// calculating
  566. //// start end start end
  567. //// read read
  568. //// write
  569. //
  570. //func (Self *bandwidth) calcBandWidth() {
  571. // t := Self.readStart.Sub(Self.lastReadStart)
  572. // if Self.lastBufLength >= 32768 {
  573. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  574. // }
  575. //}
  576. //
  577. //func (Self *bandwidth) calcReadBandwidth() {
  578. // // Bandwidth between nps and npc
  579. // readTime := Self.readEnd.Sub(Self.readStart)
  580. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  581. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  582. //}
  583. //
  584. //func (Self *bandwidth) calcWriteBandwidth() {
  585. // // Bandwidth between nps and user, npc and application
  586. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  587. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  588. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  589. //}
  590. //
  591. //func (Self *bandwidth) Get() (bw float64) {
  592. // // The zero value, 0 for numeric types
  593. // if Self.writeBW == 0 && Self.readBW == 0 {
  594. // //logs.Warn("bw both 0")
  595. // return 100
  596. // }
  597. // if Self.writeBW == 0 && Self.readBW != 0 {
  598. // return Self.readBW
  599. // }
  600. // if Self.readBW == 0 && Self.writeBW != 0 {
  601. // return Self.writeBW
  602. // }
  603. // return Self.readBandwidth
  604. //}