conn.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "math"
  6. "net"
  7. "runtime"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/cnlh/nps/lib/common"
  12. )
  13. type conn struct {
  14. net.Conn
  15. getStatusCh chan struct{}
  16. connStatusOkCh chan struct{}
  17. connStatusFailCh chan struct{}
  18. connId int32
  19. isClose bool
  20. closeFlag bool // close conn flag
  21. receiveWindow *ReceiveWindow
  22. sendWindow *SendWindow
  23. once sync.Once
  24. //label string
  25. }
  26. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  27. c := &conn{
  28. getStatusCh: make(chan struct{}),
  29. connStatusOkCh: make(chan struct{}),
  30. connStatusFailCh: make(chan struct{}),
  31. connId: connId,
  32. receiveWindow: new(ReceiveWindow),
  33. sendWindow: new(SendWindow),
  34. once: sync.Once{},
  35. }
  36. //if len(label) > 0 {
  37. // c.label = label[0]
  38. //}
  39. c.receiveWindow.New(mux)
  40. c.sendWindow.New(mux)
  41. //logm := &connLog{
  42. // startTime: time.Now(),
  43. // isClose: false,
  44. // logs: []string{c.label + "new conn success"},
  45. //}
  46. //setM(label[0], int(connId), logm)
  47. return c
  48. }
  49. func (s *conn) Read(buf []byte) (n int, err error) {
  50. if s.isClose || buf == nil {
  51. return 0, errors.New("the conn has closed")
  52. }
  53. if len(buf) == 0 {
  54. return 0, nil
  55. }
  56. // waiting for takeout from receive window finish or timeout
  57. //now := time.Now()
  58. n, err = s.receiveWindow.Read(buf, s.connId)
  59. //t := time.Now().Sub(now)
  60. //if t.Seconds() > 0.5 {
  61. //logs.Warn("conn read long", n, t.Seconds())
  62. //}
  63. //var errstr string
  64. //if err == nil {
  65. // errstr = "err:nil"
  66. //} else {
  67. // errstr = err.Error()
  68. //}
  69. //d := getM(s.label, int(s.connId))
  70. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  71. //setM(s.label, int(s.connId), d)
  72. return
  73. }
  74. func (s *conn) Write(buf []byte) (n int, err error) {
  75. if s.isClose {
  76. return 0, errors.New("the conn has closed")
  77. }
  78. if s.closeFlag {
  79. //s.Close()
  80. return 0, errors.New("io: write on closed conn")
  81. }
  82. if len(buf) == 0 {
  83. return 0, nil
  84. }
  85. //logs.Warn("write buf", len(buf))
  86. //now := time.Now()
  87. n, err = s.sendWindow.WriteFull(buf, s.connId)
  88. //t := time.Now().Sub(now)
  89. //if t.Seconds() > 0.5 {
  90. // logs.Warn("conn write long", n, t.Seconds())
  91. //}
  92. return
  93. }
  94. func (s *conn) Close() (err error) {
  95. s.once.Do(s.closeProcess)
  96. return
  97. }
  98. func (s *conn) closeProcess() {
  99. s.isClose = true
  100. s.receiveWindow.mux.connMap.Delete(s.connId)
  101. if !s.receiveWindow.mux.IsClose {
  102. // if server or user close the conn while reading, will get a io.EOF
  103. // and this Close method will be invoke, send this signal to close other side
  104. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  105. }
  106. s.sendWindow.CloseWindow()
  107. s.receiveWindow.CloseWindow()
  108. //d := getM(s.label, int(s.connId))
  109. //d.isClose = true
  110. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  111. //setM(s.label, int(s.connId), d)
  112. return
  113. }
  114. func (s *conn) LocalAddr() net.Addr {
  115. return s.receiveWindow.mux.conn.LocalAddr()
  116. }
  117. func (s *conn) RemoteAddr() net.Addr {
  118. return s.receiveWindow.mux.conn.RemoteAddr()
  119. }
  120. func (s *conn) SetDeadline(t time.Time) error {
  121. _ = s.SetReadDeadline(t)
  122. _ = s.SetWriteDeadline(t)
  123. return nil
  124. }
  125. func (s *conn) SetReadDeadline(t time.Time) error {
  126. s.receiveWindow.SetTimeOut(t)
  127. return nil
  128. }
  129. func (s *conn) SetWriteDeadline(t time.Time) error {
  130. s.sendWindow.SetTimeOut(t)
  131. return nil
  132. }
  133. type window struct {
  134. remainingWait uint64 // 64bit alignment
  135. off uint32
  136. maxSize uint32
  137. closeOp bool
  138. closeOpCh chan struct{}
  139. mux *Mux
  140. }
  141. func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
  142. const mask = 1<<dequeueBits - 1
  143. remaining = uint32((ptrs >> dequeueBits) & mask)
  144. wait = uint32(ptrs & mask)
  145. return
  146. }
  147. func (Self *window) pack(remaining, wait uint32) uint64 {
  148. const mask = 1<<dequeueBits - 1
  149. return (uint64(remaining) << dequeueBits) |
  150. uint64(wait&mask)
  151. }
  152. func (Self *window) New() {
  153. Self.closeOpCh = make(chan struct{}, 2)
  154. }
  155. func (Self *window) CloseWindow() {
  156. if !Self.closeOp {
  157. Self.closeOp = true
  158. Self.closeOpCh <- struct{}{}
  159. Self.closeOpCh <- struct{}{}
  160. }
  161. }
  162. type ReceiveWindow struct {
  163. window
  164. bufQueue ReceiveWindowQueue
  165. element *common.ListElement
  166. count int8
  167. once sync.Once
  168. }
  169. func (Self *ReceiveWindow) New(mux *Mux) {
  170. // initial a window for receive
  171. Self.bufQueue.New()
  172. Self.element = common.ListElementPool.Get()
  173. Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
  174. Self.mux = mux
  175. Self.window.New()
  176. }
  177. func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
  178. // receive window remaining
  179. l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
  180. l -= int64(delta)
  181. if l > 0 {
  182. n = uint32(l)
  183. }
  184. return
  185. }
  186. func (Self *ReceiveWindow) calcSize() {
  187. // calculating maximum receive window size
  188. if Self.count == 0 {
  189. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  190. conns := Self.mux.connMap.Size()
  191. n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
  192. Self.mux.bw.Get() / float64(conns))
  193. if n < common.MAXIMUM_SEGMENT_SIZE*10 {
  194. n = common.MAXIMUM_SEGMENT_SIZE * 10
  195. }
  196. bufLen := Self.bufQueue.Len()
  197. if n < bufLen {
  198. n = bufLen
  199. }
  200. if n < Self.maxSize/2 {
  201. n = Self.maxSize / 2
  202. }
  203. // set the minimal size
  204. if n > 2*Self.maxSize {
  205. n = 2 * Self.maxSize
  206. }
  207. if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
  208. n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
  209. }
  210. // set the maximum size
  211. //logs.Warn("n", n)
  212. atomic.StoreUint32(&Self.maxSize, n)
  213. Self.count = -10
  214. }
  215. Self.count += 1
  216. return
  217. }
  218. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  219. if Self.closeOp {
  220. return errors.New("conn.receiveWindow: write on closed window")
  221. }
  222. element, err := NewListElement(buf, l, part)
  223. //logs.Warn("push the buf", len(buf), l, (&element).l)
  224. if err != nil {
  225. return
  226. }
  227. Self.calcSize() // calculate the max window size
  228. var wait uint32
  229. start:
  230. ptrs := atomic.LoadUint64(&Self.remainingWait)
  231. _, wait = Self.unpack(ptrs)
  232. newRemaining := Self.remainingSize(l)
  233. // calculate the remaining window size now, plus the element we will push
  234. if newRemaining == 0 {
  235. //logs.Warn("window full true", remaining)
  236. wait = 1
  237. }
  238. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
  239. goto start
  240. // another goroutine change the status, make sure shall we need wait
  241. }
  242. Self.bufQueue.Push(element)
  243. // status check finish, now we can push the element into the queue
  244. if wait == 0 {
  245. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
  246. // send the remaining window size, not including zero size
  247. }
  248. return nil
  249. }
  250. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  251. if Self.closeOp {
  252. return 0, io.EOF // receive close signal, returns eof
  253. }
  254. pOff := 0
  255. l := 0
  256. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  257. copyData:
  258. if Self.off == uint32(Self.element.L) {
  259. // on the first Read method invoked, Self.off and Self.element.l
  260. // both zero value
  261. common.ListElementPool.Put(Self.element)
  262. if Self.closeOp {
  263. return 0, io.EOF
  264. }
  265. Self.element, err = Self.bufQueue.Pop()
  266. // if the queue is empty, Pop method will wait until one element push
  267. // into the queue successful, or timeout.
  268. // timer start on timeout parameter is set up ,
  269. // reset to 60s if timeout and data still available
  270. Self.off = 0
  271. if err != nil {
  272. return // queue receive stop or time out, break the loop and return
  273. }
  274. //logs.Warn("pop element", Self.element.l, Self.element.part)
  275. }
  276. l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
  277. pOff += l
  278. Self.off += uint32(l)
  279. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  280. n += l
  281. l = 0
  282. if Self.off == uint32(Self.element.L) {
  283. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  284. common.WindowBuff.Put(Self.element.Buf)
  285. Self.sendStatus(id, Self.element.L)
  286. // check the window full status
  287. }
  288. if pOff < len(p) && Self.element.Part {
  289. // element is a part of the segments, trying to fill up buf p
  290. goto copyData
  291. }
  292. return // buf p is full or all of segments in buf, return
  293. }
  294. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  295. var remaining, wait uint32
  296. for {
  297. ptrs := atomic.LoadUint64(&Self.remainingWait)
  298. remaining, wait = Self.unpack(ptrs)
  299. remaining += uint32(l)
  300. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
  301. break
  302. }
  303. runtime.Gosched()
  304. // another goroutine change remaining or wait status, make sure
  305. // we need acknowledge other side
  306. }
  307. // now we get the current window status success
  308. if wait == 1 {
  309. //logs.Warn("send the wait status", remaining)
  310. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
  311. }
  312. return
  313. }
  314. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  315. // waiting for FIFO queue Pop method
  316. Self.bufQueue.SetTimeOut(t)
  317. }
  318. func (Self *ReceiveWindow) Stop() {
  319. // queue has no more data to push, so unblock pop method
  320. Self.once.Do(Self.bufQueue.Stop)
  321. }
  322. func (Self *ReceiveWindow) CloseWindow() {
  323. Self.window.CloseWindow()
  324. Self.Stop()
  325. Self.release()
  326. }
  327. func (Self *ReceiveWindow) release() {
  328. //if Self.element != nil {
  329. // if Self.element.Buf != nil {
  330. // common.WindowBuff.Put(Self.element.Buf)
  331. // }
  332. // common.ListElementPool.Put(Self.element)
  333. //}
  334. for {
  335. Self.element = Self.bufQueue.TryPop()
  336. if Self.element == nil {
  337. return
  338. }
  339. if Self.element.Buf != nil {
  340. common.WindowBuff.Put(Self.element.Buf)
  341. }
  342. common.ListElementPool.Put(Self.element)
  343. } // release resource
  344. }
  345. type SendWindow struct {
  346. window
  347. buf []byte
  348. setSizeCh chan struct{}
  349. timeout time.Time
  350. }
  351. func (Self *SendWindow) New(mux *Mux) {
  352. Self.setSizeCh = make(chan struct{})
  353. Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
  354. atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
  355. Self.mux = mux
  356. Self.window.New()
  357. }
  358. func (Self *SendWindow) SetSendBuf(buf []byte) {
  359. // send window buff from conn write method, set it to send window
  360. Self.buf = buf
  361. Self.off = 0
  362. }
  363. func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
  364. // set the window size from receive window
  365. defer func() {
  366. if recover() != nil {
  367. closed = true
  368. }
  369. }()
  370. if Self.closeOp {
  371. close(Self.setSizeCh)
  372. return true
  373. }
  374. //logs.Warn("set send window size to ", windowSize, newRemaining)
  375. var remaining, wait, newWait uint32
  376. for {
  377. ptrs := atomic.LoadUint64(&Self.remainingWait)
  378. remaining, wait = Self.unpack(ptrs)
  379. if remaining == newRemaining {
  380. //logs.Warn("waiting for another window size")
  381. return false // waiting for receive another usable window size
  382. }
  383. if newRemaining == 0 && wait == 1 {
  384. newWait = 1 // keep the wait status,
  385. // also if newRemaining is not zero, change wait to 0
  386. }
  387. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
  388. break
  389. }
  390. // anther goroutine change wait status or window size
  391. }
  392. if wait == 1 {
  393. // send window into the wait status, need notice the channel
  394. //logs.Warn("send window remaining size is 0")
  395. Self.allow()
  396. }
  397. // send window not into the wait status, so just do slide
  398. return false
  399. }
  400. func (Self *SendWindow) allow() {
  401. select {
  402. case Self.setSizeCh <- struct{}{}:
  403. //logs.Warn("send window remaining size is 0 finish")
  404. return
  405. case <-Self.closeOpCh:
  406. close(Self.setSizeCh)
  407. return
  408. }
  409. }
  410. func (Self *SendWindow) sent(sentSize uint32) {
  411. atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
  412. }
  413. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  414. // returns buf segments, return only one segments, need a loop outside
  415. // until err = io.EOF
  416. if Self.closeOp {
  417. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  418. }
  419. if Self.off == uint32(len(Self.buf)) {
  420. return nil, 0, false, io.EOF
  421. // send window buff is drain, return eof and get another one
  422. }
  423. var remaining uint32
  424. start:
  425. ptrs := atomic.LoadUint64(&Self.remainingWait)
  426. remaining, _ = Self.unpack(ptrs)
  427. if remaining == 0 {
  428. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
  429. goto start // another goroutine change the window, try again
  430. }
  431. // into the wait status
  432. //logs.Warn("send window into wait status")
  433. err = Self.waitReceiveWindow()
  434. if err != nil {
  435. return nil, 0, false, err
  436. }
  437. //logs.Warn("rem into wait finish")
  438. goto start
  439. }
  440. // there are still remaining window
  441. //logs.Warn("rem", remaining)
  442. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  443. sendSize = common.MAXIMUM_SEGMENT_SIZE
  444. //logs.Warn("cut buf by mss")
  445. } else {
  446. sendSize = uint32(len(Self.buf[Self.off:]))
  447. }
  448. if remaining < sendSize {
  449. // usable window size is small than
  450. // window MAXIMUM_SEGMENT_SIZE or send buf left
  451. sendSize = remaining
  452. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  453. }
  454. //logs.Warn("send size", sendSize)
  455. if sendSize < uint32(len(Self.buf[Self.off:])) {
  456. part = true
  457. }
  458. p = Self.buf[Self.off : sendSize+Self.off]
  459. Self.off += sendSize
  460. Self.sent(sendSize)
  461. return
  462. }
  463. func (Self *SendWindow) waitReceiveWindow() (err error) {
  464. t := Self.timeout.Sub(time.Now())
  465. if t < 0 {
  466. t = time.Minute * 5
  467. }
  468. timer := time.NewTimer(t)
  469. defer timer.Stop()
  470. // waiting for receive usable window size, or timeout
  471. select {
  472. case _, ok := <-Self.setSizeCh:
  473. if !ok {
  474. return errors.New("conn.writeWindow: window closed")
  475. }
  476. return nil
  477. case <-timer.C:
  478. return errors.New("conn.writeWindow: write to time out")
  479. case <-Self.closeOpCh:
  480. return errors.New("conn.writeWindow: window closed")
  481. }
  482. }
  483. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  484. Self.SetSendBuf(buf) // set the buf to send window
  485. //logs.Warn("set the buf to send window")
  486. var bufSeg []byte
  487. var part bool
  488. var l uint32
  489. for {
  490. bufSeg, l, part, err = Self.WriteTo()
  491. //logs.Warn("buf seg", len(bufSeg), part, err)
  492. // get the buf segments from send window
  493. if bufSeg == nil && part == false && err == io.EOF {
  494. // send window is drain, break the loop
  495. err = nil
  496. break
  497. }
  498. if err != nil {
  499. break
  500. }
  501. n += int(l)
  502. l = 0
  503. if part {
  504. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  505. } else {
  506. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  507. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  508. }
  509. // send to other side, not send nil data to other side
  510. }
  511. //logs.Warn("buf seg write success")
  512. return
  513. }
  514. func (Self *SendWindow) SetTimeOut(t time.Time) {
  515. // waiting for receive a receive window size
  516. Self.timeout = t
  517. }
  518. //type bandwidth struct {
  519. // readStart time.Time
  520. // lastReadStart time.Time
  521. // readEnd time.Time
  522. // lastReadEnd time.Time
  523. // bufLength int
  524. // lastBufLength int
  525. // count int8
  526. // readBW float64
  527. // writeBW float64
  528. // readBandwidth float64
  529. //}
  530. //
  531. //func (Self *bandwidth) StartRead() {
  532. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  533. // if !Self.lastReadStart.IsZero() {
  534. // if Self.count == -5 {
  535. // Self.calcBandWidth()
  536. // }
  537. // }
  538. //}
  539. //
  540. //func (Self *bandwidth) EndRead() {
  541. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  542. // if Self.count == -5 {
  543. // Self.calcWriteBandwidth()
  544. // }
  545. // if Self.count == 0 {
  546. // Self.calcReadBandwidth()
  547. // Self.count = -6
  548. // }
  549. // Self.count += 1
  550. //}
  551. //
  552. //func (Self *bandwidth) SetCopySize(n int) {
  553. // // must be invoke between StartRead and EndRead
  554. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  555. //}
  556. //// calculating
  557. //// start end start end
  558. //// read read
  559. //// write
  560. //
  561. //func (Self *bandwidth) calcBandWidth() {
  562. // t := Self.readStart.Sub(Self.lastReadStart)
  563. // if Self.lastBufLength >= 32768 {
  564. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  565. // }
  566. //}
  567. //
  568. //func (Self *bandwidth) calcReadBandwidth() {
  569. // // Bandwidth between nps and npc
  570. // readTime := Self.readEnd.Sub(Self.readStart)
  571. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  572. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  573. //}
  574. //
  575. //func (Self *bandwidth) calcWriteBandwidth() {
  576. // // Bandwidth between nps and user, npc and application
  577. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  578. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  579. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  580. //}
  581. //
  582. //func (Self *bandwidth) Get() (bw float64) {
  583. // // The zero value, 0 for numeric types
  584. // if Self.writeBW == 0 && Self.readBW == 0 {
  585. // //logs.Warn("bw both 0")
  586. // return 100
  587. // }
  588. // if Self.writeBW == 0 && Self.readBW != 0 {
  589. // return Self.readBW
  590. // }
  591. // if Self.readBW == 0 && Self.writeBW != 0 {
  592. // return Self.writeBW
  593. // }
  594. // return Self.readBandwidth
  595. //}