Browse Source

change ping calculate, fix window size calculate

ffdfgdfg 5 years ago
parent
commit
d9f9dc6acb
3 changed files with 26 additions and 23 deletions
  1. 10 10
      lib/common/netpackager.go
  2. 7 8
      lib/mux/conn.go
  3. 9 5
      lib/mux/mux.go

+ 10 - 10
lib/common/netpackager.go

@@ -157,11 +157,11 @@ type MuxPackager struct {
 func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) {
 	Self.Flag = flag
 	Self.Id = id
-	if flag == MUX_NEW_MSG || flag == MUX_NEW_MSG_PART || flag == MUX_PING_FLAG {
+	switch flag {
+	case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
 		err = Self.BasePackager.NewPac(content...)
-	}
-	if flag == MUX_MSG_SEND_OK {
-		// MUX_MSG_SEND_OK only allows one data
+	case MUX_MSG_SEND_OK:
+		// MUX_MSG_SEND_OK contains two data
 		switch content[0].(type) {
 		case int:
 			Self.Window = uint32(content[0].(int))
@@ -187,10 +187,10 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 	if err != nil {
 		return
 	}
-	if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG {
+	switch Self.Flag {
+	case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
 		err = Self.BasePackager.Pack(writer)
-	}
-	if Self.Flag == MUX_MSG_SEND_OK {
+	case MUX_MSG_SEND_OK:
 		err = binary.Write(writer, binary.LittleEndian, Self.Window)
 		if err != nil {
 			return
@@ -210,10 +210,10 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
 	if err != nil {
 		return
 	}
-	if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG {
+	switch Self.Flag {
+	case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
 		err = Self.BasePackager.UnPack(reader)
-	}
-	if Self.Flag == MUX_MSG_SEND_OK {
+	case MUX_MSG_SEND_OK:
 		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
 		if err != nil {
 			return

+ 7 - 8
lib/mux/conn.go

@@ -2,7 +2,6 @@ package mux
 
 import (
 	"errors"
-	"github.com/astaxie/beego/logs"
 	"io"
 	"math"
 	"net"
@@ -169,14 +168,13 @@ func (Self *ReceiveWindow) ReadSize() (n uint32) {
 	n = Self.readLength
 	Self.readLength = 0
 	Self.bufQueue.mutex.Unlock()
-	Self.count += 1
 	return
 }
 
 func (Self *ReceiveWindow) CalcSize() {
 	// calculating maximum receive window size
 	if Self.count == 0 {
-		logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
+		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
 		n := uint32(2 * Self.mux.latency * Self.bw.Get())
 		if n < 8192 {
 			n = 8192
@@ -185,10 +183,11 @@ func (Self *ReceiveWindow) CalcSize() {
 			n = Self.bufQueue.Len()
 		}
 		// set the minimal size
-		logs.Warn("n", n)
+		//logs.Warn("n", n)
 		Self.maxSize = n
 		Self.count = -5
 	}
+	Self.count += 1
 }
 
 func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
@@ -205,7 +204,7 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err
 	//logs.Warn("read session calc size ", Self.maxSize)
 	// calculating the receive window size
 	Self.CalcSize()
-	logs.Warn("read session calc size finish", Self.maxSize)
+	//logs.Warn("read session calc size finish", Self.maxSize)
 	if Self.RemainingSize() == 0 {
 		Self.windowFull = true
 		//logs.Warn("window full true", Self.windowFull)
@@ -325,10 +324,10 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
 		return true
 	}
 	if readLength == 0 && Self.maxSize == windowSize {
-		logs.Warn("waiting for another window size")
+		//logs.Warn("waiting for another window size")
 		return false // waiting for receive another usable window size
 	}
-	logs.Warn("set send window size to ", windowSize, readLength)
+	//logs.Warn("set send window size to ", windowSize, readLength)
 	Self.mutex.Lock()
 	Self.slide(windowSize, readLength)
 	if Self.setSizeWait {
@@ -513,7 +512,7 @@ func (Self *bandwidth) calcWriteBandwidth() {
 func (Self *bandwidth) Get() (bw float64) {
 	// The zero value, 0 for numeric types
 	if Self.writeBW == 0 && Self.readBW == 0 {
-		logs.Warn("bw both 0")
+		//logs.Warn("bw both 0")
 		return 100
 	}
 	if Self.writeBW == 0 && Self.readBW != 0 {

+ 9 - 5
lib/mux/mux.go

@@ -150,7 +150,7 @@ func (s *Mux) writeBuf() {
 
 func (s *Mux) ping() {
 	go func() {
-		now, _ := time.Now().MarshalText()
+		now, _ := time.Now().UTC().MarshalText()
 		s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
 		// send the ping flag and get the latency first
 		ticker := time.NewTicker(time.Second * 15)
@@ -166,7 +166,7 @@ func (s *Mux) ping() {
 			if (math.MaxInt32 - s.id) < 10000 {
 				s.id = 0
 			}
-			now, _ := time.Now().MarshalText()
+			now, _ := time.Now().UTC().MarshalText()
 			s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
 			if s.pingOk > 10 && s.connType == "kcp" {
 				s.Close()
@@ -188,8 +188,11 @@ func (s *Mux) pingReturn() {
 				break
 			}
 			_ = now.UnmarshalText(data)
-			s.latency = time.Since(now).Seconds()
-			s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil)
+			s.latency = time.Now().UTC().Sub(now).Seconds() / 2
+			//logs.Warn("latency", s.latency)
+			if s.latency <= 0 {
+				logs.Warn("latency err", s.latency)
+			}
 		}
 	}()
 }
@@ -214,9 +217,10 @@ func (s *Mux) readSession() {
 				s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
 				continue
 			case common.MUX_PING_FLAG: //ping
-				s.pingCh <- pack.Content
+				s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
 				continue
 			case common.MUX_PING_RETURN:
+				s.pingCh <- pack.Content
 				continue
 			}
 			if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {