conn.go 18 KB

  1. package mux
  2. import (
  3. "errors"
  4. ""
  5. ""
  6. "io"
  7. "math"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type conn struct {
  15. net.Conn
  16. getStatusCh chan struct{}
  17. connStatusOkCh chan struct{}
  18. connStatusFailCh chan struct{}
  19. connId int32
  20. isClose bool
  21. closeFlag bool // close conn flag
  22. receiveWindow *ReceiveWindow
  23. sendWindow *SendWindow
  24. once sync.Once
  25. //label string
  26. }
  27. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  28. c := &conn{
  29. getStatusCh: make(chan struct{}),
  30. connStatusOkCh: make(chan struct{}),
  31. connStatusFailCh: make(chan struct{}),
  32. connId: connId,
  33. receiveWindow: new(ReceiveWindow),
  34. sendWindow: new(SendWindow),
  35. once: sync.Once{},
  36. }
  37. //if len(label) > 0 {
  38. // c.label = label[0]
  39. //}
  40. c.receiveWindow.New(mux)
  41. c.sendWindow.New(mux)
  42. //logm := &connLog{
  43. // startTime: time.Now(),
  44. // isClose: false,
  45. // logs: []string{c.label + "new conn success"},
  46. //}
  47. //setM(label[0], int(connId), logm)
  48. return c
  49. }
  50. func (s *conn) Read(buf []byte) (n int, err error) {
  51. if s.isClose || buf == nil {
  52. return 0, errors.New("the conn has closed")
  53. }
  54. if len(buf) == 0 {
  55. return 0, nil
  56. }
  57. // waiting for takeout from receive window finish or timeout
  58. //now := time.Now()
  59. n, err = s.receiveWindow.Read(buf, s.connId)
  60. //t := time.Now().Sub(now)
  61. //if t.Seconds() > 0.5 {
  62. //logs.Warn("conn read long", n, t.Seconds())
  63. //}
  64. //var errstr string
  65. //if err == nil {
  66. // errstr = "err:nil"
  67. //} else {
  68. // errstr = err.Error()
  69. //}
  70. //d := getM(s.label, int(s.connId))
  71. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  72. //setM(s.label, int(s.connId), d)
  73. return
  74. }
  75. func (s *conn) Write(buf []byte) (n int, err error) {
  76. if s.isClose {
  77. return 0, errors.New("the conn has closed")
  78. }
  79. if s.closeFlag {
  80. //s.Close()
  81. return 0, errors.New("io: write on closed conn")
  82. }
  83. if len(buf) == 0 {
  84. return 0, nil
  85. }
  86. //logs.Warn("write buf", len(buf))
  87. //now := time.Now()
  88. n, err = s.sendWindow.WriteFull(buf, s.connId)
  89. //t := time.Now().Sub(now)
  90. //if t.Seconds() > 0.5 {
  91. // logs.Warn("conn write long", n, t.Seconds())
  92. //}
  93. return
  94. }
  95. func (s *conn) Close() (err error) {
  96. s.once.Do(s.closeProcess)
  97. return
  98. }
  99. func (s *conn) closeProcess() {
  100. s.isClose = true
  101. s.receiveWindow.mux.connMap.Delete(s.connId)
  102. if !s.receiveWindow.mux.IsClose {
  103. // if server or user close the conn while reading, will get a io.EOF
  104. // and this Close method will be invoke, send this signal to close other side
  105. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  106. }
  107. s.sendWindow.CloseWindow()
  108. s.receiveWindow.CloseWindow()
  109. //d := getM(s.label, int(s.connId))
  110. //d.isClose = true
  111. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  112. //setM(s.label, int(s.connId), d)
  113. return
  114. }
  115. func (s *conn) LocalAddr() net.Addr {
  116. return s.receiveWindow.mux.conn.LocalAddr()
  117. }
  118. func (s *conn) RemoteAddr() net.Addr {
  119. return s.receiveWindow.mux.conn.RemoteAddr()
  120. }
  121. func (s *conn) SetDeadline(t time.Time) error {
  122. _ = s.SetReadDeadline(t)
  123. _ = s.SetWriteDeadline(t)
  124. return nil
  125. }
  126. func (s *conn) SetReadDeadline(t time.Time) error {
  127. s.receiveWindow.SetTimeOut(t)
  128. return nil
  129. }
  130. func (s *conn) SetWriteDeadline(t time.Time) error {
  131. s.sendWindow.SetTimeOut(t)
  132. return nil
  133. }
  134. type window struct {
  135. maxSizeDone uint64
  136. // 64bit alignment
  137. // maxSizeDone contains 4 parts
  138. // 1 31 1 31
  139. // wait maxSize useless done
  140. // wait zero means false, one means true
  141. off uint32
  142. closeOp bool
  143. closeOpCh chan struct{}
  144. mux *Mux
  145. }
  146. const windowBits = 31
  147. const waitBits = dequeueBits + windowBits
  148. const mask1 = 1
  149. const mask31 = 1<<windowBits - 1
  150. func (Self *window) unpack(ptrs uint64) (maxSize, done uint32, wait bool) {
  151. maxSize = uint32((ptrs >> dequeueBits) & mask31)
  152. done = uint32(ptrs & mask31)
  153. //logs.Warn("unpack", maxSize, done)
  154. if ((ptrs >> waitBits) & mask1) == 1 {
  155. wait = true
  156. return
  157. }
  158. return
  159. }
  160. func (Self *window) pack(maxSize, done uint32, wait bool) uint64 {
  161. //logs.Warn("pack", maxSize, done, wait)
  162. if wait {
  163. return (uint64(1)<<waitBits |
  164. uint64(maxSize&mask31)<<dequeueBits) |
  165. uint64(done&mask31)
  166. }
  167. return (uint64(0)<<waitBits |
  168. uint64(maxSize&mask31)<<dequeueBits) |
  169. uint64(done&mask31)
  170. }
  171. func (Self *window) New() {
  172. Self.closeOpCh = make(chan struct{}, 2)
  173. }
  174. func (Self *window) CloseWindow() {
  175. if !Self.closeOp {
  176. Self.closeOp = true
  177. Self.closeOpCh <- struct{}{}
  178. Self.closeOpCh <- struct{}{}
  179. }
  180. }
  181. type ReceiveWindow struct {
  182. window
  183. bufQueue ReceiveWindowQueue
  184. element *common.ListElement
  185. count int8
  186. bw *bandwidth
  187. once sync.Once
  188. // receive window send the current max size and read size to send window
  189. // means done size actually store the size receive window has read
  190. }
  191. func (Self *ReceiveWindow) New(mux *Mux) {
  192. // initial a window for receive
  193. Self.bufQueue.New()
  194. Self.element = common.ListElementPool.Get()
  195. Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
  196. Self.mux = mux
  197. Self.window.New()
  198. = NewBandwidth(nil)
  199. }
  200. func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) {
  201. // receive window remaining
  202. l := int64(maxSize) - int64(Self.bufQueue.Len())
  203. l -= int64(delta)
  204. if l > 0 {
  205. n = uint32(l)
  206. }
  207. return
  208. }
  209. func (Self *ReceiveWindow) calcSize() {
  210. // calculating maximum receive window size
  211. if Self.count == 0 {
  212. //logs.Warn("ping, bw", Self.mux.latency,
  213. conns := Self.mux.connMap.Size()
  214. n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
  215. ( +
  216. //logs.Warn(n)
  217. if n < common.MAXIMUM_SEGMENT_SIZE*30 {
  218. //logs.Warn("window small", n,,
  219. n = common.MAXIMUM_SEGMENT_SIZE * 30
  220. }
  221. for {
  222. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  223. size, read, wait := Self.unpack(ptrs)
  224. if n < size/2 {
  225. n = size / 2
  226. // half reduce
  227. }
  228. // set the minimal size
  229. if n > 2*size {
  230. n = 2 * size
  231. // twice grow
  232. }
  233. if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
  234. logs.Warn("window too large, calculated:", n, "limit:", common.MAXIMUM_WINDOW_SIZE/uint32(conns))
  235. n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
  236. }
  237. // set the maximum size
  238. //logs.Warn("n", n)
  239. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(n, read, wait)) {
  240. // only change the maxSize
  241. break
  242. }
  243. }
  244. Self.count = -10
  245. }
  246. Self.count += 1
  247. return
  248. }
  249. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  250. if Self.closeOp {
  251. return errors.New("conn.receiveWindow: write on closed window")
  252. }
  253. element, err := NewListElement(buf, l, part)
  254. //logs.Warn("push the buf", len(buf), l, element.L)
  255. if err != nil {
  256. return
  257. }
  258. Self.calcSize() // calculate the max window size
  259. var wait bool
  260. var maxSize, read uint32
  261. start:
  262. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  263. maxSize, read, wait = Self.unpack(ptrs)
  264. remain := Self.remainingSize(maxSize, l)
  265. // calculate the remaining window size now, plus the element we will push
  266. if remain == 0 && !wait {
  267. //logs.Warn("window full true", remaining)
  268. wait = true
  269. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
  270. // only change the wait status, not send the read size
  271. goto start
  272. // another goroutine change the status, make sure shall we need wait
  273. }
  274. //logs.Warn("receive window full")
  275. } else if !wait {
  276. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, wait)) {
  277. // reset read size here, and send the read size directly
  278. goto start
  279. // another goroutine change the status, make sure shall we need wait
  280. }
  281. } // maybe there are still some data received even if window is full, just keep the wait status
  282. // and push into queue. when receive window read enough, send window will be acknowledged.
  283. Self.bufQueue.Push(element)
  284. // status check finish, now we can push the element into the queue
  285. if !wait {
  286. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  287. // send the current status to send window
  288. }
  289. return nil
  290. }
  291. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  292. if Self.closeOp {
  293. return 0, io.EOF // receive close signal, returns eof
  294. }
  296. n, err = Self.readFromQueue(p, id)
  298. return
  299. }
  300. func (Self *ReceiveWindow) readFromQueue(p []byte, id int32) (n int, err error) {
  301. pOff := 0
  302. l := 0
  303. //logs.Warn("receive window read off, element.l",, Self.element.L)
  304. copyData:
  305. if == uint32(Self.element.L) {
  306. // on the first Read method invoked, and Self.element.l
  307. // both zero value
  308. common.ListElementPool.Put(Self.element)
  309. if Self.closeOp {
  310. return 0, io.EOF
  311. }
  312. Self.element, err = Self.bufQueue.Pop()
  313. // if the queue is empty, Pop method will wait until one element push
  314. // into the queue successful, or timeout.
  315. // timer start on timeout parameter is set up
  316. = 0
  317. if err != nil {
  318. Self.CloseWindow() // also close the window, to avoid read twice
  319. return // queue receive stop or time out, break the loop and return
  320. }
  321. //logs.Warn("pop element", Self.element.L, Self.element.Part)
  322. }
  323. l = copy(p[pOff:], Self.element.Buf[])
  324. pOff += l
  325. += uint32(l)
  326. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  327. n += l
  328. l = 0
  329. if == uint32(Self.element.L) {
  330. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  331. common.WindowBuff.Put(Self.element.Buf)
  332. Self.sendStatus(id, Self.element.L)
  333. // check the window full status
  334. }
  335. if pOff < len(p) && Self.element.Part {
  336. // element is a part of the segments, trying to fill up buf p
  337. goto copyData
  338. }
  339. return // buf p is full or all of segments in buf, return
  340. }
  341. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  342. var maxSize, read uint32
  343. var wait bool
  344. for {
  345. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  346. maxSize, read, wait = Self.unpack(ptrs)
  347. if read <= (read+uint32(l))&mask31 {
  348. read += uint32(l)
  349. remain := Self.remainingSize(maxSize, 0)
  350. if wait && remain > 0 || read >= maxSize/2 || remain == maxSize {
  351. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) {
  352. // now we get the current window status success
  353. // receive window free up some space we need acknowledge send window, also reset the read size
  354. // still having a condition that receive window is empty and not send the status to send window
  355. // so send the status here
  356. //logs.Warn("receive window free up some space", remain)
  357. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  358. break
  359. }
  360. } else {
  361. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
  362. // receive window not into the wait status, or still not having any space now,
  363. // just change the read size
  364. break
  365. }
  366. }
  367. } else {
  368. //overflow
  369. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, uint32(l), wait)) {
  370. // reset to l
  371. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  372. break
  373. }
  374. }
  375. runtime.Gosched()
  376. // another goroutine change remaining or wait status, make sure
  377. }
  378. return
  379. }
  380. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  381. // waiting for FIFO queue Pop method
  382. Self.bufQueue.SetTimeOut(t)
  383. }
  384. func (Self *ReceiveWindow) Stop() {
  385. // queue has no more data to push, so unblock pop method
  386. Self.once.Do(Self.bufQueue.Stop)
  387. }
  388. func (Self *ReceiveWindow) CloseWindow() {
  389. Self.window.CloseWindow()
  390. Self.Stop()
  391. Self.release()
  392. }
  393. func (Self *ReceiveWindow) release() {
  394. //if Self.element != nil {
  395. // if Self.element.Buf != nil {
  396. // common.WindowBuff.Put(Self.element.Buf)
  397. // }
  398. // common.ListElementPool.Put(Self.element)
  399. //}
  400. for {
  401. ele := Self.bufQueue.TryPop()
  402. if ele == nil {
  403. return
  404. }
  405. if ele.Buf != nil {
  406. common.WindowBuff.Put(ele.Buf)
  407. }
  408. common.ListElementPool.Put(ele)
  409. } // release resource
  410. }
  411. type SendWindow struct {
  412. window
  413. buf []byte
  414. setSizeCh chan struct{}
  415. timeout time.Time
  416. // send window receive the receive window max size and read size
  417. // done size store the size send window has send, send and read will be totally equal
  418. // so send minus read, send window can get the current window size remaining
  419. }
  420. func (Self *SendWindow) New(mux *Mux) {
  421. Self.setSizeCh = make(chan struct{})
  422. Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
  423. Self.mux = mux
  424. Self.window.New()
  425. }
  426. func (Self *SendWindow) SetSendBuf(buf []byte) {
  427. // send window buff from conn write method, set it to send window
  428. Self.buf = buf
  429. = 0
  430. }
  431. func (Self *SendWindow) remainingSize(maxSize, send uint32) uint32 {
  432. l := int64(maxSize&mask31) - int64(send&mask31)
  433. if l > 0 {
  434. return uint32(l)
  435. }
  436. return 0
  437. }
  438. func (Self *SendWindow) SetSize(currentMaxSizeDone uint64) (closed bool) {
  439. // set the window size from receive window
  440. defer func() {
  441. if recover() != nil {
  442. closed = true
  443. }
  444. }()
  445. if Self.closeOp {
  446. close(Self.setSizeCh)
  447. return true
  448. }
  449. //logs.Warn("set send window size to ", windowSize, newRemaining)
  450. var maxsize, send uint32
  451. var wait, newWait bool
  452. currentMaxSize, read, _ := Self.unpack(currentMaxSizeDone)
  453. for {
  454. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  455. maxsize, send, wait = Self.unpack(ptrs)
  456. if read > send {
  457. logs.Error("window read > send: max size:", currentMaxSize, "read:", read, "send", send)
  458. return
  459. }
  460. if read == 0 && currentMaxSize == maxsize {
  461. return
  462. }
  463. send -= read
  464. remain := Self.remainingSize(currentMaxSize, send)
  465. if remain == 0 && wait {
  466. // just keep the wait status
  467. newWait = true
  468. }
  469. // remain > 0, change wait to false. or remain == 0, wait is false, just keep it
  470. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(currentMaxSize, send, newWait)) {
  471. break
  472. }
  473. // anther goroutine change wait status or window size
  474. }
  475. if wait && !newWait {
  476. // send window into the wait status, need notice the channel
  477. //logs.Warn("send window allow")
  478. Self.allow()
  479. }
  480. // send window not into the wait status, so just do slide
  481. return false
  482. }
  483. func (Self *SendWindow) allow() {
  484. select {
  485. case Self.setSizeCh <- struct{}{}:
  486. //logs.Warn("send window remaining size is 0 finish")
  487. return
  488. case <-Self.closeOpCh:
  489. close(Self.setSizeCh)
  490. return
  491. }
  492. }
  493. func (Self *SendWindow) sent(sentSize uint32) {
  494. var maxSie, send uint32
  495. var wait bool
  496. for {
  497. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  498. maxSie, send, wait = Self.unpack(ptrs)
  499. if (send+sentSize)&mask31 < send {
  500. // overflow
  501. runtime.Gosched()
  502. continue
  503. }
  504. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSie, send+sentSize, wait)) {
  505. // set the send size
  506. //logs.Warn("sent", maxSie, send+sentSize, wait)
  507. break
  508. }
  509. }
  510. }
  511. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  512. // returns buf segments, return only one segments, need a loop outside
  513. // until err = io.EOF
  514. if Self.closeOp {
  515. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  516. }
  517. if == uint32(len(Self.buf)) {
  518. return nil, 0, false, io.EOF
  519. // send window buff is drain, return eof and get another one
  520. }
  521. var maxSize, send uint32
  522. start:
  523. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  524. maxSize, send, _ = Self.unpack(ptrs)
  525. remain := Self.remainingSize(maxSize, send)
  526. if remain == 0 {
  527. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, send, true)) {
  528. // just change the status wait status
  529. goto start // another goroutine change the window, try again
  530. }
  531. // into the wait status
  532. //logs.Warn("send window into wait status")
  533. err = Self.waitReceiveWindow()
  534. if err != nil {
  535. return nil, 0, false, err
  536. }
  537. //logs.Warn("rem into wait finish")
  538. goto start
  539. }
  540. // there are still remaining window
  541. //logs.Warn("rem", remain, maxSize, send)
  542. if len(Self.buf[]) > common.MAXIMUM_SEGMENT_SIZE {
  543. sendSize = common.MAXIMUM_SEGMENT_SIZE
  544. //logs.Warn("cut buf by mss")
  545. } else {
  546. sendSize = uint32(len(Self.buf[]))
  547. }
  548. if remain < sendSize {
  549. // usable window size is small than
  550. // window MAXIMUM_SEGMENT_SIZE or send buf left
  551. sendSize = remain
  552. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[]))
  553. }
  554. //logs.Warn("send size", sendSize)
  555. if sendSize < uint32(len(Self.buf[])) {
  556. part = true
  557. }
  558. p = Self.buf[ :]
  559. += sendSize
  560. Self.sent(sendSize)
  561. return
  562. }
  563. func (Self *SendWindow) waitReceiveWindow() (err error) {
  564. t := Self.timeout.Sub(time.Now())
  565. if t < 0 { // not set the timeout, wait for it as long as connection close
  566. select {
  567. case _, ok := <-Self.setSizeCh:
  568. if !ok {
  569. return errors.New("conn.writeWindow: window closed")
  570. }
  571. return nil
  572. case <-Self.closeOpCh:
  573. return errors.New("conn.writeWindow: window closed")
  574. }
  575. }
  576. timer := time.NewTimer(t)
  577. defer timer.Stop()
  578. // waiting for receive usable window size, or timeout
  579. select {
  580. case _, ok := <-Self.setSizeCh:
  581. if !ok {
  582. return errors.New("conn.writeWindow: window closed")
  583. }
  584. return nil
  585. case <-timer.C:
  586. return errors.New("conn.writeWindow: write to time out")
  587. case <-Self.closeOpCh:
  588. return errors.New("conn.writeWindow: window closed")
  589. }
  590. }
  591. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  592. Self.SetSendBuf(buf) // set the buf to send window
  593. //logs.Warn("set the buf to send window")
  594. var bufSeg []byte
  595. var part bool
  596. var l uint32
  597. for {
  598. bufSeg, l, part, err = Self.WriteTo()
  599. //logs.Warn("buf seg", len(bufSeg), part, err)
  600. // get the buf segments from send window
  601. if bufSeg == nil && part == false && err == io.EOF {
  602. // send window is drain, break the loop
  603. err = nil
  604. break
  605. }
  606. if err != nil {
  607. break
  608. }
  609. n += int(l)
  610. l = 0
  611. if part {
  612. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  613. } else {
  614. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  615. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  616. }
  617. // send to other side, not send nil data to other side
  618. }
  619. //logs.Warn("buf seg write success")
  620. return
  621. }
  622. func (Self *SendWindow) SetTimeOut(t time.Time) {
  623. // waiting for receive a receive window size
  624. Self.timeout = t
  625. }