Przeglądaj źródła

change some bandwidth calculation

ffdfgdfg 5 lat temu
rodzic
commit
405f41f87f
3 zmienionych plików z 31 dodań i 25 usunięć
  1. 1 1
      lib/common/pool.go
  2. 13 9
      lib/mux/conn.go
  3. 17 15
      lib/mux/mux.go

+ 1 - 1
lib/common/pool.go

@@ -10,7 +10,7 @@ const PoolSizeSmall = 100
 const PoolSizeUdp = 1472
 const PoolSizeCopy = 32 << 10
 const PoolSizeBuffer = 4096
-const PoolSizeWindow = PoolSizeBuffer - 16 - 32 - 32 - 8
+const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1
 
 var BufPool = sync.Pool{
 	New: func() interface{} {

+ 13 - 9
lib/mux/conn.go

@@ -190,7 +190,7 @@ func (Self *ReceiveWindow) New(mux *Mux) {
 	// initial a window for receive
 	Self.bufQueue.New()
 	Self.element = common.ListElementPool.Get()
-	Self.maxSize = 4096
+	Self.maxSize = common.MAXIMUM_SEGMENT_SIZE
 	Self.mux = mux
 	Self.window.New()
 }
@@ -209,21 +209,25 @@ func (Self *ReceiveWindow) calcSize() {
 	// calculating maximum receive window size
 	if Self.count == 0 {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
-		n := uint32(2 * math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
-			Self.mux.bw.Get() / float64(Self.mux.connMap.Size()))
-		if n < 8192 {
-			n = 8192
+		conns := Self.mux.connMap.Size()
+		n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
+			Self.mux.bw.Get() / float64(conns))
+		if n < common.MAXIMUM_SEGMENT_SIZE*2 {
+			n = common.MAXIMUM_SEGMENT_SIZE * 2
 		}
 		bufLen := Self.bufQueue.Len()
 		if n < bufLen {
 			n = bufLen
 		}
+		if n < Self.maxSize/2 {
+			n = Self.maxSize / 2
+		}
 		// set the minimal size
 		if n > 2*Self.maxSize {
 			n = 2 * Self.maxSize
 		}
-		if n > common.MAXIMUM_WINDOW_SIZE {
-			n = common.MAXIMUM_WINDOW_SIZE
+		if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
+			n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
 		}
 		// set the maximum size
 		//logs.Warn("n", n)
@@ -377,8 +381,8 @@ type SendWindow struct {
 
 func (Self *SendWindow) New(mux *Mux) {
 	Self.setSizeCh = make(chan struct{})
-	Self.maxSize = 4096
-	atomic.AddUint64(&Self.remainingWait, uint64(4096)<<dequeueBits)
+	Self.maxSize = common.MAXIMUM_SEGMENT_SIZE
+	atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE)<<dequeueBits)
 	Self.mux = mux
 	Self.window.New()
 }

+ 17 - 15
lib/mux/mux.go

@@ -115,34 +115,35 @@ func (s *Mux) writeSession() {
 }
 
 func (s *Mux) packBuf() {
-	buffer := common.BuffPool.Get()
+	//buffer := common.BuffPool.Get()
 	for {
 		if s.IsClose {
 			break
 		}
-		buffer.Reset()
+		//buffer.Reset()
 		pack := s.writeQueue.Pop()
 		if s.IsClose {
 			break
 		}
 		//buffer := common.BuffPool.Get()
-		err := pack.Pack(buffer)
+		err := pack.Pack(s.conn)
 		common.MuxPack.Put(pack)
 		if err != nil {
 			logs.Error("mux: pack err", err)
-			common.BuffPool.Put(buffer)
+			//common.BuffPool.Put(buffer)
+			s.Close()
 			break
 		}
 		//logs.Warn(buffer.String())
 		//s.bufQueue.Push(buffer)
-		l := buffer.Len()
-		n, err := buffer.WriteTo(s.conn)
+		//l := buffer.Len()
+		//n, err := buffer.WriteTo(s.conn)
 		//common.BuffPool.Put(buffer)
-		if err != nil || int(n) != l {
-			logs.Error("mux: close from write session fail ", err, n, l)
-			s.Close()
-			break
-		}
+		//if err != nil || int(n) != l {
+		//	logs.Error("mux: close from write session fail ", err, n, l)
+		//	s.Close()
+		//	break
+		//}
 	}
 }
 
@@ -385,24 +386,24 @@ func (s *Mux) getId() (id int32) {
 }
 
 type bandwidth struct {
+	readBandwidth uint64 // store in bits, but it's float64
 	readStart     time.Time
 	lastReadStart time.Time
-	bufLength     uint16
-	readBandwidth uint64 // store in bits, but it's float64
+	bufLength     uint32
 }
 
 func (Self *bandwidth) StartRead() {
 	if Self.readStart.IsZero() {
 		Self.readStart = time.Now()
 	}
-	if Self.bufLength >= 16384 {
+	if Self.bufLength >= 3072000 {
 		Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
 		Self.calcBandWidth()
 	}
 }
 
 func (Self *bandwidth) SetCopySize(n uint16) {
-	Self.bufLength += n
+	Self.bufLength += uint32(n)
 }
 
 func (Self *bandwidth) calcBandWidth() {
@@ -417,6 +418,7 @@ func (Self *bandwidth) Get() (bw float64) {
 	if bw <= 0 {
 		bw = 100
 	}
+	//logs.Warn(bw)
 	return
 }