conn.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package mux
  2. import (
  3. "errors"
  4. "github.com/cnlh/nps/lib/common"
  5. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  6. "io"
  7. "net"
  8. "sync"
  9. "time"
  10. )
  11. type conn struct {
  12. net.Conn
  13. getStatusCh chan struct{}
  14. connStatusOkCh chan struct{}
  15. connStatusFailCh chan struct{}
  16. readTimeOut time.Time
  17. writeTimeOut time.Time
  18. connId int32
  19. isClose bool
  20. closeFlag bool // close conn flag
  21. receiveWindow *window
  22. sendWindow *window
  23. readCh waitingCh
  24. writeCh waitingCh
  25. mux *Mux
  26. once sync.Once
  27. }
  28. func NewConn(connId int32, mux *Mux) *conn {
  29. c := &conn{
  30. getStatusCh: make(chan struct{}),
  31. connStatusOkCh: make(chan struct{}),
  32. connStatusFailCh: make(chan struct{}),
  33. connId: connId,
  34. receiveWindow: new(window),
  35. sendWindow: new(window),
  36. mux: mux,
  37. once: sync.Once{},
  38. }
  39. c.receiveWindow.NewReceive()
  40. c.sendWindow.NewSend()
  41. c.readCh.new()
  42. c.writeCh.new()
  43. return c
  44. }
  45. func (s *conn) Read(buf []byte) (n int, err error) {
  46. //logs.Warn("starting conn read", s.connId)
  47. if s.isClose || buf == nil {
  48. return 0, errors.New("the conn has closed")
  49. }
  50. // waiting for takeout from receive window finish or timeout
  51. go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh)
  52. if t := s.readTimeOut.Sub(time.Now()); t > 0 {
  53. timer := time.NewTimer(t)
  54. defer timer.Stop()
  55. select {
  56. case <-timer.C:
  57. return 0, errors.New("read timeout")
  58. case n = <-s.readCh.nCh:
  59. err = <-s.readCh.errCh
  60. }
  61. } else {
  62. n = <-s.readCh.nCh
  63. err = <-s.readCh.errCh
  64. }
  65. //logs.Warn("read window finish conn read n err buf", n, err, string(buf[:15]), s.connId)
  66. return
  67. }
  68. func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) {
  69. n, err := s.receiveWindow.Read(buf)
  70. //logs.Warn("readwindow goroutine status n err buf", n, err, string(buf[:15]))
  71. if s.receiveWindow.WindowFull {
  72. if s.receiveWindow.Size() > 0 {
  73. // window.Read may be invoked before window.Write, and WindowFull flag change to true
  74. // so make sure that receiveWindow is free some space
  75. s.receiveWindow.WindowFull = false
  76. logs.Warn("defer send mux msg send ok size", s.receiveWindow.Size())
  77. s.mux.sendInfo(common.MUX_MSG_SEND_OK, s.connId, s.receiveWindow.Size())
  78. // acknowledge other side, have empty some receive window space
  79. }
  80. }
  81. nCh <- n
  82. errCh <- err
  83. }
  84. func (s *conn) Write(buf []byte) (n int, err error) {
  85. //logs.Warn("write starting", s.connId)
  86. //defer logs.Warn("write end ", s.connId)
  87. if s.isClose {
  88. return 0, errors.New("the conn has closed")
  89. }
  90. if s.closeFlag {
  91. //logs.Warn("conn close by write ", s.connId)
  92. //s.Close()
  93. return 0, errors.New("io: write on closed conn")
  94. }
  95. s.sendWindow.SetSendBuf(buf) // set the buf to send window
  96. //logs.Warn("write set send buf success")
  97. go s.write(s.writeCh.nCh, s.writeCh.errCh)
  98. // waiting for send to other side or timeout
  99. if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
  100. timer := time.NewTimer(t)
  101. defer timer.Stop()
  102. select {
  103. case <-timer.C:
  104. return 0, errors.New("write timeout")
  105. case n = <-s.writeCh.nCh:
  106. err = <-s.writeCh.errCh
  107. }
  108. } else {
  109. n = <-s.writeCh.nCh
  110. err = <-s.writeCh.errCh
  111. }
  112. //logs.Warn("write window finish n err buf id", n, err, string(buf[:15]), s.connId)
  113. return
  114. }
  115. func (s *conn) write(nCh chan int, errCh chan error) {
  116. var n int
  117. var err error
  118. for {
  119. buf, err := s.sendWindow.WriteTo()
  120. // get the usable window size buf from send window
  121. if buf == nil && err == io.EOF {
  122. // send window is drain, break the loop
  123. err = nil
  124. break
  125. }
  126. if err != nil {
  127. break
  128. }
  129. n += len(buf)
  130. //logs.Warn("send window buf len", len(buf))
  131. s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf)
  132. // send to other side, not send nil data to other side
  133. }
  134. nCh <- n
  135. errCh <- err
  136. }
  137. func (s *conn) Close() (err error) {
  138. s.once.Do(s.closeProcess)
  139. return
  140. }
  141. func (s *conn) closeProcess() {
  142. s.isClose = true
  143. s.mux.connMap.Delete(s.connId)
  144. if !s.mux.IsClose {
  145. //logs.Warn("conn send close", s.connId)
  146. // if server or user close the conn while reading, will get a io.EOF
  147. // and this Close method will be invoke, send this signal to close other side
  148. s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
  149. }
  150. s.sendWindow.CloseWindow()
  151. s.receiveWindow.CloseWindow()
  152. return
  153. }
  154. func (s *conn) LocalAddr() net.Addr {
  155. return s.mux.conn.LocalAddr()
  156. }
  157. func (s *conn) RemoteAddr() net.Addr {
  158. return s.mux.conn.RemoteAddr()
  159. }
  160. func (s *conn) SetDeadline(t time.Time) error {
  161. s.readTimeOut = t
  162. s.writeTimeOut = t
  163. return nil
  164. }
  165. func (s *conn) SetReadDeadline(t time.Time) error {
  166. s.readTimeOut = t
  167. return nil
  168. }
  169. func (s *conn) SetWriteDeadline(t time.Time) error {
  170. s.writeTimeOut = t
  171. return nil
  172. }
  173. type window struct {
  174. windowBuff []byte
  175. off uint16
  176. readOp chan struct{}
  177. readWait bool
  178. WindowFull bool
  179. usableReceiveWindow chan uint16
  180. WriteWg sync.WaitGroup
  181. closeOp bool
  182. closeOpCh chan struct{}
  183. WriteEndOp chan struct{}
  184. mutex sync.Mutex
  185. }
  186. func (Self *window) NewReceive() {
  187. // initial a window for receive
  188. Self.windowBuff = common.WindowBuff.Get()
  189. Self.readOp = make(chan struct{})
  190. Self.WriteEndOp = make(chan struct{})
  191. Self.closeOpCh = make(chan struct{}, 3)
  192. }
  193. func (Self *window) NewSend() {
  194. // initial a window for send
  195. Self.usableReceiveWindow = make(chan uint16)
  196. Self.closeOpCh = make(chan struct{}, 3)
  197. }
  198. func (Self *window) SetSendBuf(buf []byte) {
  199. // send window buff from conn write method, set it to send window
  200. Self.mutex.Lock()
  201. Self.windowBuff = buf
  202. Self.off = 0
  203. Self.mutex.Unlock()
  204. }
  205. func (Self *window) fullSlide() {
  206. // slide by allocate
  207. newBuf := common.WindowBuff.Get()
  208. Self.liteSlide()
  209. n := copy(newBuf[:Self.len()], Self.windowBuff)
  210. common.WindowBuff.Put(Self.windowBuff)
  211. Self.windowBuff = newBuf[:n]
  212. return
  213. }
  214. func (Self *window) liteSlide() {
  215. // slide by re slice
  216. Self.windowBuff = Self.windowBuff[Self.off:]
  217. Self.off = 0
  218. return
  219. }
  220. func (Self *window) Size() (n int) {
  221. // receive Window remaining
  222. n = common.PoolSizeWindow - Self.len()
  223. return
  224. }
  225. func (Self *window) len() (n int) {
  226. n = len(Self.windowBuff[Self.off:])
  227. return
  228. }
  229. func (Self *window) cap() (n int) {
  230. n = cap(Self.windowBuff[Self.off:])
  231. return
  232. }
  233. func (Self *window) grow(n int) {
  234. Self.windowBuff = Self.windowBuff[:Self.len()+n]
  235. }
  236. func (Self *window) Write(p []byte) (n int, err error) {
  237. if Self.closeOp {
  238. return 0, errors.New("conn.receiveWindow: write on closed window")
  239. }
  240. if len(p) > Self.Size() {
  241. return 0, errors.New("conn.receiveWindow: write too large")
  242. }
  243. Self.mutex.Lock()
  244. // slide the offset
  245. if len(p) > Self.cap()-Self.len() {
  246. // not enough space, need to allocate
  247. Self.fullSlide()
  248. } else {
  249. // have enough space, re slice
  250. Self.liteSlide()
  251. }
  252. length := Self.len() // length before grow
  253. Self.grow(len(p)) // grow for copy
  254. n = copy(Self.windowBuff[length:], p) // must copy data before allow Read
  255. if Self.readWait {
  256. // if there condition is length == 0 and
  257. // Read method just take away all the windowBuff,
  258. // this method will block until windowBuff is empty again
  259. // allow continue read
  260. defer Self.allowRead()
  261. }
  262. Self.mutex.Unlock()
  263. return n, nil
  264. }
  265. func (Self *window) allowRead() (closed bool) {
  266. if Self.closeOp {
  267. close(Self.readOp)
  268. return true
  269. }
  270. Self.mutex.Lock()
  271. Self.readWait = false
  272. Self.mutex.Unlock()
  273. select {
  274. case <-Self.closeOpCh:
  275. close(Self.readOp)
  276. return true
  277. case Self.readOp <- struct{}{}:
  278. return false
  279. }
  280. }
  281. func (Self *window) Read(p []byte) (n int, err error) {
  282. if Self.closeOp {
  283. return 0, io.EOF // Write method receive close signal, returns eof
  284. }
  285. Self.mutex.Lock()
  286. length := Self.len() // protect the length data, it invokes
  287. // before Write lock and after Write unlock
  288. if length == 0 {
  289. // window is empty, waiting for Write method send a success readOp signal
  290. // or get timeout or close
  291. Self.readWait = true
  292. Self.mutex.Unlock()
  293. ticker := time.NewTicker(2 * time.Minute)
  294. defer ticker.Stop()
  295. select {
  296. case _, ok := <-Self.readOp:
  297. if !ok {
  298. return 0, errors.New("conn.receiveWindow: window closed")
  299. }
  300. case <-Self.WriteEndOp:
  301. return 0, io.EOF // receive eof signal, returns eof
  302. case <-ticker.C:
  303. return 0, errors.New("conn.receiveWindow: read time out")
  304. case <-Self.closeOpCh:
  305. close(Self.readOp)
  306. return 0, io.EOF // receive close signal, returns eof
  307. }
  308. } else {
  309. Self.mutex.Unlock()
  310. }
  311. minCopy := 512
  312. for {
  313. Self.mutex.Lock()
  314. if len(p) == n || Self.len() == 0 {
  315. Self.mutex.Unlock()
  316. break
  317. }
  318. if n+minCopy > len(p) {
  319. minCopy = len(p) - n
  320. }
  321. i := copy(p[n:n+minCopy], Self.windowBuff[Self.off:])
  322. Self.off += uint16(i)
  323. n += i
  324. Self.mutex.Unlock()
  325. }
  326. p = p[:n]
  327. return
  328. }
  329. func (Self *window) WriteTo() (p []byte, err error) {
  330. if Self.closeOp {
  331. //logs.Warn("window write to closed")
  332. return nil, errors.New("conn.writeWindow: window closed")
  333. }
  334. if Self.len() == 0 {
  335. return nil, io.EOF
  336. // send window buff is drain, return eof and get another one
  337. }
  338. var windowSize uint16
  339. var ok bool
  340. waiting:
  341. ticker := time.NewTicker(2 * time.Minute)
  342. defer ticker.Stop()
  343. // waiting for receive usable window size, or timeout
  344. select {
  345. case windowSize, ok = <-Self.usableReceiveWindow:
  346. if !ok {
  347. return nil, errors.New("conn.writeWindow: window closed")
  348. }
  349. case <-ticker.C:
  350. return nil, errors.New("conn.writeWindow: write to time out")
  351. case <-Self.closeOpCh:
  352. return nil, errors.New("conn.writeWindow: window closed")
  353. }
  354. if windowSize == 0 {
  355. goto waiting // waiting for another usable window size
  356. }
  357. Self.mutex.Lock()
  358. if windowSize > uint16(Self.len()) {
  359. // usable window size is bigger than window buff size, send the full buff
  360. windowSize = uint16(Self.len())
  361. }
  362. p = Self.windowBuff[Self.off : windowSize+Self.off]
  363. Self.off += windowSize
  364. Self.mutex.Unlock()
  365. return
  366. }
  367. func (Self *window) SetAllowSize(value uint16) (closed bool) {
  368. defer func() {
  369. if recover() != nil {
  370. closed = true
  371. }
  372. }()
  373. if Self.closeOp {
  374. close(Self.usableReceiveWindow)
  375. return true
  376. }
  377. select {
  378. case Self.usableReceiveWindow <- value:
  379. return false
  380. case <-Self.closeOpCh:
  381. close(Self.usableReceiveWindow)
  382. return true
  383. }
  384. }
  385. func (Self *window) CloseWindow() {
  386. Self.closeOp = true
  387. Self.closeOpCh <- struct{}{}
  388. Self.closeOpCh <- struct{}{}
  389. Self.closeOpCh <- struct{}{}
  390. close(Self.closeOpCh)
  391. return
  392. }
  393. type waitingCh struct {
  394. nCh chan int
  395. errCh chan error
  396. }
  397. func (Self *waitingCh) new() {
  398. Self.nCh = make(chan int)
  399. Self.errCh = make(chan error)
  400. }
  401. func (Self *waitingCh) close() {
  402. close(Self.nCh)
  403. close(Self.errCh)
  404. }