Преглед изворни кода

fix several race condition, change slide window max size 2G to 32M, add buffer release

ffdfgdfg пре 5 година
родитељ
комит
9bb8230fc1
5 измењених фајлова са 129 додато и 59 уклоњено
  1. 2 1
      lib/common/const.go
  2. 24 1
      lib/mux/conn.go
  3. 32 31
      lib/mux/map.go
  4. 47 10
      lib/mux/mux.go
  5. 24 16
      lib/mux/queue.go

+ 2 - 1
lib/common/const.go

@@ -49,5 +49,6 @@ const (
 	MUX_PING_RETURN
 	MUX_PING             int32 = -1
 	MAXIMUM_SEGMENT_SIZE       = PoolSizeWindow
-	MAXIMUM_WINDOW_SIZE        = 1<<31 - 1
+	MAXIMUM_WINDOW_SIZE        = 1 << 25 // 1<<31-1 TCP slide window size is very large,
+	// we use 32M, reduce memory usage
 )

+ 24 - 1
lib/mux/conn.go

@@ -210,7 +210,7 @@ func (Self *ReceiveWindow) calcSize() {
 	if Self.count == 0 {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
 		n := uint32(2 * math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
-			Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size()))
+			Self.mux.bw.Get() / float64(Self.mux.connMap.Size()))
 		if n < 8192 {
 			n = 8192
 		}
@@ -279,6 +279,9 @@ copyData:
 		// on the first Read method invoked, Self.off and Self.element.l
 		// both zero value
 		common.ListElementPool.Put(Self.element)
+		if Self.closeOp {
+			return 0, io.EOF
+		}
 		Self.element, err = Self.bufQueue.Pop()
 		// if the queue is empty, Pop method will wait until one element push
 		// into the queue successful, or timeout.
