Browse Source

remove mux write queue, add connection close once

ffdfgdfg 5 years ago
parent
commit
51a3787708
2 changed files with 69 additions and 54 deletions
  1. 25 21
      lib/mux/conn.go
  2. 44 33
      lib/mux/mux.go

+ 25 - 21
lib/mux/conn.go

@@ -6,6 +6,7 @@ import (
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"io"
 	"net"
+	"sync"
 	"time"
 )
 
@@ -27,9 +28,10 @@ type conn struct {
 	isClose          bool
 	readWait         bool
 	sendClose        bool // MUX_CONN_CLOSE already send
-	writeClose       bool // close conn Write
+	closeFlag        bool // close conn flag
 	hasWrite         int
 	mux              *Mux
+	once             sync.Once
 }
 
 func NewConn(connId int32, mux *Mux) *conn {
@@ -41,6 +43,7 @@ func NewConn(connId int32, mux *Mux) *conn {
 		readQueue:        NewQueue(),
 		connId:           connId,
 		mux:              mux,
+		once:             sync.Once{},
 	}
 	return c
 }
@@ -72,18 +75,14 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 			logs.Warn("conn close by read pop err", s.connId, err)
 			s.Close()
 			return 0, io.EOF
+		} else if node.val == nil {
+			s.sendClose = true
+			logs.Warn("conn close by read ", s.connId)
+			s.Close()
 		} else {
-			if node.val == nil {
-				//close
-				s.sendClose = true
-				logs.Warn("conn close by read ", s.connId)
-				s.Close()
-				return 0, io.EOF
-			} else {
-				s.readBuffer = node.val
-				s.endRead = node.l
-				s.startRead = 0
-			}
+			s.readBuffer = node.val
+			s.endRead = node.l
+			s.startRead = 0
 		}
 	}
 	if len(buf) < s.endRead-s.startRead {
@@ -101,7 +100,7 @@ func (s *conn) Write(buf []byte) (n int, err error) {
 	if s.isClose {
 		return 0, errors.New("the conn has closed")
 	}
-	if s.writeClose {
+	if s.closeFlag {
 		s.sendClose = true
 		logs.Warn("conn close by write ", s.connId)
 		s.Close()
@@ -131,11 +130,11 @@ func (s *conn) write(buf []byte, ch chan struct{}) {
 	l := len(buf)
 	for {
 		if l-start > common.PoolSizeCopy {
-			logs.Warn("conn write > poolsizecopy")
+			//logs.Warn("conn write > poolsizecopy")
 			s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:start+common.PoolSizeCopy])
 			start += common.PoolSizeCopy
 		} else {
-			logs.Warn("conn write <= poolsizecopy, start, len", start, l)
+			//logs.Warn("conn write <= poolsizecopy, start, len", start, l)
 			s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:l])
 			break
 		}
@@ -144,20 +143,25 @@ func (s *conn) write(buf []byte, ch chan struct{}) {
 }
 
 func (s *conn) Close() (err error) {
+	s.once.Do(s.closeProcess)
+	return
+}
+
+func (s *conn) closeProcess() {
 	if s.isClose {
-		return errors.New("the conn has closed")
+		logs.Warn("has closed ", s.connId)
+		return
 	}
 	s.isClose = true
+	s.readWait = false
 	s.mux.connMap.Delete(s.connId)
 	common.CopyBuff.Put(s.readBuffer)
-	if s.readWait {
-		s.readCh <- struct{}{}
-	}
+	close(s.readCh)
 	s.readQueue.Clear()
 	if !s.mux.IsClose {
 		if !s.sendClose {
-			logs.Warn("conn send close")
-			go s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
+			logs.Warn("conn send close", s.connId)
+			s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
 		}
 	}
 	return

+ 44 - 33
lib/mux/mux.go

@@ -41,7 +41,7 @@ func NewMux(c net.Conn, connType string) *Mux {
 	go m.readSession()
 	//ping
 	go m.ping()
-	go m.writeSession()
+	//go m.writeSession()
 	return m
 }
 
@@ -68,7 +68,7 @@ func (s *Mux) NewConn() (*conn, error) {
 
 func (s *Mux) Accept() (net.Conn, error) {
 	if s.IsClose {
-		return nil, errors.New("accpet error,the conn has closed")
+		return nil, errors.New("accpet error,the mux has closed")
 	}
 	conn := <-s.newConnCh
 	if conn == nil {
@@ -91,31 +91,29 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) {
 	buf := common.BuffPool.Get()
 	//defer pool.BuffPool.Put(buf)
 	pack := common.MuxPack.Get()
+	defer common.MuxPack.Put(pack)
 	err = pack.NewPac(flag, id, content)
 	if err != nil {
-		s.Close()
 		logs.Warn("new pack err", err)
 		common.BuffPool.Put(buf)
 		return
 	}
 	err = pack.Pack(buf)
 	if err != nil {
-		s.Close()
 		logs.Warn("pack err", err)
 		common.BuffPool.Put(buf)
 		return
 	}
-	s.writeQueue <- buf
-	common.MuxPack.Put(pack)
-	//_, err = buf.WriteTo(s.conn)
-	//if err != nil {
-	//	s.Close()
-	//	logs.Warn("write err, close mux", err)
-	//}
-	//if flag == common.MUX_CONN_CLOSE {
-	//}
-	//if flag == common.MUX_NEW_MSG {
-	//}
+	if pack.Flag == common.MUX_NEW_CONN {
+		logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id)
+	}
+	//s.writeQueue <- buf
+	_, err = buf.WriteTo(s.conn)
+	if err != nil {
+		s.Close()
+		logs.Warn("write err, close mux", err)
+	}
+	common.BuffPool.Put(buf)
 	return
 }
 
@@ -127,7 +125,7 @@ func (s *Mux) writeSession() {
 			n, err := buf.WriteTo(s.conn)
 			common.BuffPool.Put(buf)
 			if err != nil || int(n) != l {
-				logs.Warn("close from write to ", err, n, l)
+				logs.Warn("close from write session fail ", err, n, l)
 				s.Close()
 				break
 			}
@@ -163,8 +161,8 @@ func (s *Mux) ping() {
 
 func (s *Mux) readSession() {
 	go func() {
+		pack := common.MuxPack.Get()
 		for {
-			pack := common.MuxPack.Get()
 			if pack.UnPack(s.conn) != nil {
 				break
 			}
@@ -173,10 +171,13 @@ func (s *Mux) readSession() {
 					//logs.Warn(pack.Flag, pack.Id, pack.Length, string(pack.Content[:10]))
 				}
 			}
+			if pack.Flag == common.MUX_NEW_CONN {
+				logs.Warn("unpack mux new conn", pack.Id)
+			}
 			s.pingOk = 0
 			switch pack.Flag {
 			case common.MUX_NEW_CONN: //new conn
-				logs.Warn("mux new conn", pack.Id)
+				logs.Warn("rec mux new conn", pack.Id)
 				conn := NewConn(pack.Id, s)
 				s.connMap.Set(pack.Id, conn) //it has been set before send ok
 				s.newConnCh <- conn
@@ -194,38 +195,48 @@ func (s *Mux) readSession() {
 				switch pack.Flag {
 				case common.MUX_NEW_MSG: //new msg from remote conn
 					//insert wait queue
-					logs.Warn("mux new msg ", pack.Id)
-					conn.readQueue.Push(NewBufNode(pack.Content, int(pack.Length)))
+					buf := common.CopyBuff.Get()
+					buf = pack.Content
+					logs.Warn("rec mux new msg ", pack.Id, string(buf[0:15]))
+					conn.readQueue.Push(NewBufNode(buf, int(pack.Length)))
 					//judge len if >xxx ,send stop
 					if conn.readWait {
 						conn.readWait = false
 						conn.readCh <- struct{}{}
 					}
+					continue
 				case common.MUX_NEW_CONN_OK: //conn ok
-					logs.Warn("mux new conn ok ", pack.Id)
+					logs.Warn("rec mux new conn ok ", pack.Id)
 					conn.connStatusOkCh <- struct{}{}
+					continue
 				case common.MUX_NEW_CONN_Fail:
-					logs.Warn("mux new conn fail", pack.Id)
+					logs.Warn("rec mux new conn fail", pack.Id)
 					conn.connStatusFailCh <- struct{}{}
+					continue
 				case common.MUX_CONN_CLOSE: //close the connection
-					logs.Warn("mux conn close", pack.Id)
+					logs.Warn("rec mux conn close", pack.Id)
 					s.connMap.Delete(pack.Id)
-					conn.writeClose = true
-					conn.readQueue.Push(NewBufNode(nil, 0))
-					if conn.readWait {
-						logs.Warn("close read wait", pack.Id)
-						conn.readWait = false
-						conn.readCh <- struct{}{}
+					conn.closeFlag = true
+					conn.sendClose = true
+					if !conn.isClose {
+						conn.readQueue.Push(NewBufNode(nil, 0))
+						if conn.readWait {
+							logs.Warn("mux conn close read wait", pack.Id)
+							conn.readWait = false
+							conn.readCh <- struct{}{}
+							logs.Warn("mux conn close read wait pass", pack.Id)
+						}
 					}
 					logs.Warn("receive mux conn close, finish", conn.connId)
+					continue
 				}
-			} else if pack.Flag == common.MUX_NEW_MSG {
-				common.CopyBuff.Put(pack.Content)
 			} else if pack.Flag == common.MUX_CONN_CLOSE {
-				logs.Warn("mux conn close no id ", pack.Id)
+				logs.Warn("rec mux conn close no id ", pack.Id)
+				continue
 			}
-			common.MuxPack.Put(pack)
 		}
+		common.MuxPack.Put(pack)
+		logs.Warn("read session put pack ", pack.Id)
 		s.Close()
 	}()
 	select {