Преглед изворни кода

change window calculation, bandwidth calculation

ffdfgdfg пре 5 година
родитељ
комит
6bbe276b18
2 измењених фајлова са 65 додато и 9 уклоњено
  1. 64 8
      lib/mux/conn.go
  2. 1 1
      lib/mux/mux.go

+ 64 - 8
lib/mux/conn.go

@@ -202,7 +202,7 @@ type ReceiveWindow struct {
 	bufQueue ReceiveWindowQueue
 	element  *common.ListElement
 	count    int8
-	bw       *bandwidth
+	bw       *writeBandwidth
 	once     sync.Once
 	// receive window send the current max size and read size to send window
 	// means done size actually store the size receive window has read
@@ -215,7 +215,7 @@ func (Self *ReceiveWindow) New(mux *Mux) {
 	Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false)
 	Self.mux = mux
 	Self.window.New()
-	Self.bw = NewBandwidth(nil)
+	Self.bw = NewWriteBandwidth()
 }
 
 func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) {
@@ -232,9 +232,15 @@ func (Self *ReceiveWindow) calcSize() {
 	// calculating maximum receive window size
 	if Self.count == 0 {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
-		conns := Self.mux.connMap.Size()
-		n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
-			(Self.mux.bw.Get() + Self.bw.Get()))
+		//conns := Self.mux.connMap.Size()
+		muxBw := Self.mux.bw.Get()
+		connBw := Self.bw.Get()
+		//logs.Warn("muxbw connbw", muxBw, connBw)
+		var n uint32
+		if connBw > 0 && muxBw > 0 {
+			n = uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
+				(muxBw + connBw))
+		}
 		//logs.Warn(n)
 		if n < common.MAXIMUM_SEGMENT_SIZE*30 {
 			//logs.Warn("window small", n, Self.mux.bw.Get(), Self.bw.Get())
@@ -252,9 +258,12 @@ func (Self *ReceiveWindow) calcSize() {
 				n = 2 * size
 				// twice grow
 			}
-			if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
-				logs.Warn("window too large, calculated:", n, "limit:", common.MAXIMUM_WINDOW_SIZE/uint32(conns))
-				n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
+			if connBw > 0 && muxBw > 0 {
+				limit := uint32(common.MAXIMUM_WINDOW_SIZE * (connBw / (muxBw + connBw)))
+				if n > limit {
+					logs.Warn("window too large, calculated:", n, "limit:", limit, connBw, muxBw)
+					n = limit
+				}
 			}
 			// set the maximum size
 			//logs.Warn("n", n)
@@ -664,3 +673,50 @@ func (Self *SendWindow) SetTimeOut(t time.Time) {
 	// waiting for receive a receive window size
 	Self.timeout = t
 }
+
+type writeBandwidth struct {
+	writeBW   uint64 // store in bits, but it's float64
+	readEnd   time.Time
+	duration  float64
+	bufLength uint32
+}
+
+const writeCalcThreshold uint32 = 5 * 1024 * 1024
+
+func NewWriteBandwidth() *writeBandwidth {
+	return &writeBandwidth{}
+}
+
+func (Self *writeBandwidth) StartRead() {
+	if Self.readEnd.IsZero() {
+		Self.readEnd = time.Now()
+	}
+	Self.duration += time.Now().Sub(Self.readEnd).Seconds()
+	if Self.bufLength >= writeCalcThreshold {
+		Self.calcBandWidth()
+	}
+}
+
+func (Self *writeBandwidth) SetCopySize(n uint16) {
+	Self.bufLength += uint32(n)
+	Self.endRead()
+}
+
+func (Self *writeBandwidth) endRead() {
+	Self.readEnd = time.Now()
+}
+
+func (Self *writeBandwidth) calcBandWidth() {
+	atomic.StoreUint64(&Self.writeBW, math.Float64bits(float64(Self.bufLength)/Self.duration))
+	Self.bufLength = 0
+	Self.duration = 0
+}
+
+func (Self *writeBandwidth) Get() (bw float64) {
+	// The zero value, 0 for numeric types
+	bw = math.Float64frombits(atomic.LoadUint64(&Self.writeBW))
+	if bw <= 0 {
+		bw = 0
+	}
+	return
+}

+ 1 - 1
lib/mux/mux.go

@@ -443,7 +443,7 @@ func (Self *bandwidth) Get() (bw float64) {
 	// The zero value, 0 for numeric types
 	bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth))
 	if bw <= 0 {
-		bw = 100
+		bw = 0
 	}
 	//logs.Warn(bw)
 	return