瀏覽代碼

add mux slide window

ffdfgdfg 5 年之前
父節點
當前提交
9d3df6be7e
共有 6 個文件被更改,包括 473 次插入241 次删除
  1. 18 2
      lib/common/netpackager.go
  2. 44 16
      lib/common/pool.go
  3. 20 87
      lib/common/util.go
  4. 314 84
      lib/mux/conn.go
  5. 54 36
      lib/mux/mux.go
  6. 23 16
      lib/mux/mux_test.go

+ 18 - 2
lib/common/netpackager.go

@@ -150,8 +150,9 @@ func (Self *ConnPackager) UnPack(reader io.Reader) (err error) {
 }
 
 type MuxPackager struct {
-	Flag uint8
-	Id   int32
+	Flag   uint8
+	Id     int32
+	Window uint16
 	BasePackager
 }
 
@@ -161,6 +162,15 @@ func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (e
 	if flag == MUX_NEW_MSG {
 		err = Self.BasePackager.NewPac(content...)
 	}
+	if flag == MUX_MSG_SEND_OK {
+		// MUX_MSG_SEND_OK only allows one data
+		switch content[0].(type) {
+		case int:
+			Self.Window = uint16(content[0].(int))
+		case uint16:
+			Self.Window = content[0].(uint16)
+		}
+	}
 	return
 }
 
@@ -176,6 +186,9 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 	if Self.Flag == MUX_NEW_MSG {
 		err = Self.BasePackager.Pack(writer)
 	}
+	if Self.Flag == MUX_MSG_SEND_OK {
+		err = binary.Write(writer, binary.LittleEndian, Self.Window)
+	}
 	return
 }
 
@@ -192,5 +205,8 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
 	if Self.Flag == MUX_NEW_MSG {
 		err = Self.BasePackager.UnPack(reader)
 	}
+	if Self.Flag == MUX_MSG_SEND_OK {
+		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
+	}
 	return
 }

+ 44 - 16
lib/common/pool.go

@@ -9,6 +9,7 @@ const PoolSize = 64 * 1024
 const PoolSizeSmall = 100
 const PoolSizeUdp = 1472
 const PoolSizeCopy = 32 << 10
