Forráskód Böngészése

Change BuffSizeCopy pool

ffdfgdfg 5 éve
szülő
commit
53c2e472ae
5 módosított fájl, 39 hozzáadás és 10 törlés
  1. 5 3
      lib/common/netpackager.go
  2. 2 2
      lib/mux/conn.go
  3. 3 3
      lib/mux/mux.go
  4. 1 1
      lib/mux/queue.go
  5. 28 1
      lib/pool/pool.go

+ 5 - 3
lib/common/netpackager.go

@@ -20,6 +20,7 @@ type BasePackager struct {
 }
 
 func (Self *BasePackager) NewPac(contents ...interface{}) (err error) {
+	Self.Content = pool.CopyBuff.Get()
 	Self.clean()
 	for _, content := range contents {
 		switch content.(type) {
@@ -45,6 +46,7 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) {
 		return
 	}
 	err = binary.Write(writer, binary.LittleEndian, Self.Content)
+	pool.CopyBuff.Put(Self.Content)
 	return
 }
 
@@ -56,13 +58,13 @@ func (Self *BasePackager) UnPack(reader io.Reader) (err error) {
 	if err != nil {
 		return
 	}
-	Self.Content = pool.GetBufPoolCopy()
+	Self.Content = pool.CopyBuff.Get()
 	Self.Content = Self.Content[:Self.Length]
 	//n, err := io.ReadFull(reader, Self.Content)
 	//if n != int(Self.Length) {
 	//	err = io.ErrUnexpectedEOF
 	//}
-	err = binary.Read(reader, binary.LittleEndian, &Self.Content)
+	err = binary.Read(reader, binary.LittleEndian, Self.Content)
 	return
 }
 
@@ -160,7 +162,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
 }
 
 func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
-	Self.Length=0
+	Self.Length = 0
 	err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
 	if err != nil {
 		return

+ 2 - 2
lib/mux/conn.go

@@ -72,7 +72,6 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 			s.Close()
 			return 0, io.EOF
 		} else {
-			//pool.PutBufPoolCopy(s.readBuffer)
 			if node.val == nil {
 				//close
 				s.sendClose = true
@@ -91,6 +90,7 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 	} else {
 		n = copy(buf, s.readBuffer[s.startRead:s.endRead])
 		s.startRead += n
+		pool.CopyBuff.Put(s.readBuffer)
 	}
 	return
 }
@@ -137,7 +137,7 @@ func (s *conn) Close() (err error) {
 		return errors.New("the conn has closed")
 	}
 	s.isClose = true
-	pool.PutBufPoolCopy(s.readBuffer)
+	pool.CopyBuff.Put(s.readBuffer)
 	if s.readWait {
 		s.readCh <- struct{}{}
 	}

+ 3 - 3
lib/mux/mux.go

@@ -162,8 +162,8 @@ func (s *Mux) readSession() {
 				break
 			}
 			if pack.Flag != 0 && pack.Flag != 7 {
-				if pack.Length>10 {
-					logs.Warn(pack.Flag, pack.Id, pack.Length,string(pack.Content[:10]))
+				if pack.Length > 10 {
+					logs.Warn(pack.Flag, pack.Id, pack.Length, string(pack.Content[:10]))
 				}
 			}
 			s.pingOk = 0
@@ -205,7 +205,7 @@ func (s *Mux) readSession() {
 					s.connMap.Delete(pack.Id)
 				}
 			} else if pack.Flag == common.MUX_NEW_MSG {
-				pool.PutBufPoolCopy(pack.Content)
+				pool.CopyBuff.Put(pack.Content)
 			}
 		}
 		s.Close()

+ 1 - 1
lib/mux/queue.go

@@ -63,7 +63,7 @@ func (entry *sliceEntry) Clear() bool {
 		return false
 	}
 	for i := 0; i < entry.Size(); i++ {
-		pool.PutBufPoolCopy(entry.element[i].val)
+		pool.CopyBuff.Put(entry.element[i].val)
 		entry.element[i] = nil
 	}
 	entry.element = nil

+ 28 - 1
lib/pool/pool.go

@@ -59,6 +59,27 @@ func PutBufPoolMax(buf []byte) {
 	}
 }
 
+type CopyBufferPool struct {
+	pool sync.Pool
+}
+
+func (Self *CopyBufferPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			return make([]byte, PoolSizeCopy)
+		},
+	}
+}
+
+func (Self *CopyBufferPool) Get() []byte {
+	return Self.pool.Get().([]byte)
+}
+
+func (Self *CopyBufferPool) Put(x []byte) {
+	x = x[:0]
+	Self.pool.Put(x)
+}
+
 type BufferPool struct {
 	pool sync.Pool
 }
@@ -82,7 +103,13 @@ func (Self *BufferPool) Put(x *bytes.Buffer) {
 
 var once = sync.Once{}
 var BuffPool = BufferPool{}
+var CopyBuff = CopyBufferPool{}
+
+func newPool() {
+	BuffPool.New()
+	CopyBuff.New()
+}
 
 func init() {
-	once.Do(BuffPool.New)
+	once.Do(newPool)
 }