瀏覽代碼

Revert "change mux data struct, fix #250"

This reverts commit ae28d41231c68c226a0a0c82f3341c42cef6c620.
ffdfgdfg 5 年之前
父節點
當前提交
2629078988
共有 4 個文件被更改,包括 80 次插入72 次删除
  1. 71 63
      lib/common/netpackager.go
  2. 3 3
      lib/mux/conn.go
  3. 5 5
      lib/mux/mux.go
  4. 1 1
      lib/version/version.go

+ 71 - 63
lib/common/netpackager.go

@@ -22,34 +22,24 @@ type BasePackager struct {
 	Content []byte
 }
 
-func (Self *BasePackager) NewPac(content []byte) (err error) {
+func (Self *BasePackager) NewPac(contents ...interface{}) (err error) {
 	Self.clean()
-	if content != nil {
-		n := len(content)
-		if n > MAXIMUM_SEGMENT_SIZE {
-			err = errors.New("mux:packer: newpack content segment too large")
+	for _, content := range contents {
+		switch content.(type) {
+		case nil:
+			Self.Content = Self.Content[:0]
+		case []byte:
+			err = Self.appendByte(content.([]byte))
+		case string:
+			err = Self.appendByte([]byte(content.(string)))
+			if err != nil {
+				return
+			}
+			err = Self.appendByte([]byte(CONN_DATA_SEQ))
+		default:
+			err = Self.marshal(content)
 		}
-		Self.Content = Self.Content[:n]
-		copy(Self.Content, content)
-	} else {
-		Self.Content = Self.Content[:0]
-	}
-	//for _, content := range contents {
-	//	switch content.(type) {
-	//	case nil:
-	//		Self.Content = Self.Content[:0]
-	//	case []byte:
-	//		err = Self.appendByte(content.([]byte))
-	//	case string:
-	//		err = Self.appendByte([]byte(content.(string)))
-	//		if err != nil {
-	//			return
-	//		}
-	//		err = Self.appendByte([]byte(CONN_DATA_SEQ))
-	//	default:
-	//		err = Self.marshal(content)
-	//	}
-	//}
+	}
 	Self.setLength()
 	return
 }
@@ -88,9 +78,6 @@ func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) {
 	if int(Self.Length) > cap(Self.Content) {
 		err = errors.New("unpack err, content length too large")
 	}
-	if Self.Length > MAXIMUM_SEGMENT_SIZE {
-		err = errors.New("mux:packer: unpack content segment too large")
-	}
 	Self.Content = Self.Content[:int(Self.Length)]
 	//n, err := io.ReadFull(reader, Self.Content)
 	//if n != int(Self.Length) {
@@ -141,49 +128,61 @@ type ConnPackager struct {
 	BasePackager
 }
 
-//func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) {
-//	Self.ConnType = connType
-//	err = Self.BasePackager.NewPac(content...)
-//	return
-//}
-//
-//func (Self *ConnPackager) Pack(writer io.Writer) (err error) {
-//	err = binary.Write(writer, binary.LittleEndian, Self.ConnType)
-//	if err != nil {
-//		return
-//	}
-//	err = Self.BasePackager.Pack(writer)
-//	return
-//}
-//
-//func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
-//	err = binary.Read(reader, binary.LittleEndian, &Self.ConnType)
-//	if err != nil && err != io.EOF {
-//		return
-//	}
-//	n, err = Self.BasePackager.UnPack(reader)
-//	n += 2
-//	return
-//}
+func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) {
+	Self.ConnType = connType
+	err = Self.BasePackager.NewPac(content...)
+	return
+}
+
+func (Self *ConnPackager) Pack(writer io.Writer) (err error) {
+	err = binary.Write(writer, binary.LittleEndian, Self.ConnType)
+	if err != nil {
+		return
+	}
+	err = Self.BasePackager.Pack(writer)
+	return
+}
+
+func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
+	err = binary.Read(reader, binary.LittleEndian, &Self.ConnType)
+	if err != nil && err != io.EOF {
+		return
+	}
+	n, err = Self.BasePackager.UnPack(reader)
+	n += 2
+	return
+}
 
 type MuxPackager struct {
-	Flag         uint8
-	Id           int32
-	RemainLength uint32
+	Flag       uint8
+	Id         int32
+	Window     uint32
+	ReadLength uint32
 	BasePackager
 }
 