+const PoolSizeWindow = 1<<16 - 1
 
 var BufPool = sync.Pool{
 	New: func() interface{} {
@@ -59,11 +60,11 @@ func PutBufPoolMax(buf []byte) {
 	}
 }
 
-type CopyBufferPool struct {
+type copyBufferPool struct {
 	pool sync.Pool
 }
 
-func (Self *CopyBufferPool) New() {
+func (Self *copyBufferPool) New() {
 	Self.pool = sync.Pool{
 		New: func() interface{} {
 			return make([]byte, PoolSizeCopy, PoolSizeCopy)
@@ -71,24 +72,49 @@ func (Self *CopyBufferPool) New() {
 	}
 }
 
-func (Self *CopyBufferPool) Get() []byte {
+func (Self *copyBufferPool) Get() []byte {
 	buf := Self.pool.Get().([]byte)
 	return buf[:PoolSizeCopy] // just like make a new slice, but data may not be 0
 }
 
-func (Self *CopyBufferPool) Put(x []byte) {
+func (Self *copyBufferPool) Put(x []byte) {
 	if len(x) == PoolSizeCopy {
 		Self.pool.Put(x)
 	} else {
-		x = nil // buf is not full, maybe truncated by gc in pool, not allowed
+		x = nil // buf is not full, not allowed, New method returns a full buf
 	}
 }
 
-type BufferPool struct {
+type windowBufferPool struct {
 	pool sync.Pool
 }
 
-func (Self *BufferPool) New() {
+func (Self *windowBufferPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			return make([]byte, 0, PoolSizeWindow)
+		},
+	}
+}
+
+func (Self *windowBufferPool) Get() (buf []byte) {
+	buf = Self.pool.Get().([]byte)
+	return buf[:0]
+}
+
+func (Self *windowBufferPool) Put(x []byte) {
+	if cap(x) == PoolSizeWindow {
+		Self.pool.Put(x[:PoolSizeWindow]) // make buf to full
+	} else {
+		x = nil
+	}
+}
+
+type bufferPool struct {
+	pool sync.Pool
+}
+
+func (Self *bufferPool) New() {
 	Self.pool = sync.Pool{
 		New: func() interface{} {
 			return new(bytes.Buffer)
@@ -96,20 +122,20 @@ func (Self *BufferPool) New() {
 	}
 }
 
-func (Self *BufferPool) Get() *bytes.Buffer {
+func (Self *bufferPool) Get() *bytes.Buffer {
 	return Self.pool.Get().(*bytes.Buffer)
 }
 
-func (Self *BufferPool) Put(x *bytes.Buffer) {
+func (Self *bufferPool) Put(x *bytes.Buffer) {
 	x.Reset()
 	Self.pool.Put(x)
 }
 
-type MuxPackagerPool struct {
+type muxPackagerPool struct {
 	pool sync.Pool
 }
 
-func (Self *MuxPackagerPool) New() {
+func (Self *muxPackagerPool) New() {
 	Self.pool = sync.Pool{
 		New: func() interface{} {
 			pack := MuxPackager{}
@@ -118,27 +144,29 @@ func (Self *MuxPackagerPool) New() {
 	}
 }
 
-func (Self *MuxPackagerPool) Get() *MuxPackager {
+func (Self *muxPackagerPool) Get() *MuxPackager {
 	pack := Self.pool.Get().(*MuxPackager)
 	buf := CopyBuff.Get()
 	pack.Content = buf
 	return pack
 }
 
-func (Self *MuxPackagerPool) Put(pack *MuxPackager) {
+func (Self *muxPackagerPool) Put(pack *MuxPackager) {
 	CopyBuff.Put(pack.Content)
 	Self.pool.Put(pack)
 }
 
 var once = sync.Once{}
-var BuffPool = BufferPool{}
-var CopyBuff = CopyBufferPool{}
-var MuxPack = MuxPackagerPool{}
+var BuffPool = bufferPool{}
+var CopyBuff = copyBufferPool{}
+var MuxPack = muxPackagerPool{}
+var WindowBuff = windowBufferPool{}
 
 func newPool() {
 	BuffPool.New()
 	CopyBuff.New()
 	MuxPack.New()
+	WindowBuff.New()
 }
 
 func init() {

+ 20 - 87
lib/common/util.go

@@ -4,9 +4,7 @@ import (
 	"bytes"
 	"encoding/base64"
 	"encoding/binary"
-	"errors"
 	"github.com/cnlh/nps/lib/crypt"
-	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"html/template"
 	"io"
 	"io/ioutil"
@@ -264,96 +262,31 @@ func GetPortByAddr(addr string) int {
 	return p
 }
 
-type ConnCopy struct {
-	dst    net.Conn
-	src    net.Conn
-	buf    []byte
-	connId int32
-}
-
-func (Self *ConnCopy) New(dst net.Conn, src net.Conn, connId int32) {
-	Self.dst = dst
-	Self.src = src
-	Self.buf = CopyBuff.Get()
-	Self.connId = connId
-}
-
-func (Self *ConnCopy) copyBufferOnce() (written int64, err error) {
-	nr, er := Self.src.Read(Self.buf)
-	if nr > 0 {
-		//logs.Warn("write", Self.connId, nr, string(buf[0:10]))
-		nw, ew := Self.dst.Write(Self.buf[0:nr])
-		if nw > 0 {
-			written = int64(nw)
-		}
-		if ew != nil {
-			//logs.Warn("write err ew id nw", ew, Self.connId, nw)
-			err = ew
-			return
-		}
-		if nr != nw {
-			err = io.ErrShortWrite
-			return
-		}
-		if nw == 0 {
-			err = errors.New("io: write on closed pipe")
-			//logs.Warn("write buffer", err)
-			return
-		}
-	}
-	if nr == 0 && er == nil {
-		err = errors.New("io: read on closed pipe")
-		//logs.Warn("read buffer", err)
-		return
-	}
-	if er != nil {
-		err = er
-		return
-	}
-	return
-}
-
-func (Self *ConnCopy) copyBuffer() (written int64, err error) {
-	var write int64
-	write, err = Self.copyBufferOnce() // first copy, if written is zero and err is io.EOF
-	// means conn already closed, so need to close all the conn
-	written += write
-	if err == io.EOF && written == 0 {
-		err = errors.New("io: read on closed pipe")
-		return
-	} else if err == io.EOF && written > 0 {
-		err = nil
-		return
-	}
+func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
+	buf := CopyBuff.Get()
+	defer CopyBuff.Put(buf)
 	for {
-		write, err = Self.copyBufferOnce()
-		written += write
-		if err != nil {
-			if err == io.EOF {
-				err = nil
+		nr, er := src.Read(buf)
+		if nr > 0 {
+			nw, ew := dst.Write(buf[0:nr])
+			if nw > 0 {
+				written += int64(nw)
+			}
+			if ew != nil {
+				err = ew
+				break
+			}
+			if nr != nw {
+				err = io.ErrShortWrite
+				break
 			}
-			return
-		}
-	}
-}
-
-func (Self *ConnCopy) CopyConn() (written int64, err error) {
-	defer CopyBuff.Put(Self.buf)
-	if Self.dst != nil && Self.src != nil {
-		written, err = Self.copyBuffer()
-	} else {
-		return 0, errors.New("copy conn nil src or dst")
-	}
-	if err != nil { // copyBuffer do not return io.EOF ,close all conn
-		logs.Warn("close by copy conn ", Self.connId, err)
-		if Self.dst != nil {
-			Self.dst.Close()
 		}
-		if Self.src != nil {
-			Self.src.Close()
+		if er != nil {
+			err = er
+			break
 		}
 	}
-	return
+	return written, err
 }
 
 //send this ip forget to get a local udp port

+ 314 - 84
lib/mux/conn.go

@@ -17,34 +17,37 @@ type conn struct {
 	connStatusFailCh chan struct{}
 	readTimeOut      time.Time
 	writeTimeOut     time.Time
-	readBuffer       []byte
-	startRead        int //now read position
-	endRead          int //now end read
-	readFlag         bool
-	readCh           chan struct{}
-	readQueue        *sliceEntry
-	stopWrite        bool
-	connId           int32
-	isClose          bool
-	readWait         bool
-	sendClose        bool // MUX_CONN_CLOSE already send
-	closeFlag        bool // close conn flag
-	hasWrite         int
-	mux              *Mux
-	once             sync.Once
+	//readBuffer       []byte
+	//startRead        int //now read position
+	//endRead          int //now end read
+	//readFlag         bool
+	//readCh           chan struct{}
+	//readQueue        *sliceEntry
+	//stopWrite        bool
+	connId  int32
+	isClose bool
+	//readWait         bool
+	closeFlag     bool // close conn flag
+	hasWrite      int
+	receiveWindow *window
+	sendWindow    *window
+	mux           *Mux
+	once          sync.Once
 }
 
 func NewConn(connId int32, mux *Mux) *conn {
 	c := &conn{
-		readCh:           make(chan struct{}),
 		getStatusCh:      make(chan struct{}),
 		connStatusOkCh:   make(chan struct{}),
 		connStatusFailCh: make(chan struct{}),
-		readQueue:        NewQueue(),
 		connId:           connId,
+		receiveWindow:    new(window),
+		sendWindow:       new(window),
 		mux:              mux,
 		once:             sync.Once{},
 	}
+	c.receiveWindow.NewReceive()
+	c.sendWindow.NewSend()
 	return c
 }
 
@@ -52,94 +55,99 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 	if s.isClose || buf == nil {
 		return 0, errors.New("the conn has closed")
 	}
-	if s.endRead-s.startRead == 0 { //read finish or start
-		if s.readQueue.Size() == 0 {
-			s.readWait = true
-			if t := s.readTimeOut.Sub(time.Now()); t > 0 {
-				timer := time.NewTimer(t)
-				defer timer.Stop()
-				select {
-				case <-timer.C:
-					s.readWait = false
-					return 0, errors.New("read timeout")
-				case <-s.readCh:
-				}
-			} else {
-				<-s.readCh
-			}
-		}
-		if s.isClose { //If the connection is closed instead of  continuing command
-			return 0, errors.New("the conn has closed")
-		}
-		if node, err := s.readQueue.Pop(); err != nil {
-			logs.Warn("conn close by read pop err", s.connId, err)
-			s.Close()
-			return 0, io.EOF
-		} else if node.val == nil {
-			s.sendClose = true
-			logs.Warn("conn close by read ", s.connId)
-			s.Close()
-		} else {
-			s.readBuffer = node.val
-			s.endRead = node.l
-			s.startRead = 0
+	nCh := make(chan int)
+	errCh := make(chan error)
+	defer close(nCh)
+	defer close(errCh)
+	// waiting for takeout from receive window finish or timeout
+	go s.readWindow(buf, nCh, errCh)
+	if t := s.readTimeOut.Sub(time.Now()); t > 0 {
+		timer := time.NewTimer(t)
+		defer timer.Stop()
+		select {
+		case <-timer.C:
+			return 0, errors.New("read timeout")
+		case n = <-nCh:
+			err = <-errCh
 		}
-	}
-	if len(buf) < s.endRead-s.startRead {
-		n = copy(buf, s.readBuffer[s.startRead:s.startRead+len(buf)])
-		s.startRead += n
 	} else {
-		n = copy(buf, s.readBuffer[s.startRead:s.endRead])
-		s.startRead += n
-		common.CopyBuff.Put(s.readBuffer)
+		n = <-nCh
+		err = <-errCh
 	}
+	logs.Warn("read window finish conn read n err buf", n, err, string(buf[:15]))
 	return
 }
 
+func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) {
+	n, err := s.receiveWindow.Read(buf)
+	//logs.Warn("readwindow goroutine status n err buf", n, err, string(buf[:15]))
+	if s.receiveWindow.WindowFull {
+		if s.receiveWindow.Size() > 0 {
+			// window.Read may be invoked before window.Write, and WindowFull flag change to true
+			// so make sure that receiveWindow is free some space
+			s.receiveWindow.WindowFull = false
+			logs.Warn("defer send mux msg send ok size", s.receiveWindow.Size())
+			s.mux.sendInfo(common.MUX_MSG_SEND_OK, s.connId, s.receiveWindow.Size())
+			// acknowledge other side, have empty some receive window space
+		}
+	}
+	nCh <- n
+	errCh <- err
+}
+
 func (s *conn) Write(buf []byte) (n int, err error) {
 	if s.isClose {
 		return 0, errors.New("the conn has closed")
 	}
 	if s.closeFlag {
-		s.sendClose = true
 		logs.Warn("conn close by write ", s.connId)
-		s.Close()
+		//s.Close()
 		return 0, errors.New("io: write on closed conn")
 	}
-	ch := make(chan struct{})
-	go s.write(buf, ch)
+
+	nCh := make(chan int)
+	errCh := make(chan error)
+	defer close(nCh)
+	defer close(errCh)
+	s.sendWindow.SetSendBuf(buf) // set the buf to send window
+	go s.write(nCh, errCh)
+	// waiting for send to other side or timeout
 	if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
 		timer := time.NewTimer(t)
 		defer timer.Stop()
 		select {
 		case <-timer.C:
 			return 0, errors.New("write timeout")
-		case <-ch:
+		case n = <-nCh:
+			err = <-errCh
 		}
 	} else {
-		<-ch
+		n = <-nCh
+		err = <-errCh
 	}
-	close(ch)
-	//if s.isClose {
-	//	return 0, io.ErrClosedPipe
-	//}
-	return len(buf), nil
+	return
 }
-func (s *conn) write(buf []byte, ch chan struct{}) {
-	start := 0
-	l := len(buf)
+func (s *conn) write(nCh chan int, errCh chan error) {
+	var n int
+	var err error
 	for {
-		if l-start > common.PoolSizeCopy {
-			//logs.Warn("conn write > poolsizecopy")
-			s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:start+common.PoolSizeCopy])
-			start += common.PoolSizeCopy
-		} else {
-			//logs.Warn("conn write <= poolsizecopy, start, len", start, l)
-			s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:l])
+		buf, err := s.sendWindow.WriteTo()
+		// get the usable window size buf from send window
+		if buf == nil && err == io.EOF {
+			// send window is drain, break the loop
+			err = nil
+			break
+		}
+		if err != nil {
 			break
 		}
+		n += len(buf)
+		//logs.Warn("send window buf len", len(buf))
+		s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf)
+		// send to other side, not send nil data to other side
 	}
-	ch <- struct{}{}
+	nCh <- n
+	errCh <- err
 }
 
 func (s *conn) Close() (err error) {
@@ -150,15 +158,14 @@ func (s *conn) Close() (err error) {
 func (s *conn) closeProcess() {
 	s.isClose = true
 	s.mux.connMap.Delete(s.connId)
-	common.CopyBuff.Put(s.readBuffer)
-	close(s.readCh)
-	s.readQueue.Clear()
 	if !s.mux.IsClose {
-		if !s.sendClose {
-			logs.Warn("conn send close", s.connId)
-			s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
-		}
+		logs.Warn("conn send close", s.connId)
+		// if server or user close the conn while reading, will get a io.EOF
+		// and this Close method will be invoke, send this signal to close other side
+		s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
 	}
+	s.sendWindow.CloseWindow()
+	s.receiveWindow.CloseWindow()
 	return
 }
 
@@ -185,3 +192,226 @@ func (s *conn) SetWriteDeadline(t time.Time) error {
 	s.writeTimeOut = t
 	return nil
 }
+
+type window struct {
+	windowBuff          []byte
+	off                 uint16
+	readOp              chan struct{}
+	readWait            bool
+	WindowFull          bool
+	usableReceiveWindow chan uint16
+	WriteWg             sync.WaitGroup
+	closeOp             bool
+	closeOpCh           chan struct{}
+	WriteEndOp          chan struct{}
+	mutex               sync.Mutex
+}
+
+func (Self *window) NewReceive() {
+	// initial a window for receive
+	Self.windowBuff = common.WindowBuff.Get()
+	Self.readOp = make(chan struct{})
+	Self.WriteEndOp = make(chan struct{})
+	Self.closeOpCh = make(chan struct{}, 2)
+}
+
+func (Self *window) NewSend() {
+	// initial a window for send
+	Self.usableReceiveWindow = make(chan uint16)
+	Self.closeOpCh = make(chan struct{}, 2)
+}
+
+func (Self *window) SetSendBuf(buf []byte) {
+	// send window buff from conn write method, set it to send window
+	Self.mutex.Lock()
+	Self.windowBuff = buf
+	Self.off = 0
+	Self.mutex.Unlock()
+}
+
+func (Self *window) fullSlide() {
+	// slide by allocate
+	newBuf := common.WindowBuff.Get()
+	copy(newBuf[0:Self.len()], Self.windowBuff[Self.off:])
+	Self.off = 0
+	common.WindowBuff.Put(Self.windowBuff)
+	Self.windowBuff = newBuf
+	return
+}
+
+func (Self *window) liteSlide() {
+	// slide by re slice
+	Self.windowBuff = Self.windowBuff[Self.off:]
+	Self.off = 0
+	return
+}
+
+func (Self *window) Size() (n int) {
+	// receive Window remaining
+	n = common.PoolSizeWindow - Self.len()
+	return
+}
+
+func (Self *window) len() (n int) {
+	n = len(Self.windowBuff[Self.off:])
+	return
+}
+
+func (Self *window) cap() (n int) {
+	n = cap(Self.windowBuff[Self.off:])
+	return
+}
+
+func (Self *window) grow(n int) {
+	Self.windowBuff = Self.windowBuff[:Self.len()+n]
+}
+
+func (Self *window) Write(p []byte) (n int, err error) {
+	if Self.closeOp {
+		logs.Warn("window write closed len p", len(p))
+		return 0, errors.New("conn.receiveWindow: write on closed window")
+	}
+	if len(p) > Self.Size() {
+		return 0, errors.New("conn.receiveWindow: write too large")
+	}
+	if Self.readWait {
+		defer Self.allowRead()
+	}
+	//logs.Warn("window write p string", len(p), string(p[:15]))
+	Self.mutex.Lock()
+	// slide the offset
+	if len(p) > Self.cap()-Self.len() {
+		// not enough space, need to allocate
+		Self.fullSlide()
+		//logs.Warn("window write full slide len cap", Self.len(), Self.cap())
+	} else {
+		// have enough space, re slice
+		Self.liteSlide()
+		//logs.Warn("window write lite slide len cap", Self.len(), Self.cap())
+	}
+	length := Self.len()                  // length before grow
+	Self.grow(len(p))                     // grow for copy
+	n = copy(Self.windowBuff[length:], p) // must copy data before allow Read
+	//logs.Warn("window write copy n len cap buf", n, Self.len(), Self.cap(), string(Self.windowBuff[Self.len()-n:Self.len()+15-n]))
+	Self.mutex.Unlock()
+	return n, nil
+}
+
+func (Self *window) allowRead() (closed bool) {
+	//logs.Warn("length 0 read op")
+	Self.readWait = false
+	if Self.closeOp {
+		close(Self.readOp)
+		return true
+	}
+	select {
+	case <-Self.closeOpCh:
+		close(Self.readOp)
+		return true
+	case Self.readOp <- struct{}{}:
+		//logs.Warn("length 0 read op finish")
+		return false
+	}
+}
+
+func (Self *window) Read(p []byte) (n int, err error) {
+	//logs.Warn("starting window read method len ", Self.len())
+	if Self.closeOp {
+		return 0, io.EOF // Write method receive close signal, returns eof
+	}
+	if Self.len() == 0 {
+		// window is empty, waiting for Write method send a success readOp signal
+		// or get timeout or close
+		Self.readWait = true
+		ticker := time.NewTicker(2 * time.Minute)
+		defer ticker.Stop()
+		select {
+		case _, ok := <-Self.readOp:
+			//logs.Warn("read window read op len cap", Self.len(), Self.cap())
+			if !ok {
+				return 0, errors.New("conn.receiveWindow: window closed")
+			}
+		case <-Self.WriteEndOp:
+			return 0, io.EOF // receive eof signal, returns eof
+		case <-ticker.C:
+			return 0, errors.New("conn.receiveWindow: read time out")
+		case <-Self.closeOpCh:
+			close(Self.readOp)
+			return 0, io.EOF // receive close signal, returns eof
+		}
+	}
+	//logs.Warn("window read start len window buff", Self.len(), string(Self.windowBuff[Self.off:Self.off+15]))
+	Self.mutex.Lock()
+	n = copy(p, Self.windowBuff[Self.off:])
+	Self.off += uint16(n)
+	p = p[:n]
+	//logs.Warn("window read finish n len p p", n, len(p), string(p[:15]))
+	Self.mutex.Unlock()
+	return
+}
+
+func (Self *window) WriteTo() (p []byte, err error) {
+	if Self.closeOp {
+		logs.Warn("window write to closed")
+		return nil, errors.New("conn.writeWindow: window closed")
+	}
+	if Self.len() == 0 {
+		return nil, io.EOF
+		// send window buff is drain, return eof and get another one
+	}
+	var windowSize uint16
+	var ok bool
+waiting:
+	ticker := time.NewTicker(2 * time.Minute)
+	defer ticker.Stop()
+	// waiting for receive usable window size, or timeout
+	select {
+	case windowSize, ok = <-Self.usableReceiveWindow:
+		if !ok {
+			return nil, errors.New("conn.writeWindow: window closed")
+		}
+	case <-ticker.C:
+		return nil, errors.New("conn.writeWindow: write to time out")
+	}
+	if windowSize == 0 {
+		goto waiting // waiting for another usable window size
+	}
+	Self.mutex.Lock()
+	if windowSize > uint16(Self.len()) {
+		// usable window size is bigger than window buff size, send the full buff
+		//logs.Warn("window size overflow windowSize len()", windowSize, Self.len())
+		windowSize = uint16(Self.len())
+	}
+	//logs.Warn("window buff off windowSize", Self.off, windowSize)
+	p = Self.windowBuff[Self.off : windowSize+Self.off]
+	Self.off += windowSize
+	Self.mutex.Unlock()
+	return
+}
+
+func (Self *window) SetAllowSize(value uint16) (closed bool) {
+	defer func() {
+		if recover() != nil {
+			closed = true
+		}
+	}()
+	if Self.closeOp {
+		close(Self.usableReceiveWindow)
+		return true
+	}
+	select {
+	case Self.usableReceiveWindow <- value:
+		return false
+	case <-Self.closeOpCh:
+		close(Self.usableReceiveWindow)
+		return true
+	}
+}
+
+func (Self *window) CloseWindow() {
+	Self.closeOp = true
+	Self.closeOpCh <- struct{}{}
+	Self.closeOpCh <- struct{}{}
+	close(Self.closeOpCh)
+	return
+}

+ 54 - 36
lib/mux/mux.go

@@ -81,10 +81,10 @@ func (s *Mux) Addr() net.Addr {
 	return s.conn.LocalAddr()
 }
 
-func (s *Mux) sendInfo(flag uint8, id int32, content []byte) {
+func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
 	var err error
 	if flag == common.MUX_NEW_MSG {
-		if len(content) == 0 {
+		if len(data.([]byte)) == 0 {
 			logs.Warn("send info content is nil")
 		}
 	}
@@ -92,7 +92,7 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) {
 	//defer pool.BuffPool.Put(buf)
 	pack := common.MuxPack.Get()
 	defer common.MuxPack.Put(pack)
-	err = pack.NewPac(flag, id, content)
+	err = pack.NewPac(flag, id, data)
 	if err != nil {
 		logs.Warn("new pack err", err)
 		common.BuffPool.Put(buf)
@@ -163,6 +163,7 @@ func (s *Mux) readSession() {
 	go func() {
 		pack := common.MuxPack.Get()
 		for {
+			pack = common.MuxPack.Get()
 			if pack.UnPack(s.conn) != nil {
 				break
 			}
@@ -172,17 +173,18 @@ func (s *Mux) readSession() {
 				}
 			}
 			if pack.Flag == common.MUX_NEW_CONN {
-				logs.Warn("unpack mux new conn", pack.Id)
+				logs.Warn("unpack mux new connection", pack.Id)
 			}
 			s.pingOk = 0
 			switch pack.Flag {
-			case common.MUX_NEW_CONN: //new conn
-				logs.Warn("rec mux new conn", pack.Id)
+			case common.MUX_NEW_CONN: //new connection
+				logs.Warn("rec mux new connection", pack.Id)
 				conn := NewConn(pack.Id, s)
 				s.connMap.Set(pack.Id, conn) //it has been set before send ok
 				s.newConnCh <- conn
+				go conn.sendWindow.SetAllowSize(512) // set the initial receive window
 				s.sendInfo(common.MUX_NEW_CONN_OK, pack.Id, nil)
-				logs.Warn("send mux new conn ok", pack.Id)
+				logs.Warn("send mux new connection ok", pack.Id)
 				continue
 			case common.MUX_PING_FLAG: //ping
 				//logs.Warn("send mux ping return")
@@ -191,49 +193,65 @@ func (s *Mux) readSession() {
 			case common.MUX_PING_RETURN:
 				continue
 			}
-			if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose {
+			if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
 				switch pack.Flag {
-				case common.MUX_NEW_MSG: //new msg from remote conn
+				case common.MUX_NEW_MSG: //new msg from remote connection
 					//insert wait queue
-					buf := common.CopyBuff.Get()
-					buf = pack.Content
-					logs.Warn("rec mux new msg ", pack.Id, string(buf[0:15]))
-					conn.readQueue.Push(NewBufNode(buf, int(pack.Length)))
-					//judge len if >xxx ,send stop
-					if conn.readWait {
-						conn.readWait = false
-						conn.readCh <- struct{}{}
+					if connection.isClose {
+						logs.Warn("rec mux new msg closed", pack.Id, string(pack.Content[0:15]))
+						continue
 					}
+					connection.receiveWindow.WriteWg.Add(1)
+					logs.Warn("rec mux new msg ", pack.Id, string(pack.Content[0:15]))
+					go func(connection *conn, content []byte) { // do not block read session
+						_, err := connection.receiveWindow.Write(content)
+						if err != nil {
+							logs.Warn("mux new msg err close", err)
+							s.Close()
+						}
+						size := connection.receiveWindow.Size()
+						if size == 0 {
+							connection.receiveWindow.WindowFull = true
+						}
+						s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size)
+						logs.Warn("send mux new msg ok", pack.Id, size)
+						connection.receiveWindow.WriteWg.Done()
+					}(connection, pack.Content)
 					continue
-				case common.MUX_NEW_CONN_OK: //conn ok
-					logs.Warn("rec mux new conn ok ", pack.Id)
-					conn.connStatusOkCh <- struct{}{}
+				case common.MUX_NEW_CONN_OK: //connection ok
+					logs.Warn("rec mux new connection ok ", pack.Id)
+					connection.connStatusOkCh <- struct{}{}
+					go connection.sendWindow.SetAllowSize(512)
+					// set the initial receive window both side
 					continue
 				case common.MUX_NEW_CONN_Fail:
-					logs.Warn("rec mux new conn fail", pack.Id)
-					conn.connStatusFailCh <- struct{}{}
+					logs.Warn("rec mux new connection fail", pack.Id)
+					connection.connStatusFailCh <- struct{}{}
+					continue
+				case common.MUX_MSG_SEND_OK:
+					if connection.isClose {
+						logs.Warn("rec mux msg send ok id window closed!", pack.Id, pack.Window)
+						continue
+					}
+					logs.Warn("rec mux msg send ok id window", pack.Id, pack.Window)
+					go connection.sendWindow.SetAllowSize(pack.Window)
 					continue
 				case common.MUX_CONN_CLOSE: //close the connection
-					logs.Warn("rec mux conn close", pack.Id)
+					logs.Warn("rec mux connection close", pack.Id)
 					s.connMap.Delete(pack.Id)
-					conn.closeFlag = true
-					conn.sendClose = true
-					if !conn.isClose {
-						conn.readQueue.Push(NewBufNode(nil, 0))
-						if conn.readWait {
-							logs.Warn("mux conn close read wait", pack.Id)
-							conn.readWait = false
-							conn.readCh <- struct{}{}
-							logs.Warn("mux conn close read wait pass", pack.Id)
-						}
-					}
-					logs.Warn("receive mux conn close, finish", conn.connId)
+					connection.closeFlag = true
+					go func(connection *conn) {
+						connection.receiveWindow.WriteWg.Wait()
+						connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window
+						logs.Warn("receive mux connection close, finish", connection.connId)
+					}(connection)
 					continue
 				}
 			} else if pack.Flag == common.MUX_CONN_CLOSE {
-				logs.Warn("rec mux conn close no id ", pack.Id)
+				logs.Warn("rec mux connection close no id ", pack.Id)
 				continue
 			}
+			common.MuxPack.Put(pack)
 		}
 		common.MuxPack.Put(pack)
 		logs.Warn("read session put pack ", pack.Id)

+ 23 - 16
lib/mux/mux_test.go

@@ -37,14 +37,17 @@ func TestNewMux(t *testing.T) {
 				logs.Warn(err)
 				continue
 			}
-			var npcToServer common.ConnCopy
-			npcToServer.New(c2, c, 0)
-			go npcToServer.CopyConn()
-			var serverToNpc common.ConnCopy
-			serverToNpc.New(c, c2, 10000)
-			_, err = serverToNpc.CopyConn()
-			if err == nil {
-				logs.Warn("close npc")
+			go func() {
+				_, err = common.CopyBuffer(c2, c)
+				if err != nil {
+					logs.Warn("close npc by copy from nps", err)
+					c2.Close()
+					c.Close()
+				}
+			}()
+			_, err = common.CopyBuffer(c, c2)
+			if err != nil {
+				logs.Warn("close npc by copy from server", err)
 				c2.Close()
 				c.Close()
 			}
@@ -71,14 +74,18 @@ func TestNewMux(t *testing.T) {
 				continue
 			}
 			logs.Warn("nps new conn success ", tmpCpnn.connId)
-			var userToNps common.ConnCopy
-			userToNps.New(tmpCpnn, conn, tmpCpnn.connId)
-			go userToNps.CopyConn()
-			var npsToUser common.ConnCopy
-			npsToUser.New(conn, tmpCpnn, tmpCpnn.connId+10000)
-			_, err = npsToUser.CopyConn()
-			if err == nil {
-				logs.Warn("close from out nps ", tmpCpnn.connId)
+			go func() {
+				_, err := common.CopyBuffer(tmpCpnn, conn)
+				if err != nil {
+					logs.Warn("close nps by copy from user", tmpCpnn.connId)
+					conn.Close()
+					tmpCpnn.Close()
+				}
+			}()
+			//time.Sleep(time.Second)
+			_, err = common.CopyBuffer(conn, tmpCpnn)
+			if err != nil {
+				logs.Warn("close nps by copy from npc ", tmpCpnn.connId)
 				conn.Close()
 				tmpCpnn.Close()
 			}