@@ -343,6 +346,26 @@ func (Self *ReceiveWindow) Stop() {
 func (Self *ReceiveWindow) CloseWindow() {
 	Self.window.CloseWindow()
 	Self.Stop()
+	Self.release()
+}
+
+func (Self *ReceiveWindow) release() {
+	//if Self.element != nil {
+	//	if Self.element.Buf != nil {
+	//		common.WindowBuff.Put(Self.element.Buf)
+	//	}
+	//	common.ListElementPool.Put(Self.element)
+	//}
+	for {
+		Self.element = Self.bufQueue.TryPop()
+		if Self.element == nil {
+			return
+		}
+		if Self.element.Buf != nil {
+			common.WindowBuff.Put(Self.element.Buf)
+		}
+		common.ListElementPool.Put(Self.element)
+	} // release resource
 }
 
 type SendWindow struct {

+ 32 - 31
lib/mux/map.go

@@ -2,32 +2,35 @@ package mux
 
 import (
 	"sync"
-	"time"
 )
 
 type connMap struct {
 	connMap map[int32]*conn
-	closeCh chan struct{}
+	//closeCh chan struct{}
 	sync.RWMutex
 }
 
 func NewConnMap() *connMap {
 	connMap := &connMap{
 		connMap: make(map[int32]*conn),
-		closeCh: make(chan struct{}),
+		//closeCh: make(chan struct{}),
 	}
-	go connMap.clean()
+	//go connMap.clean()
 	return connMap
 }
 
 func (s *connMap) Size() (n int) {
-	return len(s.connMap)
+	s.Lock()
+	n = len(s.connMap)
+	s.Unlock()
+	return
 }
 
 func (s *connMap) Get(id int32) (*conn, bool) {
 	s.Lock()
-	defer s.Unlock()
-	if v, ok := s.connMap[id]; ok && v != nil {
+	v, ok := s.connMap[id]
+	s.Unlock()
+	if ok && v != nil {
 		return v, true
 	}
 	return nil, false
@@ -35,40 +38,38 @@ func (s *connMap) Get(id int32) (*conn, bool) {
 
 func (s *connMap) Set(id int32, v *conn) {
 	s.Lock()
-	defer s.Unlock()
 	s.connMap[id] = v
+	s.Unlock()
 }
 
 func (s *connMap) Close() {
-	s.Lock()
-	defer s.Unlock()
+	//s.closeCh <- struct{}{} // stop the clean goroutine first
 	for _, v := range s.connMap {
-		v.isClose = true
+		v.Close() // close all the connections in the mux
 	}
-	s.closeCh <- struct{}{}
 }
 
 func (s *connMap) Delete(id int32) {
 	s.Lock()
-	defer s.Unlock()
 	delete(s.connMap, id)
+	s.Unlock()
 }
 
-func (s *connMap) clean() {
-	ticker := time.NewTimer(time.Minute * 1)
-	for {
-		select {
-		case <-ticker.C:
-			s.Lock()
-			for _, v := range s.connMap {
-				if v.isClose {
-					delete(s.connMap, v.connId)
-				}
-			}
-			s.Unlock()
-		case <-s.closeCh:
-			ticker.Stop()
-			return
-		}
-	}
-}
+//func (s *connMap) clean() {
+//	ticker := time.NewTimer(time.Minute * 1)
+//	for {
+//		select {
+//		case <-ticker.C:
+//			s.Lock()
+//			for _, v := range s.connMap {
+//				if v.isClose {
+//					delete(s.connMap, v.connId)
+//				}
+//			}
+//			s.Unlock()
+//		case <-s.closeCh:
+//			ticker.Stop()
+//			return
+//		}
+//	}
+//}

+ 47 - 10
lib/mux/mux.go

@@ -122,6 +122,9 @@ func (s *Mux) packBuf() {
 		}
 		buffer.Reset()
 		pack := s.writeQueue.Pop()
+		if s.IsClose {
+			break
+		}
 		//buffer := common.BuffPool.Get()
 		err := pack.Pack(buffer)
 		common.MuxPack.Put(pack)
@@ -218,7 +221,9 @@ func (s *Mux) pingReturn() {
 				// convert float64 to bits, store it atomic
 			}
 			//logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency)))
-			common.WindowBuff.Put(data)
+			if cap(data) > 0 {
+				common.WindowBuff.Put(data)
+			}
 		}
 	}()
 }
@@ -227,7 +232,13 @@ func (s *Mux) readSession() {
 	go func() {
 		var connection *conn
 		for {
+			if s.IsClose {
+				break
+			}
 			connection = s.newConnQueue.Pop()
+			if s.IsClose {
+				break // make sure that is closed
+			}
 			s.connMap.Set(connection.connId, connection) //it has been set before send ok
 			s.newConnCh <- connection
 			s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
@@ -287,9 +298,9 @@ func (s *Mux) readSession() {
 					connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
 					continue
 				case common.MUX_CONN_CLOSE: //close the connection
-					s.connMap.Delete(pack.Id)
-					//go func(connection *conn) {
 					connection.closeFlag = true
+					//s.connMap.Delete(pack.Id)
+					//go func(connection *conn) {
 					connection.receiveWindow.Stop() // close signal to receive window
 					//}(connection)
 					continue
@@ -322,17 +333,42 @@ func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) {
 	return
 }
 
-func (s *Mux) Close() error {
+func (s *Mux) Close() (err error) {
 	logs.Warn("close mux")
 	if s.IsClose {
 		return errors.New("the mux has closed")
 	}
 	s.IsClose = true
 	s.connMap.Close()
+	s.connMap = nil
 	//s.bufQueue.Stop()
 	s.closeChan <- struct{}{}
 	close(s.newConnCh)
-	return s.conn.Close()
+	err = s.conn.Close()
+	s.release()
+	return
+}
+
+func (s *Mux) release() {
+	for {
+		pack := s.writeQueue.TryPop()
+		if pack == nil {
+			break
+		}
+		if pack.BasePackager.Content != nil {
+			common.WindowBuff.Put(pack.BasePackager.Content)
+		}
+		common.MuxPack.Put(pack)
+	}
+	for {
+		connection := s.newConnQueue.TryPop()
+		if connection == nil {
+			break
+		}
+		connection = nil
+	}
+	s.writeQueue.Stop()
+	s.newConnQueue.Stop()
 }
 
 //get new connId as unique flag
@@ -352,7 +388,7 @@ type bandwidth struct {
 	readStart     time.Time
 	lastReadStart time.Time
 	bufLength     uint16
-	readBandwidth float64
+	readBandwidth uint64 // store in bits, but it's float64
 }
 
 func (Self *bandwidth) StartRead() {
@@ -371,16 +407,17 @@ func (Self *bandwidth) SetCopySize(n uint16) {
 
 func (Self *bandwidth) calcBandWidth() {
 	t := Self.readStart.Sub(Self.lastReadStart)
-	Self.readBandwidth = float64(Self.bufLength) / t.Seconds()
+	atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds()))
 	Self.bufLength = 0
 }
 
 func (Self *bandwidth) Get() (bw float64) {
 	// The zero value, 0 for numeric types
-	if Self.readBandwidth <= 0 {
-		Self.readBandwidth = 100
+	bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth))
+	if bw <= 0 {
+		bw = 100
 	}
-	return Self.readBandwidth
+	return
 }
 
 const counterBits = 4

+ 24 - 16
lib/mux/queue.go

@@ -59,7 +59,7 @@ const maxStarving uint8 = 8
 func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
 	var iter bool
 	for {
-		packager = Self.pop()
+		packager = Self.TryPop()
 		if packager != nil {
 			return
 		}
@@ -75,20 +75,20 @@ func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
 	}
 	Self.cond.L.Lock()
 	defer Self.cond.L.Unlock()
-	for packager = Self.pop(); packager == nil; {
+	for packager = Self.TryPop(); packager == nil; {
 		if Self.stop {
 			return
 		}
 		//logs.Warn("queue into wait")
 		Self.cond.Wait()
 		// wait for it with no more iter
-		packager = Self.pop()
+		packager = Self.TryPop()
 		//logs.Warn("queue wait finish", packager)
 	}
 	return
 }
 
-func (Self *PriorityQueue) pop() (packager *common.MuxPackager) {
+func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) {
 	ptr, ok := Self.highestChain.popTail()
 	if ok {
 		packager = (*common.MuxPackager)(ptr)
@@ -150,7 +150,7 @@ func (Self *ConnQueue) Push(connection *conn) {
 func (Self *ConnQueue) Pop() (connection *conn) {
 	var iter bool
 	for {
-		connection = Self.pop()
+		connection = Self.TryPop()
 		if connection != nil {
 			return
 		}
@@ -166,20 +166,20 @@ func (Self *ConnQueue) Pop() (connection *conn) {
 	}
 	Self.cond.L.Lock()
 	defer Self.cond.L.Unlock()
-	for connection = Self.pop(); connection == nil; {
+	for connection = Self.TryPop(); connection == nil; {
 		if Self.stop {
 			return
 		}
 		//logs.Warn("queue into wait")
 		Self.cond.Wait()
 		// wait for it with no more iter
-		connection = Self.pop()
+		connection = Self.TryPop()
 		//logs.Warn("queue wait finish", packager)
 	}
 	return
 }
 
-func (Self *ConnQueue) pop() (connection *conn) {
+func (Self *ConnQueue) TryPop() (connection *conn) {
 	ptr, ok := Self.chain.popTail()
 	if ok {
 		connection = (*conn)(ptr)
@@ -261,18 +261,26 @@ startPop:
 	}
 	// length is not zero, so try to pop
 	for {
-		ptr, ok := Self.chain.popTail()
-		if ok {
-			//logs.Warn("window pop before", Self.Len())
-			element = (*common.ListElement)(ptr)
-			atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<<dequeueBits - 1))
-			//logs.Warn("window pop", Self.Len(), uint32(element.l))
+		element = Self.TryPop()
+		if element != nil {
 			return
 		}
 		runtime.Gosched() // another goroutine is still pushing
 	}
 }
 
+func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) {
+	ptr, ok := Self.chain.popTail()
+	if ok {
+		//logs.Warn("window pop before", Self.Len())
+		element = (*common.ListElement)(ptr)
+		atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<<dequeueBits - 1))
+		//logs.Warn("window pop", Self.Len(), uint32(element.l))
+		return
+	}
+	return nil
+}
+
 func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
 	//logs.Warn("allow pop", Self.Len())
 	select {
@@ -539,7 +547,7 @@ func (c *bufChain) popTail() (unsafe.Pointer, bool) {
 		// It's important that we load the next pointer
 		// *before* popping the tail. In general, d may be
 		// transiently empty, but if next is non-nil before
-		// the pop and the pop fails, then d is permanently
+		// the TryPop and the TryPop fails, then d is permanently
 		// empty, which is the only condition under which it's
 		// safe to drop d from the chain.
 		d2 := loadPoolChainElt(&d.next)
@@ -556,7 +564,7 @@ func (c *bufChain) popTail() (unsafe.Pointer, bool) {
 
 		// The tail of the chain has been drained, so move on
 		// to the next dequeue. Try to drop it from the chain
-		// so the next pop doesn't have to look at the empty
+		// so the next TryPop doesn't have to look at the empty
 		// dequeue again.
 		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
 			// We won the race. Clear the prev pointer so