ffdfgdfg 5 年之前
父节点
当前提交
f35a73f734
共有 5 个文件被更改,包括 89 次插入22 次删除
  1. 1 1
      lib/common/netpackager.go
  2. 3 0
      lib/common/util.go
  3. 27 8
      lib/mux/conn.go
  4. 45 6
      lib/mux/mux.go
  5. 13 7
      lib/mux/mux_test.go

+ 1 - 1
lib/common/netpackager.go

@@ -45,13 +45,13 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) {
 		return
 	}
 	err = binary.Write(writer, binary.LittleEndian, Self.Content)
-	//logs.Warn(Self.Length, string(Self.Content))
 	return
 }
 
 //Unpack 会导致传入的数字类型转化成float64!!
 //主要原因是json unmarshal并未传入正确的数据类型
 func (Self *BasePackager) UnPack(reader io.Reader) (err error) {
+	Self.clean()
 	err = binary.Read(reader, binary.LittleEndian, &Self.Length)
 	if err != nil {
 		return

+ 3 - 0
lib/common/util.go

@@ -6,6 +6,7 @@ import (
 	"encoding/binary"
 	"github.com/cnlh/nps/lib/crypt"
 	"github.com/cnlh/nps/lib/pool"
+	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"html/template"
 	"io"
 	"io/ioutil"
@@ -268,8 +269,10 @@ func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
 	defer pool.PutBufPoolCopy(buf)
 	for {
 		nr, er := src.Read(buf)
+		logs.Warn("read finish", nr, er)
 		if nr > 0 {
 			nw, ew := dst.Write(buf[0:nr])
+			logs.Warn("write finish", nw, ew)
 			if nw > 0 {
 				written += int64(nw)
 			}

+ 27 - 8
lib/mux/conn.go

@@ -22,11 +22,12 @@ type conn struct {
 	endRead          int //now end read
 	readFlag         bool
 	readCh           chan struct{}
-	waitQueue        *sliceEntry
+	readQueue        *sliceEntry
 	stopWrite        bool
 	connId           int32
 	isClose          bool
 	readWait         bool
+	sendClose        bool
 	hasWrite         int
 	mux              *Mux
 }
@@ -37,7 +38,7 @@ func NewConn(connId int32, mux *Mux) *conn {
 		getStatusCh:      make(chan struct{}),
 		connStatusOkCh:   make(chan struct{}),
 		connStatusFailCh: make(chan struct{}),
-		waitQueue:        NewQueue(),
+		readQueue:        NewQueue(),
 		connId:           connId,
 		mux:              mux,
 	}
@@ -45,11 +46,12 @@ func NewConn(connId int32, mux *Mux) *conn {
 }
 
 func (s *conn) Read(buf []byte) (n int, err error) {
+	logs.Warn("starting read ", s.connId)
 	if s.isClose || buf == nil {
 		return 0, errors.New("the conn has closed")
 	}
 	if s.endRead-s.startRead == 0 { //read finish or start
-		if s.waitQueue.Size() == 0 {
+		if s.readQueue.Size() == 0 {
 			s.readWait = true
 			if t := s.readTimeOut.Sub(time.Now()); t > 0 {
 				timer := time.NewTimer(t)
@@ -67,19 +69,22 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 		if s.isClose { //If the connection is closed instead of  continuing command
 			return 0, errors.New("the conn has closed")
 		}
-		if node, err := s.waitQueue.Pop(); err != nil {
+		if node, err := s.readQueue.Pop(); err != nil {
 			s.Close()
 			return 0, io.EOF
 		} else {
 			pool.PutBufPoolCopy(s.readBuffer)
 			if node.val == nil {
 				//close
+				s.sendClose = true
 				s.Close()
 				logs.Warn("close from read msg ", s.connId)
+				return 0, io.EOF
 			} else {
 				s.readBuffer = node.val
 				s.endRead = node.l
 				s.startRead = 0
+				logs.Warn("get a new data buffer ", s.connId)
 			}
 		}
 	}
@@ -90,10 +95,12 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 		n = copy(buf, s.readBuffer[s.startRead:s.endRead])
 		s.startRead += n
 	}
+	logs.Warn("end read ", s.connId)
 	return
 }
 
 func (s *conn) Write(buf []byte) (n int, err error) {
+	logs.Warn("trying write", s.connId)
 	if s.isClose {
 		return 0, errors.New("the conn has closed")
 	}
@@ -113,6 +120,7 @@ func (s *conn) Write(buf []byte) (n int, err error) {
 	if s.isClose {
 		return 0, io.EOF
 	}
+	logs.Warn("write success ", s.connId)
 	return len(buf), nil
 }
 func (s *conn) write(buf []byte, ch chan struct{}) {
@@ -130,7 +138,8 @@ func (s *conn) write(buf []byte, ch chan struct{}) {
 	ch <- struct{}{}
 }
 
-func (s *conn) Close() error {
+func (s *conn) Close() (err error) {
+	logs.Warn("start closing ", s.connId)
 	if s.isClose {
 		logs.Warn("already closed", s.connId)
 		return errors.New("the conn has closed")
@@ -140,12 +149,22 @@ func (s *conn) Close() error {
 	if s.readWait {
 		s.readCh <- struct{}{}
 	}
-	s.waitQueue.Clear()
+	s.readQueue.Clear()
 	s.mux.connMap.Delete(s.connId)
 	if !s.mux.IsClose {
-		s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
+		if !s.sendClose {
+			logs.Warn("start send closing msg", s.connId)
+			err = s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
+			logs.Warn("send closing msg ok ", s.connId)
+			if err != nil {
+				logs.Warn(err)
+				return
+			}
+		} else {
+			logs.Warn("send mux conn close pass ", s.connId)
+		}
 	}
-	return nil
+	return
 }
 
 func (s *conn) LocalAddr() net.Addr {

+ 45 - 6
lib/mux/mux.go

@@ -1,6 +1,7 @@
 package mux
 
 import (
+	"bytes"
 	"errors"
 	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/pool"
@@ -22,7 +23,7 @@ type Mux struct {
 	IsClose    bool
 	pingOk     int
 	connType   string
-	writeQueue *sliceEntry
+	writeQueue chan *bytes.Buffer
 	sync.Mutex
 }
 
@@ -35,12 +36,13 @@ func NewMux(c net.Conn, connType string) *Mux {
 		newConnCh:  make(chan *conn),
 		IsClose:    false,
 		connType:   connType,
-		writeQueue: NewQueue(),
+		writeQueue: make(chan *bytes.Buffer, 20),
 	}
 	//read session by flag
 	go m.readSession()
 	//ping
 	go m.ping()
+	//go m.writeSession()
 	return m
 }
 
@@ -82,8 +84,10 @@ func (s *Mux) Addr() net.Addr {
 }
 
 func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) {
+	if flag == common.MUX_NEW_MSG {
+		logs.Warn("trying write to mux new msg", id)
+	}
 	buf := pool.BuffPool.Get()
-	defer pool.BuffPool.Put(buf)
 	pack := common.MuxPackager{}
 	err = pack.NewPac(flag, id, content)
 	if err != nil {
@@ -97,14 +101,39 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) {
 		logs.Warn("pack err", err)
 		return
 	}
+	//s.writeQueue <- buf
 	_, err = buf.WriteTo(s.conn)
 	if err != nil {
 		s.Close()
 		logs.Warn("write err", err)
 	}
+	pool.BuffPool.Put(buf)
+	if flag == common.MUX_CONN_CLOSE {
+		logs.Warn("write to mux conn close success", id)
+	}
+	if flag == common.MUX_NEW_MSG {
+		logs.Warn("write to mux new msg success", id)
+	}
 	return
 }
 
+func (s *Mux) writeSession() {
+	go func() {
+		for {
+			buf := <-s.writeQueue
+			l := buf.Len()
+			n, err := buf.WriteTo(s.conn)
+			pool.BuffPool.Put(buf)
+			if err != nil || int(n) != l {
+				logs.Warn("close from write to ", err, n, l)
+				s.Close()
+				break
+			}
+		}
+	}()
+	<-s.closeChan
+}
+
 func (s *Mux) ping() {
 	go func() {
 		ticker := time.NewTicker(time.Second * 1)
@@ -138,7 +167,7 @@ func (s *Mux) readSession() {
 			s.pingOk = 0
 			switch pack.Flag {
 			case common.MUX_NEW_CONN: //new conn
-				logs.Warn("mux new conn", pack.Id)
+				//logs.Warn("mux new conn", pack.Id)
 				conn := NewConn(pack.Id, s)
 				s.connMap.Set(pack.Id, conn) //it has been set before send ok
 				s.newConnCh <- conn
@@ -151,22 +180,30 @@ func (s *Mux) readSession() {
 				continue
 			}
 			if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose {
+				logs.Warn("read session flag id", pack.Flag, pack.Id)
 				switch pack.Flag {
 				case common.MUX_NEW_MSG: //new msg from remote conn
 					//insert wait queue
-					conn.waitQueue.Push(NewBufNode(pack.Content, int(pack.Length)))
+					conn.readQueue.Push(NewBufNode(pack.Content, int(pack.Length)))
 					//judge len if >xxx ,send stop
 					if conn.readWait {
 						conn.readWait = false
 						conn.readCh <- struct{}{}
 					}
+					logs.Warn("push a read buffer ", conn.connId, pack.Id)
 				case common.MUX_NEW_CONN_OK: //conn ok
 					conn.connStatusOkCh <- struct{}{}
 				case common.MUX_NEW_CONN_Fail:
 					conn.connStatusFailCh <- struct{}{}
 				case common.MUX_CONN_CLOSE: //close the connection
-					conn.waitQueue.Push(NewBufNode(nil, 0))
+					conn.readQueue.Push(NewBufNode(nil, 0))
+					if conn.readWait {
+						logs.Warn("close read wait", pack.Id)
+						conn.readWait = false
+						conn.readCh <- struct{}{}
+					}
 					s.connMap.Delete(pack.Id)
+					logs.Warn("read session mux conn close finish", pack.Id)
 				}
 			} else if pack.Flag == common.MUX_NEW_MSG {
 				pool.PutBufPoolCopy(pack.Content)
@@ -192,6 +229,8 @@ func (s *Mux) Close() error {
 	select {
 	case s.closeChan <- struct{}{}:
 	}
+	s.closeChan <- struct{}{}
+	close(s.writeQueue)
 	close(s.newConnCh)
 	return s.conn.Close()
 }

+ 13 - 7
lib/mux/mux_test.go

@@ -26,22 +26,25 @@ func TestNewMux(t *testing.T) {
 	time.Sleep(time.Second * 3)
 	go func() {
 		m2 := NewMux(conn2, "tcp")
+		connCh := make(chan bool, 1)
 		for {
 			c, err := m2.Accept()
 			if err != nil {
 				log.Fatalln(err)
 			}
-			go func(c net.Conn) {
+			connCh <- true
+			go func(c net.Conn, ch chan bool) {
 				c2, err := net.Dial("tcp", "127.0.0.1:80")
 				if err != nil {
 					log.Fatalln(err)
 				}
 				go common.CopyBuffer(c2, c)
 				common.CopyBuffer(c, c2)
-				c.Close()
-				//logs.Warn("close from out npc ")
 				c2.Close()
-			}(c)
+				c.Close()
+				logs.Warn("close npc")
+				<-ch
+			}(c, connCh)
 		}
 	}()
 
@@ -51,12 +54,14 @@ func TestNewMux(t *testing.T) {
 		if err != nil {
 			log.Fatalln(err)
 		}
+		connCh := make(chan bool, 1)
 		for {
 			conn, err := l.Accept()
 			if err != nil {
 				log.Fatalln(err)
 			}
-			go func(conn net.Conn) {
+			connCh <- true
+			go func(conn net.Conn, ch chan bool) {
 				tmpCpnn, err := m1.NewConn()
 				if err != nil {
 					log.Fatalln(err)
@@ -64,9 +69,10 @@ func TestNewMux(t *testing.T) {
 				go common.CopyBuffer(tmpCpnn, conn)
 				common.CopyBuffer(conn, tmpCpnn)
 				conn.Close()
-				tmpCpnn.Close()
+				//tmpCpnn.Close()
 				logs.Warn("close from out nps ", tmpCpnn.connId)
-			}(conn)
+				<-ch
+			}(conn, connCh)
 		}
 	}()