Przeglądaj źródła

change window calculation

ffdfgdfg 5 lat temu
rodzic
commit
f5fce6d1f4

+ 2 - 2
lib/common/const.go

@@ -49,6 +49,6 @@ const (
 	MUX_PING_RETURN
 	MUX_PING             int32 = -1
 	MAXIMUM_SEGMENT_SIZE       = PoolSizeWindow
-	MAXIMUM_WINDOW_SIZE        = 1 << 25 // 1<<31-1 TCP slide window size is very large,
-	// we use 32M, reduce memory usage
+	MAXIMUM_WINDOW_SIZE        = 1 << 27 // 1<<31-1 TCP slide window size is very large,
+	// we use 128M, reduce memory usage
 )

+ 16 - 85
lib/mux/conn.go

@@ -202,6 +202,7 @@ type ReceiveWindow struct {
 	bufQueue ReceiveWindowQueue
 	element  *common.ListElement
 	count    int8
+	bw       *bandwidth
 	once     sync.Once
 	// receive window send the current max size and read size to send window
 	// means done size actually store the size receive window has read
@@ -211,9 +212,10 @@ func (Self *ReceiveWindow) New(mux *Mux) {
 	// initial a window for receive
 	Self.bufQueue.New()
 	Self.element = common.ListElementPool.Get()
-	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false)
+	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
 	Self.mux = mux
 	Self.window.New()
+	Self.bw = NewBandwidth(nil)
 }
 
 func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) {
@@ -232,10 +234,11 @@ func (Self *ReceiveWindow) calcSize() {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
 		conns := Self.mux.connMap.Size()
 		n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
-			Self.mux.bw.Get() / float64(conns))
+			(Self.mux.bw.Get() + Self.bw.Get()))
 		//logs.Warn(n)
-		if n < common.MAXIMUM_SEGMENT_SIZE*10 {
-			n = common.MAXIMUM_SEGMENT_SIZE * 10
+		if n < common.MAXIMUM_SEGMENT_SIZE*30 {
+			//logs.Warn("window small", n, Self.mux.bw.Get(), Self.bw.Get())
+			n = common.MAXIMUM_SEGMENT_SIZE * 30
 		}
 		for {
 			ptrs := atomic.LoadUint64(&Self.maxSizeDone)
@@ -313,6 +316,13 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
 	if Self.closeOp {
 		return 0, io.EOF // receive close signal, returns eof
 	}
+	Self.bw.StartRead()
+	n, err = Self.readFromQueue(p, id)
+	Self.bw.SetCopySize(uint16(n))
+	return
+}
+
+func (Self *ReceiveWindow) readFromQueue(p []byte, id int32) (n int, err error) {
 	pOff := 0
 	l := 0
 	//logs.Warn("receive window read off, element.l", Self.off, Self.element.L)
@@ -363,7 +373,7 @@ func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
 		if read <= (read+uint32(l))&mask31 {
 			read += uint32(l)
 			remain := Self.remainingSize(maxSize, 0)
-			if wait && remain > 0 || remain == maxSize {
+			if wait && remain > 0 || read >= maxSize/2 || remain == maxSize {
 				if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) {
 					// now we get the current window status success
 					// receive window free up some space we need acknowledge send window, also reset the read size
@@ -441,7 +451,7 @@ type SendWindow struct {
 
 func (Self *SendWindow) New(mux *Mux) {
 	Self.setSizeCh = make(chan struct{})
-	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false)
+	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
 	Self.mux = mux
 	Self.window.New()
 }
@@ -654,82 +664,3 @@ func (Self *SendWindow) SetTimeOut(t time.Time) {
 	// waiting for receive a receive window size
 	Self.timeout = t
 }
-
-//type bandwidth struct {
-//	readStart     time.Time
-//	lastReadStart time.Time
-//	readEnd       time.Time
-//	lastReadEnd time.Time
-//	bufLength     int
-//	lastBufLength int
-//	count         int8
-//	readBW        float64
-//	writeBW       float64
-//	readBandwidth float64
-//}
-//
-//func (Self *bandwidth) StartRead() {
-//	Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
-//	if !Self.lastReadStart.IsZero() {
-//		if Self.count == -5 {
-//			Self.calcBandWidth()
-//		}
-//	}
-//}
-//
-//func (Self *bandwidth) EndRead() {
-//	Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
-//	if Self.count == -5 {
-//		Self.calcWriteBandwidth()
-//	}
-//	if Self.count == 0 {
-//		Self.calcReadBandwidth()
-//		Self.count = -6
-//	}
-//	Self.count += 1
-//}
-//
-//func (Self *bandwidth) SetCopySize(n int) {
-//	// must be invoke between StartRead and EndRead
-//	Self.lastBufLength, Self.bufLength = Self.bufLength, n
-//}
-//// calculating
-//// start end start end
-////     read     read
-////        write
-//
-//func (Self *bandwidth) calcBandWidth()  {
-//	t := Self.readStart.Sub(Self.lastReadStart)
-//	if Self.lastBufLength >= 32768 {
-//		Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
-//	}
-//}
-//
-//func (Self *bandwidth) calcReadBandwidth() {
-//	// Bandwidth between nps and npc
-//	readTime := Self.readEnd.Sub(Self.readStart)
-//	Self.readBW = float64(Self.bufLength) / readTime.Seconds()
-//	//logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
-//}
-//
-//func (Self *bandwidth) calcWriteBandwidth() {
-//	// Bandwidth between nps and user, npc and application
-//	writeTime := Self.readStart.Sub(Self.lastReadEnd)
-//	Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
-//	//logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
-//}
-//
-//func (Self *bandwidth) Get() (bw float64) {
-//	// The zero value, 0 for numeric types
-//	if Self.writeBW == 0 && Self.readBW == 0 {
-//		//logs.Warn("bw both 0")
-//		return 100
-//	}
-//	if Self.writeBW == 0 && Self.readBW != 0 {
-//		return Self.readBW
-//	}
-//	if Self.readBW == 0 && Self.writeBW != 0 {
-//		return Self.writeBW
-//	}
-//	return Self.readBandwidth
-//}

+ 1 - 1
lib/mux/mux.go

@@ -220,7 +220,7 @@ func (s *Mux) pingReturn() {
 			case data = <-s.pingCh:
 				atomic.StoreUint32(&s.pingCheckTime, 0)
 			case <-s.closeChan:
-				break
+				return
 			}
 			_ = now.UnmarshalText(data)
 			latency := time.Now().UTC().Sub(now).Seconds() / 2

+ 1 - 1
lib/mux/sysGetsock_nowindows.go

@@ -14,7 +14,7 @@ func sysGetSock(fd *os.File) (bufferSize int, err error) {
 	if fd != nil {
 		return syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
 	} else {
-		return 1400 * 320, nil
+		return 5 * 1024 * 1024, nil
 	}
 }
 

+ 1 - 1
lib/mux/sysGetsock_windows.go

@@ -14,7 +14,7 @@ func sysGetSock(fd *os.File) (bufferSize int, err error) {
 	// not support, WTF???
 	// Todo
 	// return syscall.GetsockoptInt((syscall.Handle)(unsafe.Pointer(fd.Fd())), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
-	bufferSize = 10 * 1024 * 1024
+	bufferSize = 5 * 1024 * 1024
 	return
 }