-func (Self *MuxPackager) NewPac(flag uint8, id int32, content interface{}) (err error) {
+func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) {
 	Self.Flag = flag
 	Self.Id = id
 	switch flag {
 	case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART:
 		Self.Content = WindowBuff.Get()
-		err = Self.BasePackager.NewPac(content.([]byte))
+		err = Self.BasePackager.NewPac(content...)
 		//logs.Warn(Self.Length, string(Self.Content))
 	case MUX_MSG_SEND_OK:
 		// MUX_MSG_SEND_OK contains two data
-		Self.RemainLength = content.(uint32)
+		switch content[0].(type) {
+		case int:
+			Self.Window = uint32(content[0].(int))
+		case uint32:
+			Self.Window = content[0].(uint32)
+		}
+		switch content[1].(type) {
+		case int:
+			Self.ReadLength = uint32(content[1].(int))
+		case uint32:
+			Self.ReadLength = content[1].(uint32)
+		}
 	}
 	return
 }
@@ -202,7 +201,11 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 		err = Self.BasePackager.Pack(writer)
 		WindowBuff.Put(Self.Content)
 	case MUX_MSG_SEND_OK:
-		err = binary.Write(writer, binary.LittleEndian, Self.RemainLength)
+		err = binary.Write(writer, binary.LittleEndian, Self.Window)
+		if err != nil {
+			return
+		}
+		err = binary.Write(writer, binary.LittleEndian, Self.ReadLength)
 	}
 	return
 }
@@ -223,7 +226,12 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
 		n, err = Self.BasePackager.UnPack(reader)
 		//logs.Warn("unpack", Self.Length, string(Self.Content))
 	case MUX_MSG_SEND_OK:
-		err = binary.Read(reader, binary.LittleEndian, &Self.RemainLength)
+		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
+		if err != nil {
+			return
+		}
+		n += 4 // uint32
+		err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength)
 		n += 4 // uint32
 	}
 	n += 5 //uint8 int32
@@ -265,10 +273,10 @@ func (addr *Addr) Decode(b []byte) error {
 	pos := 1
 	switch addr.Type {
 	case ipV4:
-		addr.Host = net.IP(b[pos : pos+net.IPv4len]).String()
+		addr.Host = net.IP(b[pos:pos+net.IPv4len]).String()
 		pos += net.IPv4len
 	case ipV6:
-		addr.Host = net.IP(b[pos : pos+net.IPv6len]).String()
+		addr.Host = net.IP(b[pos:pos+net.IPv6len]).String()
 		pos += net.IPv6len
 	case domainName:
 		addrlen := int(b[pos])

+ 3 - 3
lib/mux/conn.go

@@ -265,7 +265,7 @@ start:
 	Self.bufQueue.Push(element)
 	// status check finish, now we can push the element into the queue
 	if wait == 0 {
-		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, newRemaining)
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
 		// send the remaining window size, not including zero size
 	}
 	return nil
@@ -333,7 +333,7 @@ func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
 	// now we get the current window status success
 	if wait == 1 {
 		//logs.Warn("send the wait status", remaining)
-		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, remaining)
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
 	}
 	return
 }
@@ -394,7 +394,7 @@ func (Self *SendWindow) SetSendBuf(buf []byte) {
 	Self.off = 0
 }
 
-func (Self *SendWindow) SetSize(newRemaining uint32) (closed bool) {
+func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
 	// set the window size from receive window
 	defer func() {
 		if recover() != nil {

+ 5 - 5
lib/mux/mux.go

@@ -92,13 +92,13 @@ func (s *Mux) Addr() net.Addr {
 	return s.conn.LocalAddr()
 }
 
-func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
+func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
 	if s.IsClose {
 		return
 	}
 	var err error
 	pack := common.MuxPack.Get()
-	err = pack.NewPac(flag, id, data)
+	err = pack.NewPac(flag, id, data...)
 	if err != nil {
 		common.MuxPack.Put(pack)
 		logs.Error("mux: new pack err", err)
@@ -173,7 +173,7 @@ func (s *Mux) ping() {
 		s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
 		// send the ping flag and get the latency first
 		ticker := time.NewTicker(time.Second * 5)
-		defer ticker.Stop()
+    defer ticker.Stop()
 		for {
 			if s.IsClose {
 				break
@@ -198,7 +198,7 @@ func (s *Mux) ping() {
 			}
 			atomic.AddUint32(&s.pingOk, 1)
 		}
-		return
+    return
 	}()
 }
 
@@ -297,7 +297,7 @@ func (s *Mux) readSession() {
 					if connection.isClose {
 						continue
 					}
-					connection.sendWindow.SetSize(pack.RemainLength)
+					connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
 					continue
 				case common.MUX_CONN_CLOSE: //close the connection
 					connection.closeFlag = true

+ 1 - 1
lib/version/version.go

@@ -4,5 +4,5 @@ const VERSION = "0.25.2"
 
 // Compulsory minimum version, Minimum downward compatibility to this version
 func GetVersion() string {
-	return "0.25.2"
+	return "0.25.0"
 }