conn.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "runtime"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/cnlh/nps/lib/common"
  11. )
  12. type conn struct {
  13. net.Conn
  14. getStatusCh chan struct{}
  15. connStatusOkCh chan struct{}
  16. connStatusFailCh chan struct{}
  17. connId int32
  18. isClose bool
  19. closeFlag bool // close conn flag
  20. receiveWindow *ReceiveWindow
  21. sendWindow *SendWindow
  22. once sync.Once
  23. label string
  24. }
  25. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  26. c := &conn{
  27. getStatusCh: make(chan struct{}),
  28. connStatusOkCh: make(chan struct{}),
  29. connStatusFailCh: make(chan struct{}),
  30. connId: connId,
  31. receiveWindow: new(ReceiveWindow),
  32. sendWindow: new(SendWindow),
  33. once: sync.Once{},
  34. }
  35. if len(label) > 0 {
  36. c.label = label[0]
  37. }
  38. c.receiveWindow.New(mux)
  39. c.sendWindow.New(mux)
  40. //logm := &connLog{
  41. // startTime: time.Now(),
  42. // isClose: false,
  43. // logs: []string{c.label + "new conn success"},
  44. //}
  45. //setM(label[0], int(connId), logm)
  46. return c
  47. }
  48. func (s *conn) Read(buf []byte) (n int, err error) {
  49. if s.isClose || buf == nil {
  50. return 0, errors.New("the conn has closed")
  51. }
  52. if len(buf) == 0 {
  53. return 0, nil
  54. }
  55. // waiting for takeout from receive window finish or timeout
  56. //now := time.Now()
  57. n, err = s.receiveWindow.Read(buf, s.connId)
  58. //t := time.Now().Sub(now)
  59. //if t.Seconds() > 0.5 {
  60. //logs.Warn("conn read long", n, t.Seconds())
  61. //}
  62. //var errstr string
  63. //if err == nil {
  64. // errstr = "err:nil"
  65. //} else {
  66. // errstr = err.Error()
  67. //}
  68. //d := getM(s.label, int(s.connId))
  69. //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  70. //setM(s.label, int(s.connId), d)
  71. return
  72. }
  73. func (s *conn) Write(buf []byte) (n int, err error) {
  74. if s.isClose {
  75. return 0, errors.New("the conn has closed")
  76. }
  77. if s.closeFlag {
  78. //s.Close()
  79. return 0, errors.New("io: write on closed conn")
  80. }
  81. if len(buf) == 0 {
  82. return 0, nil
  83. }
  84. //logs.Warn("write buf", len(buf))
  85. //now := time.Now()
  86. n, err = s.sendWindow.WriteFull(buf, s.connId)
  87. //t := time.Now().Sub(now)
  88. //if t.Seconds() > 0.5 {
  89. // logs.Warn("conn write long", n, t.Seconds())
  90. //}
  91. return
  92. }
  93. func (s *conn) Close() (err error) {
  94. s.once.Do(s.closeProcess)
  95. return
  96. }
  97. func (s *conn) closeProcess() {
  98. s.isClose = true
  99. s.receiveWindow.mux.connMap.Delete(s.connId)
  100. if !s.receiveWindow.mux.IsClose {
  101. // if server or user close the conn while reading, will get a io.EOF
  102. // and this Close method will be invoke, send this signal to close other side
  103. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  104. }
  105. s.sendWindow.CloseWindow()
  106. s.receiveWindow.CloseWindow()
  107. //d := getM(s.label, int(s.connId))
  108. //d.isClose = true
  109. //d.logs = append(d.logs, s.label+"close "+time.Now().String())
  110. //setM(s.label, int(s.connId), d)
  111. return
  112. }
  113. func (s *conn) LocalAddr() net.Addr {
  114. return s.receiveWindow.mux.conn.LocalAddr()
  115. }
  116. func (s *conn) RemoteAddr() net.Addr {
  117. return s.receiveWindow.mux.conn.RemoteAddr()
  118. }
  119. func (s *conn) SetDeadline(t time.Time) error {
  120. _ = s.SetReadDeadline(t)
  121. _ = s.SetWriteDeadline(t)
  122. return nil
  123. }
  124. func (s *conn) SetReadDeadline(t time.Time) error {
  125. s.receiveWindow.SetTimeOut(t)
  126. return nil
  127. }
  128. func (s *conn) SetWriteDeadline(t time.Time) error {
  129. s.sendWindow.SetTimeOut(t)
  130. return nil
  131. }
  132. type window struct {
  133. remainingWait uint64 // 64bit alignment
  134. off uint32
  135. maxSize uint32
  136. closeOp bool
  137. closeOpCh chan struct{}
  138. mux *Mux
  139. }
  140. func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
  141. const mask = 1<<dequeueBits - 1
  142. remaining = uint32((ptrs >> dequeueBits) & mask)
  143. wait = uint32(ptrs & mask)
  144. return
  145. }
  146. func (Self *window) pack(remaining, wait uint32) uint64 {
  147. const mask = 1<<dequeueBits - 1
  148. return (uint64(remaining) << dequeueBits) |
  149. uint64(wait&mask)
  150. }
  151. func (Self *window) New() {
  152. Self.closeOpCh = make(chan struct{}, 2)
  153. }
  154. func (Self *window) CloseWindow() {
  155. if !Self.closeOp {
  156. Self.closeOp = true
  157. Self.closeOpCh <- struct{}{}
  158. Self.closeOpCh <- struct{}{}
  159. }
  160. }
  161. type ReceiveWindow struct {
  162. window
  163. bufQueue ReceiveWindowQueue
  164. element *common.ListElement
  165. count int8
  166. once sync.Once
  167. }
  168. func (Self *ReceiveWindow) New(mux *Mux) {
  169. // initial a window for receive
  170. Self.bufQueue.New()
  171. Self.element = common.ListElementPool.Get()
  172. Self.maxSize = 4096
  173. Self.mux = mux
  174. Self.window.New()
  175. }
  176. func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
  177. // receive window remaining
  178. l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
  179. l -= int64(delta)
  180. if l > 0 {
  181. n = uint32(l)
  182. }
  183. return
  184. }
  185. func (Self *ReceiveWindow) calcSize() {
  186. // calculating maximum receive window size
  187. if Self.count == 0 {
  188. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  189. n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
  190. if n < 8192 {
  191. n = 8192
  192. }
  193. bufLen := Self.bufQueue.Len()
  194. if n < bufLen {
  195. n = bufLen
  196. }
  197. // set the minimal size
  198. if n > 2*Self.maxSize {
  199. n = 2 * Self.maxSize
  200. }
  201. if n > common.MAXIMUM_WINDOW_SIZE {
  202. n = common.MAXIMUM_WINDOW_SIZE
  203. }
  204. // set the maximum size
  205. //logs.Warn("n", n)
  206. atomic.StoreUint32(&Self.maxSize, n)
  207. Self.count = -10
  208. }
  209. Self.count += 1
  210. return
  211. }
  212. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  213. if Self.closeOp {
  214. return errors.New("conn.receiveWindow: write on closed window")
  215. }
  216. element, err := NewListElement(buf, l, part)
  217. //logs.Warn("push the buf", len(buf), l, (&element).l)
  218. if err != nil {
  219. return
  220. }
  221. Self.calcSize() // calculate the max window size
  222. var wait uint32
  223. start:
  224. ptrs := atomic.LoadUint64(&Self.remainingWait)
  225. _, wait = Self.unpack(ptrs)
  226. newRemaining := Self.remainingSize(l)
  227. // calculate the remaining window size now, plus the element we will push
  228. if newRemaining == 0 {
  229. //logs.Warn("window full true", remaining)
  230. wait = 1
  231. }
  232. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
  233. goto start
  234. // another goroutine change the status, make sure shall we need wait
  235. }
  236. Self.bufQueue.Push(element)
  237. // status check finish, now we can push the element into the queue
  238. if wait == 0 {
  239. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
  240. // send the remaining window size, not including zero size
  241. }
  242. return nil
  243. }
  244. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  245. if Self.closeOp {
  246. return 0, io.EOF // receive close signal, returns eof
  247. }
  248. pOff := 0
  249. l := 0
  250. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  251. copyData:
  252. if Self.off == uint32(Self.element.L) {
  253. // on the first Read method invoked, Self.off and Self.element.l
  254. // both zero value
  255. common.ListElementPool.Put(Self.element)
  256. Self.element, err = Self.bufQueue.Pop()
  257. // if the queue is empty, Pop method will wait until one element push
  258. // into the queue successful, or timeout.
  259. // timer start on timeout parameter is set up ,
  260. // reset to 60s if timeout and data still available
  261. Self.off = 0
  262. if err != nil {
  263. return // queue receive stop or time out, break the loop and return
  264. }
  265. //logs.Warn("pop element", Self.element.l, Self.element.part)
  266. }
  267. l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
  268. pOff += l
  269. Self.off += uint32(l)
  270. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  271. n += l
  272. l = 0
  273. if Self.off == uint32(Self.element.L) {
  274. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  275. common.WindowBuff.Put(Self.element.Buf)
  276. Self.sendStatus(id, Self.element.L)
  277. // check the window full status
  278. }
  279. if pOff < len(p) && Self.element.Part {
  280. // element is a part of the segments, trying to fill up buf p
  281. goto copyData
  282. }
  283. return // buf p is full or all of segments in buf, return
  284. }
  285. func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
  286. var remaining, wait uint32
  287. for {
  288. ptrs := atomic.LoadUint64(&Self.remainingWait)
  289. remaining, wait = Self.unpack(ptrs)
  290. remaining += uint32(l)
  291. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
  292. break
  293. }
  294. runtime.Gosched()
  295. // another goroutine change remaining or wait status, make sure
  296. // we need acknowledge other side
  297. }
  298. // now we get the current window status success
  299. if wait == 1 {
  300. //logs.Warn("send the wait status", remaining)
  301. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
  302. }
  303. return
  304. }
  305. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  306. // waiting for FIFO queue Pop method
  307. Self.bufQueue.SetTimeOut(t)
  308. }
  309. func (Self *ReceiveWindow) Stop() {
  310. // queue has no more data to push, so unblock pop method
  311. Self.once.Do(Self.bufQueue.Stop)
  312. }
  313. func (Self *ReceiveWindow) CloseWindow() {
  314. Self.window.CloseWindow()
  315. Self.Stop()
  316. }
  317. type SendWindow struct {
  318. window
  319. buf []byte
  320. setSizeCh chan struct{}
  321. timeout time.Time
  322. }
  323. func (Self *SendWindow) New(mux *Mux) {
  324. Self.setSizeCh = make(chan struct{})
  325. Self.maxSize = 4096
  326. atomic.AddUint64(&Self.remainingWait, uint64(4096)<<dequeueBits)
  327. Self.mux = mux
  328. Self.window.New()
  329. }
  330. func (Self *SendWindow) SetSendBuf(buf []byte) {
  331. // send window buff from conn write method, set it to send window
  332. Self.buf = buf
  333. Self.off = 0
  334. }
  335. func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
  336. // set the window size from receive window
  337. defer func() {
  338. if recover() != nil {
  339. closed = true
  340. }
  341. }()
  342. if Self.closeOp {
  343. close(Self.setSizeCh)
  344. return true
  345. }
  346. //logs.Warn("set send window size to ", windowSize, newRemaining)
  347. var remaining, wait, newWait uint32
  348. for {
  349. ptrs := atomic.LoadUint64(&Self.remainingWait)
  350. remaining, wait = Self.unpack(ptrs)
  351. if remaining == newRemaining {
  352. //logs.Warn("waiting for another window size")
  353. return false // waiting for receive another usable window size
  354. }
  355. if newRemaining == 0 && wait == 1 {
  356. newWait = 1 // keep the wait status,
  357. // also if newRemaining is not zero, change wait to 0
  358. }
  359. if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
  360. break
  361. }
  362. // anther goroutine change wait status or window size
  363. }
  364. if wait == 1 {
  365. // send window into the wait status, need notice the channel
  366. //logs.Warn("send window remaining size is 0")
  367. Self.allow()
  368. }
  369. // send window not into the wait status, so just do slide
  370. return false
  371. }
  372. func (Self *SendWindow) allow() {
  373. select {
  374. case Self.setSizeCh <- struct{}{}:
  375. //logs.Warn("send window remaining size is 0 finish")
  376. return
  377. case <-Self.closeOpCh:
  378. close(Self.setSizeCh)
  379. return
  380. }
  381. }
  382. func (Self *SendWindow) sent(sentSize uint32) {
  383. atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
  384. }
  385. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  386. // returns buf segments, return only one segments, need a loop outside
  387. // until err = io.EOF
  388. if Self.closeOp {
  389. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  390. }
  391. if Self.off == uint32(len(Self.buf)) {
  392. return nil, 0, false, io.EOF
  393. // send window buff is drain, return eof and get another one
  394. }
  395. var remaining uint32
  396. start:
  397. ptrs := atomic.LoadUint64(&Self.remainingWait)
  398. remaining, _ = Self.unpack(ptrs)
  399. if remaining == 0 {
  400. if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
  401. goto start // another goroutine change the window, try again
  402. }
  403. // into the wait status
  404. //logs.Warn("send window into wait status")
  405. err = Self.waitReceiveWindow()
  406. if err != nil {
  407. return nil, 0, false, err
  408. }
  409. //logs.Warn("rem into wait finish")
  410. goto start
  411. }
  412. // there are still remaining window
  413. //logs.Warn("rem", remaining)
  414. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  415. sendSize = common.MAXIMUM_SEGMENT_SIZE
  416. //logs.Warn("cut buf by mss")
  417. } else {
  418. sendSize = uint32(len(Self.buf[Self.off:]))
  419. }
  420. if remaining < sendSize {
  421. // usable window size is small than
  422. // window MAXIMUM_SEGMENT_SIZE or send buf left
  423. sendSize = remaining
  424. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  425. }
  426. //logs.Warn("send size", sendSize)
  427. if sendSize < uint32(len(Self.buf[Self.off:])) {
  428. part = true
  429. }
  430. p = Self.buf[Self.off : sendSize+Self.off]
  431. Self.off += sendSize
  432. Self.sent(sendSize)
  433. return
  434. }
  435. func (Self *SendWindow) waitReceiveWindow() (err error) {
  436. t := Self.timeout.Sub(time.Now())
  437. if t < 0 {
  438. t = time.Minute
  439. }
  440. timer := time.NewTimer(t)
  441. defer timer.Stop()
  442. // waiting for receive usable window size, or timeout
  443. select {
  444. case _, ok := <-Self.setSizeCh:
  445. if !ok {
  446. return errors.New("conn.writeWindow: window closed")
  447. }
  448. return nil
  449. case <-timer.C:
  450. return errors.New("conn.writeWindow: write to time out")
  451. case <-Self.closeOpCh:
  452. return errors.New("conn.writeWindow: window closed")
  453. }
  454. }
  455. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  456. Self.SetSendBuf(buf) // set the buf to send window
  457. //logs.Warn("set the buf to send window")
  458. var bufSeg []byte
  459. var part bool
  460. var l uint32
  461. for {
  462. bufSeg, l, part, err = Self.WriteTo()
  463. //logs.Warn("buf seg", len(bufSeg), part, err)
  464. // get the buf segments from send window
  465. if bufSeg == nil && part == false && err == io.EOF {
  466. // send window is drain, break the loop
  467. err = nil
  468. break
  469. }
  470. if err != nil {
  471. break
  472. }
  473. n += int(l)
  474. l = 0
  475. if part {
  476. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  477. } else {
  478. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  479. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  480. }
  481. // send to other side, not send nil data to other side
  482. }
  483. //logs.Warn("buf seg write success")
  484. return
  485. }
  486. func (Self *SendWindow) SetTimeOut(t time.Time) {
  487. // waiting for receive a receive window size
  488. Self.timeout = t
  489. }
  490. //type bandwidth struct {
  491. // readStart time.Time
  492. // lastReadStart time.Time
  493. // readEnd time.Time
  494. // lastReadEnd time.Time
  495. // bufLength int
  496. // lastBufLength int
  497. // count int8
  498. // readBW float64
  499. // writeBW float64
  500. // readBandwidth float64
  501. //}
  502. //
  503. //func (Self *bandwidth) StartRead() {
  504. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  505. // if !Self.lastReadStart.IsZero() {
  506. // if Self.count == -5 {
  507. // Self.calcBandWidth()
  508. // }
  509. // }
  510. //}
  511. //
  512. //func (Self *bandwidth) EndRead() {
  513. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  514. // if Self.count == -5 {
  515. // Self.calcWriteBandwidth()
  516. // }
  517. // if Self.count == 0 {
  518. // Self.calcReadBandwidth()
  519. // Self.count = -6
  520. // }
  521. // Self.count += 1
  522. //}
  523. //
  524. //func (Self *bandwidth) SetCopySize(n int) {
  525. // // must be invoke between StartRead and EndRead
  526. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  527. //}
  528. //// calculating
  529. //// start end start end
  530. //// read read
  531. //// write
  532. //
  533. //func (Self *bandwidth) calcBandWidth() {
  534. // t := Self.readStart.Sub(Self.lastReadStart)
  535. // if Self.lastBufLength >= 32768 {
  536. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  537. // }
  538. //}
  539. //
  540. //func (Self *bandwidth) calcReadBandwidth() {
  541. // // Bandwidth between nps and npc
  542. // readTime := Self.readEnd.Sub(Self.readStart)
  543. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  544. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  545. //}
  546. //
  547. //func (Self *bandwidth) calcWriteBandwidth() {
  548. // // Bandwidth between nps and user, npc and application
  549. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  550. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  551. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  552. //}
  553. //
  554. //func (Self *bandwidth) Get() (bw float64) {
  555. // // The zero value, 0 for numeric types
  556. // if Self.writeBW == 0 && Self.readBW == 0 {
  557. // //logs.Warn("bw both 0")
  558. // return 100
  559. // }
  560. // if Self.writeBW == 0 && Self.readBW != 0 {
  561. // return Self.readBW
  562. // }
  563. // if Self.readBW == 0 && Self.writeBW != 0 {
  564. // return Self.writeBW
  565. // }
  566. // return Self.readBandwidth
  567. //}