Browse Source

fix block problems

ffdfgdfg 5 years ago
parent
commit
6157b1a528
2 changed files with 80 additions and 84 deletions
  1. 58 62
      lib/mux/conn.go
  2. 22 22
      lib/mux/mux.go

+ 58 - 62
lib/mux/conn.go

@@ -17,22 +17,15 @@ type conn struct {
 	connStatusFailCh chan struct{}
 	readTimeOut      time.Time
 	writeTimeOut     time.Time
-	//readBuffer       []byte
-	//startRead        int //now read position
-	//endRead          int //now end read
-	//readFlag         bool
-	//readCh           chan struct{}
-	//readQueue        *sliceEntry
-	//stopWrite        bool
-	connId  int32
-	isClose bool
-	//readWait         bool
-	closeFlag     bool // close conn flag
-	hasWrite      int
-	receiveWindow *window
-	sendWindow    *window
-	mux           *Mux
-	once          sync.Once
+	connId           int32
+	isClose          bool
+	closeFlag        bool // close conn flag
+	receiveWindow    *window
+	sendWindow       *window
+	readCh           waitingCh
+	writeCh          waitingCh
+	mux              *Mux
+	once             sync.Once
 }
 
 func NewConn(connId int32, mux *Mux) *conn {
@@ -48,34 +41,32 @@ func NewConn(connId int32, mux *Mux) *conn {
 	}
 	c.receiveWindow.NewReceive()
 	c.sendWindow.NewSend()
+	c.readCh.new()
+	c.writeCh.new()
 	return c
 }
 
 func (s *conn) Read(buf []byte) (n int, err error) {
-	logs.Warn("starting conn read", s.connId)
+	//logs.Warn("starting conn read", s.connId)
 	if s.isClose || buf == nil {
 		return 0, errors.New("the conn has closed")
 	}
-	nCh := make(chan int)
-	errCh := make(chan error)
-	defer close(nCh)
-	defer close(errCh)
 	// waiting for takeout from receive window finish or timeout
-	go s.readWindow(buf, nCh, errCh)
+	go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh)
 	if t := s.readTimeOut.Sub(time.Now()); t > 0 {
 		timer := time.NewTimer(t)
 		defer timer.Stop()
 		select {
 		case <-timer.C:
 			return 0, errors.New("read timeout")
-		case n = <-nCh:
-			err = <-errCh
+		case n = <-s.readCh.nCh:
+			err = <-s.readCh.errCh
 		}
 	} else {
-		n = <-nCh
-		err = <-errCh
+		n = <-s.readCh.nCh
+		err = <-s.readCh.errCh
 	}
-	logs.Warn("read window finish conn read n err buf", n, err, string(buf[:15]))
+	//logs.Warn("read window finish conn read n err buf", n, err, string(buf[:15]))
 	return
 }
 
@@ -97,21 +88,19 @@ func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) {
 }
 
 func (s *conn) Write(buf []byte) (n int, err error) {
+	//logs.Warn("write starting", s.connId)
+	//defer logs.Warn("write end ", s.connId)
 	if s.isClose {
 		return 0, errors.New("the conn has closed")
 	}
 	if s.closeFlag {
-		logs.Warn("conn close by write ", s.connId)
+		//logs.Warn("conn close by write ", s.connId)
 		//s.Close()
 		return 0, errors.New("io: write on closed conn")
 	}
-
-	nCh := make(chan int)
-	errCh := make(chan error)
-	defer close(nCh)
-	defer close(errCh)
 	s.sendWindow.SetSendBuf(buf) // set the buf to send window
-	go s.write(nCh, errCh)
+	//logs.Warn("write set send buf success")
+	go s.write(s.writeCh.nCh, s.writeCh.errCh)
 	// waiting for send to other side or timeout
 	if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
 		timer := time.NewTimer(t)
@@ -119,12 +108,12 @@ func (s *conn) Write(buf []byte) (n int, err error) {
 		select {
 		case <-timer.C:
 			return 0, errors.New("write timeout")
-		case n = <-nCh:
-			err = <-errCh
+		case n = <-s.writeCh.nCh:
+			err = <-s.writeCh.errCh
 		}
 	} else {
-		n = <-nCh
-		err = <-errCh
+		n = <-s.writeCh.nCh
+		err = <-s.writeCh.errCh
 	}
 	return
 }
