Pārlūkot izejas kodu

fix mux auto disconnect, add mux new connection queue

ffdfgdfg 5 gadi atpakaļ
vecāks
revīzija
2ca84c912b
3 mainītis faili ar 149 papildinājumiem un 59 dzēšanām
  1. 4 4
      lib/mux/conn.go
  2. 44 37
      lib/mux/mux.go
  3. 101 18
      lib/mux/queue.go

+ 4 - 4
lib/mux/conn.go

@@ -23,7 +23,7 @@ type conn struct {
 	receiveWindow    *ReceiveWindow
 	sendWindow       *SendWindow
 	once             sync.Once
-	label            string
+	//label            string
 }
 
 func NewConn(connId int32, mux *Mux, label ...string) *conn {
@@ -36,9 +36,9 @@ func NewConn(connId int32, mux *Mux, label ...string) *conn {
 		sendWindow:       new(SendWindow),
 		once:             sync.Once{},
 	}
-	if len(label) > 0 {
-		c.label = label[0]
-	}
+	//if len(label) > 0 {
+	//	c.label = label[0]
+	//}
 	c.receiveWindow.New(mux)
 	c.sendWindow.New(mux)
 	//logm := &connLog{

+ 44 - 37
lib/mux/mux.go

@@ -14,21 +14,20 @@ import (
 
 type Mux struct {
 	net.Listener
-	conn       net.Conn
-	connMap    *connMap
-	newConnCh  chan *conn
-	id         int32
-	closeChan  chan struct{}
-	IsClose    bool
-	pingOk     int
-	latency    float64
-	bw         *bandwidth
-	pingCh     chan []byte
-	pingTimer  *time.Timer
-	connType   string
-	writeQueue PriorityQueue
-	//bufQueue      BytesQueue
-	//sync.Mutex
+	conn         net.Conn
+	connMap      *connMap
+	newConnCh    chan *conn
+	id           int32
+	closeChan    chan struct{}
+	IsClose      bool
+	pingOk       int
+	latency      float64
+	bw           *bandwidth
+	pingCh       chan []byte
+	pingCheck    bool
+	connType     string
+	writeQueue   PriorityQueue
+	newConnQueue ConnQueue
 }
 
 func NewMux(c net.Conn, connType string) *Mux {
@@ -39,15 +38,14 @@ func NewMux(c net.Conn, connType string) *Mux {
 		connMap:   NewConnMap(),
 		id:        0,
 		closeChan: make(chan struct{}, 1),
-		newConnCh: make(chan *conn, 10),
+		newConnCh: make(chan *conn),
 		bw:        new(bandwidth),
 		IsClose:   false,
 		connType:  connType,
 		pingCh:    make(chan []byte),
-		pingTimer: time.NewTimer(15 * time.Second),
 	}
 	m.writeQueue.New()
-	//m.bufQueue.New()
+	m.newConnQueue.New()
 	//read session by flag
 	m.readSession()
 	//ping
@@ -101,6 +99,8 @@ func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
 	err = pack.NewPac(flag, id, data...)
 	if err != nil {
 		common.MuxPack.Put(pack)
+		logs.Error("mux: new pack err")
+		s.Close()
 		return
 	}
 	s.writeQueue.Push(pack)
@@ -124,7 +124,7 @@ func (s *Mux) packBuf() {
 		err := pack.Pack(buffer)
 		common.MuxPack.Put(pack)
 		if err != nil {
-			logs.Warn("pack err", err)
+			logs.Error("mux: pack err", err)
 			common.BuffPool.Put(buffer)
 			break
 		}
@@ -134,7 +134,7 @@ func (s *Mux) packBuf() {
 		n, err := buffer.WriteTo(s.conn)
 		//common.BuffPool.Put(buffer)
 		if err != nil || int(n) != l {
-			logs.Warn("close from write session fail ", err, n, l)
+			logs.Error("mux: close from write session fail ", err, n, l)
 			s.Close()
 			break
 		}
@@ -170,21 +170,23 @@ func (s *Mux) ping() {
 		for {
 			if s.IsClose {
 				ticker.Stop()
-				if !s.pingTimer.Stop() {
-					<-s.pingTimer.C
-				}
 				break
 			}
 			select {
 			case <-ticker.C:
 			}
+			if s.pingCheck {
+				logs.Error("mux: ping time out")
+				s.Close()
+				// more than 5 seconds not receive the ping return package,
+				// mux conn is damaged, maybe a packet drop, close it
+				break
+			}
 			now, _ := time.Now().UTC().MarshalText()
 			s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
-			if !s.pingTimer.Stop() {
-				<-s.pingTimer.C
-			}
-			s.pingTimer.Reset(15 * time.Second)
+			s.pingCheck = true
 			if s.pingOk > 10 && s.connType == "kcp" {
+				logs.Error("mux: kcp ping err")
 				s.Close()
 				break
 			}
@@ -203,12 +205,9 @@ func (s *Mux) pingReturn() {
 			}
 			select {
 			case data = <-s.pingCh:
+				s.pingCheck = false
 			case <-s.closeChan:
 				break
-			case <-s.pingTimer.C:
-				logs.Error("mux: ping time out")
-				s.Close()
-				break
 			}
 			_ = now.UnmarshalText(data)
 			latency := time.Now().UTC().Sub(now).Seconds() / 2
@@ -222,6 +221,15 @@ func (s *Mux) pingReturn() {
 }
 
 func (s *Mux) readSession() {
+	go func() {
+		var connection *conn
+		for {
+			connection = s.newConnQueue.Pop()
+			s.connMap.Set(connection.connId, connection) //it has been set before send ok
+			s.newConnCh <- connection
+			s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
+		}
+	}()
 	go func() {
 		pack := common.MuxPack.Get()
 		var l uint16
@@ -233,18 +241,16 @@ func (s *Mux) readSession() {
 			pack = common.MuxPack.Get()
 			s.bw.StartRead()
 			if l, err = pack.UnPack(s.conn); err != nil {
+				logs.Error("mux: read session unpack from connection err")
+				s.Close()
 				break
 			}
 			s.bw.SetCopySize(l)
 			s.pingOk = 0
 			switch pack.Flag {
 			case common.MUX_NEW_CONN: //new connection
-				connection := NewConn(pack.Id, s, "npc ")
-				s.connMap.Set(pack.Id, connection) //it has been set before send ok
-				//go func(connection *conn) {
-				s.newConnCh <- connection
-				s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
-				//}(connection)
+				connection := NewConn(pack.Id, s)
+				s.newConnQueue.Push(connection)
 				continue
 			case common.MUX_PING_FLAG: //ping
 				s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
@@ -261,6 +267,7 @@ func (s *Mux) readSession() {
 				case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
 					err = s.newMsg(connection, pack)
 					if err != nil {
+						logs.Error("mux: read session connection new msg err")
 						connection.Close()
 					}
 					continue

+ 101 - 18
lib/mux/queue.go

@@ -33,6 +33,14 @@ func (Self *PriorityQueue) New() {
 }
 
 func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
+	//logs.Warn("push start")
+	Self.push(packager)
+	Self.cond.Broadcast()
+	//logs.Warn("push finish")
+	return
+}
+
+func (Self *PriorityQueue) push(packager *common.MuxPackager) {
 	switch packager.Flag {
 	case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
 		Self.highestChain.pushHead(unsafe.Pointer(packager))
@@ -44,8 +52,6 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
 	default:
 		Self.lowestChain.pushHead(unsafe.Pointer(packager))
 	}
-	Self.cond.Signal()
-	return
 }
 
 const maxStarving uint8 = 8
@@ -121,6 +127,72 @@ func (Self *PriorityQueue) Stop() {
 	Self.cond.Broadcast()
 }
 
+type ConnQueue struct {
+	chain    *bufChain
+	starving uint8
+	stop     bool
+	cond     *sync.Cond
+}
+
+func (Self *ConnQueue) New() {
+	Self.chain = new(bufChain)
+	Self.chain.new(32)
+	locker := new(sync.Mutex)
+	Self.cond = sync.NewCond(locker)
+}
+
+func (Self *ConnQueue) Push(connection *conn) {
+	Self.chain.pushHead(unsafe.Pointer(connection))
+	Self.cond.Broadcast()
+	return
+}
+
+func (Self *ConnQueue) Pop() (connection *conn) {
+	var iter bool
+	for {
+		connection = Self.pop()
+		if connection != nil {
+			return
+		}
+		if Self.stop {
+			return
+		}
+		if iter {
+			break
+			// trying to pop twice
+		}
+		iter = true
+		runtime.Gosched()
+	}
+	Self.cond.L.Lock()
+	defer Self.cond.L.Unlock()
+	for connection = Self.pop(); connection == nil; {
+		if Self.stop {
+			return
+		}
+		//logs.Warn("queue into wait")
+		Self.cond.Wait()
+		// wait for it with no more iter
+		connection = Self.pop()
+		//logs.Warn("queue wait finish", packager)
+	}
+	return
+}
+
+func (Self *ConnQueue) pop() (connection *conn) {
+	ptr, ok := Self.chain.popTail()
+	if ok {
+		connection = (*conn)(ptr)
+		return
+	}
+	return
+}
+
+func (Self *ConnQueue) Stop() {
+	Self.stop = true
+	Self.cond.Broadcast()
+}
+
 func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) {
 	if uint16(len(buf)) != l {
 		err = errors.New("ListElement: buf length not match")
@@ -180,24 +252,12 @@ startPop:
 		if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) {
 			goto startPop // another goroutine is pushing
 		}
-		t := Self.timeout.Sub(time.Now())
-		if t <= 0 {
-			t = time.Minute
-		}
-		timer := time.NewTimer(t)
-		defer timer.Stop()
-		//logs.Warn("queue into wait")
-		select {
-		case <-Self.readOp:
-			//logs.Warn("queue wait finish")
-			goto startPop
-		case <-Self.stopOp:
-			err = io.EOF
-			return
-		case <-timer.C:
-			err = errors.New("mux.queue: read time out")
+		err = Self.waitPush()
+		// there is no more data in queue, wait for it
+		if err != nil {
 			return
 		}
+		goto startPop // wait finish, trying to get the new status
 	}
 	// length is not zero, so try to pop
 	for {
@@ -223,6 +283,29 @@ func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
 	}
 }
 
+func (Self *ReceiveWindowQueue) waitPush() (err error) {
+	//logs.Warn("wait push")
+	//defer logs.Warn("wait push finish")
+	t := Self.timeout.Sub(time.Now())
+	if t <= 0 {
+		t = time.Second * 10
+	}
+	timer := time.NewTimer(t)
+	defer timer.Stop()
+	//logs.Warn("queue into wait")
+	select {
+	case <-Self.readOp:
+		//logs.Warn("queue wait finish")
+		return nil
+	case <-Self.stopOp:
+		err = io.EOF
+		return
+	case <-timer.C:
+		err = errors.New("mux.queue: read time out")
+		return
+	}
+}
+
 func (Self *ReceiveWindowQueue) Len() (n uint32) {
 	ptrs := atomic.LoadUint64(&Self.lengthWait)
 	n, _ = Self.chain.head.unpack(ptrs)