فهرست منبع

change slide window bandwidth calculation

ffdfgdfg 5 سال پیش
والد
کامیت
f5d5f63366
4فایلهای تغییر یافته به همراه163 افزوده شده و 84 حذف شده
  1. 11 5
      lib/common/netpackager.go
  2. 89 71
      lib/mux/conn.go
  3. 4 0
      lib/mux/map.go
  4. 59 8
      lib/mux/mux.go

+ 11 - 5
lib/common/netpackager.go

@@ -65,8 +65,9 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) {
 
 //Unpack 会导致传入的数字类型转化成float64!!
 //主要原因是json unmarshal并未传入正确的数据类型
-func (Self *BasePackager) UnPack(reader io.Reader) (err error) {
+func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) {
 	Self.clean()
+	n += 2 // uint16
 	err = binary.Read(reader, binary.LittleEndian, &Self.Length)
 	if err != nil {
 		return
@@ -80,6 +81,7 @@ func (Self *BasePackager) UnPack(reader io.Reader) (err error) {
 	//	err = io.ErrUnexpectedEOF
 	//}
 	err = binary.Read(reader, binary.LittleEndian, Self.Content)
+	n += Self.Length
 	return
 }
 
@@ -137,12 +139,13 @@ func (Self *ConnPackager) Pack(writer io.Writer) (err error) {
 	return
 }
 
-func (Self *ConnPackager) UnPack(reader io.Reader) (err error) {
+func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
 	err = binary.Read(reader, binary.LittleEndian, &Self.ConnType)
 	if err != nil && err != io.EOF {
 		return
 	}
-	err = Self.BasePackager.UnPack(reader)
+	n, err = Self.BasePackager.UnPack(reader)
+	n += 2
 	return
 }
 
@@ -203,7 +206,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 	return
 }
 
-func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
+func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
 	err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
 	if err != nil {
 		return
@@ -216,14 +219,17 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
 	case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
 		Self.Content = WindowBuff.Get() // need get a window buf from pool
 		Self.BasePackager.clean()       // also clean the content
-		err = Self.BasePackager.UnPack(reader)
+		n, err = Self.BasePackager.UnPack(reader)
 		//logs.Warn("unpack", Self.Length, string(Self.Content))
 	case MUX_MSG_SEND_OK:
 		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
 		if err != nil {
 			return
 		}
+		n += 4 // uint32
 		err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength)
+		n += 4 // uint32
 	}
+	n += 5 //uint8 int32
 	return
 }

+ 89 - 71
lib/mux/conn.go

