123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- package mux
- import (
- "errors"
- "io"
- "math"
- "net"
- "sync"
- "time"
- "github.com/cnlh/nps/lib/common"
- )
- type conn struct {
- net.Conn
- getStatusCh chan struct{}
- connStatusOkCh chan struct{}
- connStatusFailCh chan struct{}
- connId int32
- isClose bool
- closeFlag bool
- receiveWindow *ReceiveWindow
- sendWindow *SendWindow
- once sync.Once
- }
- func NewConn(connId int32, mux *Mux) *conn {
- c := &conn{
- getStatusCh: make(chan struct{}),
- connStatusOkCh: make(chan struct{}),
- connStatusFailCh: make(chan struct{}),
- connId: connId,
- receiveWindow: new(ReceiveWindow),
- sendWindow: new(SendWindow),
- once: sync.Once{},
- }
- c.receiveWindow.New(mux)
- c.sendWindow.New(mux)
- return c
- }
- func (s *conn) Read(buf []byte) (n int, err error) {
- if s.isClose || buf == nil {
- return 0, errors.New("the conn has closed")
- }
- if len(buf) == 0 {
- return 0, nil
- }
-
- n, err = s.receiveWindow.Read(buf, s.connId)
- return
- }
- func (s *conn) Write(buf []byte) (n int, err error) {
- if s.isClose {
- return 0, errors.New("the conn has closed")
- }
- if s.closeFlag {
-
- return 0, errors.New("io: write on closed conn")
- }
- if len(buf) == 0 {
- return 0, nil
- }
-
- n, err = s.sendWindow.WriteFull(buf, s.connId)
- return
- }
- func (s *conn) Close() (err error) {
- s.once.Do(s.closeProcess)
- return
- }
- func (s *conn) closeProcess() {
- s.isClose = true
- s.receiveWindow.mux.connMap.Delete(s.connId)
- if !s.receiveWindow.mux.IsClose {
-
-
- s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
- }
- s.sendWindow.CloseWindow()
- s.receiveWindow.CloseWindow()
- return
- }
- func (s *conn) LocalAddr() net.Addr {
- return s.receiveWindow.mux.conn.LocalAddr()
- }
- func (s *conn) RemoteAddr() net.Addr {
- return s.receiveWindow.mux.conn.RemoteAddr()
- }
- func (s *conn) SetDeadline(t time.Time) error {
- _ = s.SetReadDeadline(t)
- _ = s.SetWriteDeadline(t)
- return nil
- }
- func (s *conn) SetReadDeadline(t time.Time) error {
- s.receiveWindow.SetTimeOut(t)
- return nil
- }
- func (s *conn) SetWriteDeadline(t time.Time) error {
- s.sendWindow.SetTimeOut(t)
- return nil
- }
- type window struct {
- off uint32
- maxSize uint32
- closeOp bool
- closeOpCh chan struct{}
- mux *Mux
- }
- func (Self *window) New() {
- Self.closeOpCh = make(chan struct{}, 2)
- }
- func (Self *window) CloseWindow() {
- if !Self.closeOp {
- Self.closeOp = true
- Self.closeOpCh <- struct{}{}
- Self.closeOpCh <- struct{}{}
- }
- }
- type ReceiveWindow struct {
- bufQueue FIFOQueue
- element *ListElement
- readLength uint32
- readOp chan struct{}
- readWait bool
- windowFull bool
- count int8
- bw *bandwidth
- once sync.Once
- window
- }
- func (Self *ReceiveWindow) New(mux *Mux) {
-
- Self.readOp = make(chan struct{})
- Self.bufQueue.New()
- Self.bw = new(bandwidth)
- Self.element = new(ListElement)
- Self.maxSize = 8192
- Self.mux = mux
- Self.window.New()
- }
- func (Self *ReceiveWindow) RemainingSize() (n uint32) {
-
- if Self.maxSize >= Self.bufQueue.Len() {
- n = Self.maxSize - Self.bufQueue.Len()
- }
-
- return
- }
- func (Self *ReceiveWindow) ReadSize() (n uint32) {
-
- Self.bufQueue.mutex.Lock()
- n = Self.readLength
- Self.readLength = 0
- Self.bufQueue.mutex.Unlock()
- return
- }
- func (Self *ReceiveWindow) CalcSize() {
-
- if Self.count == 0 {
-
- n := uint32(2 * Self.mux.latency * Self.bw.Get())
- if n < 8192 {
- n = 8192
- }
- if n < Self.bufQueue.Len() {
- n = Self.bufQueue.Len()
- }
-
- if n > common.MAXIMUM_WINDOW_SIZE {
- n = common.MAXIMUM_WINDOW_SIZE
- }
-
-
- Self.maxSize = n
- Self.count = -5
- }
- Self.count += 1
- }
- func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
- if Self.closeOp {
- return errors.New("conn.receiveWindow: write on closed window")
- }
- element := ListElement{}
- err = element.New(buf, l, part)
-
- if err != nil {
- return
- }
- Self.bufQueue.Push(&element)
-
-
- Self.CalcSize()
-
- if Self.RemainingSize() == 0 {
- Self.windowFull = true
-
- }
- Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
- return nil
- }
- func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
- if Self.closeOp {
- return 0, io.EOF
- }
- pOff := 0
- l := 0
-
- copyData:
- Self.bw.StartRead()
- if Self.off == uint32(Self.element.l) {
-
-
- Self.element, err = Self.bufQueue.Pop()
-
-
-
-
- Self.off = 0
- if err != nil {
- return
- }
-
- }
- l = copy(p[pOff:], Self.element.buf[Self.off:])
- Self.bw.SetCopySize(l)
- pOff += l
- Self.off += uint32(l)
- Self.bufQueue.mutex.Lock()
- Self.readLength += uint32(l)
-
- Self.bufQueue.mutex.Unlock()
- n += l
- l = 0
- Self.bw.EndRead()
- Self.sendStatus(id)
- if Self.off == uint32(Self.element.l) {
-
- common.WindowBuff.Put(Self.element.buf)
- }
- if pOff < len(p) && Self.element.part {
-
- goto copyData
- }
- return
- }
- func (Self *ReceiveWindow) sendStatus(id int32) {
- if Self.windowFull || Self.bufQueue.Len() == 0 {
-
- Self.windowFull = false
- Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
-
-
- }
- }
- func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
-
- Self.bufQueue.SetTimeOut(t)
- }
- func (Self *ReceiveWindow) Stop() {
-
- Self.once.Do(Self.bufQueue.Stop)
- }
- func (Self *ReceiveWindow) CloseWindow() {
- Self.window.CloseWindow()
- Self.Stop()
- }
- type SendWindow struct {
- buf []byte
- sentLength uint32
- setSizeCh chan struct{}
- setSizeWait bool
- unSlide uint32
- timeout time.Time
- window
- mutex sync.Mutex
- }
- func (Self *SendWindow) New(mux *Mux) {
- Self.setSizeCh = make(chan struct{})
- Self.maxSize = 4096
- Self.mux = mux
- Self.window.New()
- }
- func (Self *SendWindow) SetSendBuf(buf []byte) {
-
- Self.mutex.Lock()
- Self.buf = buf
- Self.off = 0
- Self.mutex.Unlock()
- }
- func (Self *SendWindow) RemainingSize() (n uint32) {
- if Self.maxSize >= Self.sentLength {
- n = Self.maxSize - Self.sentLength
- }
- return
- }
- func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
- defer func() {
- if recover() != nil {
- closed = true
- }
- }()
- if Self.closeOp {
- close(Self.setSizeCh)
- return true
- }
- if readLength == 0 && Self.maxSize == windowSize {
-
- return false
- }
-
- Self.mutex.Lock()
- Self.slide(windowSize, readLength)
- if Self.setSizeWait {
-
-
- if Self.RemainingSize() == 0 {
-
-
- Self.mutex.Unlock()
- return false
- }
- Self.setSizeWait = false
- Self.mutex.Unlock()
-
- select {
- case Self.setSizeCh <- struct{}{}:
-
- return false
- case <-Self.closeOpCh:
- close(Self.setSizeCh)
- return true
- }
- }
-
- Self.mutex.Unlock()
- return false
- }
- func (Self *SendWindow) slide(windowSize, readLength uint32) {
- Self.sentLength -= readLength
- Self.maxSize = windowSize
- }
- func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
-
-
- if Self.closeOp {
- return nil, false, errors.New("conn.writeWindow: window closed")
- }
- if Self.off == uint32(len(Self.buf)) {
- return nil, false, io.EOF
-
- }
- Self.mutex.Lock()
- if Self.RemainingSize() == 0 {
- Self.setSizeWait = true
- Self.mutex.Unlock()
-
- err = Self.waitReceiveWindow()
- if err != nil {
- return nil, false, err
- }
- } else {
- Self.mutex.Unlock()
- }
- Self.mutex.Lock()
- var sendSize uint32
- if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
- sendSize = common.MAXIMUM_SEGMENT_SIZE
- part = true
- } else {
- sendSize = uint32(len(Self.buf[Self.off:]))
- part = false
- }
- if Self.RemainingSize() < sendSize {
-
-
- sendSize = Self.RemainingSize()
- part = true
- }
-
- p = Self.buf[Self.off : sendSize+Self.off]
- Self.off += sendSize
- Self.sentLength += sendSize
- Self.mutex.Unlock()
- return
- }
- func (Self *SendWindow) waitReceiveWindow() (err error) {
- t := Self.timeout.Sub(time.Now())
- if t < 0 {
- t = time.Minute
- }
- timer := time.NewTimer(t)
- defer timer.Stop()
-
- select {
- case _, ok := <-Self.setSizeCh:
- if !ok {
- return errors.New("conn.writeWindow: window closed")
- }
- return nil
- case <-timer.C:
- return errors.New("conn.writeWindow: write to time out")
- case <-Self.closeOpCh:
- return errors.New("conn.writeWindow: window closed")
- }
- }
- func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
- Self.SetSendBuf(buf)
- var bufSeg []byte
- var part bool
- for {
- bufSeg, part, err = Self.WriteTo()
-
-
- if bufSeg == nil && part == false && err == io.EOF {
-
- err = nil
- break
- }
- if err != nil {
- break
- }
- n += len(bufSeg)
- if part {
- Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
- } else {
- Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
-
- }
-
- }
-
- return
- }
- func (Self *SendWindow) SetTimeOut(t time.Time) {
-
- Self.timeout = t
- }
- type bandwidth struct {
- lastReadStart time.Time
- readStart time.Time
- readEnd time.Time
- bufLength int
- lastBufLength int
- count int8
- readBW float64
- writeBW float64
- }
- func (Self *bandwidth) StartRead() {
- Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
- }
- func (Self *bandwidth) EndRead() {
- if !Self.lastReadStart.IsZero() {
- if Self.count == 0 {
- Self.calcWriteBandwidth()
- }
- }
- Self.readEnd = time.Now()
- if Self.count == 0 {
- Self.calcReadBandwidth()
- Self.count = -3
- }
- Self.count += 1
- }
- func (Self *bandwidth) SetCopySize(n int) {
-
- Self.lastBufLength, Self.bufLength = Self.bufLength, n
- }
- func (Self *bandwidth) calcReadBandwidth() {
-
- readTime := Self.readEnd.Sub(Self.readStart)
- Self.readBW = float64(Self.bufLength) / readTime.Seconds()
-
- }
- func (Self *bandwidth) calcWriteBandwidth() {
-
-
- writeTime := Self.readEnd.Sub(Self.lastReadStart)
- Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
- }
- func (Self *bandwidth) Get() (bw float64) {
-
- if Self.writeBW == 0 && Self.readBW == 0 {
-
- return 100
- }
- if Self.writeBW == 0 && Self.readBW != 0 {
- return Self.readBW
- }
- if Self.readBW == 0 && Self.writeBW != 0 {
- return Self.writeBW
- }
- return math.Min(Self.readBW, Self.writeBW)
- }
|