소스 검색

change slide window design

ffdfgdfg 5 년 전
부모
커밋
aad7ed8f24
4개의 변경된 파일195개의 추가작업 그리고 142개의 파일을 삭제
  1. 6 28
      lib/common/netpackager.go
  2. 178 102
      lib/mux/conn.go
  3. 1 1
      lib/mux/mux.go
  4. 10 11
      lib/mux/mux_test.go

+ 6 - 28
lib/common/netpackager.go

@@ -162,10 +162,9 @@ func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
 }
 
 type MuxPackager struct {
-	Flag       uint8
-	Id         int32
-	Window     uint32
-	ReadLength uint32
+	Flag   uint8
+	Id     int32
+	Window uint64
 	BasePackager
 }
 
@@ -178,19 +177,8 @@ func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (e
 		err = Self.BasePackager.NewPac(content...)
 		//logs.Warn(Self.Length, string(Self.Content))
 	case MUX_MSG_SEND_OK:
-		// MUX_MSG_SEND_OK contains two data
-		switch content[0].(type) {
-		case int:
-			Self.Window = uint32(content[0].(int))
-		case uint32:
-			Self.Window = content[0].(uint32)
-		}
-		switch content[1].(type) {
-		case int:
-			Self.ReadLength = uint32(content[1].(int))
-		case uint32:
-			Self.ReadLength = content[1].(uint32)
-		}
+		// MUX_MSG_SEND_OK contains one data
+		Self.Window = content[0].(uint64)
 	}
 	return
 }
@@ -210,10 +198,6 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 		WindowBuff.Put(Self.Content)
 	case MUX_MSG_SEND_OK:
 		err = binary.Write(writer, binary.LittleEndian, Self.Window)
-		if err != nil {
-			return
-		}
-		err = binary.Write(writer, binary.LittleEndian, Self.ReadLength)
 	}
 	return
 }
@@ -235,12 +219,7 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
 		//logs.Warn("unpack", Self.Length, string(Self.Content))
 	case MUX_MSG_SEND_OK:
 		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
-		if err != nil {
-			return
-		}
-		n += 4 // uint32
-		err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength)
-		n += 4 // uint32
+		n += 8 // uint64
 	}
 	n += 5 //uint8 int32
 	return
@@ -251,7 +230,6 @@ func (Self *MuxPackager) reset() {
 	Self.Flag = 0
 	Self.Length = 0
 	Self.Content = nil
-	Self.ReadLength = 0
 	Self.Window = 0
 }
 

+ 178 - 102
lib/mux/conn.go

@@ -3,6 +3,7 @@ package mux
 import (
 	"errors"
 	"github.com/astaxie/beego/logs"
+	"github.com/cnlh/nps/lib/common"
 	"io"
 	"math"
 	"net"
@@ -10,8 +11,6 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
-
-	"github.com/cnlh/nps/lib/common"
 )
 
 type conn struct {
@@ -146,25 +145,44 @@ func (s *conn) SetWriteDeadline(t time.Time) error {
 }
 
 type window struct {
-	remainingWait uint64 // 64bit alignment
-	off           uint32
-	maxSize       uint32
-	closeOp       bool
-	closeOpCh     chan struct{}
-	mux           *Mux
-}
-
-func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
-	const mask = 1<<dequeueBits - 1
-	remaining = uint32((ptrs >> dequeueBits) & mask)
-	wait = uint32(ptrs & mask)
+	maxSizeDone uint64
+	// 64bit alignment
+	// maxSizeDone contains 4 parts
+	//   1       31       1      31
+	// wait   maxSize  useless  done
+	// wait zero means false, one means true
+	off       uint32
+	closeOp   bool
+	closeOpCh chan struct{}
+	mux       *Mux
+}
+
+const windowBits = 31
+const waitBits = dequeueBits + windowBits
+const mask1 = 1
+const mask31 = 1<<windowBits - 1
+
+func (Self *window) unpack(ptrs uint64) (maxSize, done uint32, wait bool) {
+	maxSize = uint32((ptrs >> dequeueBits) & mask31)
+	done = uint32(ptrs & mask31)
+	//logs.Warn("unpack", maxSize, done)
+	if ((ptrs >> waitBits) & mask1) == 1 {
+		wait = true
+		return
+	}
 	return
 }
 