@@ -3,7 +3,6 @@ package mux
 import (
 	"errors"
 	"io"
-	"math"
 	"net"
 	"sync"
 	"time"
@@ -137,8 +136,8 @@ type ReceiveWindow struct {
 	readWait   bool
 	windowFull bool
 	count      int8
-	bw         *bandwidth
-	once       sync.Once
+	//bw         *bandwidth
+	once sync.Once
 	window
 }
 
@@ -146,7 +145,7 @@ func (Self *ReceiveWindow) New(mux *Mux) {
 	// initial a window for receive
 	Self.readOp = make(chan struct{})
 	Self.bufQueue.New()
-	Self.bw = new(bandwidth)
+	//Self.bw = new(bandwidth)
 	Self.element = new(ListElement)
 	Self.maxSize = 8192
 	Self.mux = mux
@@ -175,7 +174,7 @@ func (Self *ReceiveWindow) CalcSize() {
 	// calculating maximum receive window size
 	if Self.count == 0 {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
-		n := uint32(2 * Self.mux.latency * Self.bw.Get())
+		n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
 		if n < 8192 {
 			n = 8192
 		}
@@ -183,13 +182,16 @@ func (Self *ReceiveWindow) CalcSize() {
 			n = Self.bufQueue.Len()
 		}
 		// set the minimal size
+		if n > 2*Self.maxSize {
+			n = 2 * Self.maxSize
+		}
 		if n > common.MAXIMUM_WINDOW_SIZE {
 			n = common.MAXIMUM_WINDOW_SIZE
 		}
 		// set the maximum size
 		//logs.Warn("n", n)
 		Self.maxSize = n
-		Self.count = -5
+		Self.count = -10
 	}
 	Self.count += 1
 }
@@ -225,7 +227,7 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
 	l := 0
 	//logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
 copyData:
-	Self.bw.StartRead()
+	//Self.bw.StartRead()
 	if Self.off == uint32(Self.element.l) {
 		// on the first Read method invoked, Self.off and Self.element.l
 		// both zero value
@@ -241,7 +243,7 @@ copyData:
 		//logs.Warn("pop element", Self.element.l, Self.element.part)
 	}
 	l = copy(p[pOff:], Self.element.buf[Self.off:])
-	Self.bw.SetCopySize(l)
+	//Self.bw.SetCopySize(l)
 	pOff += l
 	Self.off += uint32(l)
 	Self.bufQueue.mutex.Lock()
@@ -250,7 +252,7 @@ copyData:
 	Self.bufQueue.mutex.Unlock()
 	n += l
 	l = 0
-	Self.bw.EndRead()
+	//Self.bw.EndRead()
 	Self.sendStatus(id)
 	if Self.off == uint32(Self.element.l) {
 		//logs.Warn("put the element end ", string(Self.element.buf[:15]))
@@ -469,65 +471,81 @@ func (Self *SendWindow) SetTimeOut(t time.Time) {
 	Self.timeout = t
 }
 
-type bandwidth struct {
-	lastReadStart time.Time
-	readStart     time.Time
-	readEnd       time.Time
-	bufLength     int
-	lastBufLength int
-	count         int8
-	readBW        float64
-	writeBW       float64
-}
-
-func (Self *bandwidth) StartRead() {
-	Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
-}
-
-func (Self *bandwidth) EndRead() {
-	if !Self.lastReadStart.IsZero() {
-		if Self.count == 0 {
-			Self.calcWriteBandwidth()
-		}
-	}
-	Self.readEnd = time.Now()
-	if Self.count == 0 {
-		Self.calcReadBandwidth()
-		Self.count = -3
-	}
-	Self.count += 1
-}
-
-func (Self *bandwidth) SetCopySize(n int) {
-	// must be invoke between StartRead and EndRead
-	Self.lastBufLength, Self.bufLength = Self.bufLength, n
-}
-
-func (Self *bandwidth) calcReadBandwidth() {
-	// Bandwidth between nps and npc
-	readTime := Self.readEnd.Sub(Self.readStart)
-	Self.readBW = float64(Self.bufLength) / readTime.Seconds()
-	//logs.Warn("calc read bw", Self.bufLength, readTime.Seconds())
-}
-
-func (Self *bandwidth) calcWriteBandwidth() {
-	// Bandwidth between nps and user, npc and application
-	//logs.Warn("calc write bw")
-	writeTime := Self.readEnd.Sub(Self.lastReadStart)
-	Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
-}
-
-func (Self *bandwidth) Get() (bw float64) {
-	// The zero value, 0 for numeric types
-	if Self.writeBW == 0 && Self.readBW == 0 {
-		//logs.Warn("bw both 0")
-		return 100
-	}
-	if Self.writeBW == 0 && Self.readBW != 0 {
-		return Self.readBW
-	}
-	if Self.readBW == 0 && Self.writeBW != 0 {
-		return Self.writeBW
-	}
-	return math.Min(Self.readBW, Self.writeBW)
-}
+//type bandwidth struct {
+//	readStart     time.Time
+//	lastReadStart time.Time
+//	readEnd       time.Time
+//	lastReadEnd time.Time
+//	bufLength     int
+//	lastBufLength int
+//	count         int8
+//	readBW        float64
+//	writeBW       float64
+//	readBandwidth float64
+//}
+//
+//func (Self *bandwidth) StartRead() {
+//	Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
+//	if !Self.lastReadStart.IsZero() {
+//		if Self.count == -5 {
+//			Self.calcBandWidth()
+//		}
+//	}
+//}
+//
+//func (Self *bandwidth) EndRead() {
+//	Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
+//	if Self.count == -5 {
+//		Self.calcWriteBandwidth()
+//	}
+//	if Self.count == 0 {
+//		Self.calcReadBandwidth()
+//		Self.count = -6
+//	}
+//	Self.count += 1
+//}
+//
+//func (Self *bandwidth) SetCopySize(n int) {
+//	// must be invoke between StartRead and EndRead
+//	Self.lastBufLength, Self.bufLength = Self.bufLength, n
+//}
+//// calculating
+//// start end start end
+////     read     read
+////        write
+//
+//func (Self *bandwidth) calcBandWidth()  {
+//	t := Self.readStart.Sub(Self.lastReadStart)
+//	if Self.lastBufLength >= 32768 {
+//		Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
+//	}
+//}
+//
+//func (Self *bandwidth) calcReadBandwidth() {
+//	// Bandwidth between nps and npc
+//	readTime := Self.readEnd.Sub(Self.readStart)
+//	Self.readBW = float64(Self.bufLength) / readTime.Seconds()
+//	//logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
+//}
+//
+//func (Self *bandwidth) calcWriteBandwidth() {
+//	// Bandwidth between nps and user, npc and application
+//	writeTime := Self.readStart.Sub(Self.lastReadEnd)
+//	Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
+//	//logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
+//}
+//
+//func (Self *bandwidth) Get() (bw float64) {
+//	// The zero value, 0 for numeric types
+//	if Self.writeBW == 0 && Self.readBW == 0 {
+//		//logs.Warn("bw both 0")
+//		return 100
+//	}
+//	if Self.writeBW == 0 && Self.readBW != 0 {
+//		return Self.readBW
+//	}
+//	if Self.readBW == 0 && Self.writeBW != 0 {
+//		return Self.writeBW
+//	}
+//	return Self.readBandwidth
+//}

+ 4 - 0
lib/mux/map.go

@@ -20,6 +20,10 @@ func NewConnMap() *connMap {
 	return connMap
 }
 
+func (s *connMap) Size() (n int) {
+	return len(s.connMap)
+}
+
 func (s *connMap) Get(id int32) (*conn, bool) {
 	s.Lock()
 	defer s.Unlock()

+ 59 - 8
lib/mux/mux.go

@@ -24,6 +24,7 @@ type Mux struct {
 	IsClose    bool
 	pingOk     int
 	latency    float64
+	bw         *bandwidth
 	pingCh     chan []byte
 	pingTimer  *time.Timer
 	connType   string
@@ -37,8 +38,9 @@ func NewMux(c net.Conn, connType string) *Mux {
 		conn:      c,
 		connMap:   NewConnMap(),
 		id:        0,
-		closeChan: make(chan struct{}),
+		closeChan: make(chan struct{}, 3),
 		newConnCh: make(chan *conn),
+		bw:        new(bandwidth),
 		IsClose:   false,
 		connType:  connType,
 		bufCh:     make(chan *bytes.Buffer),
@@ -91,6 +93,9 @@ func (s *Mux) Addr() net.Addr {
 }
 
 func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
+	if s.IsClose {
+		return
+	}
 	var err error
 	pack := common.MuxPack.Get()
 	err = pack.NewPac(flag, id, data...)
@@ -160,6 +165,9 @@ func (s *Mux) ping() {
 		for {
 			if s.IsClose {
 				ticker.Stop()
+				if !s.pingTimer.Stop() {
+					<-s.pingTimer.C
+				}
 				break
 			}
 			select {
@@ -189,6 +197,9 @@ func (s *Mux) pingReturn() {
 		var now time.Time
 		var data []byte
 		for {
+			if s.IsClose {
+				break
+			}
 			select {
 			case data = <-s.pingCh:
 			case <-s.closeChan:
@@ -199,12 +210,12 @@ func (s *Mux) pingReturn() {
 				break
 			}
 			_ = now.UnmarshalText(data)
-			s.latency = time.Now().UTC().Sub(now).Seconds() / 2
-			logs.Warn("latency", s.latency)
-			common.WindowBuff.Put(data)
-			if s.latency <= 0 {
-				logs.Warn("latency err", s.latency)
+			latency := time.Now().UTC().Sub(now).Seconds() / 2
+			if latency < 0.5 && latency > 0 {
+				s.latency = latency
 			}
+			//logs.Warn("latency", s.latency)
+			common.WindowBuff.Put(data)
 		}
 	}()
 }
@@ -212,14 +223,18 @@ func (s *Mux) pingReturn() {
 func (s *Mux) readSession() {
 	go func() {
 		pack := common.MuxPack.Get()
+		var l uint16
+		var err error
 		for {
 			if s.IsClose {
 				break
 			}
 			pack = common.MuxPack.Get()
-			if pack.UnPack(s.conn) != nil {
+			s.bw.StartRead()
+			if l, err = pack.UnPack(s.conn); err != nil {
 				break
 			}
+			s.bw.SetCopySize(l)
 			s.pingOk = 0
 			switch pack.Flag {
 			case common.MUX_NEW_CONN: //new connection
@@ -239,7 +254,7 @@ func (s *Mux) readSession() {
 			if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
 				switch pack.Flag {
 				case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
-					err := s.newMsg(connection, pack)
+					err = s.newMsg(connection, pack)
 					if err != nil {
 						connection.Close()
 					}
@@ -299,6 +314,7 @@ func (s *Mux) Close() error {
 	s.connMap.Close()
 	s.closeChan <- struct{}{}
 	s.closeChan <- struct{}{}
+	s.closeChan <- struct{}{}
 	close(s.newConnCh)
 	return s.conn.Close()
 }
@@ -311,3 +327,38 @@ func (s *Mux) getId() (id int32) {
 	}
 	return
 }
+
+type bandwidth struct {
+	readStart     time.Time
+	lastReadStart time.Time
+	bufLength     uint16
+	readBandwidth float64
+}
+
+func (Self *bandwidth) StartRead() {
+	if Self.readStart.IsZero() {
+		Self.readStart = time.Now()
+	}
+	if Self.bufLength >= 16384 {
+		Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
+		Self.calcBandWidth()
+	}
+}
+
+func (Self *bandwidth) SetCopySize(n uint16) {
+	Self.bufLength += n
+}
+
+func (Self *bandwidth) calcBandWidth() {
+	t := Self.readStart.Sub(Self.lastReadStart)
+	Self.readBandwidth = float64(Self.bufLength) / t.Seconds()
+	Self.bufLength = 0
+}
+
+func (Self *bandwidth) Get() (bw float64) {
+	// The zero value, 0 for numeric types
+	if Self.readBandwidth <= 0 {
+		Self.readBandwidth = 100
+	}
+	return Self.readBandwidth
+}