conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/cnlh/nps/lib/common"
  9. )
  10. type conn struct {
  11. net.Conn
  12. getStatusCh chan struct{}
  13. connStatusOkCh chan struct{}
  14. connStatusFailCh chan struct{}
  15. connId int32
  16. isClose bool
  17. closeFlag bool // close conn flag
  18. receiveWindow *ReceiveWindow
  19. sendWindow *SendWindow
  20. once sync.Once
  21. }
  22. func NewConn(connId int32, mux *Mux) *conn {
  23. c := &conn{
  24. getStatusCh: make(chan struct{}),
  25. connStatusOkCh: make(chan struct{}),
  26. connStatusFailCh: make(chan struct{}),
  27. connId: connId,
  28. receiveWindow: new(ReceiveWindow),
  29. sendWindow: new(SendWindow),
  30. once: sync.Once{},
  31. }
  32. c.receiveWindow.New(mux)
  33. c.sendWindow.New(mux)
  34. return c
  35. }
  36. func (s *conn) Read(buf []byte) (n int, err error) {
  37. if s.isClose || buf == nil {
  38. return 0, errors.New("the conn has closed")
  39. }
  40. if len(buf) == 0 {
  41. return 0, nil
  42. }
  43. // waiting for takeout from receive window finish or timeout
  44. n, err = s.receiveWindow.Read(buf, s.connId)
  45. return
  46. }
  47. func (s *conn) Write(buf []byte) (n int, err error) {
  48. if s.isClose {
  49. return 0, errors.New("the conn has closed")
  50. }
  51. if s.closeFlag {
  52. //s.Close()
  53. return 0, errors.New("io: write on closed conn")
  54. }
  55. if len(buf) == 0 {
  56. return 0, nil
  57. }
  58. //logs.Warn("write buf", len(buf))
  59. n, err = s.sendWindow.WriteFull(buf, s.connId)
  60. return
  61. }
  62. func (s *conn) Close() (err error) {
  63. s.once.Do(s.closeProcess)
  64. return
  65. }
  66. func (s *conn) closeProcess() {
  67. s.isClose = true
  68. s.receiveWindow.mux.connMap.Delete(s.connId)
  69. if !s.receiveWindow.mux.IsClose {
  70. // if server or user close the conn while reading, will get a io.EOF
  71. // and this Close method will be invoke, send this signal to close other side
  72. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  73. }
  74. s.sendWindow.CloseWindow()
  75. s.receiveWindow.CloseWindow()
  76. return
  77. }
  78. func (s *conn) LocalAddr() net.Addr {
  79. return s.receiveWindow.mux.conn.LocalAddr()
  80. }
  81. func (s *conn) RemoteAddr() net.Addr {
  82. return s.receiveWindow.mux.conn.RemoteAddr()
  83. }
  84. func (s *conn) SetDeadline(t time.Time) error {
  85. _ = s.SetReadDeadline(t)
  86. _ = s.SetWriteDeadline(t)
  87. return nil
  88. }
  89. func (s *conn) SetReadDeadline(t time.Time) error {
  90. s.receiveWindow.SetTimeOut(t)
  91. return nil
  92. }
  93. func (s *conn) SetWriteDeadline(t time.Time) error {
  94. s.sendWindow.SetTimeOut(t)
  95. return nil
  96. }
  97. type window struct {
  98. off uint32
  99. maxSize uint32
  100. closeOp bool
  101. closeOpCh chan struct{}
  102. mux *Mux
  103. }
  104. func (Self *window) New() {
  105. Self.closeOpCh = make(chan struct{}, 2)
  106. }
  107. func (Self *window) CloseWindow() {
  108. if !Self.closeOp {
  109. Self.closeOp = true
  110. Self.closeOpCh <- struct{}{}
  111. Self.closeOpCh <- struct{}{}
  112. }
  113. }
  114. type ReceiveWindow struct {
  115. bufQueue FIFOQueue
  116. element *ListElement
  117. readLength uint32
  118. readOp chan struct{}
  119. readWait bool
  120. windowFull bool
  121. count int8
  122. //bw *bandwidth
  123. once sync.Once
  124. window
  125. }
  126. func (Self *ReceiveWindow) New(mux *Mux) {
  127. // initial a window for receive
  128. Self.readOp = make(chan struct{})
  129. Self.bufQueue.New()
  130. //Self.bw = new(bandwidth)
  131. Self.element = new(ListElement)
  132. Self.maxSize = 8192
  133. Self.mux = mux
  134. Self.window.New()
  135. }
  136. func (Self *ReceiveWindow) RemainingSize() (n uint32) {
  137. // receive window remaining
  138. if Self.maxSize >= Self.bufQueue.Len() {
  139. n = Self.maxSize - Self.bufQueue.Len()
  140. }
  141. // if maxSize is small than bufQueue length, return 0
  142. return
  143. }
  144. func (Self *ReceiveWindow) ReadSize() (n uint32) {
  145. // acknowledge the size already read
  146. Self.bufQueue.mutex.Lock()
  147. n = Self.readLength
  148. Self.readLength = 0
  149. Self.bufQueue.mutex.Unlock()
  150. return
  151. }
  152. func (Self *ReceiveWindow) CalcSize() {
  153. // calculating maximum receive window size
  154. if Self.count == 0 {
  155. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  156. n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
  157. if n < 8192 {
  158. n = 8192
  159. }
  160. if n < Self.bufQueue.Len() {
  161. n = Self.bufQueue.Len()
  162. }
  163. // set the minimal size
  164. if n > 2*Self.maxSize {
  165. n = 2 * Self.maxSize
  166. }
  167. if n > common.MAXIMUM_WINDOW_SIZE {
  168. n = common.MAXIMUM_WINDOW_SIZE
  169. }
  170. // set the maximum size
  171. //logs.Warn("n", n)
  172. Self.maxSize = n
  173. Self.count = -10
  174. }
  175. Self.count += 1
  176. }
  177. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  178. if Self.closeOp {
  179. return errors.New("conn.receiveWindow: write on closed window")
  180. }
  181. element := ListElement{}
  182. err = element.New(buf, l, part)
  183. //logs.Warn("push the buf", len(buf), l, (&element).l)
  184. if err != nil {
  185. return
  186. }
  187. Self.bufQueue.Push(&element) // must push data before allow read
  188. //logs.Warn("read session calc size ", Self.maxSize)
  189. // calculating the receive window size
  190. Self.CalcSize()
  191. //logs.Warn("read session calc size finish", Self.maxSize)
  192. if Self.RemainingSize() == 0 {
  193. Self.windowFull = true
  194. //logs.Warn("window full true", Self.windowFull)
  195. }
  196. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
  197. return nil
  198. }
  199. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  200. if Self.closeOp {
  201. return 0, io.EOF // receive close signal, returns eof
  202. }
  203. pOff := 0
  204. l := 0
  205. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  206. copyData:
  207. //Self.bw.StartRead()
  208. if Self.off == uint32(Self.element.l) {
  209. // on the first Read method invoked, Self.off and Self.element.l
  210. // both zero value
  211. Self.element, err = Self.bufQueue.Pop()
  212. // if the queue is empty, Pop method will wait until one element push
  213. // into the queue successful, or timeout.
  214. // timer start on timeout parameter is set up ,
  215. // reset to 60s if timeout and data still available
  216. Self.off = 0
  217. if err != nil {
  218. return // queue receive stop or time out, break the loop and return
  219. }
  220. //logs.Warn("pop element", Self.element.l, Self.element.part)
  221. }
  222. l = copy(p[pOff:], Self.element.buf[Self.off:])
  223. //Self.bw.SetCopySize(l)
  224. pOff += l
  225. Self.off += uint32(l)
  226. Self.bufQueue.mutex.Lock()
  227. Self.readLength += uint32(l)
  228. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  229. Self.bufQueue.mutex.Unlock()
  230. n += l
  231. l = 0
  232. //Self.bw.EndRead()
  233. Self.sendStatus(id)
  234. if Self.off == uint32(Self.element.l) {
  235. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  236. common.WindowBuff.Put(Self.element.buf)
  237. }
  238. if pOff < len(p) && Self.element.part {
  239. // element is a part of the segments, trying to fill up buf p
  240. goto copyData
  241. }
  242. return // buf p is full or all of segments in buf, return
  243. }
  244. func (Self *ReceiveWindow) sendStatus(id int32) {
  245. if Self.windowFull || Self.bufQueue.Len() == 0 {
  246. // window is full before read or empty now
  247. Self.windowFull = false
  248. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
  249. // acknowledge other side, have empty some receive window space
  250. //}
  251. }
  252. }
  253. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  254. // waiting for FIFO queue Pop method
  255. Self.bufQueue.SetTimeOut(t)
  256. }
  257. func (Self *ReceiveWindow) Stop() {
  258. // queue has no more data to push, so unblock pop method
  259. Self.once.Do(Self.bufQueue.Stop)
  260. }
  261. func (Self *ReceiveWindow) CloseWindow() {
  262. Self.window.CloseWindow()
  263. Self.Stop()
  264. }
  265. type SendWindow struct {
  266. buf []byte
  267. sentLength uint32
  268. setSizeCh chan struct{}
  269. setSizeWait bool
  270. unSlide uint32
  271. timeout time.Time
  272. window
  273. mutex sync.Mutex
  274. }
  275. func (Self *SendWindow) New(mux *Mux) {
  276. Self.setSizeCh = make(chan struct{})
  277. Self.maxSize = 4096
  278. Self.mux = mux
  279. Self.window.New()
  280. }
  281. func (Self *SendWindow) SetSendBuf(buf []byte) {
  282. // send window buff from conn write method, set it to send window
  283. Self.mutex.Lock()
  284. Self.buf = buf
  285. Self.off = 0
  286. Self.mutex.Unlock()
  287. }
  288. func (Self *SendWindow) RemainingSize() (n uint32) {
  289. if Self.maxSize >= Self.sentLength {
  290. n = Self.maxSize - Self.sentLength
  291. }
  292. return
  293. }
  294. func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
  295. defer func() {
  296. if recover() != nil {
  297. closed = true
  298. }
  299. }()
  300. if Self.closeOp {
  301. close(Self.setSizeCh)
  302. return true
  303. }
  304. if readLength == 0 && Self.maxSize == windowSize {
  305. //logs.Warn("waiting for another window size")
  306. return false // waiting for receive another usable window size
  307. }
  308. //logs.Warn("set send window size to ", windowSize, readLength)
  309. Self.mutex.Lock()
  310. Self.slide(windowSize, readLength)
  311. if Self.setSizeWait {
  312. // send window into the wait status, need notice the channel
  313. //logs.Warn("send window remaining size is 0 , wait")
  314. if Self.RemainingSize() == 0 {
  315. //logs.Warn("waiting for another window size after slide")
  316. // keep the wait status
  317. Self.mutex.Unlock()
  318. return false
  319. }
  320. Self.setSizeWait = false
  321. Self.mutex.Unlock()
  322. //logs.Warn("send window remaining size is 0 starting wait")
  323. select {
  324. case Self.setSizeCh <- struct{}{}:
  325. //logs.Warn("send window remaining size is 0 finish")
  326. return false
  327. case <-Self.closeOpCh:
  328. close(Self.setSizeCh)
  329. return true
  330. }
  331. }
  332. // send window not into the wait status, so just do slide
  333. Self.mutex.Unlock()
  334. return false
  335. }
  336. func (Self *SendWindow) slide(windowSize, readLength uint32) {
  337. Self.sentLength -= readLength
  338. Self.maxSize = windowSize
  339. }
  340. func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
  341. // returns buf segments, return only one segments, need a loop outside
  342. // until err = io.EOF
  343. if Self.closeOp {
  344. return nil, false, errors.New("conn.writeWindow: window closed")
  345. }
  346. if Self.off == uint32(len(Self.buf)) {
  347. return nil, false, io.EOF
  348. // send window buff is drain, return eof and get another one
  349. }
  350. Self.mutex.Lock()
  351. if Self.RemainingSize() == 0 {
  352. Self.setSizeWait = true
  353. Self.mutex.Unlock()
  354. // into the wait status
  355. err = Self.waitReceiveWindow()
  356. if err != nil {
  357. return nil, false, err
  358. }
  359. } else {
  360. Self.mutex.Unlock()
  361. }
  362. Self.mutex.Lock()
  363. var sendSize uint32
  364. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  365. sendSize = common.MAXIMUM_SEGMENT_SIZE
  366. part = true
  367. } else {
  368. sendSize = uint32(len(Self.buf[Self.off:]))
  369. part = false
  370. }
  371. if Self.RemainingSize() < sendSize {
  372. // usable window size is small than
  373. // window MAXIMUM_SEGMENT_SIZE or send buf left
  374. sendSize = Self.RemainingSize()
  375. part = true
  376. }
  377. //logs.Warn("send size", sendSize)
  378. p = Self.buf[Self.off : sendSize+Self.off]
  379. Self.off += sendSize
  380. Self.sentLength += sendSize
  381. Self.mutex.Unlock()
  382. return
  383. }
  384. func (Self *SendWindow) waitReceiveWindow() (err error) {
  385. t := Self.timeout.Sub(time.Now())
  386. if t < 0 {
  387. t = time.Minute
  388. }
  389. timer := time.NewTimer(t)
  390. defer timer.Stop()
  391. // waiting for receive usable window size, or timeout
  392. select {
  393. case _, ok := <-Self.setSizeCh:
  394. if !ok {
  395. return errors.New("conn.writeWindow: window closed")
  396. }
  397. return nil
  398. case <-timer.C:
  399. return errors.New("conn.writeWindow: write to time out")
  400. case <-Self.closeOpCh:
  401. return errors.New("conn.writeWindow: window closed")
  402. }
  403. }
  404. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  405. Self.SetSendBuf(buf) // set the buf to send window
  406. var bufSeg []byte
  407. var part bool
  408. for {
  409. bufSeg, part, err = Self.WriteTo()
  410. //logs.Warn("buf seg", len(bufSeg), part, err)
  411. // get the buf segments from send window
  412. if bufSeg == nil && part == false && err == io.EOF {
  413. // send window is drain, break the loop
  414. err = nil
  415. break
  416. }
  417. if err != nil {
  418. break
  419. }
  420. n += len(bufSeg)
  421. if part {
  422. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  423. } else {
  424. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  425. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  426. }
  427. // send to other side, not send nil data to other side
  428. }
  429. //logs.Warn("buf seg write success")
  430. return
  431. }
  432. func (Self *SendWindow) SetTimeOut(t time.Time) {
  433. // waiting for receive a receive window size
  434. Self.timeout = t
  435. }
  436. //type bandwidth struct {
  437. // readStart time.Time
  438. // lastReadStart time.Time
  439. // readEnd time.Time
  440. // lastReadEnd time.Time
  441. // bufLength int
  442. // lastBufLength int
  443. // count int8
  444. // readBW float64
  445. // writeBW float64
  446. // readBandwidth float64
  447. //}
  448. //
  449. //func (Self *bandwidth) StartRead() {
  450. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  451. // if !Self.lastReadStart.IsZero() {
  452. // if Self.count == -5 {
  453. // Self.calcBandWidth()
  454. // }
  455. // }
  456. //}
  457. //
  458. //func (Self *bandwidth) EndRead() {
  459. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  460. // if Self.count == -5 {
  461. // Self.calcWriteBandwidth()
  462. // }
  463. // if Self.count == 0 {
  464. // Self.calcReadBandwidth()
  465. // Self.count = -6
  466. // }
  467. // Self.count += 1
  468. //}
  469. //
  470. //func (Self *bandwidth) SetCopySize(n int) {
  471. // // must be invoke between StartRead and EndRead
  472. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  473. //}
  474. //// calculating
  475. //// start end start end
  476. //// read read
  477. //// write
  478. //
  479. //func (Self *bandwidth) calcBandWidth() {
  480. // t := Self.readStart.Sub(Self.lastReadStart)
  481. // if Self.lastBufLength >= 32768 {
  482. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  483. // }
  484. //}
  485. //
  486. //func (Self *bandwidth) calcReadBandwidth() {
  487. // // Bandwidth between nps and npc
  488. // readTime := Self.readEnd.Sub(Self.readStart)
  489. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  490. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  491. //}
  492. //
  493. //func (Self *bandwidth) calcWriteBandwidth() {
  494. // // Bandwidth between nps and user, npc and application
  495. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  496. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  497. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  498. //}
  499. //
  500. //func (Self *bandwidth) Get() (bw float64) {
  501. // // The zero value, 0 for numeric types
  502. // if Self.writeBW == 0 && Self.readBW == 0 {
  503. // //logs.Warn("bw both 0")
  504. // return 100
  505. // }
  506. // if Self.writeBW == 0 && Self.readBW != 0 {
  507. // return Self.readBW
  508. // }
  509. // if Self.readBW == 0 && Self.writeBW != 0 {
  510. // return Self.writeBW
  511. // }
  512. // return Self.readBandwidth
  513. //}