1
0
Эх сурвалжийг харах

change slide window design

ffdfgdfg 5 жил өмнө
parent
commit
b3ed822c72
5 өөрчлөгдсөн 590 нэмэгдсэн , 354 устгасан
  1. 3 1
      lib/common/const.go
  2. 26 11
      lib/common/netpackager.go
  3. 369 268
      lib/mux/conn.go
  4. 71 47
      lib/mux/mux.go
  5. 121 27
      lib/mux/queue.go

+ 3 - 1
lib/common/const.go

@@ -42,9 +42,11 @@ const (
 	MUX_NEW_CONN_OK
 	MUX_NEW_CONN_Fail
 	MUX_NEW_MSG
+	MUX_NEW_MSG_PART
 	MUX_MSG_SEND_OK
 	MUX_NEW_CONN
 	MUX_CONN_CLOSE
 	MUX_PING_RETURN
-	MUX_PING int32 = -1
+	MUX_PING             int32 = -1
+	MAXIMUM_SEGMENT_SIZE       = 4096 - 16 - 32 - 32 - 8
 )

+ 26 - 11
lib/common/netpackager.go

@@ -15,7 +15,7 @@ type NetPackager interface {
 }
 
 type BasePackager struct {
-	Length  uint32
+	Length  uint16
 	Content []byte
 }
 
@@ -101,7 +101,7 @@ func (Self *BasePackager) Unmarshal(content interface{}) (err error) {
 }
 
 func (Self *BasePackager) setLength() {
-	Self.Length = uint32(len(Self.Content))
+	Self.Length = uint16(len(Self.Content))
 	return
 }
 
@@ -147,25 +147,32 @@ func (Self *ConnPackager) UnPack(reader io.Reader) (err error) {
 }
 
 type MuxPackager struct {
-	Flag   uint8
-	Id     int32
-	Window uint16
+	Flag       uint8
+	Id         int32
+	Window     uint32
+	ReadLength uint32
 	BasePackager
 }
 
 func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) {
 	Self.Flag = flag
 	Self.Id = id
-	if flag == MUX_NEW_MSG {
+	if flag == MUX_NEW_MSG || flag == MUX_NEW_MSG_PART || flag == MUX_PING_FLAG {
 		err = Self.BasePackager.NewPac(content...)
 	}
 	if flag == MUX_MSG_SEND_OK {
 		// MUX_MSG_SEND_OK only allows one data
 		switch content[0].(type) {
 		case int:
-			Self.Window = uint16(content[0].(int))
-		case uint16:
-			Self.Window = content[0].(uint16)
+			Self.Window = uint32(content[0].(int))
+		case uint32:
+			Self.Window = content[0].(uint32)
+		}
+		switch content[1].(type) {
+		case int:
+			Self.ReadLength = uint32(content[1].(int))
+		case uint32:
+			Self.ReadLength = content[1].(uint32)
 		}
 	}
 	return
@@ -180,11 +187,15 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 	if err != nil {
 		return
 	}
-	if Self.Flag == MUX_NEW_MSG {
+	if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG {
 		err = Self.BasePackager.Pack(writer)
 	}
 	if Self.Flag == MUX_MSG_SEND_OK {
 		err = binary.Write(writer, binary.LittleEndian, Self.Window)
+		if err != nil {
+			return
+		}
+		err = binary.Write(writer, binary.LittleEndian, Self.ReadLength)
 	}
 	return
 }
@@ -199,11 +210,15 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
 	if err != nil {
 		return
 	}
-	if Self.Flag == MUX_NEW_MSG {
+	if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG {
 		err = Self.BasePackager.UnPack(reader)
 	}
 	if Self.Flag == MUX_MSG_SEND_OK {
 		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
+		if err != nil {
+			return
+		}
+		err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength)
 	}
 	return
 }

+ 369 - 268
lib/mux/conn.go

@@ -2,7 +2,9 @@ package mux
 
 import (
 	"errors"
+	"github.com/astaxie/beego/logs"
 	"io"
+	"math"
 	"net"
 	"sync"
 	"time"
@@ -15,16 +17,11 @@ type conn struct {
 	getStatusCh      chan struct{}
 	connStatusOkCh   chan struct{}
 	connStatusFailCh chan struct{}
-	readTimeOut      time.Time
-	writeTimeOut     time.Time
 	connId           int32
 	isClose          bool
 	closeFlag        bool // close conn flag
-	receiveWindow    *window
-	sendWindow       *window
-	readCh           waitingCh
-	writeCh          waitingCh
-	mux              *Mux
+	receiveWindow    *ReceiveWindow
+	sendWindow       *SendWindow
 	once             sync.Once
 }
 
@@ -34,15 +31,12 @@ func NewConn(connId int32, mux *Mux) *conn {
 		connStatusOkCh:   make(chan struct{}),
 		connStatusFailCh: make(chan struct{}),
 		connId:           connId,
-		receiveWindow:    new(window),
-		sendWindow:       new(window),
-		mux:              mux,
+		receiveWindow:    new(ReceiveWindow),
+		sendWindow:       new(SendWindow),
 		once:             sync.Once{},
 	}
-	c.receiveWindow.NewReceive()
-	c.sendWindow.NewSend()
-	c.readCh.new()
-	c.writeCh.new()
+	c.receiveWindow.New(mux)
+	c.sendWindow.New(mux)
 	return c
 }
 
@@ -50,39 +44,14 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 	if s.isClose || buf == nil {
 		return 0, errors.New("the conn has closed")
 	}
-	// waiting for takeout from receive window finish or timeout
-	go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh)
-	if t := s.readTimeOut.Sub(time.Now()); t > 0 {
-		timer := time.NewTimer(t)
-		defer timer.Stop()
-		select {
-		case <-timer.C:
-			return 0, errors.New("read timeout")
-		case n = <-s.readCh.nCh:
-			err = <-s.readCh.errCh
-		}
-	} else {
-		n = <-s.readCh.nCh
-		err = <-s.readCh.errCh
+	if len(buf) == 0 {
+		return 0, nil
 	}
+	// waiting for takeout from receive window finish or timeout
+	n, err = s.receiveWindow.Read(buf, s.connId)
 	return
 }
 