@@ -160,7 +149,7 @@ func (s *conn) closeProcess() {
 	s.isClose = true
 	s.mux.connMap.Delete(s.connId)
 	if !s.mux.IsClose {
-		logs.Warn("conn send close", s.connId)
+		//logs.Warn("conn send close", s.connId)
 		// if server or user close the conn while reading, will get a io.EOF
 		// and this Close method will be invoke, send this signal to close other side
 		s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
@@ -198,7 +187,6 @@ type window struct {
 	windowBuff          []byte
 	off                 uint16
 	readOp              chan struct{}
-	readWait            bool
 	WindowFull          bool
 	usableReceiveWindow chan uint16
 	WriteWg             sync.WaitGroup
@@ -213,13 +201,13 @@ func (Self *window) NewReceive() {
 	Self.windowBuff = common.WindowBuff.Get()
 	Self.readOp = make(chan struct{})
 	Self.WriteEndOp = make(chan struct{})
-	Self.closeOpCh = make(chan struct{}, 2)
+	Self.closeOpCh = make(chan struct{}, 3)
 }
 
 func (Self *window) NewSend() {
 	// initial a window for send
 	Self.usableReceiveWindow = make(chan uint16)
-	Self.closeOpCh = make(chan struct{}, 2)
+	Self.closeOpCh = make(chan struct{}, 3)
 }
 
 func (Self *window) SetSendBuf(buf []byte) {
@@ -269,38 +257,32 @@ func (Self *window) grow(n int) {
 
 func (Self *window) Write(p []byte) (n int, err error) {
 	if Self.closeOp {
-		logs.Warn("window write closed len p", len(p))
 		return 0, errors.New("conn.receiveWindow: write on closed window")
 	}
 	if len(p) > Self.Size() {
 		return 0, errors.New("conn.receiveWindow: write too large")
 	}
-	if Self.readWait {
-		defer Self.allowRead()
-	}
-	//logs.Warn("window write p string", len(p), string(p[:15]))
 	Self.mutex.Lock()
 	// slide the offset
 	if len(p) > Self.cap()-Self.len() {
 		// not enough space, need to allocate
 		Self.fullSlide()
-		//logs.Warn("window write full slide len cap", Self.len(), Self.cap())
 	} else {
 		// have enough space, re slice
 		Self.liteSlide()
-		//logs.Warn("window write lite slide len cap", Self.len(), Self.cap())
 	}
 	length := Self.len()                  // length before grow
 	Self.grow(len(p))                     // grow for copy
 	n = copy(Self.windowBuff[length:], p) // must copy data before allow Read
-	//logs.Warn("window write copy n len cap buf", n, Self.len(), Self.cap(), string(Self.windowBuff[Self.len()-n:Self.len()+15-n]))
+	if length == 0 {
+		// allow continue read
+		defer Self.allowRead()
+	}
 	Self.mutex.Unlock()
 	return n, nil
 }
 
 func (Self *window) allowRead() (closed bool) {
-	//logs.Warn("length 0 read op")
-	Self.readWait = false
 	if Self.closeOp {
 		close(Self.readOp)
 		return true
@@ -310,25 +292,25 @@ func (Self *window) allowRead() (closed bool) {
 		close(Self.readOp)
 		return true
 	case Self.readOp <- struct{}{}:
-		//logs.Warn("length 0 read op finish")
 		return false
 	}
 }
 
 func (Self *window) Read(p []byte) (n int, err error) {
-	logs.Warn("starting window read method len ", Self.len())
 	if Self.closeOp {
 		return 0, io.EOF // Write method receive close signal, returns eof
 	}
-	if Self.len() == 0 {
+	Self.mutex.Lock()
+	length := Self.len() // protect the length data, it invokes
+	// before Write lock and after Write unlock
+	Self.mutex.Unlock()
+	if length == 0 {
 		// window is empty, waiting for Write method send a success readOp signal
 		// or get timeout or close
-		Self.readWait = true
 		ticker := time.NewTicker(2 * time.Minute)
 		defer ticker.Stop()
 		select {
 		case _, ok := <-Self.readOp:
-			//logs.Warn("read window read op len cap", Self.len(), Self.cap())
 			if !ok {
 				return 0, errors.New("conn.receiveWindow: window closed")
 			}
@@ -341,19 +323,17 @@ func (Self *window) Read(p []byte) (n int, err error) {
 			return 0, io.EOF // receive close signal, returns eof
 		}
 	}
-	//logs.Warn("window read start len window buff", Self.len(), string(Self.windowBuff[Self.off:Self.off+15]))
 	Self.mutex.Lock()
 	n = copy(p, Self.windowBuff[Self.off:])
 	Self.off += uint16(n)
 	p = p[:n]
-	//logs.Warn("window read finish n len p p", n, len(p), string(p[:15]))
 	Self.mutex.Unlock()
 	return
 }
 
 func (Self *window) WriteTo() (p []byte, err error) {
 	if Self.closeOp {
-		logs.Warn("window write to closed")
+		//logs.Warn("window write to closed")
 		return nil, errors.New("conn.writeWindow: window closed")
 	}
 	if Self.len() == 0 {
@@ -373,6 +353,8 @@ waiting:
 		}
 	case <-ticker.C:
 		return nil, errors.New("conn.writeWindow: write to time out")
+	case <-Self.closeOpCh:
+		return nil, errors.New("conn.writeWindow: window closed")
 	}
 	if windowSize == 0 {
 		goto waiting // waiting for another usable window size
@@ -380,10 +362,8 @@ waiting:
 	Self.mutex.Lock()
 	if windowSize > uint16(Self.len()) {
 		// usable window size is bigger than window buff size, send the full buff
-		//logs.Warn("window size overflow windowSize len()", windowSize, Self.len())
 		windowSize = uint16(Self.len())
 	}
-	//logs.Warn("window buff off windowSize", Self.off, windowSize)
 	p = Self.windowBuff[Self.off : windowSize+Self.off]
 	Self.off += windowSize
 	Self.mutex.Unlock()
@@ -413,6 +393,22 @@ func (Self *window) CloseWindow() {
 	Self.closeOp = true
 	Self.closeOpCh <- struct{}{}
 	Self.closeOpCh <- struct{}{}
+	Self.closeOpCh <- struct{}{}
 	close(Self.closeOpCh)
 	return
 }
+
+type waitingCh struct {
+	nCh   chan int
+	errCh chan error
+}
+
+func (Self *waitingCh) new() {
+	Self.nCh = make(chan int)
+	Self.errCh = make(chan error)
+}
+
+func (Self *waitingCh) close() {
+	close(Self.nCh)
+	close(Self.errCh)
+}

+ 22 - 22
lib/mux/mux.go

@@ -53,7 +53,7 @@ func (s *Mux) NewConn() (*conn, error) {
 	//it must be set before send
 	s.connMap.Set(conn.connId, conn)
 	s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil)
-	logs.Warn("send mux new conn ", conn.connId)
+	//logs.Warn("send mux new conn ", conn.connId)
 	//set a timer timeout 30 second
 	timer := time.NewTimer(time.Minute * 2)
 	defer timer.Stop()
@@ -85,7 +85,7 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
 	var err error
 	if flag == common.MUX_NEW_MSG {
 		if len(data.([]byte)) == 0 {
-			logs.Warn("send info content is nil")
+			//logs.Warn("send info content is nil")
 		}
 	}
 	buf := common.BuffPool.Get()
@@ -94,18 +94,18 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
 	defer common.MuxPack.Put(pack)
 	err = pack.NewPac(flag, id, data)
 	if err != nil {
-		logs.Warn("new pack err", err)
+		//logs.Warn("new pack err", err)
 		common.BuffPool.Put(buf)
 		return
 	}
 	err = pack.Pack(buf)
 	if err != nil {
-		logs.Warn("pack err", err)
+		//logs.Warn("pack err", err)
 		common.BuffPool.Put(buf)
 		return
 	}
 	if pack.Flag == common.MUX_NEW_CONN {
-		logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id)
+		//logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id)
 	}
 	s.writeQueue <- buf
 	//_, err = buf.WriteTo(s.conn)
@@ -125,7 +125,7 @@ func (s *Mux) writeSession() {
 			n, err := buf.WriteTo(s.conn)
 			common.BuffPool.Put(buf)
 			if err != nil || int(n) != l {
-				logs.Warn("close from write session fail ", err, n, l)
+				//logs.Warn("close from write session fail ", err, n, l)
 				s.Close()
 				break
 			}
@@ -173,12 +173,12 @@ func (s *Mux) readSession() {
 				}
 			}
 			if pack.Flag == common.MUX_NEW_CONN {
-				logs.Warn("unpack mux new connection", pack.Id)
+				//logs.Warn("unpack mux new connection", pack.Id)
 			}
 			s.pingOk = 0
 			switch pack.Flag {
 			case common.MUX_NEW_CONN: //new connection
-				logs.Warn("rec mux new connection", pack.Id)
+				//logs.Warn("rec mux new connection", pack.Id)
 				connection := NewConn(pack.Id, s)
 				s.connMap.Set(pack.Id, connection) //it has been set before send ok
 				go func(connection *conn) {
@@ -186,7 +186,7 @@ func (s *Mux) readSession() {
 				}(connection)
 				s.newConnCh <- connection
 				s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
-				logs.Warn("send mux new connection ok", connection.connId)
+				//logs.Warn("send mux new connection ok", connection.connId)
 				continue
 			case common.MUX_PING_FLAG: //ping
 				//logs.Warn("send mux ping return")
@@ -204,61 +204,61 @@ func (s *Mux) readSession() {
 						continue
 					}
 					connection.receiveWindow.WriteWg.Add(1)
-					logs.Warn("rec mux new msg ", connection.connId, string(pack.Content[0:15]))
+					//logs.Warn("rec mux new msg ", connection.connId, string(pack.Content[0:15]))
 					go func(connection *conn, content []byte) { // do not block read session
 						_, err := connection.receiveWindow.Write(content)
 						if err != nil {
 							logs.Warn("mux new msg err close", err)
-							s.Close()
+							connection.Close()
 						}
 						size := connection.receiveWindow.Size()
 						if size == 0 {
 							connection.receiveWindow.WindowFull = true
 						}
 						s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size)
-						logs.Warn("send mux new msg ok", connection.connId, size)
+						//logs.Warn("send mux new msg ok", connection.connId, size)
 						connection.receiveWindow.WriteWg.Done()
 					}(connection, pack.Content)
 					continue
 				case common.MUX_NEW_CONN_OK: //connection ok
-					logs.Warn("rec mux new connection ok ", pack.Id)
+					//logs.Warn("rec mux new connection ok ", pack.Id)
 					connection.connStatusOkCh <- struct{}{}
 					go connection.sendWindow.SetAllowSize(512)
 					// set the initial receive window both side
 					continue
 				case common.MUX_NEW_CONN_Fail:
-					logs.Warn("rec mux new connection fail", pack.Id)
+					//logs.Warn("rec mux new connection fail", pack.Id)
 					connection.connStatusFailCh <- struct{}{}
 					continue
 				case common.MUX_MSG_SEND_OK:
 					if connection.isClose {
-						logs.Warn("rec mux msg send ok id window closed!", pack.Id, pack.Window)
+						//logs.Warn("rec mux msg send ok id window closed!", pack.Id, pack.Window)
 						continue
 					}
-					logs.Warn("rec mux msg send ok id window", pack.Id, pack.Window)
+					//logs.Warn("rec mux msg send ok id window", pack.Id, pack.Window)
 					go connection.sendWindow.SetAllowSize(pack.Window)
 					continue
 				case common.MUX_CONN_CLOSE: //close the connection
-					logs.Warn("rec mux connection close", pack.Id)
+					//logs.Warn("rec mux connection close", pack.Id)
 					s.connMap.Delete(pack.Id)
 					connection.closeFlag = true
 					go func(connection *conn) {
-						logs.Warn("receive mux connection close, wg waiting", connection.connId)
+						//logs.Warn("receive mux connection close, wg waiting", connection.connId)
 						connection.receiveWindow.WriteWg.Wait()
-						logs.Warn("receive mux connection close, wg waited", connection.connId)
+						//logs.Warn("receive mux connection close, wg waited", connection.connId)
 						connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window
-						logs.Warn("receive mux connection close, finish", connection.connId)
+						//logs.Warn("receive mux connection close, finish", connection.connId)
 					}(connection)
 					continue
 				}
 			} else if pack.Flag == common.MUX_CONN_CLOSE {
-				logs.Warn("rec mux connection close no id ", pack.Id)
+				//logs.Warn("rec mux connection close no id ", pack.Id)
 				continue
 			}
 			common.MuxPack.Put(pack)
 		}
 		common.MuxPack.Put(pack)
-		logs.Warn("read session put pack ", pack.Id)
+		//logs.Warn("read session put pack ", pack.Id)
 		s.Close()
 	}()
 	select {