conn.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/astaxie/beego/logs"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/cnlh/nps/lib/common"
  13. )
  14. type conn struct {
  15. net.Conn
  16. getStatusCh chan struct{}
  17. connStatusOkCh chan struct{}
  18. connStatusFailCh chan struct{}
  19. connId int32
  20. isClose bool
  21. closeFlag bool // close conn flag
  22. receiveWindow *ReceiveWindow
  23. sendWindow *SendWindow
  24. once sync.Once
  25. //label string
  26. }
  27. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  28. c := &conn{
  29. getStatusCh: make(chan struct{}),
  30. connStatusOkCh: make(chan struct{}),
  31. connStatusFailCh: make(chan struct{}),
  32. connId: connId,
  33. receiveWindow: new(ReceiveWindow),
  34. sendWindow: new(SendWindow),
  35. once: sync.Once{},
  36. }
  37. //if len(label) > 0 {
  38. // c.label = label[0]
  39. //}
  40. c.receiveWindow.New(mux)
  41. c.sendWindow.New(mux)
  42. //logm := &connLog{
  43. // startTime: time.Now(),
  44. // isClose: false,
  45. // logs: []string{c.label + "new conn success"},
  46. //}
  47. //setM(label[0], int(connId), logm)
  48. return c
  49. }
  50. func (s *conn) Read(buf []byte) (n int, err error) {
  51. if s.isClose || buf == nil {
  52. return 0, errors.New("the conn has closed")
  53. }
  54. if len(buf) == 0 {
  55. return 0, nil
  56. }
  57. // waiting for takeout from receive window finish or timeout
  58. //now := time.Now()
  59. n, err = s.receiveWindow.Read(buf, s.connId)
  60. //t := time.Now().Sub(now)
  61. //if t.Seconds() > 0.5 {
  62. //logs.Warn("conn read long", n, t.Seconds())
  63. //}
  64. //var errstr string
  65. //if err == nil {
  66. // errstr = "err:nil"
  67. //} else {
  68. // errstr = err.Error()
  69. //}
  70. //d := getM(s.label, int(s.connId))
  71. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  72. //setM(s.label, int(s.connId), d)
  73. return
  74. }
  75. func (s *conn) Write(buf []byte) (n int, err error) {
  76. if s.isClose {
  77. return 0, errors.New("the conn has closed")
  78. }
  79. if s.closeFlag {
  80. //s.Close()
  81. return 0, errors.New("io: write on closed conn")
  82. }
  83. if len(buf) == 0 {
  84. return 0, nil
  85. }
  86. //logs.Warn("write buf", len(buf))
  87. //now := time.Now()
  88. n, err = s.sendWindow.WriteFull(buf, s.connId)
  89. //t := time.Now().Sub(now)
  90. //if t.Seconds() > 0.5 {
  91. // logs.Warn("conn write long", n, t.Seconds())
  92. //}
  93. return
  94. }
  95. func (s *conn) Close() (err error) {
  96. s.once.Do(s.closeProcess)
  97. return
  98. }
  99. func (s *conn) closeProcess() {
  100. s.isClose = true
  101. s.receiveWindow.mux.connMap.Delete(s.connId)
  102. if !s.receiveWindow.mux.IsClose {
  103. // if server or user close the conn while reading, will get a io.EOF
  104. // and this Close method will be invoke, send this signal to close other side
  105. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  106. }
  107. s.sendWindow.CloseWindow()
  108. s.receiveWindow.CloseWindow()
  109. //d := getM(s.label, int(s.connId))
  110. //d.isClose = true
  111. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  112. //setM(s.label, int(s.connId), d)
  113. return
  114. }
  115. func (s *conn) LocalAddr() net.Addr {
  116. return s.receiveWindow.mux.conn.LocalAddr()
  117. }
  118. func (s *conn) RemoteAddr() net.Addr {
  119. return s.receiveWindow.mux.conn.RemoteAddr()
  120. }
  121. func (s *conn) SetDeadline(t time.Time) error {
  122. _ = s.SetReadDeadline(t)
  123. _ = s.SetWriteDeadline(t)
  124. return nil
  125. }
  126. func (s *conn) SetReadDeadline(t time.Time) error {
  127. s.receiveWindow.SetTimeOut(t)
  128. return nil
  129. }
  130. func (s *conn) SetWriteDeadline(t time.Time) error {
  131. s.sendWindow.SetTimeOut(t)
  132. return nil
  133. }
  134. type window struct {
  135. remainingWait uint64 // 64bit alignment
  136. off uint32
  137. maxSize uint32
  138. closeOp bool
  139. closeOpCh chan struct{}
  140. mux *Mux
  141. }
  142. func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
  143. const mask = 1<<dequeueBits - 1
  144. remaining = uint32((ptrs >> dequeueBits) & mask)
  145. wait = uint32(ptrs & mask)
  146. return
  147. }
  148. func (Self *window) pack(remaining, wait uint32) uint64 {
  149. const mask = 1<<dequeueBits - 1
  150. return (uint64(remaining) << dequeueBits) |
  151. uint64(wait&mask)
  152. }
  153. func (Self *window) New() {
  154. Self.closeOpCh = make(chan struct{}, 2)
  155. }
  156. func (Self *window) CloseWindow() {
  157. if !Self.closeOp {
  158. Self.closeOp = true
  159. Self.closeOpCh <- struct{}{}
  160. Self.closeOpCh <- struct{}{}
  161. }
  162. }
  163. type ReceiveWindow struct {
  164. window
  165. bufQueue ReceiveWindowQueue
  166. element *common.ListElement
  167. count int8
  168. once sync.Once
  169. }
  170. func (Self *ReceiveWindow) New(mux *Mux) {
  171. // initial a window for receive
  172. Self.bufQueue.New()
  173. Self.element = common.ListElementPool.Get()
  174. Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
  175. Self.mux = mux
  176. Self.window.New()
  177. }
  178. func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
  179. // receive window remaining
  180. l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
  181. l -= int64(delta)
  182. if l > 0 {
  183. n = uint32(l)
  184. }
  185. return
  186. }
  187. func (Self *ReceiveWindow) calcSize() {
  188. // calculating maximum receive window size
  189. if Self.count == 0 {
  190. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  191. conns := Self.mux.connMap.Size()
  192. n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
  193. Self.mux.bw.Get() / float64(conns))
  194. if n < common.MAXIMUM_SEGMENT_SIZE*10 {
  195. n = common.MAXIMUM_SEGMENT_SIZE * 10
  196. }
  197. //bufLen := Self.bufQueue.Len()
  198. //if n < bufLen {
  199. // n = bufLen
  200. //}
  201. if n < Self.maxSize/2 {
  202. n = Self.maxSize / 2
  203. }
  204. // set the minimal size
  205. if n > 2*Self.maxSize {
  206. n = 2 * Self.maxSize
  207. }
  208. if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
  209. logs.Warn("window too large", n)
  210. n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
  211. }
  212. // set the maximum size
  213. //logs.Warn("n", n)
  214. atomic.StoreUint32(&Self.maxSize, n)
  215. Self.count = -10
  216. }
  217. Self.count += 1
  218. return
  219. }
  220. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  221. if Self.closeOp {
  222. return errors.New("conn.receiveWindow: write on closed window")
  223. }
  224. element, err := NewListElement(buf, l, part)
  225. //logs.Warn("push the buf", len(buf), l, (&element).l)
  226. if err != nil {
  227. return
  228. }
  229. Self.calcSize() // calculate the max window size
  230. var wait uint32
  231. start:
  232. ptrs := atomic.LoadUint64(&Self.remainingWait)
  233. _, wait = Self.unpack(ptrs)
  234. newRemaining := Self.remainingSize(l)
  235. // calculate the remaining window size now, plus the element we will push
  236. if newRemaining == 0 {
  237. //logs.Warn("window full true", remaining)
  238. wait = 1
  239. }
  240. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
  241. goto start
  242. // another goroutine change the status, make sure shall we need wait
  243. }
  244. Self.bufQueue.Push(element)
  245. // status check finish, now we can push the element into the queue
  246. if wait == 0 {
  247. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
  248. // send the remaining window size, not including zero size
  249. }
  250. return nil
  251. }
  252. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  253. if Self.closeOp {
  254. return 0, io.EOF // receive close signal, returns eof
  255. }
  256. pOff := 0
  257. l := 0
  258. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  259. copyData:
  260. if Self.off == uint32(Self.element.L) {
  261. // on the first Read method invoked, Self.off and Self.element.l
  262. // both zero value
  263. common.ListElementPool.Put(Self.element)
  264. if Self.closeOp {
  265. return 0, io.EOF
  266. }
  267. Self.element, err = Self.bufQueue.Pop()
  268. // if the queue is empty, Pop method will wait until one element push
  269. // into the queue successful, or timeout.
  270. // timer start on timeout parameter is set up ,
  271. // reset to 60s if timeout and data still available
  272. Self.off = 0
  273. if err != nil {
  274. Self.CloseWindow() // also close the window, to avoid read twice
  275. return // queue receive stop or time out, break the loop and return
  276. }
  277. //logs.Warn("pop element", Self.element.l, Self.element.part)
  278. }
  279. l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
  280. pOff += l
  281. Self.off += uint32(l)
  282. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  283. n += l
  284. l = 0
  285. if Self.off == uint32(Self.element.L) {
  286. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  287. common.WindowBuff.Put(Self.element.Buf)
  288. Self.sendStatus(id, Self.element.L)
  289. // check the window full status
  290. }
  291. if pOff < len(p) && Self.element.Part {
  292. // element is a part of the segments, trying to fill up buf p
  293. goto copyData
  294. }
  295. return // buf p is full or all of segments in buf, return
  296. }
  297. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  298. var remaining, wait uint32
  299. for {
  300. ptrs := atomic.LoadUint64(&Self.remainingWait)
  301. remaining, wait = Self.unpack(ptrs)
  302. remaining += uint32(l)
  303. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
  304. break
  305. }
  306. runtime.Gosched()
  307. // another goroutine change remaining or wait status, make sure
  308. // we need acknowledge other side
  309. }
  310. // now we get the current window status success
  311. if wait == 1 {
  312. //logs.Warn("send the wait status", remaining)
  313. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
  314. }
  315. return
  316. }
  317. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  318. // waiting for FIFO queue Pop method
  319. Self.bufQueue.SetTimeOut(t)
  320. }
  321. func (Self *ReceiveWindow) Stop() {
  322. // queue has no more data to push, so unblock pop method
  323. Self.once.Do(Self.bufQueue.Stop)
  324. }
  325. func (Self *ReceiveWindow) CloseWindow() {
  326. Self.window.CloseWindow()
  327. Self.Stop()
  328. Self.release()
  329. }
  330. func (Self *ReceiveWindow) release() {
  331. //if Self.element != nil {
  332. // if Self.element.Buf != nil {
  333. // common.WindowBuff.Put(Self.element.Buf)
  334. // }
  335. // common.ListElementPool.Put(Self.element)
  336. //}
  337. for {
  338. ele := Self.bufQueue.TryPop()
  339. if ele == nil {
  340. return
  341. }
  342. if ele.Buf != nil {
  343. common.WindowBuff.Put(ele.Buf)
  344. }
  345. common.ListElementPool.Put(ele)
  346. } // release resource
  347. }
  348. type SendWindow struct {
  349. window
  350. buf []byte
  351. setSizeCh chan struct{}
  352. timeout time.Time
  353. }
  354. func (Self *SendWindow) New(mux *Mux) {
  355. Self.setSizeCh = make(chan struct{})
  356. Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
  357. atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
  358. Self.mux = mux
  359. Self.window.New()
  360. }
  361. func (Self *SendWindow) SetSendBuf(buf []byte) {
  362. // send window buff from conn write method, set it to send window
  363. Self.buf = buf
  364. Self.off = 0
  365. }
  366. func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
  367. // set the window size from receive window
  368. defer func() {
  369. if recover() != nil {
  370. closed = true
  371. }
  372. }()
  373. if Self.closeOp {
  374. close(Self.setSizeCh)
  375. return true
  376. }
  377. //logs.Warn("set send window size to ", windowSize, newRemaining)
  378. var remaining, wait, newWait uint32
  379. for {
  380. ptrs := atomic.LoadUint64(&Self.remainingWait)
  381. remaining, wait = Self.unpack(ptrs)
  382. if remaining == newRemaining {
  383. //logs.Warn("waiting for another window size")
  384. return false // waiting for receive another usable window size
  385. }
  386. if newRemaining == 0 && wait == 1 {
  387. newWait = 1 // keep the wait status,
  388. // also if newRemaining is not zero, change wait to 0
  389. }
  390. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
  391. break
  392. }
  393. // anther goroutine change wait status or window size
  394. }
  395. if wait == 1 {
  396. // send window into the wait status, need notice the channel
  397. //logs.Warn("send window remaining size is 0")
  398. Self.allow()
  399. }
  400. // send window not into the wait status, so just do slide
  401. return false
  402. }
  403. func (Self *SendWindow) allow() {
  404. select {
  405. case Self.setSizeCh <- struct{}{}:
  406. //logs.Warn("send window remaining size is 0 finish")
  407. return
  408. case <-Self.closeOpCh:
  409. close(Self.setSizeCh)
  410. return
  411. }
  412. }
  413. func (Self *SendWindow) sent(sentSize uint32) {
  414. var remaining, wait uint32
  415. for {
  416. ptrs := atomic.LoadUint64(&Self.remainingWait)
  417. remaining, wait = Self.unpack(ptrs)
  418. if remaining >= sentSize {
  419. atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
  420. break
  421. } else {
  422. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
  423. // just keep the wait status, it will be wait in the next loop
  424. break
  425. }
  426. }
  427. }
  428. }
  429. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  430. // returns buf segments, return only one segments, need a loop outside
  431. // until err = io.EOF
  432. if Self.closeOp {
  433. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  434. }
  435. if Self.off == uint32(len(Self.buf)) {
  436. return nil, 0, false, io.EOF
  437. // send window buff is drain, return eof and get another one
  438. }
  439. var remaining uint32
  440. start:
  441. ptrs := atomic.LoadUint64(&Self.remainingWait)
  442. remaining, _ = Self.unpack(ptrs)
  443. if remaining == 0 {
  444. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
  445. goto start // another goroutine change the window, try again
  446. }
  447. // into the wait status
  448. //logs.Warn("send window into wait status")
  449. err = Self.waitReceiveWindow()
  450. if err != nil {
  451. return nil, 0, false, err
  452. }
  453. //logs.Warn("rem into wait finish")
  454. goto start
  455. }
  456. // there are still remaining window
  457. //logs.Warn("rem", remaining)
  458. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  459. sendSize = common.MAXIMUM_SEGMENT_SIZE
  460. //logs.Warn("cut buf by mss")
  461. } else {
  462. sendSize = uint32(len(Self.buf[Self.off:]))
  463. }
  464. if remaining < sendSize {
  465. // usable window size is small than
  466. // window MAXIMUM_SEGMENT_SIZE or send buf left
  467. sendSize = remaining
  468. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  469. }
  470. //logs.Warn("send size", sendSize)
  471. if sendSize < uint32(len(Self.buf[Self.off:])) {
  472. part = true
  473. }
  474. p = Self.buf[Self.off : sendSize+Self.off]
  475. Self.off += sendSize
  476. Self.sent(sendSize)
  477. return
  478. }
  479. func (Self *SendWindow) waitReceiveWindow() (err error) {
  480. t := Self.timeout.Sub(time.Now())
  481. if t < 0 { // not set the timeout, wait for it as long as connection close
  482. select {
  483. case _, ok := <-Self.setSizeCh:
  484. if !ok {
  485. return errors.New("conn.writeWindow: window closed")
  486. }
  487. return nil
  488. case <-Self.closeOpCh:
  489. return errors.New("conn.writeWindow: window closed")
  490. }
  491. }
  492. timer := time.NewTimer(t)
  493. defer timer.Stop()
  494. // waiting for receive usable window size, or timeout
  495. select {
  496. case _, ok := <-Self.setSizeCh:
  497. if !ok {
  498. return errors.New("conn.writeWindow: window closed")
  499. }
  500. return nil
  501. case <-timer.C:
  502. return errors.New("conn.writeWindow: write to time out")
  503. case <-Self.closeOpCh:
  504. return errors.New("conn.writeWindow: window closed")
  505. }
  506. }
  507. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  508. Self.SetSendBuf(buf) // set the buf to send window
  509. //logs.Warn("set the buf to send window")
  510. var bufSeg []byte
  511. var part bool
  512. var l uint32
  513. for {
  514. bufSeg, l, part, err = Self.WriteTo()
  515. //logs.Warn("buf seg", len(bufSeg), part, err)
  516. // get the buf segments from send window
  517. if bufSeg == nil && part == false && err == io.EOF {
  518. // send window is drain, break the loop
  519. err = nil
  520. break
  521. }
  522. if err != nil {
  523. break
  524. }
  525. n += int(l)
  526. l = 0
  527. if part {
  528. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  529. } else {
  530. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  531. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  532. }
  533. // send to other side, not send nil data to other side
  534. }
  535. //logs.Warn("buf seg write success")
  536. return
  537. }
  538. func (Self *SendWindow) SetTimeOut(t time.Time) {
  539. // waiting for receive a receive window size
  540. Self.timeout = t
  541. }
  542. //type bandwidth struct {
  543. // readStart time.Time
  544. // lastReadStart time.Time
  545. // readEnd time.Time
  546. // lastReadEnd time.Time
  547. // bufLength int
  548. // lastBufLength int
  549. // count int8
  550. // readBW float64
  551. // writeBW float64
  552. // readBandwidth float64
  553. //}
  554. //
  555. //func (Self *bandwidth) StartRead() {
  556. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  557. // if !Self.lastReadStart.IsZero() {
  558. // if Self.count == -5 {
  559. // Self.calcBandWidth()
  560. // }
  561. // }
  562. //}
  563. //
  564. //func (Self *bandwidth) EndRead() {
  565. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  566. // if Self.count == -5 {
  567. // Self.calcWriteBandwidth()
  568. // }
  569. // if Self.count == 0 {
  570. // Self.calcReadBandwidth()
  571. // Self.count = -6
  572. // }
  573. // Self.count += 1
  574. //}
  575. //
  576. //func (Self *bandwidth) SetCopySize(n int) {
  577. // // must be invoke between StartRead and EndRead
  578. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  579. //}
  580. //// calculating
  581. //// start end start end
  582. //// read read
  583. //// write
  584. //
  585. //func (Self *bandwidth) calcBandWidth() {
  586. // t := Self.readStart.Sub(Self.lastReadStart)
  587. // if Self.lastBufLength >= 32768 {
  588. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  589. // }
  590. //}
  591. //
  592. //func (Self *bandwidth) calcReadBandwidth() {
  593. // // Bandwidth between nps and npc
  594. // readTime := Self.readEnd.Sub(Self.readStart)
  595. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  596. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  597. //}
  598. //
  599. //func (Self *bandwidth) calcWriteBandwidth() {
  600. // // Bandwidth between nps and user, npc and application
  601. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  602. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  603. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  604. //}
  605. //
  606. //func (Self *bandwidth) Get() (bw float64) {
  607. // // The zero value, 0 for numeric types
  608. // if Self.writeBW == 0 && Self.readBW == 0 {
  609. // //logs.Warn("bw both 0")
  610. // return 100
  611. // }
  612. // if Self.writeBW == 0 && Self.readBW != 0 {
  613. // return Self.readBW
  614. // }
  615. // if Self.readBW == 0 && Self.writeBW != 0 {
  616. // return Self.writeBW
  617. // }
  618. // return Self.readBandwidth
  619. //}