1
0

conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. package mux
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/cnlh/nps/lib/common"
  10. )
  11. type conn struct {
  12. net.Conn
  13. getStatusCh chan struct{}
  14. connStatusOkCh chan struct{}
  15. connStatusFailCh chan struct{}
  16. connId int32
  17. isClose bool
  18. closeFlag bool // close conn flag
  19. receiveWindow *ReceiveWindow
  20. sendWindow *SendWindow
  21. once sync.Once
  22. label string
  23. }
  24. func NewConn(connId int32, mux *Mux, label ...string) *conn {
  25. c := &conn{
  26. getStatusCh: make(chan struct{}),
  27. connStatusOkCh: make(chan struct{}),
  28. connStatusFailCh: make(chan struct{}),
  29. connId: connId,
  30. receiveWindow: new(ReceiveWindow),
  31. sendWindow: new(SendWindow),
  32. once: sync.Once{},
  33. }
  34. if len(label) > 0 {
  35. c.label = label[0]
  36. }
  37. c.receiveWindow.New(mux)
  38. c.sendWindow.New(mux)
  39. logm := &connLog{
  40. startTime: time.Now(),
  41. isClose: false,
  42. logs: []string{c.label + "new conn success"},
  43. }
  44. setM(label[0], int(connId), logm)
  45. return c
  46. }
  47. func (s *conn) Read(buf []byte) (n int, err error) {
  48. if s.isClose || buf == nil {
  49. return 0, errors.New("the conn has closed")
  50. }
  51. if len(buf) == 0 {
  52. return 0, nil
  53. }
  54. // waiting for takeout from receive window finish or timeout
  55. n, err = s.receiveWindow.Read(buf, s.connId)
  56. var errstr string
  57. if err == nil {
  58. errstr = "err:nil"
  59. } else {
  60. errstr = err.Error()
  61. }
  62. d := getM(s.label, int(s.connId))
  63. d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr)
  64. setM(s.label, int(s.connId), d)
  65. return
  66. }
  67. func (s *conn) Write(buf []byte) (n int, err error) {
  68. if s.isClose {
  69. return 0, errors.New("the conn has closed")
  70. }
  71. if s.closeFlag {
  72. //s.Close()
  73. return 0, errors.New("io: write on closed conn")
  74. }
  75. if len(buf) == 0 {
  76. return 0, nil
  77. }
  78. //logs.Warn("write buf", len(buf))
  79. n, err = s.sendWindow.WriteFull(buf, s.connId)
  80. return
  81. }
  82. func (s *conn) Close() (err error) {
  83. s.once.Do(s.closeProcess)
  84. return
  85. }
  86. func (s *conn) closeProcess() {
  87. s.isClose = true
  88. s.receiveWindow.mux.connMap.Delete(s.connId)
  89. if !s.receiveWindow.mux.IsClose {
  90. // if server or user close the conn while reading, will get a io.EOF
  91. // and this Close method will be invoke, send this signal to close other side
  92. s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  93. }
  94. s.sendWindow.CloseWindow()
  95. s.receiveWindow.CloseWindow()
  96. d := getM(s.label, int(s.connId))
  97. d.isClose = true
  98. d.logs = append(d.logs, s.label+"close "+time.Now().String())
  99. setM(s.label, int(s.connId), d)
  100. return
  101. }
  102. func (s *conn) LocalAddr() net.Addr {
  103. return s.receiveWindow.mux.conn.LocalAddr()
  104. }
  105. func (s *conn) RemoteAddr() net.Addr {
  106. return s.receiveWindow.mux.conn.RemoteAddr()
  107. }
  108. func (s *conn) SetDeadline(t time.Time) error {
  109. _ = s.SetReadDeadline(t)
  110. _ = s.SetWriteDeadline(t)
  111. return nil
  112. }
  113. func (s *conn) SetReadDeadline(t time.Time) error {
  114. s.receiveWindow.SetTimeOut(t)
  115. return nil
  116. }
  117. func (s *conn) SetWriteDeadline(t time.Time) error {
  118. s.sendWindow.SetTimeOut(t)
  119. return nil
  120. }
  121. type window struct {
  122. off uint32
  123. maxSize uint32
  124. closeOp bool
  125. closeOpCh chan struct{}
  126. mux *Mux
  127. }
  128. func (Self *window) New() {
  129. Self.closeOpCh = make(chan struct{}, 2)
  130. }
  131. func (Self *window) CloseWindow() {
  132. if !Self.closeOp {
  133. Self.closeOp = true
  134. Self.closeOpCh <- struct{}{}
  135. Self.closeOpCh <- struct{}{}
  136. }
  137. }
  138. type ReceiveWindow struct {
  139. bufQueue FIFOQueue
  140. element *ListElement
  141. readLength uint32
  142. readOp chan struct{}
  143. readWait bool
  144. windowFull bool
  145. count int8
  146. //bw *bandwidth
  147. once sync.Once
  148. window
  149. }
  150. func (Self *ReceiveWindow) New(mux *Mux) {
  151. // initial a window for receive
  152. Self.readOp = make(chan struct{})
  153. Self.bufQueue.New()
  154. //Self.bw = new(bandwidth)
  155. Self.element = new(ListElement)
  156. Self.maxSize = 8192
  157. Self.mux = mux
  158. Self.window.New()
  159. }
  160. func (Self *ReceiveWindow) RemainingSize() (n uint32) {
  161. // receive window remaining
  162. if Self.maxSize >= Self.bufQueue.Len() {
  163. n = Self.maxSize - Self.bufQueue.Len()
  164. }
  165. // if maxSize is small than bufQueue length, return 0
  166. return
  167. }
  168. func (Self *ReceiveWindow) ReadSize() (n uint32) {
  169. // acknowledge the size already read
  170. Self.bufQueue.mutex.Lock()
  171. n = Self.readLength
  172. Self.readLength = 0
  173. Self.bufQueue.mutex.Unlock()
  174. return
  175. }
  176. func (Self *ReceiveWindow) CalcSize() {
  177. // calculating maximum receive window size
  178. if Self.count == 0 {
  179. //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
  180. n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
  181. if n < 8192 {
  182. n = 8192
  183. }
  184. if n < Self.bufQueue.Len() {
  185. n = Self.bufQueue.Len()
  186. }
  187. // set the minimal size
  188. if n > 2*Self.maxSize {
  189. n = 2 * Self.maxSize
  190. }
  191. if n > common.MAXIMUM_WINDOW_SIZE {
  192. n = common.MAXIMUM_WINDOW_SIZE
  193. }
  194. // set the maximum size
  195. //logs.Warn("n", n)
  196. Self.maxSize = n
  197. Self.count = -10
  198. }
  199. Self.count += 1
  200. }
  201. func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
  202. if Self.closeOp {
  203. return errors.New("conn.receiveWindow: write on closed window")
  204. }
  205. element := ListElement{}
  206. err = element.New(buf, l, part)
  207. //logs.Warn("push the buf", len(buf), l, (&element).l)
  208. if err != nil {
  209. return
  210. }
  211. Self.bufQueue.Push(&element) // must push data before allow read
  212. //logs.Warn("read session calc size ", Self.maxSize)
  213. // calculating the receive window size
  214. Self.CalcSize()
  215. //logs.Warn("read session calc size finish", Self.maxSize)
  216. if Self.RemainingSize() == 0 {
  217. Self.windowFull = true
  218. //logs.Warn("window full true", Self.windowFull)
  219. }
  220. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
  221. return nil
  222. }
  223. func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
  224. if Self.closeOp {
  225. return 0, io.EOF // receive close signal, returns eof
  226. }
  227. pOff := 0
  228. l := 0
  229. //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
  230. copyData:
  231. //Self.bw.StartRead()
  232. if Self.off == uint32(Self.element.l) {
  233. // on the first Read method invoked, Self.off and Self.element.l
  234. // both zero value
  235. Self.element, err = Self.bufQueue.Pop()
  236. // if the queue is empty, Pop method will wait until one element push
  237. // into the queue successful, or timeout.
  238. // timer start on timeout parameter is set up ,
  239. // reset to 60s if timeout and data still available
  240. Self.off = 0
  241. if err != nil {
  242. return // queue receive stop or time out, break the loop and return
  243. }
  244. //logs.Warn("pop element", Self.element.l, Self.element.part)
  245. }
  246. l = copy(p[pOff:], Self.element.buf[Self.off:])
  247. //Self.bw.SetCopySize(l)
  248. pOff += l
  249. Self.off += uint32(l)
  250. Self.bufQueue.mutex.Lock()
  251. Self.readLength += uint32(l)
  252. //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
  253. Self.bufQueue.mutex.Unlock()
  254. n += l
  255. l = 0
  256. //Self.bw.EndRead()
  257. Self.sendStatus(id)
  258. if Self.off == uint32(Self.element.l) {
  259. //logs.Warn("put the element end ", string(Self.element.buf[:15]))
  260. common.WindowBuff.Put(Self.element.buf)
  261. }
  262. if pOff < len(p) && Self.element.part {
  263. // element is a part of the segments, trying to fill up buf p
  264. goto copyData
  265. }
  266. return // buf p is full or all of segments in buf, return
  267. }
  268. func (Self *ReceiveWindow) sendStatus(id int32) {
  269. if Self.windowFull || Self.bufQueue.Len() == 0 {
  270. // window is full before read or empty now
  271. Self.windowFull = false
  272. Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
  273. // acknowledge other side, have empty some receive window space
  274. //}
  275. }
  276. }
  277. func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
  278. // waiting for FIFO queue Pop method
  279. Self.bufQueue.SetTimeOut(t)
  280. }
  281. func (Self *ReceiveWindow) Stop() {
  282. // queue has no more data to push, so unblock pop method
  283. Self.once.Do(Self.bufQueue.Stop)
  284. }
  285. func (Self *ReceiveWindow) CloseWindow() {
  286. Self.window.CloseWindow()
  287. Self.Stop()
  288. }
  289. type SendWindow struct {
  290. buf []byte
  291. sentLength uint32
  292. setSizeCh chan struct{}
  293. setSizeWait bool
  294. unSlide uint32
  295. timeout time.Time
  296. window
  297. mutex sync.Mutex
  298. }
  299. func (Self *SendWindow) New(mux *Mux) {
  300. Self.setSizeCh = make(chan struct{})
  301. Self.maxSize = 4096
  302. Self.mux = mux
  303. Self.window.New()
  304. }
  305. func (Self *SendWindow) SetSendBuf(buf []byte) {
  306. // send window buff from conn write method, set it to send window
  307. Self.mutex.Lock()
  308. Self.buf = buf
  309. Self.off = 0
  310. Self.mutex.Unlock()
  311. }
  312. func (Self *SendWindow) RemainingSize() (n uint32) {
  313. if Self.maxSize >= Self.sentLength {
  314. n = Self.maxSize - Self.sentLength
  315. }
  316. return
  317. }
  318. func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
  319. defer func() {
  320. if recover() != nil {
  321. closed = true
  322. }
  323. }()
  324. if Self.closeOp {
  325. close(Self.setSizeCh)
  326. return true
  327. }
  328. if readLength == 0 && Self.maxSize == windowSize {
  329. //logs.Warn("waiting for another window size")
  330. return false // waiting for receive another usable window size
  331. }
  332. //logs.Warn("set send window size to ", windowSize, readLength)
  333. Self.mutex.Lock()
  334. Self.slide(windowSize, readLength)
  335. if Self.setSizeWait {
  336. // send window into the wait status, need notice the channel
  337. //logs.Warn("send window remaining size is 0 , wait")
  338. if Self.RemainingSize() == 0 {
  339. //logs.Warn("waiting for another window size after slide")
  340. // keep the wait status
  341. Self.mutex.Unlock()
  342. return false
  343. }
  344. Self.setSizeWait = false
  345. Self.mutex.Unlock()
  346. //logs.Warn("send window remaining size is 0 starting wait")
  347. select {
  348. case Self.setSizeCh <- struct{}{}:
  349. //logs.Warn("send window remaining size is 0 finish")
  350. return false
  351. case <-Self.closeOpCh:
  352. close(Self.setSizeCh)
  353. return true
  354. }
  355. }
  356. // send window not into the wait status, so just do slide
  357. Self.mutex.Unlock()
  358. return false
  359. }
  360. func (Self *SendWindow) slide(windowSize, readLength uint32) {
  361. Self.sentLength -= readLength
  362. Self.maxSize = windowSize
  363. }
  364. func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
  365. // returns buf segments, return only one segments, need a loop outside
  366. // until err = io.EOF
  367. if Self.closeOp {
  368. return nil, false, errors.New("conn.writeWindow: window closed")
  369. }
  370. if Self.off == uint32(len(Self.buf)) {
  371. return nil, false, io.EOF
  372. // send window buff is drain, return eof and get another one
  373. }
  374. Self.mutex.Lock()
  375. if Self.RemainingSize() == 0 {
  376. Self.setSizeWait = true
  377. Self.mutex.Unlock()
  378. // into the wait status
  379. err = Self.waitReceiveWindow()
  380. if err != nil {
  381. return nil, false, err
  382. }
  383. } else {
  384. Self.mutex.Unlock()
  385. }
  386. Self.mutex.Lock()
  387. var sendSize uint32
  388. if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
  389. sendSize = common.MAXIMUM_SEGMENT_SIZE
  390. part = true
  391. } else {
  392. sendSize = uint32(len(Self.buf[Self.off:]))
  393. part = false
  394. }
  395. if Self.RemainingSize() < sendSize {
  396. // usable window size is small than
  397. // window MAXIMUM_SEGMENT_SIZE or send buf left
  398. sendSize = Self.RemainingSize()
  399. part = true
  400. }
  401. //logs.Warn("send size", sendSize)
  402. p = Self.buf[Self.off : sendSize+Self.off]
  403. Self.off += sendSize
  404. Self.sentLength += sendSize
  405. Self.mutex.Unlock()
  406. return
  407. }
  408. func (Self *SendWindow) waitReceiveWindow() (err error) {
  409. t := Self.timeout.Sub(time.Now())
  410. if t < 0 {
  411. t = time.Minute
  412. }
  413. timer := time.NewTimer(t)
  414. defer timer.Stop()
  415. // waiting for receive usable window size, or timeout
  416. select {
  417. case _, ok := <-Self.setSizeCh:
  418. if !ok {
  419. return errors.New("conn.writeWindow: window closed")
  420. }
  421. return nil
  422. case <-timer.C:
  423. return errors.New("conn.writeWindow: write to time out")
  424. case <-Self.closeOpCh:
  425. return errors.New("conn.writeWindow: window closed")
  426. }
  427. }
  428. func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
  429. Self.SetSendBuf(buf) // set the buf to send window
  430. var bufSeg []byte
  431. var part bool
  432. for {
  433. bufSeg, part, err = Self.WriteTo()
  434. //logs.Warn("buf seg", len(bufSeg), part, err)
  435. // get the buf segments from send window
  436. if bufSeg == nil && part == false && err == io.EOF {
  437. // send window is drain, break the loop
  438. err = nil
  439. break
  440. }
  441. if err != nil {
  442. break
  443. }
  444. n += len(bufSeg)
  445. if part {
  446. Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
  447. } else {
  448. Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
  449. //logs.Warn("buf seg sent", len(bufSeg), part, err)
  450. }
  451. // send to other side, not send nil data to other side
  452. }
  453. //logs.Warn("buf seg write success")
  454. return
  455. }
  456. func (Self *SendWindow) SetTimeOut(t time.Time) {
  457. // waiting for receive a receive window size
  458. Self.timeout = t
  459. }
  460. //type bandwidth struct {
  461. // readStart time.Time
  462. // lastReadStart time.Time
  463. // readEnd time.Time
  464. // lastReadEnd time.Time
  465. // bufLength int
  466. // lastBufLength int
  467. // count int8
  468. // readBW float64
  469. // writeBW float64
  470. // readBandwidth float64
  471. //}
  472. //
  473. //func (Self *bandwidth) StartRead() {
  474. // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
  475. // if !Self.lastReadStart.IsZero() {
  476. // if Self.count == -5 {
  477. // Self.calcBandWidth()
  478. // }
  479. // }
  480. //}
  481. //
  482. //func (Self *bandwidth) EndRead() {
  483. // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
  484. // if Self.count == -5 {
  485. // Self.calcWriteBandwidth()
  486. // }
  487. // if Self.count == 0 {
  488. // Self.calcReadBandwidth()
  489. // Self.count = -6
  490. // }
  491. // Self.count += 1
  492. //}
  493. //
  494. //func (Self *bandwidth) SetCopySize(n int) {
  495. // // must be invoke between StartRead and EndRead
  496. // Self.lastBufLength, Self.bufLength = Self.bufLength, n
  497. //}
  498. //// calculating
  499. //// start end start end
  500. //// read read
  501. //// write
  502. //
  503. //func (Self *bandwidth) calcBandWidth() {
  504. // t := Self.readStart.Sub(Self.lastReadStart)
  505. // if Self.lastBufLength >= 32768 {
  506. // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
  507. // }
  508. //}
  509. //
  510. //func (Self *bandwidth) calcReadBandwidth() {
  511. // // Bandwidth between nps and npc
  512. // readTime := Self.readEnd.Sub(Self.readStart)
  513. // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
  514. // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
  515. //}
  516. //
  517. //func (Self *bandwidth) calcWriteBandwidth() {
  518. // // Bandwidth between nps and user, npc and application
  519. // writeTime := Self.readStart.Sub(Self.lastReadEnd)
  520. // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
  521. // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
  522. //}
  523. //
  524. //func (Self *bandwidth) Get() (bw float64) {
  525. // // The zero value, 0 for numeric types
  526. // if Self.writeBW == 0 && Self.readBW == 0 {
  527. // //logs.Warn("bw both 0")
  528. // return 100
  529. // }
  530. // if Self.writeBW == 0 && Self.readBW != 0 {
  531. // return Self.readBW
  532. // }
  533. // if Self.readBW == 0 && Self.writeBW != 0 {
  534. // return Self.writeBW
  535. // }
  536. // return Self.readBandwidth
  537. //}