conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "strconv"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/cnlh/nps/lib/common"
  11. )
  12. type conn struct {
  13. net.Conn
  14. getStatusCh chan struct{}
  15. connStatusOkCh chan struct{}
  16. connStatusFailCh chan struct{}
  17. connId int32
  18. isClose bool
  19. closeFlag bool // close conn flag
  20. receiveWindow *ReceiveWindow
  21. sendWindow *SendWindow
  22. once sync.Once
  23. label string
  24. }
  25. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  26. c := &conn{
  27. getStatusCh: make(chan struct{}),
  28. connStatusOkCh: make(chan struct{}),
  29. connStatusFailCh: make(chan struct{}),
  30. connId: connId,
  31. receiveWindow: new(ReceiveWindow),
  32. sendWindow: new(SendWindow),
  33. once: sync.Once{},
  34. }
  35. if len(label) > 0 {
  36. c.label = label[0]
  37. }
  38. c.receiveWindow.New(mux)
  39. c.sendWindow.New(mux)
  40. logm := &connLog{
  41. startTime: time.Now(),
  42. isClose: false,
  43. logs: []string{c.label + "new conn success"},
  44. }
  45. setM(label[0], int(connId), logm)
  46. return c
  47. }
  48. func (s *conn) Read(buf []byte) (n int, err error) {
  49. if s.isClose || buf == nil {
  50. return 0, errors.New("the conn has closed")
  51. }
  52. if len(buf) == 0 {
  53. return 0, nil
  54. }
  55. // waiting for takeout from receive window finish or timeout
  56. n, err = s.receiveWindow.Read(buf, s.connId)
  57. var errstr string
  58. if err == nil {
  59. errstr = "err:nil"
  60. } else {
  61. errstr = err.Error()
  62. }
  63. d := getM(s.label, int(s.connId))
  64. d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
  65. setM(s.label, int(s.connId), d)
  66. return
  67. }
  68. func (s *conn) Write(buf []byte) (n int, err error) {
  69. if s.isClose {
  70. return 0, errors.New("the conn has closed")
  71. }
  72. if s.closeFlag {
  73. //s.Close()
  74. return 0, errors.New("io: write on closed conn")
  75. }
  76. if len(buf) == 0 {
  77. return 0, nil
  78. }
  79. //logs.Warn("write buf", len(buf))
  80. n, err = s.sendWindow.WriteFull(buf, s.connId)
  81. return
  82. }
  83. func (s *conn) Close() (err error) {
  84. s.once.Do(s.closeProcess)
  85. return
  86. }
  87. func (s *conn) closeProcess() {
  88. s.isClose = true
  89. s.receiveWindow.mux.connMap.Delete(s.connId)
  90. if !s.receiveWindow.mux.IsClose {
  91. // if server or user close the conn while reading, will get a io.EOF
  92. // and this Close method will be invoke, send this signal to close other side
  93. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  94. }
  95. s.sendWindow.CloseWindow()
  96. s.receiveWindow.CloseWindow()
  97. d := getM(s.label, int(s.connId))
  98. d.isClose = true
  99. d.logs = append(d.logs, s.label+"close "+time.Now().String())
  100. setM(s.label, int(s.connId), d)
  101. return
  102. }
  103. func (s *conn) LocalAddr() net.Addr {
  104. return s.receiveWindow.mux.conn.LocalAddr()
  105. }
  106. func (s *conn) RemoteAddr() net.Addr {
  107. return s.receiveWindow.mux.conn.RemoteAddr()
  108. }
  109. func (s *conn) SetDeadline(t time.Time) error {
  110. _ = s.SetReadDeadline(t)
  111. _ = s.SetWriteDeadline(t)
  112. return nil
  113. }
  114. func (s *conn) SetReadDeadline(t time.Time) error {
  115. s.receiveWindow.SetTimeOut(t)
  116. return nil
  117. }
  118. func (s *conn) SetWriteDeadline(t time.Time) error {
  119. s.sendWindow.SetTimeOut(t)
  120. return nil
  121. }
  122. type window struct {
  123. off uint32
  124. maxSize uint32
  125. closeOp bool
  126. closeOpCh chan struct{}
  127. mux *Mux
  128. }
  129. func (Self *window) New() {
  130. Self.closeOpCh = make(chan struct{}, 2)
  131. }
  132. func (Self *window) CloseWindow() {
  133. if !Self.closeOp {
  134. Self.closeOp = true
  135. Self.closeOpCh <- struct{}{}
  136. Self.closeOpCh <- struct{}{}
  137. }
  138. }
  139. type ReceiveWindow struct {
  140. bufQueue FIFOQueue
  141. element *ListElement
  142. readLength uint32
  143. readOp chan struct{}
  144. readWait bool
  145. windowFull bool
  146. count int8
  147. //bw *bandwidth
  148. once sync.Once
  149. window
  150. }
  151. func (Self *ReceiveWindow) New(mux *Mux) {
  152. // initial a window for receive
  153. Self.readOp = make(chan struct{})
  154. Self.bufQueue.New()
  155. //Self.bw = new(bandwidth)
  156. Self.element = new(ListElement)
  157. Self.maxSize = 8192
  158. Self.mux = mux
  159. Self.window.New()
  160. }
  161. func (Self *ReceiveWindow) remainingSize() (n uint32) {
  162. // receive window remaining
  163. return Self.maxSize - Self.bufQueue.Len()
  164. }
  165. func (Self *ReceiveWindow) readSize() (n uint32) {
  166. // acknowledge the size already read
  167. return atomic.SwapUint32(&Self.readLength, 0)
  168. }
  169. func (Self *ReceiveWindow) calcSize() {
  170. // calculating maximum receive window size
  171. if Self.count == 0 {
  172. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  173. n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
  174. if n < 8192 {
  175. n = 8192
  176. }
  177. if n < Self.bufQueue.Len() {
  178. n = Self.bufQueue.Len()
  179. }
  180. // set the minimal size
  181. if n > 2*Self.maxSize {
  182. n = 2 * Self.maxSize
  183. }
  184. if n > common.MAXIMUM_WINDOW_SIZE {
  185. n = common.MAXIMUM_WINDOW_SIZE
  186. }
  187. // set the maximum size
  188. //logs.Warn("n", n)
  189. Self.maxSize = n
  190. Self.count = -10
  191. }
  192. Self.count += 1
  193. }
  194. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  195. if Self.closeOp {
  196. return errors.New("conn.receiveWindow: write on closed window")
  197. }
  198. element := new(ListElement)
  199. err = element.New(buf, l, part)
  200. //logs.Warn("push the buf", len(buf), l, (&element).l)
  201. if err != nil {
  202. return
  203. }
  204. Self.bufQueue.Push(element) // must push data before allow read
  205. //logs.Warn("read session calc size ", Self.maxSize)
  206. // calculating the receive window size
  207. Self.calcSize()
  208. //logs.Warn("read session calc size finish", Self.maxSize)
  209. if Self.remainingSize() == 0 {
  210. Self.windowFull = true
  211. //logs.Warn("window full true", Self.windowFull)
  212. }
  213. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
  214. return nil
  215. }
  216. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  217. if Self.closeOp {
  218. return 0, io.EOF // receive close signal, returns eof
  219. }
  220. pOff := 0
  221. l := 0
  222. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  223. copyData:
  224. //Self.bw.StartRead()
  225. if Self.off == uint32(Self.element.l) {
  226. // on the first Read method invoked, Self.off and Self.element.l
  227. // both zero value
  228. Self.element, err = Self.bufQueue.Pop()
  229. // if the queue is empty, Pop method will wait until one element push
  230. // into the queue successful, or timeout.
  231. // timer start on timeout parameter is set up ,
  232. // reset to 60s if timeout and data still available
  233. Self.off = 0
  234. if err != nil {
  235. return // queue receive stop or time out, break the loop and return
  236. }
  237. //logs.Warn("pop element", Self.element.l, Self.element.part)
  238. }
  239. l = copy(p[pOff:], Self.element.buf[Self.off:])
  240. //Self.bw.SetCopySize(l)
  241. pOff += l
  242. Self.off += uint32(l)
  243. atomic.AddUint32(&Self.readLength, uint32(l))
  244. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  245. n += l
  246. l = 0
  247. //Self.bw.EndRead()
  248. if Self.off == uint32(Self.element.l) {
  249. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  250. common.WindowBuff.Put(Self.element.buf)
  251. Self.sendStatus(id)
  252. }
  253. if pOff < len(p) && Self.element.part {
  254. // element is a part of the segments, trying to fill up buf p
  255. goto copyData
  256. }
  257. return // buf p is full or all of segments in buf, return
  258. }
  259. func (Self *ReceiveWindow) sendStatus(id int32) {
  260. if Self.windowFull || Self.bufQueue.Len() == 0 {
  261. // window is full before read or empty now
  262. Self.windowFull = false
  263. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
  264. // acknowledge other side, have empty some receive window space
  265. //}
  266. }
  267. }
  268. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  269. // waiting for FIFO queue Pop method
  270. Self.bufQueue.SetTimeOut(t)
  271. }
  272. func (Self *ReceiveWindow) Stop() {
  273. // queue has no more data to push, so unblock pop method
  274. Self.once.Do(Self.bufQueue.Stop)
  275. }
  276. func (Self *ReceiveWindow) CloseWindow() {
  277. Self.window.CloseWindow()
  278. Self.Stop()
  279. }
  280. type SendWindow struct {
  281. buf []byte
  282. sentLength uint32
  283. setSizeCh chan struct{}
  284. setSizeWait int32
  285. unSlide uint32
  286. timeout time.Time
  287. window
  288. }
  289. func (Self *SendWindow) New(mux *Mux) {
  290. Self.setSizeCh = make(chan struct{})
  291. Self.maxSize = 4096
  292. Self.mux = mux
  293. Self.window.New()
  294. }
  295. func (Self *SendWindow) SetSendBuf(buf []byte) {
  296. // send window buff from conn write method, set it to send window
  297. Self.buf = buf
  298. Self.off = 0
  299. }
  300. func (Self *SendWindow) RemainingSize() (n uint32) {
  301. return atomic.LoadUint32(&Self.maxSize) - atomic.LoadUint32(&Self.sentLength)
  302. }
  303. func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
  304. defer func() {
  305. if recover() != nil {
  306. closed = true
  307. }
  308. }()
  309. if Self.closeOp {
  310. close(Self.setSizeCh)
  311. return true
  312. }
  313. if readLength == 0 && atomic.LoadUint32(&Self.maxSize) == windowSize {
  314. //logs.Warn("waiting for another window size")
  315. return false // waiting for receive another usable window size
  316. }
  317. //logs.Warn("set send window size to ", windowSize, readLength)
  318. Self.slide(windowSize, readLength)
  319. if Self.RemainingSize() == 0 {
  320. //logs.Warn("waiting for another window size after slide")
  321. // keep the wait status
  322. atomic.StoreInt32(&Self.setSizeWait, 1)
  323. return false
  324. }
  325. if atomic.CompareAndSwapInt32(&Self.setSizeWait, 1, 0) {
  326. // send window into the wait status, need notice the channel
  327. select {
  328. case Self.setSizeCh <- struct{}{}:
  329. //logs.Warn("send window remaining size is 0 finish")
  330. return false
  331. case <-Self.closeOpCh:
  332. close(Self.setSizeCh)
  333. return true
  334. }
  335. }
  336. // send window not into the wait status, so just do slide
  337. return false
  338. }
  339. func (Self *SendWindow) slide(windowSize, readLength uint32) {
  340. atomic.AddUint32(&Self.sentLength, ^readLength-1)
  341. atomic.SwapUint32(&Self.maxSize, windowSize)
  342. }
  343. func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
  344. // returns buf segments, return only one segments, need a loop outside
  345. // until err = io.EOF
  346. if Self.closeOp {
  347. return nil, 0, false, errors.New("conn.writeWindow: window closed")
  348. }
  349. if Self.off == uint32(len(Self.buf)) {
  350. return nil, 0, false, io.EOF
  351. // send window buff is drain, return eof and get another one
  352. }
  353. if Self.RemainingSize() == 0 {
  354. atomic.StoreInt32(&Self.setSizeWait, 1)
  355. // into the wait status
  356. err = Self.waitReceiveWindow()
  357. if err != nil {
  358. return nil, 0, false, err
  359. }
  360. }
  361. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  362. sendSize = common.MAXIMUM_SEGMENT_SIZE
  363. part = true
  364. //logs.Warn("cut buf by mss")
  365. } else {
  366. sendSize = uint32(len(Self.buf[Self.off:]))
  367. part = false
  368. }
  369. if Self.RemainingSize() < sendSize {
  370. // usable window size is small than
  371. // window MAXIMUM_SEGMENT_SIZE or send buf left
  372. sendSize = Self.RemainingSize()
  373. //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
  374. part = true
  375. }
  376. //logs.Warn("send size", sendSize)
  377. p = Self.buf[Self.off : sendSize+Self.off]
  378. Self.off += sendSize
  379. atomic.AddUint32(&Self.sentLength, sendSize)
  380. return
  381. }
  382. func (Self *SendWindow) waitReceiveWindow() (err error) {
  383. t := Self.timeout.Sub(time.Now())
  384. if t < 0 {
  385. t = time.Minute
  386. }
  387. timer := time.NewTimer(t)
  388. defer timer.Stop()
  389. // waiting for receive usable window size, or timeout
  390. select {
  391. case _, ok := <-Self.setSizeCh:
  392. if !ok {
  393. return errors.New("conn.writeWindow: window closed")
  394. }
  395. return nil
  396. case <-timer.C:
  397. return errors.New("conn.writeWindow: write to time out")
  398. case <-Self.closeOpCh:
  399. return errors.New("conn.writeWindow: window closed")
  400. }
  401. }
  402. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  403. Self.SetSendBuf(buf) // set the buf to send window
  404. var bufSeg []byte
  405. var part bool
  406. var l uint32
  407. for {
  408. bufSeg, l, part, err = Self.WriteTo()
  409. //logs.Warn("buf seg", len(bufSeg), part, err)
  410. // get the buf segments from send window
  411. if bufSeg == nil && part == false && err == io.EOF {
  412. // send window is drain, break the loop
  413. err = nil
  414. break
  415. }
  416. if err != nil {
  417. break
  418. }
  419. n += int(l)
  420. l = 0
  421. if part {
  422. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  423. } else {
  424. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  425. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  426. }
  427. // send to other side, not send nil data to other side
  428. }
  429. //logs.Warn("buf seg write success")
  430. return
  431. }
  432. func (Self *SendWindow) SetTimeOut(t time.Time) {
  433. // waiting for receive a receive window size
  434. Self.timeout = t
  435. }
  436. //type bandwidth struct {
  437. // readStart time.Time
  438. // lastReadStart time.Time
  439. // readEnd time.Time
  440. // lastReadEnd time.Time
  441. // bufLength int
  442. // lastBufLength int
  443. // count int8
  444. // readBW float64
  445. // writeBW float64
  446. // readBandwidth float64
  447. //}
  448. //
  449. //func (Self *bandwidth) StartRead() {
  450. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  451. // if !Self.lastReadStart.IsZero() {
  452. // if Self.count == -5 {
  453. // Self.calcBandWidth()
  454. // }
  455. // }
  456. //}
  457. //
  458. //func (Self *bandwidth) EndRead() {
  459. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  460. // if Self.count == -5 {
  461. // Self.calcWriteBandwidth()
  462. // }
  463. // if Self.count == 0 {
  464. // Self.calcReadBandwidth()
  465. // Self.count = -6
  466. // }
  467. // Self.count += 1
  468. //}
  469. //
  470. //func (Self *bandwidth) SetCopySize(n int) {
  471. // // must be invoke between StartRead and EndRead
  472. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  473. //}
  474. //// calculating
  475. //// start end start end
  476. //// read read
  477. //// write
  478. //
  479. //func (Self *bandwidth) calcBandWidth() {
  480. // t := Self.readStart.Sub(Self.lastReadStart)
  481. // if Self.lastBufLength >= 32768 {
  482. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  483. // }
  484. //}
  485. //
  486. //func (Self *bandwidth) calcReadBandwidth() {
  487. // // Bandwidth between nps and npc
  488. // readTime := Self.readEnd.Sub(Self.readStart)
  489. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  490. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  491. //}
  492. //
  493. //func (Self *bandwidth) calcWriteBandwidth() {
  494. // // Bandwidth between nps and user, npc and application
  495. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  496. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  497. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  498. //}
  499. //
  500. //func (Self *bandwidth) Get() (bw float64) {
  501. // // The zero value, 0 for numeric types
  502. // if Self.writeBW == 0 && Self.readBW == 0 {
  503. // //logs.Warn("bw both 0")
  504. // return 100
  505. // }
  506. // if Self.writeBW == 0 && Self.readBW != 0 {
  507. // return Self.readBW
  508. // }
  509. // if Self.readBW == 0 && Self.writeBW != 0 {
  510. // return Self.writeBW
  511. // }
  512. // return Self.readBandwidth
  513. //}