-func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) {
-	n, err := s.receiveWindow.Read(buf)
-	if s.receiveWindow.WindowFull {
-		if s.receiveWindow.Size() > 0 {
-			// window.Read may be invoked before window.Write, and WindowFull flag change to true
-			// so make sure that receiveWindow is free some space
-			s.receiveWindow.WindowFull = false
-			s.mux.sendInfo(common.MUX_MSG_SEND_OK, s.connId, s.receiveWindow.Size())
-			// acknowledge other side, have empty some receive window space
-		}
-	}
-	nCh <- n
-	errCh <- err
-}
-
 func (s *conn) Write(buf []byte) (n int, err error) {
 	if s.isClose {
 		return 0, errors.New("the conn has closed")
@@ -91,45 +60,13 @@ func (s *conn) Write(buf []byte) (n int, err error) {
 		//s.Close()
 		return 0, errors.New("io: write on closed conn")
 	}
-	s.sendWindow.SetSendBuf(buf) // set the buf to send window
-	go s.write(s.writeCh.nCh, s.writeCh.errCh)
-	// waiting for send to other side or timeout
-	if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
-		timer := time.NewTimer(t)
-		defer timer.Stop()
-		select {
-		case <-timer.C:
-			return 0, errors.New("write timeout")
-		case n = <-s.writeCh.nCh:
-			err = <-s.writeCh.errCh
-		}
-	} else {
-		n = <-s.writeCh.nCh
-		err = <-s.writeCh.errCh
+	if len(buf) == 0 {
+		return 0, nil
 	}
+	//logs.Warn("write buf", len(buf))
+	n, err = s.sendWindow.WriteFull(buf, s.connId)
 	return
 }
-func (s *conn) write(nCh chan int, errCh chan error) {
-	var n int
-	var err error
-	for {
-		buf, err := s.sendWindow.WriteTo()
-		// get the usable window size buf from send window
-		if buf == nil && err == io.EOF {
-			// send window is drain, break the loop
-			err = nil
-			break
-		}
-		if err != nil {
-			break
-		}
-		n += len(buf)
-		s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf)
-		// send to other side, not send nil data to other side
-	}
-	nCh <- n
-	errCh <- err
-}
 
 func (s *conn) Close() (err error) {
 	s.once.Do(s.closeProcess)
@@ -138,11 +75,11 @@ func (s *conn) Close() (err error) {
 
 func (s *conn) closeProcess() {
 	s.isClose = true
-	s.mux.connMap.Delete(s.connId)
-	if !s.mux.IsClose {
+	s.receiveWindow.mux.connMap.Delete(s.connId)
+	if !s.receiveWindow.mux.IsClose {
 		// if server or user close the conn while reading, will get a io.EOF
 		// and this Close method will be invoke, send this signal to close other side
-		s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
+		s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
 	}
 	s.sendWindow.CloseWindow()
 	s.receiveWindow.CloseWindow()
@@ -150,276 +87,440 @@ func (s *conn) closeProcess() {
 }
 
 func (s *conn) LocalAddr() net.Addr {
-	return s.mux.conn.LocalAddr()
+	return s.receiveWindow.mux.conn.LocalAddr()
 }
 
 func (s *conn) RemoteAddr() net.Addr {
-	return s.mux.conn.RemoteAddr()
+	return s.receiveWindow.mux.conn.RemoteAddr()
 }
 
 func (s *conn) SetDeadline(t time.Time) error {
-	s.readTimeOut = t
-	s.writeTimeOut = t
+	_ = s.SetReadDeadline(t)
+	_ = s.SetWriteDeadline(t)
 	return nil
 }
 
 func (s *conn) SetReadDeadline(t time.Time) error {
-	s.readTimeOut = t
+	s.receiveWindow.SetTimeOut(t)
 	return nil
 }
 
 func (s *conn) SetWriteDeadline(t time.Time) error {
-	s.writeTimeOut = t
+	s.sendWindow.SetTimeOut(t)
 	return nil
 }
 
 type window struct {
-	windowBuff          []byte
-	off                 uint16
-	readOp              chan struct{}
-	readWait            bool
-	WindowFull          bool
-	usableReceiveWindow chan uint16
-	WriteWg             sync.WaitGroup
-	closeOp             bool
-	closeOpCh           chan struct{}
-	WriteEndOp          chan struct{}
-	mutex               sync.Mutex
-}
-
-func (Self *window) NewReceive() {
-	// initial a window for receive
-	Self.windowBuff = common.WindowBuff.Get()
-	Self.readOp = make(chan struct{})
-	Self.WriteEndOp = make(chan struct{})
-	Self.closeOpCh = make(chan struct{}, 3)
-}
-
-func (Self *window) NewSend() {
-	// initial a window for send
-	Self.usableReceiveWindow = make(chan uint16)
-	Self.closeOpCh = make(chan struct{}, 3)
+	off       uint32
+	maxSize   uint32
+	closeOp   bool
+	closeOpCh chan struct{}
+	mux       *Mux
 }
 
-func (Self *window) SetSendBuf(buf []byte) {
-	// send window buff from conn write method, set it to send window
-	Self.mutex.Lock()
-	Self.windowBuff = buf
-	Self.off = 0
-	Self.mutex.Unlock()
+func (Self *window) New() {
+	Self.closeOpCh = make(chan struct{}, 2)
 }
 
-func (Self *window) fullSlide() {
-	// slide by allocate
-	newBuf := common.WindowBuff.Get()
-	Self.liteSlide()
-	n := copy(newBuf[:Self.len()], Self.windowBuff)
-	common.WindowBuff.Put(Self.windowBuff)
-	Self.windowBuff = newBuf[:n]
-	return
+func (Self *window) CloseWindow() {
+	if !Self.closeOp {
+		Self.closeOp = true
+		Self.closeOpCh <- struct{}{}
+		Self.closeOpCh <- struct{}{}
+	}
 }
 
-func (Self *window) liteSlide() {
-	// slide by re slice
-	Self.windowBuff = Self.windowBuff[Self.off:]
-	Self.off = 0
-	return
+type ReceiveWindow struct {
+	bufQueue   FIFOQueue
+	element    *ListElement
+	readLength uint32
+	readOp     chan struct{}
+	readWait   bool
+	windowFull bool
+	count      int8
+	bw         *bandwidth
+	once       sync.Once
+	window
 }
 
-func (Self *window) Size() (n int) {
-	// receive Window remaining
-	n = common.PoolSizeWindow - Self.len()
+func (Self *ReceiveWindow) New(mux *Mux) {
+	// initial a window for receive
+	Self.readOp = make(chan struct{})
+	Self.bufQueue.New()
+	Self.bw = new(bandwidth)
+	Self.element = new(ListElement)
+	Self.maxSize = 8192
+	Self.mux = mux
+	Self.window.New()
+}
+
+func (Self *ReceiveWindow) RemainingSize() (n uint32) {
+	// receive window remaining
+	if Self.maxSize >= Self.bufQueue.Len() {
+		n = Self.maxSize - Self.bufQueue.Len()
+	}
+	// if maxSize is small than bufQueue length, return 0
 	return
 }
 
-func (Self *window) len() (n int) {
-	n = len(Self.windowBuff[Self.off:])
+func (Self *ReceiveWindow) ReadSize() (n uint32) {
+	// acknowledge the size already read
+	Self.bufQueue.mutex.Lock()
+	n = Self.readLength
+	Self.readLength = 0
+	Self.bufQueue.mutex.Unlock()
+	Self.count += 1
 	return
 }
 
-func (Self *window) cap() (n int) {
-	n = cap(Self.windowBuff[Self.off:])
-	return
+func (Self *ReceiveWindow) CalcSize() {
+	// calculating maximum receive window size
+	if Self.count == 0 {
+		logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
+		n := uint32(2 * Self.mux.latency * Self.bw.Get())
+		if n < 8192 {
+			n = 8192
+		}
+		if n < Self.bufQueue.Len() {
+			n = Self.bufQueue.Len()
+		}
+		// set the minimal size
+		logs.Warn("n", n)
+		Self.maxSize = n
+		Self.count = -5
+	}
 }
 
-func (Self *window) grow(n int) {
-	Self.windowBuff = Self.windowBuff[:Self.len()+n]
+func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
+	if Self.closeOp {
+		return errors.New("conn.receiveWindow: write on closed window")
+	}
+	element := ListElement{}
+	err = element.New(buf, l, part)
+	//logs.Warn("push the buf", len(buf), l, (&element).l)
+	if err != nil {
+		return
+	}
+	Self.bufQueue.Push(&element) // must push data before allow read
+	//logs.Warn("read session calc size ", Self.maxSize)
+	// calculating the receive window size
+	Self.CalcSize()
+	logs.Warn("read session calc size finish", Self.maxSize)
+	if Self.RemainingSize() == 0 {
+		Self.windowFull = true
+		//logs.Warn("window full true", Self.windowFull)
+	}
+	Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
+	return nil
 }
 
-func (Self *window) Write(p []byte) (n int, err error) {
+func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
 	if Self.closeOp {
-		return 0, errors.New("conn.receiveWindow: write on closed window")
+		return 0, io.EOF // receive close signal, returns eof
 	}
-	if len(p) > Self.Size() {
-		return 0, errors.New("conn.receiveWindow: write too large")
+	pOff := 0
+	l := 0
+	//logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
+copyData:
+	Self.bw.StartRead()
+	if Self.off == uint32(Self.element.l) {
+		// on the first Read method invoked, Self.off and Self.element.l
+		// both zero value
+		Self.element, err = Self.bufQueue.Pop()
+		// if the queue is empty, Pop method will wait until one element push
+		// into the queue successful, or timeout.
+		// timer start on timeout parameter is set up ,
+		// reset to 60s if timeout and data still available
+		Self.off = 0
+		if err != nil {
+			return // queue receive stop or time out, break the loop and return
+		}
+		//logs.Warn("pop element", Self.element.l, Self.element.part)
 	}
-	Self.mutex.Lock()
-	// slide the offset
-	if len(p) > Self.cap()-Self.len() {
-		// not enough space, need to allocate
-		Self.fullSlide()
-	} else {
-		// have enough space, re slice
-		Self.liteSlide()
+	l = copy(p[pOff:], Self.element.buf[Self.off:])
+	Self.bw.SetCopySize(l)
+	pOff += l
+	Self.off += uint32(l)
+	Self.bufQueue.mutex.Lock()
+	Self.readLength += uint32(l)
+	//logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
+	Self.bufQueue.mutex.Unlock()
+	n += l
+	l = 0
+	Self.bw.EndRead()
+	Self.sendStatus(id)
+	if pOff < len(p) && Self.element.part {
+		// element is a part of the segments, trying to fill up buf p
+		goto copyData
 	}
-	length := Self.len()                  // length before grow
-	Self.grow(len(p))                     // grow for copy
-	n = copy(Self.windowBuff[length:], p) // must copy data before allow Read
-	if Self.readWait {
-		// if there condition is length == 0 and
-		// Read method just take away all the windowBuff,
-		// this method will block until windowBuff is empty again
+	return // buf p is full or all of segments in buf, return
+}
 
-		// allow continue read
-		defer Self.allowRead()
+func (Self *ReceiveWindow) sendStatus(id int32) {
+	if Self.windowFull || Self.bufQueue.Len() == 0 {
+		// window is full before read or empty now
+		Self.windowFull = false
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
+		// acknowledge other side, have empty some receive window space
+		//}
 	}
+}
+
+func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
+	// waiting for FIFO queue Pop method
+	Self.bufQueue.SetTimeOut(t)
+}
+
+func (Self *ReceiveWindow) Stop() {
+	// queue has no more data to push, so unblock pop method
+	Self.once.Do(Self.bufQueue.Stop)
+}
+
+func (Self *ReceiveWindow) CloseWindow() {
+	Self.window.CloseWindow()
+	Self.Stop()
+}
+
+type SendWindow struct {
+	buf         []byte
+	sentLength  uint32
+	setSizeCh   chan struct{}
+	setSizeWait bool
+	unSlide     uint32
+	timeout     time.Time
+	window
+	mutex sync.Mutex
+}
+
+func (Self *SendWindow) New(mux *Mux) {
+	Self.setSizeCh = make(chan struct{})
+	Self.maxSize = 4096
+	Self.mux = mux
+	Self.window.New()
+}
+
+func (Self *SendWindow) SetSendBuf(buf []byte) {
+	// send window buff from conn write method, set it to send window
+	Self.mutex.Lock()
+	Self.buf = buf
+	Self.off = 0
 	Self.mutex.Unlock()
-	return n, nil
 }
 
-func (Self *window) allowRead() (closed bool) {
+func (Self *SendWindow) RemainingSize() (n uint32) {
+	if Self.maxSize >= Self.sentLength {
+		n = Self.maxSize - Self.sentLength
+	}
+	return
+}
+
+func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
+	defer func() {
+		if recover() != nil {
+			closed = true
+		}
+	}()
 	if Self.closeOp {
-		close(Self.readOp)
+		close(Self.setSizeCh)
 		return true
 	}
+	if readLength == 0 && Self.maxSize == windowSize {
+		logs.Warn("waiting for another window size")
+		return false // waiting for receive another usable window size
+	}
+	logs.Warn("set send window size to ", windowSize, readLength)
 	Self.mutex.Lock()
-	Self.readWait = false
-	Self.mutex.Unlock()
-	select {
-	case <-Self.closeOpCh:
-		close(Self.readOp)
-		return true
-	case Self.readOp <- struct{}{}:
-		return false
+	Self.slide(windowSize, readLength)
+	if Self.setSizeWait {
+		// send window into the wait status, need notice the channel
+		//logs.Warn("send window remaining size is 0 , wait")
+		if Self.RemainingSize() == 0 {
+			//logs.Warn("waiting for another window size after slide")
+			// keep the wait status
+			Self.mutex.Unlock()
+			return false
+		}
+		Self.setSizeWait = false
+		Self.mutex.Unlock()
+		//logs.Warn("send window remaining size is 0 starting wait")
+		select {
+		case Self.setSizeCh <- struct{}{}:
+			//logs.Warn("send window remaining size is 0 finish")
+			return false
+		case <-Self.closeOpCh:
+			close(Self.setSizeCh)
+			return true
+		}
 	}
+	// send window not into the wait status, so just do slide
+	Self.mutex.Unlock()
+	return false
+}
+
+func (Self *SendWindow) slide(windowSize, readLength uint32) {
+	Self.sentLength -= readLength
+	Self.maxSize = windowSize
 }
 
-func (Self *window) Read(p []byte) (n int, err error) {
+func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
+	// returns buf segments, return only one segments, need a loop outside
+	// until err = io.EOF
 	if Self.closeOp {
-		return 0, io.EOF // Write method receive close signal, returns eof
+		return nil, false, errors.New("conn.writeWindow: window closed")
+	}
+	if Self.off == uint32(len(Self.buf)) {
+		return nil, false, io.EOF
+		// send window buff is drain, return eof and get another one
 	}
 	Self.mutex.Lock()
-	length := Self.len() // protect the length data, it invokes
-	// before Write lock and after Write unlock
-	if length == 0 {
-		// window is empty, waiting for Write method send a success readOp signal
-		// or get timeout or close
-		Self.readWait = true
+	if Self.RemainingSize() == 0 {
+		Self.setSizeWait = true
 		Self.mutex.Unlock()
-		ticker := time.NewTicker(2 * time.Minute)
-		defer ticker.Stop()
-		select {
-		case _, ok := <-Self.readOp:
-			if !ok {
-				return 0, errors.New("conn.receiveWindow: window closed")
-			}
-		case <-Self.WriteEndOp:
-			return 0, io.EOF // receive eof signal, returns eof
-		case <-ticker.C:
-			return 0, errors.New("conn.receiveWindow: read time out")
-		case <-Self.closeOpCh:
-			close(Self.readOp)
-			return 0, io.EOF // receive close signal, returns eof
+		// into the wait status
+		err = Self.waitReceiveWindow()
+		if err != nil {
+			return nil, false, err
 		}
 	} else {
 		Self.mutex.Unlock()
 	}
-	minCopy := 512
-	for {
-		Self.mutex.Lock()
-		if len(p) == n || Self.len() == 0 {
-			Self.mutex.Unlock()
-			break
-		}
-		if n+minCopy > len(p) {
-			minCopy = len(p) - n
-		}
-		i := copy(p[n:n+minCopy], Self.windowBuff[Self.off:])
-		Self.off += uint16(i)
-		n += i
-		Self.mutex.Unlock()
+	Self.mutex.Lock()
+	var sendSize uint32
+	if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
+		sendSize = common.MAXIMUM_SEGMENT_SIZE
+		part = true
+	} else {
+		sendSize = uint32(len(Self.buf[Self.off:]))
+		part = false
 	}
-	p = p[:n]
+	if Self.RemainingSize() < sendSize {
+		// usable window size is small than
+		// window MAXIMUM_SEGMENT_SIZE or send buf left
+		sendSize = Self.RemainingSize()
+		part = true
+	}
+	//logs.Warn("send size", sendSize)
+	p = Self.buf[Self.off : sendSize+Self.off]
+	Self.off += sendSize
+	Self.sentLength += sendSize
+	Self.mutex.Unlock()
 	return
 }
 
-func (Self *window) WriteTo() (p []byte, err error) {
-	if Self.closeOp {
-		return nil, errors.New("conn.writeWindow: window closed")
-	}
-	if Self.len() == 0 {
-		return nil, io.EOF
-		// send window buff is drain, return eof and get another one
+func (Self *SendWindow) waitReceiveWindow() (err error) {
+	t := Self.timeout.Sub(time.Now())
+	if t < 0 {
+		t = time.Minute
 	}
-	var windowSize uint16
-	var ok bool
-waiting:
-	ticker := time.NewTicker(2 * time.Minute)
-	defer ticker.Stop()
+	timer := time.NewTimer(t)
+	defer timer.Stop()
 	// waiting for receive usable window size, or timeout
 	select {
-	case windowSize, ok = <-Self.usableReceiveWindow:
+	case _, ok := <-Self.setSizeCh:
 		if !ok {
-			return nil, errors.New("conn.writeWindow: window closed")
+			return errors.New("conn.writeWindow: window closed")
 		}
-	case <-ticker.C:
-		return nil, errors.New("conn.writeWindow: write to time out")
+		return nil
+	case <-timer.C:
+		return errors.New("conn.writeWindow: write to time out")
 	case <-Self.closeOpCh:
-		return nil, errors.New("conn.writeWindow: window closed")
-	}
-	if windowSize == 0 {
-		goto waiting // waiting for another usable window size
+		return errors.New("conn.writeWindow: window closed")
 	}
-	Self.mutex.Lock()
-	if windowSize > uint16(Self.len()) {
-		// usable window size is bigger than window buff size, send the full buff
-		windowSize = uint16(Self.len())
+}
+
+func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
+	Self.SetSendBuf(buf) // set the buf to send window
+	var bufSeg []byte
+	var part bool
+	for {
+		bufSeg, part, err = Self.WriteTo()
+		//logs.Warn("buf seg", len(bufSeg), part, err)
+		// get the buf segments from send window
+		if bufSeg == nil && part == false && err == io.EOF {
+			// send window is drain, break the loop
+			err = nil
+			break
+		}
+		if err != nil {
+			break
+		}
+		n += len(bufSeg)
+		if part {
+			Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
+		} else {
+			Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
+			//logs.Warn("buf seg sent", len(bufSeg), part, err)
+		}
+		// send to other side, not send nil data to other side
 	}
-	p = Self.windowBuff[Self.off : windowSize+Self.off]
-	Self.off += windowSize
-	Self.mutex.Unlock()
+	//logs.Warn("buf seg write success")
 	return
 }
 
-func (Self *window) SetAllowSize(value uint16) (closed bool) {
-	defer func() {
-		if recover() != nil {
-			closed = true
+func (Self *SendWindow) SetTimeOut(t time.Time) {
+	// waiting for receive a receive window size
+	Self.timeout = t
+}
+
+type bandwidth struct {
+	lastReadStart time.Time
+	readStart     time.Time
+	readEnd       time.Time
+	bufLength     int
+	lastBufLength int
+	count         int8
+	readBW        float64
+	writeBW       float64
+}
+
+func (Self *bandwidth) StartRead() {
+	Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
+}
+
+func (Self *bandwidth) EndRead() {
+	if !Self.lastReadStart.IsZero() {
+		if Self.count == 0 {
+			Self.calcWriteBandwidth()
 		}
-	}()
-	if Self.closeOp {
-		close(Self.usableReceiveWindow)
-		return true
 	}
-	select {
-	case Self.usableReceiveWindow <- value:
-		return false
-	case <-Self.closeOpCh:
-		close(Self.usableReceiveWindow)
-		return true
+	Self.readEnd = time.Now()
+	if Self.count == 0 {
+		Self.calcReadBandwidth()
+		Self.count = -3
 	}
+	Self.count += 1
 }
 
-func (Self *window) CloseWindow() {
-	Self.closeOp = true
-	Self.closeOpCh <- struct{}{}
-	Self.closeOpCh <- struct{}{}
-	Self.closeOpCh <- struct{}{}
-	close(Self.closeOpCh)
-	return
+func (Self *bandwidth) SetCopySize(n int) {
+	// must be invoke between StartRead and EndRead
+	Self.lastBufLength, Self.bufLength = Self.bufLength, n
 }
 
-type waitingCh struct {
-	nCh   chan int
-	errCh chan error
+func (Self *bandwidth) calcReadBandwidth() {
+	// Bandwidth between nps and npc
+	readTime := Self.readEnd.Sub(Self.readStart)
+	Self.readBW = float64(Self.bufLength) / readTime.Seconds()
+	//logs.Warn("calc read bw", Self.bufLength, readTime.Seconds())
 }
 
-func (Self *waitingCh) new() {
-	Self.nCh = make(chan int)
-	Self.errCh = make(chan error)
+func (Self *bandwidth) calcWriteBandwidth() {
+	// Bandwidth between nps and user, npc and application
+	//logs.Warn("calc write bw")
+	writeTime := Self.readEnd.Sub(Self.lastReadStart)
+	Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
 }
 
-func (Self *waitingCh) close() {
-	close(Self.nCh)
-	close(Self.errCh)
+func (Self *bandwidth) Get() (bw float64) {
+	// The zero value, 0 for numeric types
+	if Self.writeBW == 0 && Self.readBW == 0 {
+		logs.Warn("bw both 0")
+		return 100
+	}
+	if Self.writeBW == 0 && Self.readBW != 0 {
+		return Self.readBW
+	}
+	if Self.readBW == 0 && Self.writeBW != 0 {
+		return Self.writeBW
+	}
+	return math.Min(Self.readBW, Self.writeBW)
 }

+ 71 - 47
lib/mux/mux.go

@@ -3,6 +3,7 @@ package mux
 import (
 	"bytes"
 	"errors"
+	"io"
 	"math"
 	"net"
 	"sync"
@@ -22,8 +23,10 @@ type Mux struct {
 	closeChan  chan struct{}
 	IsClose    bool
 	pingOk     int
+	latency    float64
+	pingCh     chan []byte
 	connType   string
-	writeQueue Queue
+	writeQueue PriorityQueue
 	bufCh      chan *bytes.Buffer
 	sync.Mutex
 }
@@ -38,13 +41,15 @@ func NewMux(c net.Conn, connType string) *Mux {
 		IsClose:   false,
 		connType:  connType,
 		bufCh:     make(chan *bytes.Buffer),
+		pingCh:    make(chan []byte),
 	}
 	m.writeQueue.New()
 	//read session by flag
-	go m.readSession()
+	m.readSession()
 	//ping
-	go m.ping()
-	go m.writeSession()
+	m.ping()
+	m.pingReturn()
+	m.writeSession()
 	return m
 }
 
@@ -83,10 +88,10 @@ func (s *Mux) Addr() net.Addr {
 	return s.conn.LocalAddr()
 }
 
-func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
+func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
 	var err error
 	pack := common.MuxPack.Get()
-	err = pack.NewPac(flag, id, data)
+	err = pack.NewPac(flag, id, data...)
 	if err != nil {
 		common.MuxPack.Put(pack)
 		return
@@ -98,11 +103,13 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
 func (s *Mux) writeSession() {
 	go s.packBuf()
 	go s.writeBuf()
-	<-s.closeChan
 }
 
 func (s *Mux) packBuf() {
 	for {
+		if s.IsClose {
+			break
+		}
 		pack := s.writeQueue.Pop()
 		buffer := common.BuffPool.Get()
 		err := pack.Pack(buffer)
@@ -117,12 +124,14 @@ func (s *Mux) packBuf() {
 		case <-s.closeChan:
 			break
 		}
-
 	}
 }
 
 func (s *Mux) writeBuf() {
 	for {
+		if s.IsClose {
+			break
+		}
 		select {
 		case buffer := <-s.bufCh:
 			l := buffer.Len()
@@ -141,8 +150,15 @@ func (s *Mux) writeBuf() {
 
 func (s *Mux) ping() {
 	go func() {
-		ticker := time.NewTicker(time.Second * 1)
+		now, _ := time.Now().MarshalText()
+		s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
+		// send the ping flag and get the latency first
+		ticker := time.NewTicker(time.Second * 15)
 		for {
+			if s.IsClose {
+				ticker.Stop()
+				break
+			}
 			select {
 			case <-ticker.C:
 			}
@@ -150,7 +166,8 @@ func (s *Mux) ping() {
 			if (math.MaxInt32 - s.id) < 10000 {
 				s.id = 0
 			}
-			s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil)
+			now, _ := time.Now().MarshalText()
+			s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
 			if s.pingOk > 10 && s.connType == "kcp" {
 				s.Close()
 				break
@@ -158,15 +175,32 @@ func (s *Mux) ping() {
 			s.pingOk++
 		}
 	}()
-	select {
-	case <-s.closeChan:
-	}
+}
+
+func (s *Mux) pingReturn() {
+	go func() {
+		var now time.Time
+		var data []byte
+		for {
+			select {
+			case data = <-s.pingCh:
+			case <-s.closeChan:
+				break
+			}
+			_ = now.UnmarshalText(data)
+			s.latency = time.Since(now).Seconds()
+			s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil)
+		}
+	}()
 }
 
 func (s *Mux) readSession() {
 	go func() {
 		pack := common.MuxPack.Get()
 		for {
+			if s.IsClose {
+				break
+			}
 			pack = common.MuxPack.Get()
 			if pack.UnPack(s.conn) != nil {
 				break
@@ -176,44 +210,25 @@ func (s *Mux) readSession() {
 			case common.MUX_NEW_CONN: //new connection
 				connection := NewConn(pack.Id, s)
 				s.connMap.Set(pack.Id, connection) //it has been set before send ok
-				go func(connection *conn) {
-					connection.sendWindow.SetAllowSize(512) // set the initial receive window
-				}(connection)
 				s.newConnCh <- connection
 				s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
 				continue
 			case common.MUX_PING_FLAG: //ping
-				go s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil)
+				s.pingCh <- pack.Content
 				continue
 			case common.MUX_PING_RETURN:
 				continue
 			}
 			if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
 				switch pack.Flag {
-				case common.MUX_NEW_MSG: //new msg from remote connection
-					//insert wait queue
-					if connection.isClose {
-						continue
+				case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
+					err := s.newMsg(connection, pack)
+					if err != nil {
+						connection.Close()
 					}
-					connection.receiveWindow.WriteWg.Add(1)
-					go func(connection *conn, content []byte) { // do not block read session
-						_, err := connection.receiveWindow.Write(content)
-						if err != nil {
-							logs.Warn("mux new msg err close", err)
-							connection.Close()
-						}
-						size := connection.receiveWindow.Size()
-						if size == 0 {
-							connection.receiveWindow.WindowFull = true
-						}
-						s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size)
-						connection.receiveWindow.WriteWg.Done()
-					}(connection, pack.Content)
 					continue
 				case common.MUX_NEW_CONN_OK: //connection ok
 					connection.connStatusOkCh <- struct{}{}
-					go connection.sendWindow.SetAllowSize(512)
-					// set the initial receive window both side
 					continue
 				case common.MUX_NEW_CONN_Fail:
 					connection.connStatusFailCh <- struct{}{}
@@ -222,15 +237,12 @@ func (s *Mux) readSession() {
 					if connection.isClose {
 						continue
 					}
-					go connection.sendWindow.SetAllowSize(pack.Window)
+					connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
 					continue
 				case common.MUX_CONN_CLOSE: //close the connection
 					s.connMap.Delete(pack.Id)
 					connection.closeFlag = true
-					go func(connection *conn) {
-						connection.receiveWindow.WriteWg.Wait()
-						connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window
-					}(connection)
+					connection.receiveWindow.Stop() // close signal to receive window
 					continue
 				}
 			} else if pack.Flag == common.MUX_CONN_CLOSE {
@@ -241,9 +253,24 @@ func (s *Mux) readSession() {
 		common.MuxPack.Put(pack)
 		s.Close()
 	}()
-	select {
-	case <-s.closeChan:
+}
+
+func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) {
+	if connection.isClose {
+		err = io.ErrClosedPipe
+		return
+	}
+	//logs.Warn("read session receive new msg", pack.Length)
+	//go func(connection *conn, pack *common.MuxPackager) { // do not block read session
+	//insert into queue
+	if pack.Flag == common.MUX_NEW_MSG_PART {
+		err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id)
+	}
+	if pack.Flag == common.MUX_NEW_MSG {
+		err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id)
 	}
+	//logs.Warn("read session write success", pack.Length)
+	return
 }
 
 func (s *Mux) Close() error {
@@ -255,9 +282,6 @@ func (s *Mux) Close() error {
 	s.connMap.Close()
 	s.closeChan <- struct{}{}
 	s.closeChan <- struct{}{}
-	s.closeChan <- struct{}{}
-	s.closeChan <- struct{}{}
-	s.closeChan <- struct{}{}
 	close(s.newConnCh)
 	return s.conn.Close()
 }

+ 121 - 27
lib/mux/queue.go

@@ -2,25 +2,54 @@ package mux
 
 import (
 	"container/list"
+	"errors"
 	"github.com/cnlh/nps/lib/common"
+	"io"
 	"sync"
+	"time"
 )
 
-type Queue struct {
-	list    *list.List
+type QueueOp struct {
 	readOp  chan struct{}
 	cleanOp chan struct{}
 	popWait bool
 	mutex   sync.Mutex
 }
 
-func (Self *Queue) New() {
-	Self.list = list.New()
+func (Self *QueueOp) New() {
 	Self.readOp = make(chan struct{})
 	Self.cleanOp = make(chan struct{}, 2)
 }
 
-func (Self *Queue) Push(packager *common.MuxPackager) {
+func (Self *QueueOp) allowPop() (closed bool) {
+	Self.mutex.Lock()
+	Self.popWait = false
+	Self.mutex.Unlock()
+	select {
+	case Self.readOp <- struct{}{}:
+		return false
+	case <-Self.cleanOp:
+		return true
+	}
+}
+
+func (Self *QueueOp) Clean() {
+	Self.cleanOp <- struct{}{}
+	Self.cleanOp <- struct{}{}
+	close(Self.cleanOp)
+}
+
+type PriorityQueue struct {
+	list *list.List
+	QueueOp
+}
+
+func (Self *PriorityQueue) New() {
+	Self.list = list.New()
+	Self.QueueOp.New()
+}
+
+func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
 	Self.mutex.Lock()
 	if Self.popWait {
 		defer Self.allowPop()
@@ -35,28 +64,16 @@ func (Self *Queue) Push(packager *common.MuxPackager) {
 	return
 }
 
-func (Self *Queue) allowPop() (closed bool) {
-	Self.mutex.Lock()
-	Self.popWait = false
-	Self.mutex.Unlock()
-	select {
-	case Self.readOp <- struct{}{}:
-		return false
-	case <-Self.cleanOp:
-		return true
-	}
-}
-
-func (Self *Queue) insert(packager *common.MuxPackager) {
+func (Self *PriorityQueue) insert(packager *common.MuxPackager) {
 	element := Self.list.Back()
 	for {
-		if element == nil { // Queue dose not have any of msg package with this close package id
+		if element == nil { // PriorityQueue dose not have any of msg package with this close package id
 			Self.list.PushFront(packager) // insert close package to first
 			break
 		}
 		if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG &&
 			element.Value.(*common.MuxPackager).Id == packager.Id {
-			Self.list.InsertAfter(packager, element) // Queue has some msg package
+			Self.list.InsertAfter(packager, element) // PriorityQueue has some msg package
 			// with this close package id, insert close package after last msg package
 			break
 		}
@@ -64,7 +81,7 @@ func (Self *Queue) insert(packager *common.MuxPackager) {
 	}
 }
 
-func (Self *Queue) Pop() (packager *common.MuxPackager) {
+func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
 	Self.mutex.Lock()
 	element := Self.list.Front()
 	if element != nil {
@@ -73,7 +90,7 @@ func (Self *Queue) Pop() (packager *common.MuxPackager) {
 		Self.mutex.Unlock()
 		return
 	}
-	Self.popWait = true // Queue is empty, notice Push method
+	Self.popWait = true // PriorityQueue is empty, notice Push method
 	Self.mutex.Unlock()
 	select {
 	case <-Self.readOp:
@@ -83,13 +100,90 @@ func (Self *Queue) Pop() (packager *common.MuxPackager) {
 	}
 }
 
-func (Self *Queue) Len() (n int) {
+func (Self *PriorityQueue) Len() (n int) {
 	n = Self.list.Len()
 	return
 }
 
-func (Self *Queue) Clean() {
-	Self.cleanOp <- struct{}{}
-	Self.cleanOp <- struct{}{}
-	close(Self.cleanOp)
+type ListElement struct {
+	buf  []byte
+	l    uint16
+	part bool
+}
+
+func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
+	if uint16(len(buf)) != l {
+		return errors.New("ListElement: buf length not match")
+	}
+	Self.buf = buf
+	Self.l = l
+	Self.part = part
+	return nil
+}
+
+type FIFOQueue struct {
+	list    []*ListElement
+	length  uint32
+	stopOp  chan struct{}
+	timeout time.Time
+	QueueOp
+}
+
+func (Self *FIFOQueue) New() {
+	Self.QueueOp.New()
+	Self.stopOp = make(chan struct{}, 1)
+}
+
+func (Self *FIFOQueue) Push(element *ListElement) {
+	Self.mutex.Lock()
+	if Self.popWait {
+		defer Self.allowPop()
+	}
+	Self.list = append(Self.list, element)
+	Self.length += uint32(element.l)
+	Self.mutex.Unlock()
+	return
+}
+
+func (Self *FIFOQueue) Pop() (element *ListElement, err error) {
+	Self.mutex.Lock()
+	if len(Self.list) == 0 {
+		Self.popWait = true
+		Self.mutex.Unlock()
+		t := Self.timeout.Sub(time.Now())
+		if t <= 0 {
+			t = time.Minute
+		}
+		timer := time.NewTimer(t)
+		defer timer.Stop()
+		select {
+		case <-Self.readOp:
+			Self.mutex.Lock()
+		case <-Self.cleanOp:
+			return
+		case <-Self.stopOp:
+			err = io.EOF
+			return
+		case <-timer.C:
+			err = errors.New("mux.queue: read time out")
+			return
+		}
+	}
+	element = Self.list[0]
+	Self.list = Self.list[1:]
+	Self.length -= uint32(element.l)
+	Self.mutex.Unlock()
+	return
+}
+
+func (Self *FIFOQueue) Len() (n uint32) {
+	return Self.length
+}
+
+func (Self *FIFOQueue) Stop() {
+	Self.stopOp <- struct{}{}
+}
+
+func (Self *FIFOQueue) SetTimeOut(t time.Time) {
+	Self.timeout = t
 }