-func (Self *window) pack(remaining, wait uint32) uint64 {
-	const mask = 1<<dequeueBits - 1
-	return (uint64(remaining) << dequeueBits) |
-		uint64(wait&mask)
+func (Self *window) pack(maxSize, done uint32, wait bool) uint64 {
+	//logs.Warn("pack", maxSize, done, wait)
+	if wait {
+		return (uint64(1)<<waitBits |
+			uint64(maxSize&mask31)<<dequeueBits) |
+			uint64(done&mask31)
+	}
+	return (uint64(0)<<waitBits |
+		uint64(maxSize&mask31)<<dequeueBits) |
+		uint64(done&mask31)
 }
 
 func (Self *window) New() {
@@ -185,20 +203,22 @@ type ReceiveWindow struct {
 	element  *common.ListElement
 	count    int8
 	once     sync.Once
+	// receive window send the current max size and read size to send window
+	// means done size actually store the size receive window has read
 }
 
 func (Self *ReceiveWindow) New(mux *Mux) {
 	// initial a window for receive
 	Self.bufQueue.New()
 	Self.element = common.ListElementPool.Get()
-	Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
+	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false)
 	Self.mux = mux
 	Self.window.New()
 }
 
-func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
+func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) {
 	// receive window remaining
-	l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
+	l := int64(maxSize) - int64(Self.bufQueue.Len())
 	l -= int64(delta)
 	if l > 0 {
 		n = uint32(l)
@@ -213,27 +233,33 @@ func (Self *ReceiveWindow) calcSize() {
 		conns := Self.mux.connMap.Size()
 		n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
 			Self.mux.bw.Get() / float64(conns))
+		//logs.Warn(n)
 		if n < common.MAXIMUM_SEGMENT_SIZE*10 {
 			n = common.MAXIMUM_SEGMENT_SIZE * 10
 		}
-		//bufLen := Self.bufQueue.Len()
-		//if n < bufLen {
-		//	n = bufLen
-		//}
-		if n < Self.maxSize/2 {
-			n = Self.maxSize / 2
-		}
-		// set the minimal size
-		if n > 2*Self.maxSize {
-			n = 2 * Self.maxSize
-		}
-		if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
-			logs.Warn("window too large", n)
-			n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
+		for {
+			ptrs := atomic.LoadUint64(&Self.maxSizeDone)
+			size, read, wait := Self.unpack(ptrs)
+			if n < size/2 {
+				n = size / 2
+				// half reduce
+			}
+			// set the minimal size
+			if n > 2*size {
+				n = 2 * size
+				// twice grow
+			}
+			if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
+				//logs.Warn("window too large", n)
+				n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
+			}
+			// set the maximum size
+			//logs.Warn("n", n)
+			if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(n, read, wait)) {
+				// only change the maxSize
+				break
+			}
 		}
-		// set the maximum size
-		//logs.Warn("n", n)
-		atomic.StoreUint32(&Self.maxSize, n)
 		Self.count = -10
 	}
 	Self.count += 1
@@ -245,30 +271,40 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err
 		return errors.New("conn.receiveWindow: write on closed window")
 	}
 	element, err := NewListElement(buf, l, part)
-	//logs.Warn("push the buf", len(buf), l, (&element).l)
+	//logs.Warn("push the buf", len(buf), l, element.L)
 	if err != nil {
 		return
 	}
 	Self.calcSize() // calculate the max window size
-	var wait uint32
+	var wait bool
+	var maxSize, read uint32
 start:
-	ptrs := atomic.LoadUint64(&Self.remainingWait)
-	_, wait = Self.unpack(ptrs)
-	newRemaining := Self.remainingSize(l)
+	ptrs := atomic.LoadUint64(&Self.maxSizeDone)
+	maxSize, read, wait = Self.unpack(ptrs)
+	remain := Self.remainingSize(maxSize, l)
 	// calculate the remaining window size now, plus the element we will push
-	if newRemaining == 0 {
+	if remain == 0 && !wait {
 		//logs.Warn("window full true", remaining)
-		wait = 1
-	}
-	if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
-		goto start
-		// another goroutine change the status, make sure shall we need wait
-	}
+		wait = true
+		if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
+			// only change the wait status, not send the read size
+			goto start
+			// another goroutine change the status, make sure shall we need wait
+		}
+		//logs.Warn("receive window full")
+	} else if !wait {
+		if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, wait)) {
+			// reset read size here, and send the read size directly
+			goto start
+			// another goroutine change the status, make sure shall we need wait
+		}
+	} // maybe there are still some data received even if window is full, just keep the wait status
+	// and push into queue. when receive window read enough, send window will be acknowledged.
 	Self.bufQueue.Push(element)
 	// status check finish, now we can push the element into the queue
-	if wait == 0 {
-		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
-		// send the remaining window size, not including zero size
+	if !wait {
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
+		// send the current status to send window
 	}
 	return nil
 }
@@ -279,7 +315,7 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
 	}
 	pOff := 0
 	l := 0
-	//logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
+	//logs.Warn("receive window read off, element.l", Self.off, Self.element.L)
 copyData:
 	if Self.off == uint32(Self.element.L) {
 		// on the first Read method invoked, Self.off and Self.element.l
@@ -291,14 +327,13 @@ copyData:
 		Self.element, err = Self.bufQueue.Pop()
 		// if the queue is empty, Pop method will wait until one element push
 		// into the queue successful, or timeout.
-		// timer start on timeout parameter is set up ,
-		// reset to 60s if timeout and data still available
+		// timer start on timeout parameter is set up
 		Self.off = 0
 		if err != nil {
 			Self.CloseWindow() // also close the window, to avoid read twice
 			return             // queue receive stop or time out, break the loop and return
 		}
-		//logs.Warn("pop element", Self.element.l, Self.element.part)
+		//logs.Warn("pop element", Self.element.L, Self.element.Part)
 	}
 	l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
 	pOff += l
@@ -320,22 +355,41 @@ copyData:
 }
 
 func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
-	var remaining, wait uint32
+	var maxSize, read uint32
+	var wait bool
 	for {
-		ptrs := atomic.LoadUint64(&Self.remainingWait)
-		remaining, wait = Self.unpack(ptrs)
-		remaining += uint32(l)
-		if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
-			break
+		ptrs := atomic.LoadUint64(&Self.maxSizeDone)
+		maxSize, read, wait = Self.unpack(ptrs)
+		if read <= (read+uint32(l))&mask31 {
+			read += uint32(l)
+			remain := Self.remainingSize(maxSize, 0)
+			if wait && remain > 0 || remain == maxSize {
+				if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) {
+					// now we get the current window status success
+					// receive window free up some space we need acknowledge send window, also reset the read size
+					// still having a condition that receive window is empty and not send the status to send window
+					// so send the status here
+					//logs.Warn("receive window free up some space", remain)
+					Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
+					break
+				}
+			} else {
+				if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) {
+					// receive window not into the wait status, or still not having any space now,
+					// just change the read size
+					break
+				}
+			}
+		} else {
+			//overflow
+			if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, uint32(l), wait)) {
+				// reset to l
+				Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false))
+				break
+			}
 		}
 		runtime.Gosched()
 		// another goroutine change remaining or wait status, make sure
-		// we need acknowledge other side
-	}
-	// now we get the current window status success
-	if wait == 1 {
-		//logs.Warn("send the wait status", remaining)
-		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
 	}
 	return
 }
