Procházet zdrojové kódy

fine bandwidth calculation, fix slide window calculated size too large, fix #330

ffdfgdfg před 5 roky
rodič
revize
076fc8b2e1
4 změnil soubory, kde provedl 70 přidání a 10 odebrání
  1. 9 0
      lib/common/netpackager.go
  2. 6 3
      lib/common/pool.go
  3. 6 4
      lib/mux/conn.go
  4. 49 3
      lib/mux/mux.go

+ 9 - 0
lib/common/netpackager.go

@@ -246,6 +246,15 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
 	return
 }
 
+func (Self *MuxPackager) reset() {
+	Self.Id = 0
+	Self.Flag = 0
+	Self.Length = 0
+	Self.Content = nil
+	Self.ReadLength = 0
+	Self.Window = 0
+}
+
 const (
 	ipV4       = 1
 	domainName = 3

+ 6 - 3
lib/common/pool.go

@@ -93,18 +93,20 @@ type windowBufferPool struct {
 func (Self *windowBufferPool) New() {
 	Self.pool = sync.Pool{
 		New: func() interface{} {
-			return make([]byte, PoolSizeWindow, PoolSizeWindow)
+			return make([]byte, PoolSizeWindow)
 		},
 	}
 }
 
 func (Self *windowBufferPool) Get() (buf []byte) {
 	buf = Self.pool.Get().([]byte)
-	return buf[:PoolSizeWindow]
+	buf = buf[:PoolSizeWindow]
+	return buf
 }
 
 func (Self *windowBufferPool) Put(x []byte) {
-	Self.pool.Put(x[:PoolSizeWindow]) // make buf to full
+	x = x[:0] // clean buf
+	Self.pool.Put(x)
 }
 
 type bufferPool struct {
@@ -146,6 +148,7 @@ func (Self *muxPackagerPool) Get() *MuxPackager {
 }
 
 func (Self *muxPackagerPool) Put(pack *MuxPackager) {
+	pack.reset()
 	Self.pool.Put(pack)
 }
 

+ 6 - 4
lib/mux/conn.go

@@ -2,6 +2,7 @@ package mux
 
 import (
 	"errors"
+	"github.com/astaxie/beego/logs"
 	"io"
 	"math"
 	"net"
@@ -215,10 +216,10 @@ func (Self *ReceiveWindow) calcSize() {
 		if n < common.MAXIMUM_SEGMENT_SIZE*10 {
 			n = common.MAXIMUM_SEGMENT_SIZE * 10
 		}
-		bufLen := Self.bufQueue.Len()
-		if n < bufLen {
-			n = bufLen
-		}
+		//bufLen := Self.bufQueue.Len()
+		//if n < bufLen {
+		//	n = bufLen
+		//}
 		if n < Self.maxSize/2 {
 			n = Self.maxSize / 2
 		}
@@ -227,6 +228,7 @@ func (Self *ReceiveWindow) calcSize() {
 			n = 2 * Self.maxSize
 		}
 		if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
+			logs.Warn("window too large", n)
 			n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
 		}
 		// set the maximum size

+ 49 - 3
lib/mux/mux.go

@@ -5,7 +5,9 @@ import (
 	"io"
 	"math"
 	"net"
+	"os"
 	"sync/atomic"
+	"syscall"
 	"time"
 
 	"github.com/astaxie/beego/logs"
@@ -35,13 +37,17 @@ func NewMux(c net.Conn, connType string) *Mux {
 	//c.(*net.TCPConn).SetReadBuffer(0)
 	//c.(*net.TCPConn).SetWriteBuffer(0)
 	_ = c.SetDeadline(time.Time{})
+	fd, err := getConnFd(c)
+	if err != nil {
+		logs.Warn(err)
+	}
 	m := &Mux{
 		conn:      c,
 		connMap:   NewConnMap(),
 		id:        0,
 		closeChan: make(chan struct{}, 1),
 		newConnCh: make(chan *conn),
-		bw:        new(bandwidth),
+		bw:        NewBandwidth(fd),
 		IsClose:   false,
 		connType:  connType,
 		pingCh:    make(chan []byte),
@@ -58,6 +64,26 @@ func NewMux(c net.Conn, connType string) *Mux {
 	return m
 }
 
+func getConnFd(c net.Conn) (fd *os.File, err error) {
+	switch c.(type) {
+	case *net.TCPConn:
+		fd, err = c.(*net.TCPConn).File()
+		if err != nil {
+			return
+		}
+		return
+	case *net.UDPConn:
+		fd, err = c.(*net.UDPConn).File()
+		if err != nil {
+			return
+		}
+		return
+	default:
+		err = errors.New("mux:unknown conn type, only tcp or kcp")
+		return
+	}
+}
+
 func (s *Mux) NewConn() (*conn, error) {
 	if s.IsClose {
 		return nil, errors.New("the mux has closed")
@@ -392,13 +418,19 @@ type bandwidth struct {
 	readStart     time.Time
 	lastReadStart time.Time
 	bufLength     uint32
+	fd            *os.File
+	calcThreshold uint32
+}
+
+func NewBandwidth(fd *os.File) *bandwidth {
+	return &bandwidth{fd: fd}
 }
 
 func (Self *bandwidth) StartRead() {
 	if Self.readStart.IsZero() {
 		Self.readStart = time.Now()
 	}
-	if Self.bufLength >= common.MAXIMUM_SEGMENT_SIZE*300 {
+	if Self.bufLength >= Self.calcThreshold {
 		Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
 		Self.calcBandWidth()
 	}
@@ -410,7 +442,21 @@ func (Self *bandwidth) SetCopySize(n uint16) {
 
 func (Self *bandwidth) calcBandWidth() {
 	t := Self.readStart.Sub(Self.lastReadStart)
-	atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds()))
+	bufferSize, err := syscall.GetsockoptInt(int(Self.fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
+	//logs.Warn(bufferSize)
+	if err != nil {
+		logs.Warn(err)
+		Self.bufLength = 0
+		return
+	}
+	if Self.bufLength >= uint32(bufferSize) {
+		atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds()))
+		// calculate the hole socket buffer, the time meaning to fill the buffer
+		//logs.Warn(Self.Get())
+	} else {
+		Self.calcThreshold = uint32(bufferSize)
+	}
+	// socket buffer size is bigger than bufLength, so we don't calculate it
 	Self.bufLength = 0
 }