conn.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/astaxie/beego/logs"
  5. "github.com/cnlh/nps/lib/common"
  6. "io"
  7. "math"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type conn struct {
  15. net.Conn
  16. getStatusCh chan struct{}
  17. connStatusOkCh chan struct{}
  18. connStatusFailCh chan struct{}
  19. connId int32
  20. isClose bool
  21. closeFlag bool // close conn flag
  22. receiveWindow *ReceiveWindow
  23. sendWindow *SendWindow
  24. once sync.Once
  25. //label string
  26. }
  27. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  28. c := &conn{
  29. getStatusCh: make(chan struct{}),
  30. connStatusOkCh: make(chan struct{}),
  31. connStatusFailCh: make(chan struct{}),
  32. connId: connId,
  33. receiveWindow: new(ReceiveWindow),
  34. sendWindow: new(SendWindow),
  35. once: sync.Once{},
  36. }
  37. //if len(label) > 0 {
  38. // c.label = label[0]
  39. //}
  40. c.receiveWindow.New(mux)
  41. c.sendWindow.New(mux)
  42. //logm := &connLog{
  43. // startTime: time.Now(),
  44. // isClose: false,
  45. // logs: []string{c.label + "new conn success"},
  46. //}
  47. //setM(label[0], int(connId), logm)
  48. return c
  49. }
  50. func (s *conn) Read(buf []byte) (n int, err error) {
  51. if s.isClose || buf == nil {
  52. return 0, errors.New("the conn has closed")
  53. }
  54. if len(buf) == 0 {
  55. return 0, nil
  56. }
  57. // waiting for takeout from receive window finish or timeout
  58. //now := time.Now()
  59. n, err = s.receiveWindow.Read(buf, s.connId)
  60. //t := time.Now().Sub(now)
  61. //if t.Seconds() > 0.5 {
  62. //logs.Warn("conn read long", n, t.Seconds())
  63. //}
  64. //var errstr string
  65. //if err == nil {
  66. // errstr = "err:nil"
  67. //} else {
  68. // errstr = err.Error()
  69. //}
  70. //d := getM(s.label, int(s.connId))
  71. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  72. //setM(s.label, int(s.connId), d)
  73. return
  74. }
  75. func (s *conn) Write(buf []byte) (n int, err error) {
  76. if s.isClose {
  77. return 0, errors.New("the conn has closed")
  78. }
  79. if s.closeFlag {
  80. //s.Close()
  81. return 0, errors.New("io: write on closed conn")
  82. }
  83. if len(buf) == 0 {
  84. return 0, nil
  85. }
  86. //logs.Warn("write buf", len(buf))
  87. //now := time.Now()
  88. n, err = s.sendWindow.WriteFull(buf, s.connId)
  89. //t := time.Now().Sub(now)
  90. //if t.Seconds() > 0.5 {
  91. // logs.Warn("conn write long", n, t.Seconds())
  92. //}
  93. return
  94. }
  95. func (s *conn) Close() (err error) {
  96. s.once.Do(s.closeProcess)
  97. return
  98. }
  99. func (s *conn) closeProcess() {
  100. s.isClose = true
  101. s.receiveWindow.mux.connMap.Delete(s.connId)
  102. if !s.receiveWindow.mux.IsClose {
  103. // if server or user close the conn while reading, will get a io.EOF
  104. // and this Close method will be invoke, send this signal to close other side
  105. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  106. }
  107. s.sendWindow.CloseWindow()
  108. s.receiveWindow.CloseWindow()
  109. //d := getM(s.label, int(s.connId))
  110. //d.isClose = true
  111. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  112. //setM(s.label, int(s.connId), d)
  113. return
  114. }
  115. func (s *conn) LocalAddr() net.Addr {
  116. return s.receiveWindow.mux.conn.LocalAddr()
  117. }
  118. func (s *conn) RemoteAddr() net.Addr {
  119. return s.receiveWindow.mux.conn.RemoteAddr()
  120. }
  121. func (s *conn) SetDeadline(t time.Time) error {
  122. _ = s.SetReadDeadline(t)
  123. _ = s.SetWriteDeadline(t)
  124. return nil
  125. }
  126. func (s *conn) SetReadDeadline(t time.Time) error {
  127. s.receiveWindow.SetTimeOut(t)
  128. return nil
  129. }
  130. func (s *conn) SetWriteDeadline(t time.Time) error {
  131. s.sendWindow.SetTimeOut(t)
  132. return nil
  133. }
  134. type window struct {
  135. maxSizeDone uint64
  136. // 64bit alignment
  137. // maxSizeDone contains 4 parts
  138. // 1 31 1 31
  139. // wait maxSize useless done
  140. // wait zero means false, one means true
  141. off uint32
  142. closeOp bool
  143. closeOpCh chan struct{}
  144. mux *Mux
  145. }
  146. const windowBits = 31
  147. const waitBits = dequeueBits + windowBits
  148. const mask1 = 1
  149. const mask31 = 1<<windowBits - 1
  150. func (Self *window) unpack(ptrs uint64) (maxSize, done uint32, wait bool) {
  151. maxSize = uint32((ptrs >> dequeueBits) & mask31)
  152. done = uint32(ptrs & mask31)
  153. //logs.Warn("unpack", maxSize, done)
  154. if ((ptrs >> waitBits) & mask1) == 1 {
  155. wait = true
  156. return
  157. }
  158. return
  159. }
  160. func (Self *window) pack(maxSize, done uint32, wait bool) uint64 {
  161. //logs.Warn("pack", maxSize, done, wait)
  162. if wait {
  163. return (uint64(1)<<waitBits |
  164. uint64(maxSize&mask31)<<dequeueBits) |
  165. uint64(done&mask31)
  166. }
  167. return (uint64(0)<<waitBits |
  168. uint64(maxSize&mask31)<<dequeueBits) |
  169. uint64(done&mask31)
  170. }
  171. func (Self *window) New() {
  172. Self.closeOpCh = make(chan struct{}, 2)
  173. }
  174. func (Self *window) CloseWindow() {
  175. if !Self.closeOp {
  176. Self.closeOp = true
  177. Self.closeOpCh <- struct{}{}
  178. Self.closeOpCh <- struct{}{}
  179. }
  180. }
  181. type ReceiveWindow struct {
  182. window
  183. bufQueue ReceiveWindowQueue
  184. element *common.ListElement
  185. count int8
  186. once sync.Once
  187. // receive window send the current max size and read size to send window
  188. // means done size actually store the size receive window has read
  189. }
  190. func (Self *ReceiveWindow) New(mux *Mux) {
  191. // initial a window for receive
  192. Self.bufQueue.New()
  193. Self.element = common.ListElementPool.Get()
  194. Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false)
  195. Self.mux = mux
  196. Self.window.New()
  197. }
  198. func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) {
  199. // receive window remaining
  200. l := int64(maxSize) - int64(Self.bufQueue.Len())
  201. l -= int64(delta)
  202. if l > 0 {
  203. n = uint32(l)
  204. }
  205. return
  206. }
  207. func (Self *ReceiveWindow) calcSize() {
  208. // calculating maximum receive window size
  209. if Self.count == 0 {
  210. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  211. conns := Self.mux.connMap.Size()
  212. n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
  213. Self.mux.bw.Get() / float64(conns))
  214. //logs.Warn(n)
  215. if n < common.MAXIMUM_SEGMENT_SIZE*10 {
  216. n = common.MAXIMUM_SEGMENT_SIZE * 10
  217. }
  218. for {
  219. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  220. size, read, wait := Self.unpack(ptrs)
  221. if n < size/2 {
  222. n = size / 2
  223. // half reduce
  224. }
  225. // set the minimal size
  226. if n > 2*size {
  227. n = 2 * size
  228. // twice grow
  229. }
  230. if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
  231. //logs.Warn("window too large", n)
  232. n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
  233. }
  234. // set the maximum size
  235. //logs.Warn("n", n)
  236. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(n, read, wait)) {
  237. // only change the maxSize
  238. break
  239. }
  240. }
  241. Self.count = -10
  242. }
  243. Self.count += 1
  244. return
  245. }
  246. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  247. if Self.closeOp {
  248. return errors.New("conn.receiveWindow: write on closed window")
  249. }
  250. element, err := NewListElement(buf, l, part)
  251. //logs.Warn("push the buf", len(buf), l, element.L)
  252. if err != nil {
  253. return
  254. }
  255. Self.calcSize() // calculate the max window size
  256. var wait bool
  257. var maxSize, read uint32
  258. start:
  259. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  260. maxSize, read, wait = Self.unpack(ptrs)
  261. remain := Self.remainingSize(maxSize, l)
  262. // calculate the remaining window size now, plus the element we will push
  263. if remain == 0 && !wait {
  264. //logs.Warn("window full true", remaining)
  265. wait = true
  266. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
  267. // only change the wait status, not send the read size
  268. goto start
  269. // another goroutine change the status, make sure shall we need wait
  270. }
  271. //logs.Warn("receive window full")
  272. } else if !wait {
  273. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, wait)) {
  274. // reset read size here, and send the read size directly
  275. goto start
  276. // another goroutine change the status, make sure shall we need wait
  277. }
  278. } // maybe there are still some data received even if window is full, just keep the wait status
  279. // and push into queue. when receive window read enough, send window will be acknowledged.
  280. Self.bufQueue.Push(element)
  281. // status check finish, now we can push the element into the queue
  282. if !wait {
  283. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  284. // send the current status to send window
  285. }
  286. return nil
  287. }
  288. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  289. if Self.closeOp {
  290. return 0, io.EOF // receive close signal, returns eof
  291. }
  292. pOff := 0
  293. l := 0
  294. //logs.Warn("receive window read off, element.l", Self.off, Self.element.L)
  295. copyData:
  296. if Self.off == uint32(Self.element.L) {
  297. // on the first Read method invoked, Self.off and Self.element.l
  298. // both zero value
  299. common.ListElementPool.Put(Self.element)
  300. if Self.closeOp {
  301. return 0, io.EOF
  302. }
  303. Self.element, err = Self.bufQueue.Pop()
  304. // if the queue is empty, Pop method will wait until one element push
  305. // into the queue successful, or timeout.
  306. // timer start on timeout parameter is set up
  307. Self.off = 0
  308. if err != nil {
  309. Self.CloseWindow() // also close the window, to avoid read twice
  310. return // queue receive stop or time out, break the loop and return
  311. }
  312. //logs.Warn("pop element", Self.element.L, Self.element.Part)
  313. }
  314. l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
  315. pOff += l
  316. Self.off += uint32(l)
  317. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  318. n += l
  319. l = 0
  320. if Self.off == uint32(Self.element.L) {
  321. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  322. common.WindowBuff.Put(Self.element.Buf)
  323. Self.sendStatus(id, Self.element.L)
  324. // check the window full status
  325. }
  326. if pOff < len(p) && Self.element.Part {
  327. // element is a part of the segments, trying to fill up buf p
  328. goto copyData
  329. }
  330. return // buf p is full or all of segments in buf, return
  331. }
  332. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  333. var maxSize, read uint32
  334. var wait bool
  335. for {
  336. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  337. maxSize, read, wait = Self.unpack(ptrs)
  338. if read <= (read+uint32(l))&mask31 {
  339. read += uint32(l)
  340. remain := Self.remainingSize(maxSize, 0)
  341. if wait && remain > 0 || remain == maxSize {
  342. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) {
  343. // now we get the current window status success
  344. // receive window free up some space we need acknowledge send window, also reset the read size
  345. // still having a condition that receive window is empty and not send the status to send window
  346. // so send the status here
  347. //logs.Warn("receive window free up some space", remain)
  348. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  349. break
  350. }
  351. } else {
  352. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
  353. // receive window not into the wait status, or still not having any space now,
  354. // just change the read size
  355. break
  356. }
  357. }
  358. } else {
  359. //overflow
  360. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, uint32(l), wait)) {
  361. // reset to l
  362. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
  363. break
  364. }
  365. }
  366. runtime.Gosched()
  367. // another goroutine change remaining or wait status, make sure
  368. }
  369. return
  370. }
  371. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  372. // waiting for FIFO queue Pop method
  373. Self.bufQueue.SetTimeOut(t)
  374. }
  375. func (Self *ReceiveWindow) Stop() {
  376. // queue has no more data to push, so unblock pop method
  377. Self.once.Do(Self.bufQueue.Stop)
  378. }
  379. func (Self *ReceiveWindow) CloseWindow() {
  380. Self.window.CloseWindow()
  381. Self.Stop()
  382. Self.release()
  383. }
  384. func (Self *ReceiveWindow) release() {
  385. //if Self.element != nil {
  386. // if Self.element.Buf != nil {
  387. // common.WindowBuff.Put(Self.element.Buf)
  388. // }
  389. // common.ListElementPool.Put(Self.element)
  390. //}
  391. for {
  392. ele := Self.bufQueue.TryPop()
  393. if ele == nil {
  394. return
  395. }
  396. if ele.Buf != nil {
  397. common.WindowBuff.Put(ele.Buf)
  398. }
  399. common.ListElementPool.Put(ele)
  400. } // release resource
  401. }
  402. type SendWindow struct {
  403. window
  404. buf []byte
  405. setSizeCh chan struct{}
  406. timeout time.Time
  407. // send window receive the receive window max size and read size
  408. // done size store the size send window has send, send and read will be totally equal
  409. // so send minus read, send window can get the current window size remaining
  410. }
  411. func (Self *SendWindow) New(mux *Mux) {
  412. Self.setSizeCh = make(chan struct{})
  413. Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false)
  414. Self.mux = mux
  415. Self.window.New()
  416. }
  417. func (Self *SendWindow) SetSendBuf(buf []byte) {
  418. // send window buff from conn write method, set it to send window
  419. Self.buf = buf
  420. Self.off = 0
  421. }
  422. func (Self *SendWindow) remainingSize(maxSize, send uint32) uint32 {
  423. l := int64(maxSize&mask31) - int64(send&mask31)
  424. if l > 0 {
  425. return uint32(l)
  426. }
  427. return 0
  428. }
  429. func (Self *SendWindow) SetSize(currentMaxSizeDone uint64) (closed bool) {
  430. // set the window size from receive window
  431. defer func() {
  432. if recover() != nil {
  433. closed = true
  434. }
  435. }()
  436. if Self.closeOp {
  437. close(Self.setSizeCh)
  438. return true
  439. }
  440. //logs.Warn("set send window size to ", windowSize, newRemaining)
  441. var maxsize, send uint32
  442. var wait, newWait bool
  443. currentMaxSize, read, _ := Self.unpack(currentMaxSizeDone)
  444. for {
  445. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  446. maxsize, send, wait = Self.unpack(ptrs)
  447. if read > send {
  448. logs.Error("read > send")
  449. return
  450. }
  451. if read == 0 && currentMaxSize == maxsize {
  452. return
  453. }
  454. send -= read
  455. remain := Self.remainingSize(currentMaxSize, send)
  456. if remain == 0 && wait {
  457. // just keep the wait status
  458. newWait = true
  459. }
  460. // remain > 0, change wait to false. or remain == 0, wait is false, just keep it
  461. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(currentMaxSize, send, newWait)) {
  462. break
  463. }
  464. // anther goroutine change wait status or window size
  465. }
  466. if wait && !newWait {
  467. // send window into the wait status, need notice the channel
  468. //logs.Warn("send window allow")
  469. Self.allow()
  470. }
  471. // send window not into the wait status, so just do slide
  472. return false
  473. }
  474. func (Self *SendWindow) allow() {
  475. select {
  476. case Self.setSizeCh <- struct{}{}:
  477. //logs.Warn("send window remaining size is 0 finish")
  478. return
  479. case <-Self.closeOpCh:
  480. close(Self.setSizeCh)
  481. return
  482. }
  483. }
  484. func (Self *SendWindow) sent(sentSize uint32) {
  485. var maxSie, send uint32
  486. var wait bool
  487. for {
  488. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  489. maxSie, send, wait = Self.unpack(ptrs)
  490. if (send+sentSize)&mask31 < send {
  491. // overflow
  492. runtime.Gosched()
  493. continue
  494. }
  495. if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSie, send+sentSize, wait)) {
  496. // set the send size
  497. //logs.Warn("sent", maxSie, send+sentSize, wait)
  498. break
  499. }
  500. }
  501. }
  502. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  503. // returns buf segments, return only one segments, need a loop outside
  504. // until err = io.EOF
  505. if Self.closeOp {
  506. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  507. }
  508. if Self.off == uint32(len(Self.buf)) {
  509. return nil, 0, false, io.EOF
  510. // send window buff is drain, return eof and get another one
  511. }
  512. var maxSize, send uint32
  513. start:
  514. ptrs := atomic.LoadUint64(&Self.maxSizeDone)
  515. maxSize, send, _ = Self.unpack(ptrs)
  516. remain := Self.remainingSize(maxSize, send)
  517. if remain == 0 {
  518. if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, send, true)) {
  519. // just change the status wait status
  520. goto start // another goroutine change the window, try again
  521. }
  522. // into the wait status
  523. //logs.Warn("send window into wait status")
  524. err = Self.waitReceiveWindow()
  525. if err != nil {
  526. return nil, 0, false, err
  527. }
  528. //logs.Warn("rem into wait finish")
  529. goto start
  530. }
  531. // there are still remaining window
  532. //logs.Warn("rem", remain, maxSize, send)
  533. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  534. sendSize = common.MAXIMUM_SEGMENT_SIZE
  535. //logs.Warn("cut buf by mss")
  536. } else {
  537. sendSize = uint32(len(Self.buf[Self.off:]))
  538. }
  539. if remain < sendSize {
  540. // usable window size is small than
  541. // window MAXIMUM_SEGMENT_SIZE or send buf left
  542. sendSize = remain
  543. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  544. }
  545. //logs.Warn("send size", sendSize)
  546. if sendSize < uint32(len(Self.buf[Self.off:])) {
  547. part = true
  548. }
  549. p = Self.buf[Self.off : sendSize+Self.off]
  550. Self.off += sendSize
  551. Self.sent(sendSize)
  552. return
  553. }
  554. func (Self *SendWindow) waitReceiveWindow() (err error) {
  555. t := Self.timeout.Sub(time.Now())
  556. if t < 0 { // not set the timeout, wait for it as long as connection close
  557. select {
  558. case _, ok := <-Self.setSizeCh:
  559. if !ok {
  560. return errors.New("conn.writeWindow: window closed")
  561. }
  562. return nil
  563. case <-Self.closeOpCh:
  564. return errors.New("conn.writeWindow: window closed")
  565. }
  566. }
  567. timer := time.NewTimer(t)
  568. defer timer.Stop()
  569. // waiting for receive usable window size, or timeout
  570. select {
  571. case _, ok := <-Self.setSizeCh:
  572. if !ok {
  573. return errors.New("conn.writeWindow: window closed")
  574. }
  575. return nil
  576. case <-timer.C:
  577. return errors.New("conn.writeWindow: write to time out")
  578. case <-Self.closeOpCh:
  579. return errors.New("conn.writeWindow: window closed")
  580. }
  581. }
  582. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  583. Self.SetSendBuf(buf) // set the buf to send window
  584. //logs.Warn("set the buf to send window")
  585. var bufSeg []byte
  586. var part bool
  587. var l uint32
  588. for {
  589. bufSeg, l, part, err = Self.WriteTo()
  590. //logs.Warn("buf seg", len(bufSeg), part, err)
  591. // get the buf segments from send window
  592. if bufSeg == nil && part == false && err == io.EOF {
  593. // send window is drain, break the loop
  594. err = nil
  595. break
  596. }
  597. if err != nil {
  598. break
  599. }
  600. n += int(l)
  601. l = 0
  602. if part {
  603. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  604. } else {
  605. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  606. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  607. }
  608. // send to other side, not send nil data to other side
  609. }
  610. //logs.Warn("buf seg write success")
  611. return
  612. }
  613. func (Self *SendWindow) SetTimeOut(t time.Time) {
  614. // waiting for receive a receive window size
  615. Self.timeout = t
  616. }
  617. //type bandwidth struct {
  618. // readStart time.Time
  619. // lastReadStart time.Time
  620. // readEnd time.Time
  621. // lastReadEnd time.Time
  622. // bufLength int
  623. // lastBufLength int
  624. // count int8
  625. // readBW float64
  626. // writeBW float64
  627. // readBandwidth float64
  628. //}
  629. //
  630. //func (Self *bandwidth) StartRead() {
  631. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  632. // if !Self.lastReadStart.IsZero() {
  633. // if Self.count == -5 {
  634. // Self.calcBandWidth()
  635. // }
  636. // }
  637. //}
  638. //
  639. //func (Self *bandwidth) EndRead() {
  640. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  641. // if Self.count == -5 {
  642. // Self.calcWriteBandwidth()
  643. // }
  644. // if Self.count == 0 {
  645. // Self.calcReadBandwidth()
  646. // Self.count = -6
  647. // }
  648. // Self.count += 1
  649. //}
  650. //
  651. //func (Self *bandwidth) SetCopySize(n int) {
  652. // // must be invoke between StartRead and EndRead
  653. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  654. //}
  655. //// calculating
  656. //// start end start end
  657. //// read read
  658. //// write
  659. //
  660. //func (Self *bandwidth) calcBandWidth() {
  661. // t := Self.readStart.Sub(Self.lastReadStart)
  662. // if Self.lastBufLength >= 32768 {
  663. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  664. // }
  665. //}
  666. //
  667. //func (Self *bandwidth) calcReadBandwidth() {
  668. // // Bandwidth between nps and npc
  669. // readTime := Self.readEnd.Sub(Self.readStart)
  670. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  671. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  672. //}
  673. //
  674. //func (Self *bandwidth) calcWriteBandwidth() {
  675. // // Bandwidth between nps and user, npc and application
  676. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  677. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  678. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  679. //}
  680. //
  681. //func (Self *bandwidth) Get() (bw float64) {
  682. // // The zero value, 0 for numeric types
  683. // if Self.writeBW == 0 && Self.readBW == 0 {
  684. // //logs.Warn("bw both 0")
  685. // return 100
  686. // }
  687. // if Self.writeBW == 0 && Self.readBW != 0 {
  688. // return Self.readBW
  689. // }
  690. // if Self.readBW == 0 && Self.writeBW != 0 {
  691. // return Self.writeBW
  692. // }
  693. // return Self.readBandwidth
  694. //}