conn.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "math"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/cnlh/nps/lib/common"
  10. )
  11. type conn struct {
  12. net.Conn
  13. getStatusCh chan struct{}
  14. connStatusOkCh chan struct{}
  15. connStatusFailCh chan struct{}
  16. connId int32
  17. isClose bool
  18. closeFlag bool // close conn flag
  19. receiveWindow *ReceiveWindow
  20. sendWindow *SendWindow
  21. once sync.Once
  22. }
  23. func NewConn(connId int32, mux *Mux) *conn {
  24. c := &conn{
  25. getStatusCh: make(chan struct{}),
  26. connStatusOkCh: make(chan struct{}),
  27. connStatusFailCh: make(chan struct{}),
  28. connId: connId,
  29. receiveWindow: new(ReceiveWindow),
  30. sendWindow: new(SendWindow),
  31. once: sync.Once{},
  32. }
  33. c.receiveWindow.New(mux)
  34. c.sendWindow.New(mux)
  35. return c
  36. }
  37. func (s *conn) Read(buf []byte) (n int, err error) {
  38. if s.isClose || buf == nil {
  39. return 0, errors.New("the conn has closed")
  40. }
  41. if len(buf) == 0 {
  42. return 0, nil
  43. }
  44. // waiting for takeout from receive window finish or timeout
  45. n, err = s.receiveWindow.Read(buf, s.connId)
  46. return
  47. }
  48. func (s *conn) Write(buf []byte) (n int, err error) {
  49. if s.isClose {
  50. return 0, errors.New("the conn has closed")
  51. }
  52. if s.closeFlag {
  53. //s.Close()
  54. return 0, errors.New("io: write on closed conn")
  55. }
  56. if len(buf) == 0 {
  57. return 0, nil
  58. }
  59. //logs.Warn("write buf", len(buf))
  60. n, err = s.sendWindow.WriteFull(buf, s.connId)
  61. return
  62. }
  63. func (s *conn) Close() (err error) {
  64. s.once.Do(s.closeProcess)
  65. return
  66. }
  67. func (s *conn) closeProcess() {
  68. s.isClose = true
  69. s.receiveWindow.mux.connMap.Delete(s.connId)
  70. if !s.receiveWindow.mux.IsClose {
  71. // if server or user close the conn while reading, will get a io.EOF
  72. // and this Close method will be invoke, send this signal to close other side
  73. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  74. }
  75. s.sendWindow.CloseWindow()
  76. s.receiveWindow.CloseWindow()
  77. return
  78. }
  79. func (s *conn) LocalAddr() net.Addr {
  80. return s.receiveWindow.mux.conn.LocalAddr()
  81. }
  82. func (s *conn) RemoteAddr() net.Addr {
  83. return s.receiveWindow.mux.conn.RemoteAddr()
  84. }
  85. func (s *conn) SetDeadline(t time.Time) error {
  86. _ = s.SetReadDeadline(t)
  87. _ = s.SetWriteDeadline(t)
  88. return nil
  89. }
  90. func (s *conn) SetReadDeadline(t time.Time) error {
  91. s.receiveWindow.SetTimeOut(t)
  92. return nil
  93. }
  94. func (s *conn) SetWriteDeadline(t time.Time) error {
  95. s.sendWindow.SetTimeOut(t)
  96. return nil
  97. }
  98. type window struct {
  99. off uint32
  100. maxSize uint32
  101. closeOp bool
  102. closeOpCh chan struct{}
  103. mux *Mux
  104. }
  105. func (Self *window) New() {
  106. Self.closeOpCh = make(chan struct{}, 2)
  107. }
  108. func (Self *window) CloseWindow() {
  109. if !Self.closeOp {
  110. Self.closeOp = true
  111. Self.closeOpCh <- struct{}{}
  112. Self.closeOpCh <- struct{}{}
  113. }
  114. }
  115. type ReceiveWindow struct {
  116. bufQueue FIFOQueue
  117. element *ListElement
  118. readLength uint32
  119. readOp chan struct{}
  120. readWait bool
  121. windowFull bool
  122. count int8
  123. bw *bandwidth
  124. once sync.Once
  125. window
  126. }
  127. func (Self *ReceiveWindow) New(mux *Mux) {
  128. // initial a window for receive
  129. Self.readOp = make(chan struct{})
  130. Self.bufQueue.New()
  131. Self.bw = new(bandwidth)
  132. Self.element = new(ListElement)
  133. Self.maxSize = 8192
  134. Self.mux = mux
  135. Self.window.New()
  136. }
  137. func (Self *ReceiveWindow) RemainingSize() (n uint32) {
  138. // receive window remaining
  139. if Self.maxSize >= Self.bufQueue.Len() {
  140. n = Self.maxSize - Self.bufQueue.Len()
  141. }
  142. // if maxSize is small than bufQueue length, return 0
  143. return
  144. }
  145. func (Self *ReceiveWindow) ReadSize() (n uint32) {
  146. // acknowledge the size already read
  147. Self.bufQueue.mutex.Lock()
  148. n = Self.readLength
  149. Self.readLength = 0
  150. Self.bufQueue.mutex.Unlock()
  151. return
  152. }
  153. func (Self *ReceiveWindow) CalcSize() {
  154. // calculating maximum receive window size
  155. if Self.count == 0 {
  156. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  157. n := uint32(2 * Self.mux.latency * Self.bw.Get())
  158. if n < 8192 {
  159. n = 8192
  160. }
  161. if n < Self.bufQueue.Len() {
  162. n = Self.bufQueue.Len()
  163. }
  164. // set the minimal size
  165. if n > common.MAXIMUM_WINDOW_SIZE {
  166. n = common.MAXIMUM_WINDOW_SIZE
  167. }
  168. // set the maximum size
  169. //logs.Warn("n", n)
  170. Self.maxSize = n
  171. Self.count = -5
  172. }
  173. Self.count += 1
  174. }
  175. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  176. if Self.closeOp {
  177. return errors.New("conn.receiveWindow: write on closed window")
  178. }
  179. element := ListElement{}
  180. err = element.New(buf, l, part)
  181. //logs.Warn("push the buf", len(buf), l, (&element).l)
  182. if err != nil {
  183. return
  184. }
  185. Self.bufQueue.Push(&element) // must push data before allow read
  186. //logs.Warn("read session calc size ", Self.maxSize)
  187. // calculating the receive window size
  188. Self.CalcSize()
  189. //logs.Warn("read session calc size finish", Self.maxSize)
  190. if Self.RemainingSize() == 0 {
  191. Self.windowFull = true
  192. //logs.Warn("window full true", Self.windowFull)
  193. }
  194. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
  195. return nil
  196. }
  197. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  198. if Self.closeOp {
  199. return 0, io.EOF // receive close signal, returns eof
  200. }
  201. pOff := 0
  202. l := 0
  203. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  204. copyData:
  205. Self.bw.StartRead()
  206. if Self.off == uint32(Self.element.l) {
  207. // on the first Read method invoked, Self.off and Self.element.l
  208. // both zero value
  209. Self.element, err = Self.bufQueue.Pop()
  210. // if the queue is empty, Pop method will wait until one element push
  211. // into the queue successful, or timeout.
  212. // timer start on timeout parameter is set up ,
  213. // reset to 60s if timeout and data still available
  214. Self.off = 0
  215. if err != nil {
  216. return // queue receive stop or time out, break the loop and return
  217. }
  218. //logs.Warn("pop element", Self.element.l, Self.element.part)
  219. }
  220. l = copy(p[pOff:], Self.element.buf[Self.off:])
  221. Self.bw.SetCopySize(l)
  222. pOff += l
  223. Self.off += uint32(l)
  224. Self.bufQueue.mutex.Lock()
  225. Self.readLength += uint32(l)
  226. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  227. Self.bufQueue.mutex.Unlock()
  228. n += l
  229. l = 0
  230. Self.bw.EndRead()
  231. Self.sendStatus(id)
  232. if Self.off == uint32(Self.element.l) {
  233. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  234. common.WindowBuff.Put(Self.element.buf)
  235. }
  236. if pOff < len(p) && Self.element.part {
  237. // element is a part of the segments, trying to fill up buf p
  238. goto copyData
  239. }
  240. return // buf p is full or all of segments in buf, return
  241. }
  242. func (Self *ReceiveWindow) sendStatus(id int32) {
  243. if Self.windowFull || Self.bufQueue.Len() == 0 {
  244. // window is full before read or empty now
  245. Self.windowFull = false
  246. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
  247. // acknowledge other side, have empty some receive window space
  248. //}
  249. }
  250. }
  251. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  252. // waiting for FIFO queue Pop method
  253. Self.bufQueue.SetTimeOut(t)
  254. }
  255. func (Self *ReceiveWindow) Stop() {
  256. // queue has no more data to push, so unblock pop method
  257. Self.once.Do(Self.bufQueue.Stop)
  258. }
  259. func (Self *ReceiveWindow) CloseWindow() {
  260. Self.window.CloseWindow()
  261. Self.Stop()
  262. }
  263. type SendWindow struct {
  264. buf []byte
  265. sentLength uint32
  266. setSizeCh chan struct{}
  267. setSizeWait bool
  268. unSlide uint32
  269. timeout time.Time
  270. window
  271. mutex sync.Mutex
  272. }
  273. func (Self *SendWindow) New(mux *Mux) {
  274. Self.setSizeCh = make(chan struct{})
  275. Self.maxSize = 4096
  276. Self.mux = mux
  277. Self.window.New()
  278. }
  279. func (Self *SendWindow) SetSendBuf(buf []byte) {
  280. // send window buff from conn write method, set it to send window
  281. Self.mutex.Lock()
  282. Self.buf = buf
  283. Self.off = 0
  284. Self.mutex.Unlock()
  285. }
  286. func (Self *SendWindow) RemainingSize() (n uint32) {
  287. if Self.maxSize >= Self.sentLength {
  288. n = Self.maxSize - Self.sentLength
  289. }
  290. return
  291. }
  292. func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
  293. defer func() {
  294. if recover() != nil {
  295. closed = true
  296. }
  297. }()
  298. if Self.closeOp {
  299. close(Self.setSizeCh)
  300. return true
  301. }
  302. if readLength == 0 && Self.maxSize == windowSize {
  303. //logs.Warn("waiting for another window size")
  304. return false // waiting for receive another usable window size
  305. }
  306. //logs.Warn("set send window size to ", windowSize, readLength)
  307. Self.mutex.Lock()
  308. Self.slide(windowSize, readLength)
  309. if Self.setSizeWait {
  310. // send window into the wait status, need notice the channel
  311. //logs.Warn("send window remaining size is 0 , wait")
  312. if Self.RemainingSize() == 0 {
  313. //logs.Warn("waiting for another window size after slide")
  314. // keep the wait status
  315. Self.mutex.Unlock()
  316. return false
  317. }
  318. Self.setSizeWait = false
  319. Self.mutex.Unlock()
  320. //logs.Warn("send window remaining size is 0 starting wait")
  321. select {
  322. case Self.setSizeCh <- struct{}{}:
  323. //logs.Warn("send window remaining size is 0 finish")
  324. return false
  325. case <-Self.closeOpCh:
  326. close(Self.setSizeCh)
  327. return true
  328. }
  329. }
  330. // send window not into the wait status, so just do slide
  331. Self.mutex.Unlock()
  332. return false
  333. }
  334. func (Self *SendWindow) slide(windowSize, readLength uint32) {
  335. Self.sentLength -= readLength
  336. Self.maxSize = windowSize
  337. }
  338. func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
  339. // returns buf segments, return only one segments, need a loop outside
  340. // until err = io.EOF
  341. if Self.closeOp {
  342. return nil, false, errors.New("conn.writeWindow: window closed")
  343. }
  344. if Self.off == uint32(len(Self.buf)) {
  345. return nil, false, io.EOF
  346. // send window buff is drain, return eof and get another one
  347. }
  348. Self.mutex.Lock()
  349. if Self.RemainingSize() == 0 {
  350. Self.setSizeWait = true
  351. Self.mutex.Unlock()
  352. // into the wait status
  353. err = Self.waitReceiveWindow()
  354. if err != nil {
  355. return nil, false, err
  356. }
  357. } else {
  358. Self.mutex.Unlock()
  359. }
  360. Self.mutex.Lock()
  361. var sendSize uint32
  362. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  363. sendSize = common.MAXIMUM_SEGMENT_SIZE
  364. part = true
  365. } else {
  366. sendSize = uint32(len(Self.buf[Self.off:]))
  367. part = false
  368. }
  369. if Self.RemainingSize() < sendSize {
  370. // usable window size is small than
  371. // window MAXIMUM_SEGMENT_SIZE or send buf left
  372. sendSize = Self.RemainingSize()
  373. part = true
  374. }
  375. //logs.Warn("send size", sendSize)
  376. p = Self.buf[Self.off : sendSize+Self.off]
  377. Self.off += sendSize
  378. Self.sentLength += sendSize
  379. Self.mutex.Unlock()
  380. return
  381. }
  382. func (Self *SendWindow) waitReceiveWindow() (err error) {
  383. t := Self.timeout.Sub(time.Now())
  384. if t < 0 {
  385. t = time.Minute
  386. }
  387. timer := time.NewTimer(t)
  388. defer timer.Stop()
  389. // waiting for receive usable window size, or timeout
  390. select {
  391. case _, ok := <-Self.setSizeCh:
  392. if !ok {
  393. return errors.New("conn.writeWindow: window closed")
  394. }
  395. return nil
  396. case <-timer.C:
  397. return errors.New("conn.writeWindow: write to time out")
  398. case <-Self.closeOpCh:
  399. return errors.New("conn.writeWindow: window closed")
  400. }
  401. }
  402. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  403. Self.SetSendBuf(buf) // set the buf to send window
  404. var bufSeg []byte
  405. var part bool
  406. for {
  407. bufSeg, part, err = Self.WriteTo()
  408. //logs.Warn("buf seg", len(bufSeg), part, err)
  409. // get the buf segments from send window
  410. if bufSeg == nil && part == false && err == io.EOF {
  411. // send window is drain, break the loop
  412. err = nil
  413. break
  414. }
  415. if err != nil {
  416. break
  417. }
  418. n += len(bufSeg)
  419. if part {
  420. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  421. } else {
  422. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  423. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  424. }
  425. // send to other side, not send nil data to other side
  426. }
  427. //logs.Warn("buf seg write success")
  428. return
  429. }
  430. func (Self *SendWindow) SetTimeOut(t time.Time) {
  431. // waiting for receive a receive window size
  432. Self.timeout = t
  433. }
  434. type bandwidth struct {
  435. lastReadStart time.Time
  436. readStart time.Time
  437. readEnd time.Time
  438. bufLength int
  439. lastBufLength int
  440. count int8
  441. readBW float64
  442. writeBW float64
  443. }
  444. func (Self *bandwidth) StartRead() {
  445. Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  446. }
  447. func (Self *bandwidth) EndRead() {
  448. if !Self.lastReadStart.IsZero() {
  449. if Self.count == 0 {
  450. Self.calcWriteBandwidth()
  451. }
  452. }
  453. Self.readEnd = time.Now()
  454. if Self.count == 0 {
  455. Self.calcReadBandwidth()
  456. Self.count = -3
  457. }
  458. Self.count += 1
  459. }
  460. func (Self *bandwidth) SetCopySize(n int) {
  461. // must be invoke between StartRead and EndRead
  462. Self.lastBufLength, Self.bufLength = Self.bufLength, n
  463. }
  464. func (Self *bandwidth) calcReadBandwidth() {
  465. // Bandwidth between nps and npc
  466. readTime := Self.readEnd.Sub(Self.readStart)
  467. Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  468. //logs.Warn("calc read bw", Self.bufLength, readTime.Seconds())
  469. }
  470. func (Self *bandwidth) calcWriteBandwidth() {
  471. // Bandwidth between nps and user, npc and application
  472. //logs.Warn("calc write bw")
  473. writeTime := Self.readEnd.Sub(Self.lastReadStart)
  474. Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  475. }
  476. func (Self *bandwidth) Get() (bw float64) {
  477. // The zero value, 0 for numeric types
  478. if Self.writeBW == 0 && Self.readBW == 0 {
  479. //logs.Warn("bw both 0")
  480. return 100
  481. }
  482. if Self.writeBW == 0 && Self.readBW != 0 {
  483. return Self.readBW
  484. }
  485. if Self.readBW == 0 && Self.writeBW != 0 {
  486. return Self.writeBW
  487. }
  488. return math.Min(Self.readBW, Self.writeBW)
  489. }