@@ -380,12 +434,14 @@ type SendWindow struct {
 	buf       []byte
 	setSizeCh chan struct{}
 	timeout   time.Time
+	// send window receive the receive window max size and read size
+	// done size store the size send window has send, send and read will be totally equal
+	// so send minus read, send window can get the current window size remaining
 }
 
 func (Self *SendWindow) New(mux *Mux) {
 	Self.setSizeCh = make(chan struct{})
-	Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
-	atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
+	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false)
 	Self.mux = mux
 	Self.window.New()
 }
@@ -396,7 +452,15 @@ func (Self *SendWindow) SetSendBuf(buf []byte) {
 	Self.off = 0
 }
 
-func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
+func (Self *SendWindow) remainingSize(maxSize, send uint32) uint32 {
+	l := int64(maxSize&mask31) - int64(send&mask31)
+	if l > 0 {
+		return uint32(l)
+	}
+	return 0
+}
+
+func (Self *SendWindow) SetSize(currentMaxSizeDone uint64) (closed bool) {
 	// set the window size from receive window
 	defer func() {
 		if recover() != nil {
@@ -408,26 +472,34 @@ func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
 		return true
 	}
 	//logs.Warn("set send window size to ", windowSize, newRemaining)
-	var remaining, wait, newWait uint32
+	var maxsize, send uint32
+	var wait, newWait bool
+	currentMaxSize, read, _ := Self.unpack(currentMaxSizeDone)
 	for {
-		ptrs := atomic.LoadUint64(&Self.remainingWait)
-		remaining, wait = Self.unpack(ptrs)
-		if remaining == newRemaining {
-			//logs.Warn("waiting for another window size")
-			return false // waiting for receive another usable window size
+		ptrs := atomic.LoadUint64(&Self.maxSizeDone)
+		maxsize, send, wait = Self.unpack(ptrs)
+		if read > send {
+			logs.Error("read > send")
+			return
+		}
+		if read == 0 && currentMaxSize == maxsize {
+			return
 		}
-		if newRemaining == 0 && wait == 1 {
-			newWait = 1 // keep the wait status,
-			// also if newRemaining is not zero, change wait to 0
+		send -= read
+		remain := Self.remainingSize(currentMaxSize, send)
+		if remain == 0 && wait {
+			// just keep the wait status
+			newWait = true
 		}
-		if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
+		// remain > 0, change wait to false. or remain == 0, wait is false, just keep it
+		if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(currentMaxSize, send, newWait)) {
 			break
 		}
 		// anther goroutine change wait status or window size
 	}
-	if wait == 1 {
+	if wait && !newWait {
 		// send window into the wait status, need notice the channel
-		//logs.Warn("send window remaining size is 0")
+		//logs.Warn("send window allow")
 		Self.allow()
 	}
 	// send window not into the wait status, so just do slide
@@ -446,18 +518,20 @@ func (Self *SendWindow) allow() {
 }
 
 func (Self *SendWindow) sent(sentSize uint32) {
-	var remaining, wait uint32
+	var maxSie, send uint32
+	var wait bool
 	for {
-		ptrs := atomic.LoadUint64(&Self.remainingWait)
-		remaining, wait = Self.unpack(ptrs)
-		if remaining >= sentSize {
-			atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
+		ptrs := atomic.LoadUint64(&Self.maxSizeDone)
+		maxSie, send, wait = Self.unpack(ptrs)
+		if (send+sentSize)&mask31 < send {
+			// overflow
+			runtime.Gosched()
+			continue
+		}
+		if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSie, send+sentSize, wait)) {
+			// set the send size
+			//logs.Warn("sent", maxSie, send+sentSize, wait)
 			break
-		} else {
-			if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
-				// just keep the wait status, it will be wait in the next loop
-				break
-			}
 		}
 	}
 }
@@ -472,12 +546,14 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err
 		return nil, 0, false, io.EOF
 		// send window buff is drain, return eof and get another one
 	}
-	var remaining uint32
+	var maxSize, send uint32
 start:
-	ptrs := atomic.LoadUint64(&Self.remainingWait)
-	remaining, _ = Self.unpack(ptrs)
-	if remaining == 0 {
-		if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
+	ptrs := atomic.LoadUint64(&Self.maxSizeDone)
+	maxSize, send, _ = Self.unpack(ptrs)
+	remain := Self.remainingSize(maxSize, send)
+	if remain == 0 {
+		if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, send, true)) {
+			// just change the status wait status
 			goto start // another goroutine change the window, try again
 		}
 		// into the wait status
@@ -490,17 +566,17 @@ start:
 		goto start
 	}
 	// there are still remaining window
