瀏覽代碼

fine mux connection ping calculation, increase connection waiting time

ffdfgdfg 5 年之前
父節點
當前提交
aaf79b21f0
共有 4 個文件被更改,包括 127 次插入24 次删除
  1. 13 0
      README.md
  2. 4 2
      lib/mux/conn.go
  3. 109 21
      lib/mux/mux.go
  4. 1 1
      lib/mux/queue.go

+ 13 - 0
README.md

@@ -824,6 +824,19 @@ nps支持对客户端的隧道数量进行限制,该功能默认是关闭的
 
 nps主要通信默认基于多路复用,无需开启。
 
+多路复用基于TCP滑动窗口原理设计,动态计算延迟以及带宽来算出应该往网络管道中打入的流量。
+由于主要通信大多采用TCP协议,并无法探测其实时丢包情况,对于产生丢包重传的情况,采用较大的宽容度,
+5分钟的等待时间,超时将会关闭当前隧道连接并重新建立,这将会抛弃当前所有的连接。
+在Linux上,可以通过调节内核参数来适应不同应用场景。
+
+对于需求大带宽又有一定的丢包的场景,可以保持默认参数不变,尽可能少抛弃连接
+高并发下可根据[Linux系统限制](#Linux系统限制) 调整
+
+对于延迟敏感而又有一定丢包的场景,可以适当调整TCP重传次数
+`tcp_syn_retries`, `tcp_retries1`, `tcp_retries2`
+高并发同上
+nps会在系统主动关闭连接的时候拿到报错,进而重新建立隧道连接
+
 ### 环境变量渲染
 npc支持环境变量渲染以适应在某些特殊场景下的要求。
 

+ 4 - 2
lib/mux/conn.go

@@ -3,6 +3,7 @@ package mux
 import (
 	"errors"
 	"io"
+	"math"
 	"net"
 	"runtime"
 	"sync"
@@ -208,7 +209,8 @@ func (Self *ReceiveWindow) calcSize() {
 	// calculating maximum receive window size
 	if Self.count == 0 {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
-		n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
+		n := uint32(2 * math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
+			Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
 		if n < 8192 {
 			n = 8192
 		}
@@ -471,7 +473,7 @@ start:
 func (Self *SendWindow) waitReceiveWindow() (err error) {
 	t := Self.timeout.Sub(time.Now())
 	if t < 0 {
-		t = time.Minute
+		t = time.Minute * 5
 	}
 	timer := time.NewTimer(t)
 	defer timer.Stop()

+ 109 - 21
lib/mux/mux.go

@@ -13,21 +13,22 @@ import (
 )
 
 type Mux struct {
+	latency uint64 // we store latency in bits, but it's float64
 	net.Listener
-	conn         net.Conn
-	connMap      *connMap
-	newConnCh    chan *conn
-	id           int32
-	closeChan    chan struct{}
-	IsClose      bool
-	pingOk       int
-	latency      float64
-	bw           *bandwidth
-	pingCh       chan []byte
-	pingCheck    bool
-	connType     string
-	writeQueue   PriorityQueue
-	newConnQueue ConnQueue
+	conn          net.Conn
+	connMap       *connMap
+	newConnCh     chan *conn
+	id            int32
+	closeChan     chan struct{}
+	IsClose       bool
+	pingOk        int
+	counter       *latencyCounter
+	bw            *bandwidth
+	pingCh        chan []byte
+	pingCheckTime uint32
+	connType      string
+	writeQueue    PriorityQueue
+	newConnQueue  ConnQueue
 }
 
 func NewMux(c net.Conn, connType string) *Mux {
@@ -43,6 +44,7 @@ func NewMux(c net.Conn, connType string) *Mux {
 		IsClose:   false,
 		connType:  connType,
 		pingCh:    make(chan []byte),
+		counter:   newLatencyCounter(),
 	}
 	m.writeQueue.New()
 	m.newConnQueue.New()
@@ -175,16 +177,16 @@ func (s *Mux) ping() {
 			select {
 			case <-ticker.C:
 			}
-			if s.pingCheck {
+			if atomic.LoadUint32(&s.pingCheckTime) >= 60 {
 				logs.Error("mux: ping time out")
 				s.Close()
-				// more than 5 seconds not receive the ping return package,
+				// more than 5 minutes not receive the ping return package,
 				// mux conn is damaged, maybe a packet drop, close it
 				break
 			}
 			now, _ := time.Now().UTC().MarshalText()
 			s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
-			s.pingCheck = true
+			atomic.AddUint32(&s.pingCheckTime, 1)
 			if s.pingOk > 10 && s.connType == "kcp" {
 				logs.Error("mux: kcp ping err")
 				s.Close()
@@ -205,16 +207,17 @@ func (s *Mux) pingReturn() {
 			}
 			select {
 			case data = <-s.pingCh:
-				s.pingCheck = false
+				atomic.StoreUint32(&s.pingCheckTime, 0)
 			case <-s.closeChan:
 				break
 			}
 			_ = now.UnmarshalText(data)
 			latency := time.Now().UTC().Sub(now).Seconds() / 2
-			if latency < 0.5 && latency > 0 {
-				s.latency = latency
+			if latency > 0 {
+				atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency)))
+				// convert float64 to bits, store it atomic
 			}
-			//logs.Warn("latency", s.latency)
+			//logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency)))
 			common.WindowBuff.Put(data)
 		}
 	}()
@@ -379,3 +382,88 @@ func (Self *bandwidth) Get() (bw float64) {
 	}
 	return Self.readBandwidth
 }
+
+const counterBits = 4
+const counterMask = 1<<counterBits - 1
+
+func newLatencyCounter() *latencyCounter {
+	return &latencyCounter{
+		buf:     make([]float64, 1<<counterBits, 1<<counterBits),
+		headMin: 0,
+	}
+}
+
+type latencyCounter struct {
+	buf []float64 //buf is a fixed length ring buffer,
+	// if buffer is full, new value will replace the oldest one.
+	headMin uint8 //head indicate the head in ring buffer,
+	// in meaning, slot in list will be replaced;
+	// min indicate this slot value is minimal in list.
+}
+
+func (Self *latencyCounter) unpack(idxs uint8) (head, min uint8) {
+	head = uint8((idxs >> counterBits) & counterMask)
+	// we set head is 4 bits
+	min = uint8(idxs & counterMask)
+	return
+}
+
+func (Self *latencyCounter) pack(head, min uint8) uint8 {
+	return uint8(head<<counterBits) |
+		uint8(min&counterMask)
+}
+
+func (Self *latencyCounter) add(value float64) {
+	head, min := Self.unpack(Self.headMin)
+	Self.buf[head] = value
+	if head == min {
+		min = Self.minimal()
+		//if head equals min, means the min slot already be replaced,
+		// so we need to find another minimal value in the list,
+		// and change the min indicator
+	}
+	if Self.buf[min] > value {
+		min = head
+	}
+	head++
+	Self.headMin = Self.pack(head, min)
+}
+
+func (Self *latencyCounter) minimal() (min uint8) {
+	var val float64
+	var i uint8
+	for i = 0; i < counterMask; i++ {
+		if Self.buf[i] > 0 {
+			if val > Self.buf[i] {
+				val = Self.buf[i]
+				min = i
+			}
+		}
+	}
+	return
+}
+
+func (Self *latencyCounter) Latency(value float64) (latency float64) {
+	Self.add(value)
+	_, min := Self.unpack(Self.headMin)
+	latency = Self.buf[min] * Self.countSuccess()
+	return
+}
+
+const lossRatio = 1.6
+
+func (Self *latencyCounter) countSuccess() (successRate float64) {
+	var success, loss, i uint8
+	_, min := Self.unpack(Self.headMin)
+	for i = 0; i < counterMask; i++ {
+		if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 {
+			loss++
+		}
+		if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 {
+			success++
+		}
+	}
+	// counting all the data in the ring buf, except zero
+	successRate = float64(success) / float64(loss+success)
+	return
+}

+ 1 - 1
lib/mux/queue.go

@@ -288,7 +288,7 @@ func (Self *ReceiveWindowQueue) waitPush() (err error) {
 	//defer logs.Warn("wait push finish")
 	t := Self.timeout.Sub(time.Now())
 	if t <= 0 {
-		t = time.Second * 10
+		t = time.Minute * 5
 	}
 	timer := time.NewTimer(t)
 	defer timer.Stop()