conn.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. package mux
  2. import (
  3. "ehang.io/nps/lib/common"
  4. "errors"
  5. "github.com/astaxie/beego/logs"
  6. "io"
  7. "math"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type conn struct {
  15. net.Conn
  16. getStatusCh chan struct{}
  17. connStatusOkCh chan struct{}
  18. connStatusFailCh chan struct{}
  19. connId int32
  20. isClose bool
  21. closeFlag bool // close conn flag
  22. receiveWindow *ReceiveWindow
  23. sendWindow *SendWindow
  24. once sync.Once
  25. //label string
  26. }
  27. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  28. c := &conn{
  29. getStatusCh: make(chan struct{}),
  30. connStatusOkCh: make(chan struct{}),
  31. connStatusFailCh: make(chan struct{}),
  32. connId: connId,
  33. receiveWindow: new(ReceiveWindow),
  34. sendWindow: new(SendWindow),
  35. once: sync.Once{},
  36. }
  37. //if len(label) > 0 {
  38. // c.label = label[0]
  39. //}
  40. c.receiveWindow.New(mux)
  41. c.sendWindow.New(mux)
  42. //logm := &connLog{
  43. // startTime: time.Now(),
  44. // isClose: false,
  45. // logs: []string{c.label + "new conn success"},
  46. //}
  47. //setM(label[0], int(connId), logm)
  48. return c
  49. }
  50. func (s *conn) Read(buf []byte) (n int, err error) {
  51. if s.isClose || buf == nil {
  52. return 0, errors.New("the conn has closed")
  53. }
  54. if len(buf) == 0 {
  55. return 0, nil
  56. }
  57. // waiting for takeout from receive window finish or timeout
  58. //now := time.Now()
  59. n, err = s.receiveWindow.Read(buf, s.connId)
  60. //t := time.Now().Sub(now)
  61. //if t.Seconds() > 0.5 {
  62. //logs.Warn("conn read long", n, t.Seconds())
  63. //}
  64. //var errstr string
  65. //if err == nil {
  66. // errstr = "err:nil"
  67. //} else {
  68. // errstr = err.Error()
  69. //}
  70. //d := getM(s.label, int(s.connId))
  71. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  72. //setM(s.label, int(s.connId), d)
  73. return
  74. }
  75. func (s *conn) Write(buf []byte) (n int, err error) {
  76. if s.isClose {
  77. return 0, errors.New("the conn has closed")
  78. }
  79. if s.closeFlag {
  80. //s.Close()
  81. return 0, errors.New("io: write on closed conn")
  82. }
  83. if len(buf) == 0 {
  84. return 0, nil
  85. }
  86. //logs.Warn("write buf", len(buf))
  87. //now := time.Now()
  88. n, err = s.sendWindow.WriteFull(buf, s.connId)
  89. //t := time.Now().Sub(now)
  90. //if t.Seconds() > 0.5 {
  91. // logs.Warn("conn write long", n, t.Seconds())
  92. //}
  93. return
  94. }
  95. func (s *conn) Close() (err error) {
  96. s.once.Do(s.closeProcess)
  97. return
  98. }
  99. func (s *conn) closeProcess() {
  100. s.isClose = true
  101. s.receiveWindow.mux.connMap.Delete(s.connId)
  102. if !s.receiveWindow.mux.IsClose {
  103. // if server or user close the conn while reading, will get a io.EOF
  104. // and this Close method will be invoke, send this signal to close other side
  105. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  106. }
  107. s.sendWindow.CloseWindow()
  108. s.receiveWindow.CloseWindow()
  109. //d := getM(s.label, int(s.connId))
  110. //d.isClose = true
  111. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  112. //setM(s.label, int(s.connId), d)
  113. return
  114. }
  115. func (s *conn) LocalAddr() net.Addr {
  116. return s.receiveWindow.mux.conn.LocalAddr()
  117. }
  118. func (s *conn) RemoteAddr() net.Addr {
  119. return s.receiveWindow.mux.conn.RemoteAddr()
  120. }
  121. func (s *conn) SetDeadline(t time.Time) error {
  122. _ = s.SetReadDeadline(t)
  123. _ = s.SetWriteDeadline(t)
  124. return nil
  125. }
  126. func (s *conn) SetReadDeadline(t time.Time) error {
  127. s.receiveWindow.SetTimeOut(t)
  128. return nil
  129. }
  130. func (s *conn) SetWriteDeadline(t time.Time) error {
  131. s.sendWindow.SetTimeOut(t)
  132. return nil
  133. }
  134. type window struct {
  135. maxSizeDone uint64
  136. // 64bit alignment
  137. // maxSizeDone contains 4 parts
  138. // 1 31 1 31
  139. // wait maxSize useless done
  140. // wait zero means false, one means true
  141. off uint32
  142. closeOp bool
  143. closeOpCh chan struct{}
  144. mux *Mux
  145. }
  146. const windowBits = 31
  147. const waitBits = dequeueBits + windowBits
  148. const mask1 = 1
  149. const mask31 = 1<<windowBits - 1
  150. func (Self *window) unpack(ptrs uint64) (maxSize, done uint32, wait bool) {
  151. maxSize = uint32((ptrs >> dequeueBits) & mask31)
  152. done = uint32(ptrs & mask31)
  153. //logs.Warn("unpack", maxSize, done)
  154. if ((ptrs >> waitBits) & mask1) == 1 {
  155. wait = true
  156. return
  157. }
  158. return
  159. }
  160. func (Self *window) pack(maxSize, done uint32, wait bool) uint64 {
  161. //logs.Warn("pack", maxSize, done, wait)
  162. if wait {
  163. return (uint64(1)<<waitBits |
  164. uint64(maxSize&mask31)<<dequeueBits) |
  165. uint64(done&mask31)
  166. }
  167. return (uint64(0)<<waitBits |
  168. uint64(maxSize&mask31)<<dequeueBits) |
  169. uint64(done&mask31)
  170. }
  171. func (Self *window) New() {
  172. Self.closeOpCh = make(chan struct{}, 2)
  173. }
  174. func (Self *window) CloseWindow() {
  175. if !Self.closeOp {
  176. Self.closeOp = true
  177. Self.closeOpCh <- struct{}{}
  178. Self.closeOpCh <- struct{}{}
  179. }
  180. }
  181. type ReceiveWindow struct {
  182. window
  183. bufQueue *ReceiveWindowQueue
  184. element *common.ListElement
  185. count int8
  186. bw *writeBandwidth
  187. once sync.Once
  188. // receive window send the current max size and read size to send window
  189. // means done size actually store the size receive window has read
  190. }
  191. func (Self *ReceiveWindow) New(mux *Mux) {
  192. // initial a window for receive
  193. Self.bufQueue = NewReceiveWindowQueue()
  194. Self.element = common.ListElementPool.Get()
  195. Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
  196. Self.mux = mux
  197. Self.window.New()
  198. Self.bw = NewWriteBandwidth()
  199. }
  200. func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) {
  201. // receive window remaining
  202. l := int64(maxSize) - int64(Self.bufQueue.Len())
  203. l -= int64(delta)
  204. if l > 0 {
  205. n = uint32(l)
  206. }
  207. return
  208. }
  209. func (Self *ReceiveWindow) calcSize() {
  210. // calculating maximum receive window size
  211. if Self.count == 0 {
  212. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  213. //conns := Self.mux.connMap.Size()
  214. muxBw := Self.mux.bw.Get()
  215. connBw := Self.bw.Get()
  216. //logs.Warn("muxbw connbw", muxBw, connBw)
  217. var n uint32
  218. if connBw > 0 && muxBw > 0 {
  219. n = uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
  220. (muxBw + connBw))
  221. }
  222. //logs.Warn(n)
  223. if n < common.MAXIMUM_SEGMENT_SIZE*30 {
  224. //logs.Warn("window small", n, Self.mux.bw.Get(), Self.bw.Get())
  225. n = common.MAXIMUM_SEGMENT_SIZE * 30
  226. }
  227. for {
  228. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  229. size, read, wait := Self.unpack(ptrs)
  230. if n < size/2 {
  231. n = size / 2
  232. // half reduce
  233. }
  234. // set the minimal size
  235. if n > 2*size {
  236. n = 2 * size
  237. // twice grow
  238. }
  239. if connBw > 0 && muxBw > 0 {
  240. limit := uint32(common.MAXIMUM_WINDOW_SIZE * (connBw / (muxBw + connBw)))
  241. if n > limit {
  242. logs.Warn("window too large, calculated:", n, "limit:", limit, connBw, muxBw)
  243. n = limit
  244. }
  245. }
  246. // set the maximum size
  247. //logs.Warn("n", n)
  248. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(n, read, wait)) {
  249. // only change the maxSize
  250. break
  251. }
  252. }
  253. Self.count = -10
  254. }
  255. Self.count += 1
  256. return
  257. }
  258. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  259. if Self.closeOp {
  260. return errors.New("conn.receiveWindow: write on closed window")
  261. }
  262. element, err := NewListElement(buf, l, part)
  263. //logs.Warn("push the buf", len(buf), l, element.L)
  264. if err != nil {
  265. return
  266. }
  267. Self.calcSize() // calculate the max window size
  268. var wait bool
  269. var maxSize, read uint32
  270. start:
  271. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  272. maxSize, read, wait = Self.unpack(ptrs)
  273. remain := Self.remainingSize(maxSize, l)
  274. // calculate the remaining window size now, plus the element we will push
  275. if remain == 0 && !wait {
  276. //logs.Warn("window full true", remaining)
  277. wait = true
  278. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
  279. // only change the wait status, not send the read size
  280. goto start
  281. // another goroutine change the status, make sure shall we need wait
  282. }
  283. //logs.Warn("receive window full")
  284. } else if !wait {
  285. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, wait)) {
  286. // reset read size here, and send the read size directly
  287. goto start
  288. // another goroutine change the status, make sure shall we need wait
  289. }
  290. } // maybe there are still some data received even if window is full, just keep the wait status
  291. // and push into queue. when receive window read enough, send window will be acknowledged.
  292. Self.bufQueue.Push(element)
  293. // status check finish, now we can push the element into the queue
  294. if !wait {
  295. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  296. // send the current status to send window
  297. }
  298. return nil
  299. }
  300. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  301. if Self.closeOp {
  302. return 0, io.EOF // receive close signal, returns eof
  303. }
  304. Self.bw.StartRead()
  305. n, err = Self.readFromQueue(p, id)
  306. Self.bw.SetCopySize(uint16(n))
  307. return
  308. }
  309. func (Self *ReceiveWindow) readFromQueue(p []byte, id int32) (n int, err error) {
  310. pOff := 0
  311. l := 0
  312. //logs.Warn("receive window read off, element.l", Self.off, Self.element.L)
  313. copyData:
  314. if Self.off == uint32(Self.element.L) {
  315. // on the first Read method invoked, Self.off and Self.element.l
  316. // both zero value
  317. common.ListElementPool.Put(Self.element)
  318. if Self.closeOp {
  319. return 0, io.EOF
  320. }
  321. Self.element, err = Self.bufQueue.Pop()
  322. // if the queue is empty, Pop method will wait until one element push
  323. // into the queue successful, or timeout.
  324. // timer start on timeout parameter is set up
  325. Self.off = 0
  326. if err != nil {
  327. Self.CloseWindow() // also close the window, to avoid read twice
  328. return // queue receive stop or time out, break the loop and return
  329. }
  330. //logs.Warn("pop element", Self.element.L, Self.element.Part)
  331. }
  332. l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
  333. pOff += l
  334. Self.off += uint32(l)
  335. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  336. n += l
  337. l = 0
  338. if Self.off == uint32(Self.element.L) {
  339. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  340. common.WindowBuff.Put(Self.element.Buf)
  341. Self.sendStatus(id, Self.element.L)
  342. // check the window full status
  343. }
  344. if pOff < len(p) && Self.element.Part {
  345. // element is a part of the segments, trying to fill up buf p
  346. goto copyData
  347. }
  348. return // buf p is full or all of segments in buf, return
  349. }
  350. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  351. var maxSize, read uint32
  352. var wait bool
  353. for {
  354. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  355. maxSize, read, wait = Self.unpack(ptrs)
  356. if read <= (read+uint32(l))&mask31 {
  357. read += uint32(l)
  358. remain := Self.remainingSize(maxSize, 0)
  359. if wait && remain > 0 || read >= maxSize/2 || remain == maxSize {
  360. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) {
  361. // now we get the current window status success
  362. // receive window free up some space we need acknowledge send window, also reset the read size
  363. // still having a condition that receive window is empty and not send the status to send window
  364. // so send the status here
  365. //logs.Warn("receive window free up some space", remain)
  366. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  367. break
  368. }
  369. } else {
  370. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
  371. // receive window not into the wait status, or still not having any space now,
  372. // just change the read size
  373. break
  374. }
  375. }
  376. } else {
  377. //overflow
  378. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, uint32(l), wait)) {
  379. // reset to l
  380. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  381. break
  382. }
  383. }
  384. runtime.Gosched()
  385. // another goroutine change remaining or wait status, make sure
  386. }
  387. return
  388. }
  389. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  390. // waiting for FIFO queue Pop method
  391. Self.bufQueue.SetTimeOut(t)
  392. }
  393. func (Self *ReceiveWindow) Stop() {
  394. // queue has no more data to push, so unblock pop method
  395. Self.once.Do(Self.bufQueue.Stop)
  396. }
  397. func (Self *ReceiveWindow) CloseWindow() {
  398. Self.window.CloseWindow()
  399. Self.Stop()
  400. Self.release()
  401. }
  402. func (Self *ReceiveWindow) release() {
  403. //if Self.element != nil {
  404. // if Self.element.Buf != nil {
  405. // common.WindowBuff.Put(Self.element.Buf)
  406. // }
  407. // common.ListElementPool.Put(Self.element)
  408. //}
  409. for {
  410. ele := Self.bufQueue.TryPop()
  411. if ele == nil {
  412. return
  413. }
  414. if ele.Buf != nil {
  415. common.WindowBuff.Put(ele.Buf)
  416. }
  417. common.ListElementPool.Put(ele)
  418. } // release resource
  419. }
  420. type SendWindow struct {
  421. window
  422. buf []byte
  423. setSizeCh chan struct{}
  424. timeout time.Time
  425. // send window receive the receive window max size and read size
  426. // done size store the size send window has send, send and read will be totally equal
  427. // so send minus read, send window can get the current window size remaining
  428. }
  429. func (Self *SendWindow) New(mux *Mux) {
  430. Self.setSizeCh = make(chan struct{})
  431. Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
  432. Self.mux = mux
  433. Self.window.New()
  434. }
  435. func (Self *SendWindow) SetSendBuf(buf []byte) {
  436. // send window buff from conn write method, set it to send window
  437. Self.buf = buf
  438. Self.off = 0
  439. }
  440. func (Self *SendWindow) remainingSize(maxSize, send uint32) uint32 {
  441. l := int64(maxSize&mask31) - int64(send&mask31)
  442. if l > 0 {
  443. return uint32(l)
  444. }
  445. return 0
  446. }
  447. func (Self *SendWindow) SetSize(currentMaxSizeDone uint64) (closed bool) {
  448. // set the window size from receive window
  449. defer func() {
  450. if recover() != nil {
  451. closed = true
  452. }
  453. }()
  454. if Self.closeOp {
  455. close(Self.setSizeCh)
  456. return true
  457. }
  458. //logs.Warn("set send window size to ", windowSize, newRemaining)
  459. var maxsize, send uint32
  460. var wait, newWait bool
  461. currentMaxSize, read, _ := Self.unpack(currentMaxSizeDone)
  462. for {
  463. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  464. maxsize, send, wait = Self.unpack(ptrs)
  465. if read > send {
  466. logs.Error("window read > send: max size:", currentMaxSize, "read:", read, "send", send)
  467. return
  468. }
  469. if read == 0 && currentMaxSize == maxsize {
  470. return
  471. }
  472. send -= read
  473. remain := Self.remainingSize(currentMaxSize, send)
  474. if remain == 0 && wait {
  475. // just keep the wait status
  476. newWait = true
  477. }
  478. // remain > 0, change wait to false. or remain == 0, wait is false, just keep it
  479. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(currentMaxSize, send, newWait)) {
  480. break
  481. }
  482. // anther goroutine change wait status or window size
  483. }
  484. if wait && !newWait {
  485. // send window into the wait status, need notice the channel
  486. //logs.Warn("send window allow")
  487. Self.allow()
  488. }
  489. // send window not into the wait status, so just do slide
  490. return false
  491. }
  492. func (Self *SendWindow) allow() {
  493. select {
  494. case Self.setSizeCh <- struct{}{}:
  495. //logs.Warn("send window remaining size is 0 finish")
  496. return
  497. case <-Self.closeOpCh:
  498. close(Self.setSizeCh)
  499. return
  500. }
  501. }
  502. func (Self *SendWindow) sent(sentSize uint32) {
  503. var maxSie, send uint32
  504. var wait bool
  505. for {
  506. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  507. maxSie, send, wait = Self.unpack(ptrs)
  508. if (send+sentSize)&mask31 < send {
  509. // overflow
  510. runtime.Gosched()
  511. continue
  512. }
  513. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSie, send+sentSize, wait)) {
  514. // set the send size
  515. //logs.Warn("sent", maxSie, send+sentSize, wait)
  516. break
  517. }
  518. }
  519. }
  520. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  521. // returns buf segments, return only one segments, need a loop outside
  522. // until err = io.EOF
  523. if Self.closeOp {
  524. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  525. }
  526. if Self.off == uint32(len(Self.buf)) {
  527. return nil, 0, false, io.EOF
  528. // send window buff is drain, return eof and get another one
  529. }
  530. var maxSize, send uint32
  531. start:
  532. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  533. maxSize, send, _ = Self.unpack(ptrs)
  534. remain := Self.remainingSize(maxSize, send)
  535. if remain == 0 {
  536. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, send, true)) {
  537. // just change the status wait status
  538. goto start // another goroutine change the window, try again
  539. }
  540. // into the wait status
  541. //logs.Warn("send window into wait status")
  542. err = Self.waitReceiveWindow()
  543. if err != nil {
  544. return nil, 0, false, err
  545. }
  546. //logs.Warn("rem into wait finish")
  547. goto start
  548. }
  549. // there are still remaining window
  550. //logs.Warn("rem", remain, maxSize, send)
  551. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  552. sendSize = common.MAXIMUM_SEGMENT_SIZE
  553. //logs.Warn("cut buf by mss")
  554. } else {
  555. sendSize = uint32(len(Self.buf[Self.off:]))
  556. }
  557. if remain < sendSize {
  558. // usable window size is small than
  559. // window MAXIMUM_SEGMENT_SIZE or send buf left
  560. sendSize = remain
  561. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  562. }
  563. //logs.Warn("send size", sendSize)
  564. if sendSize < uint32(len(Self.buf[Self.off:])) {
  565. part = true
  566. }
  567. p = Self.buf[Self.off : sendSize+Self.off]
  568. Self.off += sendSize
  569. Self.sent(sendSize)
  570. return
  571. }
  572. func (Self *SendWindow) waitReceiveWindow() (err error) {
  573. t := Self.timeout.Sub(time.Now())
  574. if t < 0 { // not set the timeout, wait for it as long as connection close
  575. select {
  576. case _, ok := <-Self.setSizeCh:
  577. if !ok {
  578. return errors.New("conn.writeWindow: window closed")
  579. }
  580. return nil
  581. case <-Self.closeOpCh:
  582. return errors.New("conn.writeWindow: window closed")
  583. }
  584. }
  585. timer := time.NewTimer(t)
  586. defer timer.Stop()
  587. // waiting for receive usable window size, or timeout
  588. select {
  589. case _, ok := <-Self.setSizeCh:
  590. if !ok {
  591. return errors.New("conn.writeWindow: window closed")
  592. }
  593. return nil
  594. case <-timer.C:
  595. return errors.New("conn.writeWindow: write to time out")
  596. case <-Self.closeOpCh:
  597. return errors.New("conn.writeWindow: window closed")
  598. }
  599. }
  600. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  601. Self.SetSendBuf(buf) // set the buf to send window
  602. //logs.Warn("set the buf to send window")
  603. var bufSeg []byte
  604. var part bool
  605. var l uint32
  606. for {
  607. bufSeg, l, part, err = Self.WriteTo()
  608. //logs.Warn("buf seg", len(bufSeg), part, err)
  609. // get the buf segments from send window
  610. if bufSeg == nil && part == false && err == io.EOF {
  611. // send window is drain, break the loop
  612. err = nil
  613. break
  614. }
  615. if err != nil {
  616. break
  617. }
  618. n += int(l)
  619. l = 0
  620. if part {
  621. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  622. } else {
  623. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  624. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  625. }
  626. // send to other side, not send nil data to other side
  627. }
  628. //logs.Warn("buf seg write success")
  629. return
  630. }
  631. func (Self *SendWindow) SetTimeOut(t time.Time) {
  632. // waiting for receive a receive window size
  633. Self.timeout = t
  634. }
  635. type writeBandwidth struct {
  636. writeBW uint64 // store in bits, but it's float64
  637. readEnd time.Time
  638. duration float64
  639. bufLength uint32
  640. }
  641. const writeCalcThreshold uint32 = 5 * 1024 * 1024
  642. func NewWriteBandwidth() *writeBandwidth {
  643. return &writeBandwidth{}
  644. }
  645. func (Self *writeBandwidth) StartRead() {
  646. if Self.readEnd.IsZero() {
  647. Self.readEnd = time.Now()
  648. }
  649. Self.duration += time.Now().Sub(Self.readEnd).Seconds()
  650. if Self.bufLength >= writeCalcThreshold {
  651. Self.calcBandWidth()
  652. }
  653. }
  654. func (Self *writeBandwidth) SetCopySize(n uint16) {
  655. Self.bufLength += uint32(n)
  656. Self.endRead()
  657. }
  658. func (Self *writeBandwidth) endRead() {
  659. Self.readEnd = time.Now()
  660. }
  661. func (Self *writeBandwidth) calcBandWidth() {
  662. atomic.StoreUint64(&Self.writeBW, math.Float64bits(float64(Self.bufLength)/Self.duration))
  663. Self.bufLength = 0
  664. Self.duration = 0
  665. }
  666. func (Self *writeBandwidth) Get() (bw float64) {
  667. // The zero value, 0 for numeric types
  668. bw = math.Float64frombits(atomic.LoadUint64(&Self.writeBW))
  669. if bw <= 0 {
  670. bw = 0
  671. }
  672. return
  673. }