-	//logs.Warn("rem", remaining)
+	//logs.Warn("rem", remain, maxSize, send)
 	if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
 		sendSize = common.MAXIMUM_SEGMENT_SIZE
 		//logs.Warn("cut buf by mss")
 	} else {
 		sendSize = uint32(len(Self.buf[Self.off:]))
 	}
-	if remaining < sendSize {
+	if remain < sendSize {
 		// usable window size is small than
 		// window MAXIMUM_SEGMENT_SIZE or send buf left
-		sendSize = remaining
+		sendSize = remain
 		//logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
 	}
 	//logs.Warn("send size", sendSize)

+ 1 - 1
lib/mux/mux.go

@@ -303,7 +303,7 @@ func (s *Mux) readSession() {
 					if connection.isClose {
 						continue
 					}
-					connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
+					connection.sendWindow.SetSize(pack.Window)
 					continue
 				case common.MUX_CONN_CLOSE: //close the connection
 					connection.closeFlag = true

+ 10 - 11
lib/mux/mux_test.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/goroutine"
-	"github.com/xtaci/kcp-go"
 	"io"
 	"log"
 	"net"
@@ -34,8 +33,8 @@ func TestNewMux(t *testing.T) {
 	//poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false))
 	time.Sleep(time.Second * 3)
 	go func() {
-		//m2 := NewMux(conn2, "tcp")
-		m2 := NewMux(conn2, "kcp")
+		m2 := NewMux(conn2, "tcp")
+		//m2 := NewMux(conn2, "kcp")
 		for {
 			//logs.Warn("npc starting accept")
 			c, err := m2.Accept()
@@ -84,8 +83,8 @@ func TestNewMux(t *testing.T) {
 	}()
 
 	go func() {
-		//m1 := NewMux(conn1, "tcp")
-		m1 := NewMux(conn1, "kcp")
+		m1 := NewMux(conn1, "tcp")
+		//m1 := NewMux(conn1, "kcp")
 		l, err := net.Listen("tcp", "127.0.0.1:7777")
 		if err != nil {
 			logs.Warn(err)
@@ -147,14 +146,14 @@ func TestNewMux(t *testing.T) {
 
 func server() {
 	var err error
-	//l, err := net.Listen("tcp", "127.0.0.1:9999")
-	l, err := kcp.Listen("127.0.0.1:9999")
+	l, err := net.Listen("tcp", "127.0.0.1:9999")
+	//l, err := kcp.Listen("127.0.0.1:9999")
 	if err != nil {
 		logs.Warn(err)
 	}
 	go func() {
 		conn1, err = l.Accept()
-		logs.Info("accept", conn1)
+		//logs.Info("accept", conn1)
 		if err != nil {
 			logs.Warn(err)
 		}
@@ -164,9 +163,9 @@ func server() {
 
 func client() {
 	var err error
-	//conn2, err = net.Dial("tcp", "127.0.0.1:9999")
-	logs.Warn("dial")
-	conn2, err = kcp.Dial("127.0.0.1:9999")
+	conn2, err = net.Dial("tcp", "127.0.0.1:9999")
+	//logs.Warn("dial")
+	//conn2, err = kcp.Dial("127.0.0.1:9999")
 	if err != nil {
 		logs.Warn(err)
 	}