Browse Source

fix conn write block, add priority queue support

ffdfgdfg 5 years ago
parent
commit
1bf4cf2347
3 changed files with 148 additions and 97 deletions
  1. 13 2
      lib/mux/conn.go
  2. 65 38
      lib/mux/mux.go
  3. 70 57
      lib/mux/queue.go

+ 13 - 2
lib/mux/conn.go

@@ -187,6 +187,7 @@ type window struct {
 	windowBuff          []byte
 	off                 uint16
 	readOp              chan struct{}
+	readWait            bool
 	WindowFull          bool
 	usableReceiveWindow chan uint16
 	WriteWg             sync.WaitGroup
@@ -274,7 +275,11 @@ func (Self *window) Write(p []byte) (n int, err error) {
 	length := Self.len()                  // length before grow
 	Self.grow(len(p))                     // grow for copy
 	n = copy(Self.windowBuff[length:], p) // must copy data before allow Read
-	if length == 0 {
+	if Self.readWait {
+		// if there condition is length == 0 and
+		// Read method just take away all the windowBuff,
+		// this method will block until windowBuff is empty again
+
 		// allow continue read
 		defer Self.allowRead()
 	}
@@ -287,6 +292,9 @@ func (Self *window) allowRead() (closed bool) {
 		close(Self.readOp)
 		return true
 	}
+	Self.mutex.Lock()
+	Self.readWait = false
+	Self.mutex.Unlock()
 	select {
 	case <-Self.closeOpCh:
 		close(Self.readOp)
@@ -303,10 +311,11 @@ func (Self *window) Read(p []byte) (n int, err error) {
 	Self.mutex.Lock()
 	length := Self.len() // protect the length data, it invokes
 	// before Write lock and after Write unlock
-	Self.mutex.Unlock()
 	if length == 0 {
 		// window is empty, waiting for Write method send a success readOp signal
 		// or get timeout or close
+		Self.readWait = true
+		Self.mutex.Unlock()
 		ticker := time.NewTicker(2 * time.Minute)
 		defer ticker.Stop()
 		select {
@@ -322,6 +331,8 @@ func (Self *window) Read(p []byte) (n int, err error) {
 			close(Self.readOp)
 			return 0, io.EOF // receive close signal, returns eof
 		}
+	} else {
+		Self.mutex.Unlock()
 	}
 	Self.mutex.Lock()
 	n = copy(p, Self.windowBuff[Self.off:])

+ 65 - 38
lib/mux/mux.go

@@ -22,21 +22,23 @@ type Mux struct {
 	IsClose    bool
 	pingOk     int
 	connType   string
-	writeQueue chan *bytes.Buffer
+	writeQueue Queue
+	bufCh      chan *bytes.Buffer
 	sync.Mutex
 }
 
 func NewMux(c net.Conn, connType string) *Mux {
 	m := &Mux{
-		conn:       c,
-		connMap:    NewConnMap(),
-		id:         0,
-		closeChan:  make(chan struct{}),
-		newConnCh:  make(chan *conn),
-		IsClose:    false,
-		connType:   connType,
-		writeQueue: make(chan *bytes.Buffer, 20),
+		conn:      c,
+		connMap:   NewConnMap(),
+		id:        0,
+		closeChan: make(chan struct{}),
+		newConnCh: make(chan *conn),
+		IsClose:   false,
+		connType:  connType,
+		bufCh:     make(chan *bytes.Buffer),
 	}
+	m.writeQueue.New()
 	//read session by flag
 	go m.readSession()
 	//ping
@@ -88,26 +90,27 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
 			//logs.Warn("send info content is nil")
 		}
 	}
-	buf := common.BuffPool.Get()
+	//buf := common.BuffPool.Get()
 	//defer pool.BuffPool.Put(buf)
 	pack := common.MuxPack.Get()
-	defer common.MuxPack.Put(pack)
+
 	err = pack.NewPac(flag, id, data)
 	if err != nil {
 		//logs.Warn("new pack err", err)
-		common.BuffPool.Put(buf)
-		return
-	}
-	err = pack.Pack(buf)
-	if err != nil {
-		//logs.Warn("pack err", err)
-		common.BuffPool.Put(buf)
+		common.MuxPack.Put(pack)
 		return
 	}
-	if pack.Flag == common.MUX_NEW_CONN {
-		//logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id)
-	}
-	s.writeQueue <- buf
+	s.writeQueue.Push(pack)
+	//err = pack.Pack(buf)
+	//if err != nil {
+	//	//logs.Warn("pack err", err)
+	//	common.BuffPool.Put(buf)
+	//	return
+	//}
+	//if pack.Flag == common.MUX_NEW_CONN {
+	//	//logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id)
+	//}
+	//s.writeQueue <- buf
 	//_, err = buf.WriteTo(s.conn)
 	//if err != nil {
 	//	s.Close()
@@ -118,20 +121,47 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
 }
 
 func (s *Mux) writeSession() {
-	go func() {
-		for {
-			buf := <-s.writeQueue
-			l := buf.Len()
-			n, err := buf.WriteTo(s.conn)
-			common.BuffPool.Put(buf)
+	go s.packBuf()
+	go s.writeBuf()
+	<-s.closeChan
+}
+
+func (s *Mux) packBuf() {
+	for {
+		pack := s.writeQueue.Pop()
+		buffer := common.BuffPool.Get()
+		err := pack.Pack(buffer)
+		common.MuxPack.Put(pack)
+		if err != nil {
+			logs.Warn("pack err", err)
+			common.BuffPool.Put(buffer)
+			break
+		}
+		select {
+		case s.bufCh <- buffer:
+		case <-s.closeChan:
+			break
+		}
+
+	}
+}
+
+func (s *Mux) writeBuf() {
+	for {
+		select {
+		case buffer := <-s.bufCh:
+			l := buffer.Len()
+			n, err := buffer.WriteTo(s.conn)
+			common.BuffPool.Put(buffer)
 			if err != nil || int(n) != l {
-				//logs.Warn("close from write session fail ", err, n, l)
+				logs.Warn("close from write session fail ", err, n, l)
 				s.Close()
 				break
 			}
+		case <-s.closeChan:
+			break
 		}
-	}()
-	<-s.closeChan
+	}
 }
 
 func (s *Mux) ping() {
@@ -273,14 +303,11 @@ func (s *Mux) Close() error {
 	}
 	s.IsClose = true
 	s.connMap.Close()
-	select {
-	case s.closeChan <- struct{}{}:
-	}
-	select {
-	case s.closeChan <- struct{}{}:
-	}
 	s.closeChan <- struct{}{}
-	close(s.writeQueue)
+	s.closeChan <- struct{}{}
+	s.closeChan <- struct{}{}
+	s.closeChan <- struct{}{}
+	s.closeChan <- struct{}{}
 	close(s.newConnCh)
 	return s.conn.Close()
 }

+ 70 - 57
lib/mux/queue.go

@@ -1,82 +1,95 @@
 package mux
 
 import (
-	"errors"
+	"container/list"
 	"github.com/cnlh/nps/lib/common"
 	"sync"
 )
 
-type Element *bufNode
-
-type bufNode struct {
-	val []byte //buf value
-	l   int    //length
-}
-
-func NewBufNode(buf []byte, l int) *bufNode {
-	return &bufNode{
-		val: buf,
-		l:   l,
-	}
+type Queue struct {
+	list    *list.List
+	readOp  chan struct{}
+	cleanOp chan struct{}
+	popWait bool
+	mutex   sync.Mutex
 }
 
-type Queue interface {
-	Push(e Element) //向队列中添加元素
-	Pop() Element   //移除队列中最前面的元素
-	Clear() bool    //清空队列
-	Size() int      //获取队列的元素个数
-	IsEmpty() bool  //判断队列是否是空
+func (Self *Queue) New() {
+	Self.list = list.New()
+	Self.readOp = make(chan struct{})
+	Self.cleanOp = make(chan struct{}, 2)
 }
 
-type sliceEntry struct {
-	element []Element
-	sync.Mutex
-}
-
-func NewQueue() *sliceEntry {
-	return &sliceEntry{}
+func (Self *Queue) Push(packager *common.MuxPackager) {
+	Self.mutex.Lock()
+	if Self.popWait {
+		defer Self.allowPop()
+	}
+	if packager.Flag == common.MUX_CONN_CLOSE {
+		Self.insert(packager) // the close package may need priority,
+		// prevent wait too long to close
+	} else {
+		Self.list.PushBack(packager)
+	}
+	Self.mutex.Unlock()
+	return
 }
 
-//向队列中添加元素
-func (entry *sliceEntry) Push(e Element) {
-	entry.Lock()
-	defer entry.Unlock()
-	entry.element = append(entry.element, e)
+func (Self *Queue) allowPop() (closed bool) {
+	Self.mutex.Lock()
+	Self.popWait = false
+	Self.mutex.Unlock()
+	select {
+	case Self.readOp <- struct{}{}:
+		return false
+	case <-Self.cleanOp:
+		return true
+	}
 }
 
-//移除队列中最前面的额元素
-func (entry *sliceEntry) Pop() (Element, error) {
-	if entry.IsEmpty() {
-		return nil, errors.New("queue is empty!")
+func (Self *Queue) insert(packager *common.MuxPackager) {
+	element := Self.list.Back()
+	for {
+		if element == nil { // Queue dose not have any of msg package with this close package id
+			Self.list.PushFront(packager) // insert close package to first
+			break
+		}
+		if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG &&
+			element.Value.(*common.MuxPackager).Id == packager.Id {
+			Self.list.InsertAfter(packager, element) // Queue has some msg package
+			// with this close package id, insert close package after last msg package
+			break
+		}
+		element = element.Prev()
 	}
-	entry.Lock()
-	defer entry.Unlock()
-	firstElement := entry.element[0]
-	entry.element = entry.element[1:]
-	return firstElement, nil
 }
 
-func (entry *sliceEntry) Clear() bool {
-	entry.Lock()
-	defer entry.Unlock()
-	if entry.IsEmpty() {
-		return false
+func (Self *Queue) Pop() (packager *common.MuxPackager) {
+	Self.mutex.Lock()
+	element := Self.list.Front()
+	if element != nil {
+		packager = element.Value.(*common.MuxPackager)
+		Self.list.Remove(element)
+		Self.mutex.Unlock()
+		return
 	}
-	for i := 0; i < entry.Size(); i++ {
-		common.CopyBuff.Put(entry.element[i].val)
-		entry.element[i] = nil
+	Self.popWait = true // Queue is empty, notice Push method
+	Self.mutex.Unlock()
+	select {
+	case <-Self.readOp:
+		return Self.Pop()
+	case <-Self.cleanOp:
+		return nil
 	}
-	entry.element = nil
-	return true
 }
 
-func (entry *sliceEntry) Size() int {
-	return len(entry.element)
+func (Self *Queue) Len() (n int) {
+	n = Self.list.Len()
+	return
 }
 
-func (entry *sliceEntry) IsEmpty() bool {
-	if len(entry.element) == 0 {
-		return true
-	}
-	return false
+func (Self *Queue) Clean() {
+	Self.cleanOp <- struct{}{}
+	Self.cleanOp <- struct{}{}
+	close(Self.cleanOp)
 }