瀏覽代碼

add test code

ffdfgdfg 5 年之前
父節點
當前提交
442354db17
共有 4 個文件被更改,包括 300 次插入161 次删除
  1. 36 57
      lib/mux/conn.go
  2. 3 3
      lib/mux/mux.go
  3. 241 89
      lib/mux/mux_test.go
  4. 20 12
      lib/mux/queue.go

+ 36 - 57
lib/mux/conn.go

@@ -2,7 +2,6 @@ package mux
 
 import (
 	"errors"
-	"github.com/astaxie/beego/logs"
 	"io"
 	"net"
 	"strconv"
@@ -178,21 +177,17 @@ func (Self *ReceiveWindow) New(mux *Mux) {
 	Self.window.New()
 }
 
-func (Self *ReceiveWindow) RemainingSize() (n uint32) {
+func (Self *ReceiveWindow) remainingSize() (n uint32) {
 	// receive window remaining
-	if Self.maxSize >= Self.bufQueue.Len() {
-		n = Self.maxSize - Self.bufQueue.Len()
-	}
-	// if maxSize is small than bufQueue length, return 0
-	return
+	return Self.maxSize - Self.bufQueue.Len()
 }
 
-func (Self *ReceiveWindow) ReadSize() (n uint32) {
+func (Self *ReceiveWindow) readSize() (n uint32) {
 	// acknowledge the size already read
 	return atomic.SwapUint32(&Self.readLength, 0)
 }
 
-func (Self *ReceiveWindow) CalcSize() {
+func (Self *ReceiveWindow) calcSize() {
 	// calculating maximum receive window size
 	if Self.count == 0 {
 		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
@@ -222,22 +217,22 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err
 	if Self.closeOp {
 		return errors.New("conn.receiveWindow: write on closed window")
 	}
-	element := ListElement{}
+	element := new(ListElement)
 	err = element.New(buf, l, part)
 	//logs.Warn("push the buf", len(buf), l, (&element).l)
 	if err != nil {
 		return
 	}
-	Self.bufQueue.Push(&element) // must push data before allow read
+	Self.bufQueue.Push(element) // must push data before allow read
 	//logs.Warn("read session calc size ", Self.maxSize)
 	// calculating the receive window size
-	Self.CalcSize()
+	Self.calcSize()
 	//logs.Warn("read session calc size finish", Self.maxSize)
-	if Self.RemainingSize() == 0 {
+	if Self.remainingSize() == 0 {
 		Self.windowFull = true
 		//logs.Warn("window full true", Self.windowFull)
 	}
-	Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
+	Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
 	return nil
 }
 
@@ -273,10 +268,10 @@ copyData:
 	n += l
 	l = 0
 	//Self.bw.EndRead()
-	Self.sendStatus(id)
 	if Self.off == uint32(Self.element.l) {
 		//logs.Warn("put the element end ", string(Self.element.buf[:15]))
 		common.WindowBuff.Put(Self.element.buf)
+		Self.sendStatus(id)
 	}
 	if pOff < len(p) && Self.element.part {
 		// element is a part of the segments, trying to fill up buf p
@@ -289,7 +284,7 @@ func (Self *ReceiveWindow) sendStatus(id int32) {
 	if Self.windowFull || Self.bufQueue.Len() == 0 {
 		// window is full before read or empty now
 		Self.windowFull = false
-		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize())
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
 		// acknowledge other side, have empty some receive window space
 		//}
 	}
@@ -314,11 +309,10 @@ type SendWindow struct {
 	buf         []byte
 	sentLength  uint32
 	setSizeCh   chan struct{}
-	setSizeWait bool
+	setSizeWait int32
 	unSlide     uint32
 	timeout     time.Time
 	window
-	mutex sync.Mutex
 }
 
 func (Self *SendWindow) New(mux *Mux) {
@@ -330,17 +324,12 @@ func (Self *SendWindow) New(mux *Mux) {
 
 func (Self *SendWindow) SetSendBuf(buf []byte) {
 	// send window buff from conn write method, set it to send window
-	Self.mutex.Lock()
 	Self.buf = buf
 	Self.off = 0
-	Self.mutex.Unlock()
 }
 
 func (Self *SendWindow) RemainingSize() (n uint32) {
-	if Self.maxSize >= Self.sentLength {
-		n = Self.maxSize - Self.sentLength
-	}
-	return
+	return atomic.LoadUint32(&Self.maxSize) - atomic.LoadUint32(&Self.sentLength)
 }
 
 func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
@@ -353,25 +342,21 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
 		close(Self.setSizeCh)
 		return true
 	}
-	if readLength == 0 && Self.maxSize == windowSize {
+
+	if readLength == 0 && atomic.LoadUint32(&Self.maxSize) == windowSize {
 		//logs.Warn("waiting for another window size")
 		return false // waiting for receive another usable window size
 	}
 	//logs.Warn("set send window size to ", windowSize, readLength)
-	Self.mutex.Lock()
 	Self.slide(windowSize, readLength)
-	if Self.setSizeWait {
+	if Self.RemainingSize() == 0 {
+		//logs.Warn("waiting for another window size after slide")
+		// keep the wait status
+		atomic.StoreInt32(&Self.setSizeWait, 1)
+		return false
+	}
+	if atomic.CompareAndSwapInt32(&Self.setSizeWait, 1, 0) {
 		// send window into the wait status, need notice the channel
-		//logs.Warn("send window remaining size is 0 , wait")
-		if Self.RemainingSize() == 0 {
-			//logs.Warn("waiting for another window size after slide")
-			// keep the wait status
-			Self.mutex.Unlock()
-			return false
-		}
-		Self.setSizeWait = false
-		Self.mutex.Unlock()
-		//logs.Warn("send window remaining size is 0 starting wait")
 		select {
 		case Self.setSizeCh <- struct{}{}:
 			//logs.Warn("send window remaining size is 0 finish")
@@ -382,43 +367,36 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
 		}
 	}
 	// send window not into the wait status, so just do slide
-	Self.mutex.Unlock()
 	return false
 }
 
 func (Self *SendWindow) slide(windowSize, readLength uint32) {
-	Self.sentLength -= readLength
-	Self.maxSize = windowSize
+	atomic.AddUint32(&Self.sentLength, ^readLength-1)
+	atomic.SwapUint32(&Self.maxSize, windowSize)
 }
 
-func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
+func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
 	// returns buf segments, return only one segments, need a loop outside
 	// until err = io.EOF
 	if Self.closeOp {
-		return nil, false, errors.New("conn.writeWindow: window closed")
+		return nil, 0, false, errors.New("conn.writeWindow: window closed")
 	}
 	if Self.off == uint32(len(Self.buf)) {
-		return nil, false, io.EOF
+		return nil, 0, false, io.EOF
 		// send window buff is drain, return eof and get another one
 	}
-	Self.mutex.Lock()
 	if Self.RemainingSize() == 0 {
-		Self.setSizeWait = true
-		Self.mutex.Unlock()
+		atomic.StoreInt32(&Self.setSizeWait, 1)
 		// into the wait status
 		err = Self.waitReceiveWindow()
 		if err != nil {
-			return nil, false, err
+			return nil, 0, false, err
 		}
-	} else {
-		Self.mutex.Unlock()
 	}
-	Self.mutex.Lock()
-	var sendSize uint32
 	if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
 		sendSize = common.MAXIMUM_SEGMENT_SIZE
 		part = true
-		logs.Warn("cut buf by mss")
+		//logs.Warn("cut buf by mss")
 	} else {
 		sendSize = uint32(len(Self.buf[Self.off:]))
 		part = false
@@ -427,14 +405,13 @@ func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) {
 		// usable window size is small than
 		// window MAXIMUM_SEGMENT_SIZE or send buf left
 		sendSize = Self.RemainingSize()
-		logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
+		//logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
 		part = true
 	}
 	//logs.Warn("send size", sendSize)
 	p = Self.buf[Self.off : sendSize+Self.off]
 	Self.off += sendSize
-	Self.sentLength += sendSize
-	Self.mutex.Unlock()
+	atomic.AddUint32(&Self.sentLength, sendSize)
 	return
 }
 
@@ -463,8 +440,9 @@ func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
 	Self.SetSendBuf(buf) // set the buf to send window
 	var bufSeg []byte
 	var part bool
+	var l uint32
 	for {
-		bufSeg, part, err = Self.WriteTo()
+		bufSeg, l, part, err = Self.WriteTo()
 		//logs.Warn("buf seg", len(bufSeg), part, err)
 		// get the buf segments from send window
 		if bufSeg == nil && part == false && err == io.EOF {
@@ -475,7 +453,8 @@ func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
 		if err != nil {
 			break
 		}
-		n += len(bufSeg)
+		n += int(l)
+		l = 0
 		if part {
 			Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
 		} else {

+ 3 - 3
lib/mux/mux.go

@@ -41,7 +41,7 @@ func NewMux(c net.Conn, connType string) *Mux {
 		connMap:   NewConnMap(),
 		id:        0,
 		closeChan: make(chan struct{}, 3),
-		newConnCh: make(chan *conn),
+		newConnCh: make(chan *conn, 10),
 		bw:        new(bandwidth),
 		IsClose:   false,
 		connType:  connType,
@@ -321,11 +321,11 @@ func (s *Mux) Close() error {
 func (s *Mux) getId() (id int32) {
 	//Avoid going beyond the scope
 	if (math.MaxInt32 - s.id) < 10000 {
-		atomic.SwapInt32(&s.id, 0)
+		atomic.StoreInt32(&s.id, 0)
 	}
 	id = atomic.AddInt32(&s.id, 1)
 	if _, ok := s.connMap.Get(id); ok {
-		s.getId()
+		return s.getId()
 	}
 	return
 }

+ 241 - 89
lib/mux/mux_test.go

@@ -3,19 +3,18 @@ package mux
 import (
 	"bufio"
 	"fmt"
-	"io"
+	"github.com/cnlh/nps/lib/common"
+	"log"
 	"net"
 	"net/http"
 	"net/http/httputil"
 	_ "net/http/pprof"
 	"strconv"
-	"sync"
 	"testing"
 	"time"
 	"unsafe"
 
 	"github.com/astaxie/beego/logs"
-	"github.com/cnlh/nps/lib/common"
 )
 
 var conn1 net.Conn
@@ -49,42 +48,42 @@ func TestNewMux(t *testing.T) {
 			//c2.(*net.TCPConn).SetReadBuffer(0)
 			//c2.(*net.TCPConn).SetReadBuffer(0)
 			go func(c2 net.Conn, c *conn) {
-				wg := sync.WaitGroup{}
-				wg.Add(1)
-				go func() {
-					_, err = common.CopyBuffer(c2, c)
-					if err != nil {
-						c2.Close()
-						c.Close()
-						logs.Warn("close npc by copy from nps", err, c.connId)
-					}
-					wg.Done()
-				}()
-				wg.Add(1)
-				go func() {
-					_, err = common.CopyBuffer(c, c2)
-					if err != nil {
-						c2.Close()
-						c.Close()
-						logs.Warn("close npc by copy from server", err, c.connId)
-					}
-					wg.Done()
-				}()
-				//logs.Warn("npc wait")
-				wg.Wait()
+				//wg := sync.WaitGroup{}
+				//wg.Add(1)
+				//go func() {
+				//	_, err = common.CopyBuffer(c2, c)
+				//	if err != nil {
+				//		c2.Close()
+				//		c.Close()
+				//		logs.Warn("close npc by copy from nps", err, c.connId)
+				//	}
+				//	wg.Done()
+				//}()
+				//wg.Add(1)
+				//go func() {
+				//	_, err = common.CopyBuffer(c, c2)
+				//	if err != nil {
+				//		c2.Close()
+				//		c.Close()
+				//		logs.Warn("close npc by copy from server", err, c.connId)
+				//	}
+				//	wg.Done()
+				//}()
+				////logs.Warn("npc wait")
+				//wg.Wait()
 			}(c2, c.(*conn))
 		}
 	}()
 
 	go func() {
-		m1 := NewMux(conn1, "tcp")
+		//m1 := NewMux(conn1, "tcp")
 		l, err := net.Listen("tcp", "127.0.0.1:7777")
 		if err != nil {
 			logs.Warn(err)
 		}
 		for {
 			//logs.Warn("nps starting accept")
-			conns, err := l.Accept()
+			_, err := l.Accept()
 			if err != nil {
 				logs.Warn(err)
 				continue
@@ -92,37 +91,37 @@ func TestNewMux(t *testing.T) {
 			//conns.(*net.TCPConn).SetReadBuffer(0)
 			//conns.(*net.TCPConn).SetReadBuffer(0)
 			//logs.Warn("nps accept success starting new conn")
-			tmpCpnn, err := m1.NewConn()
-			if err != nil {
-				logs.Warn("nps new conn err ", err)
-				continue
-			}
-			logs.Warn("nps new conn success ", tmpCpnn.connId)
-			go func(tmpCpnn *conn, conns net.Conn) {
-				go func() {
-					_, err := common.CopyBuffer(tmpCpnn, conns)
-					if err != nil {
-						conns.Close()
-						tmpCpnn.Close()
-						logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
-					}
-				}()
-				//time.Sleep(time.Second)
-				_, err = common.CopyBuffer(conns, tmpCpnn)
-				if err != nil {
-					conns.Close()
-					tmpCpnn.Close()
-					logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
-				}
-			}(tmpCpnn, conns)
+			//tmpCpnn, err := m1.NewConn()
+			//if err != nil {
+			//	logs.Warn("nps new conn err ", err)
+			//	continue
+			//}
+			////logs.Warn("nps new conn success ", tmpCpnn.connId)
+			//go func(tmpCpnn *conn, conns net.Conn) {
+			//	//go func() {
+			//	//	_, err := common.CopyBuffer(tmpCpnn, conns)
+			//	//	if err != nil {
+			//	//		conns.Close()
+			//	//		tmpCpnn.Close()
+			//	//		logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
+			//	//	}
+			//	//}()
+			//	////time.Sleep(time.Second)
+			//	//_, err = common.CopyBuffer(conns, tmpCpnn)
+			//	//if err != nil {
+			//	//	conns.Close()
+			//	//	tmpCpnn.Close()
+			//	//	logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
+			//	//}
+			//}(tmpCpnn, conns)
 		}
 	}()
 
 	go NewLogServer()
 	time.Sleep(time.Second * 5)
-	//for i:=0;i<1000;i++ {
-	//	go test_raw(i)
-	//}
+	for i := 0; i < 1000; i++ {
+		go test_raw(i)
+	}
 
 	for {
 		time.Sleep(time.Second * 5)
@@ -180,37 +179,37 @@ Connection: keep-alive
 func test_raw(k int) {
 	for i := 0; i < 1; i++ {
 		ti := time.Now()
-		conn, _ := net.Dial("tcp", "127.0.0.1:7777")
+		_, _ = net.Dial("tcp", "127.0.0.1:7777")
 		tid := time.Now()
-		conn.Write([]byte(`GET / HTTP/1.1
-Host: 127.0.0.1:7777
-
-
-`))
-		tiw := time.Now()
-		buf := make([]byte, 3572)
-		n, err := io.ReadFull(conn, buf)
-		//n, err := conn.Read(buf)
-		if err != nil {
-			logs.Warn("close by read response err", err)
-			break
-		}
-		//logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
-		//time.Sleep(time.Second)
-		err = conn.Close()
-		if err != nil {
-			logs.Warn("close conn err ", err)
-		}
+		//		conn.Write([]byte(`GET / HTTP/1.1
+		//Host: 127.0.0.1:7777
+		//
+		//
+		//`))
+		//		tiw := time.Now()
+		//buf := make([]byte, 3572)
+		//n, err := io.ReadFull(conn, buf)
+		////n, err := conn.Read(buf)
+		//if err != nil {
+		//	logs.Warn("close by read response err", err)
+		//	break
+		//}
+		////logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
+		////time.Sleep(time.Second)
+		//err = conn.Close()
+		//if err != nil {
+		//	logs.Warn("close conn err ", err)
+		//}
 		now := time.Now()
 		du := now.Sub(ti).Seconds()
 		dud := now.Sub(tid).Seconds()
-		duw := now.Sub(tiw).Seconds()
-		if du > 1 {
-			logs.Warn("duration long", du, dud, duw, k, i)
-		}
-		if n != 3572 {
-			logs.Warn("n loss", n, string(buf))
-		}
+		//duw := now.Sub(tiw).Seconds()
+		//if du > 1 {
+		logs.Warn("duration long", du, dud, k, i)
+		//}
+		//if n != 3572 {
+		//	logs.Warn("n loss", n, string(buf))
+		//}
 	}
 }
 
@@ -249,29 +248,182 @@ func TestDQueue(t *testing.T) {
 }
 
 func TestChain(t *testing.T) {
+	go func() {
+		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+	}()
 	logs.EnableFuncCallDepth(true)
 	logs.SetLogFuncCallDepth(3)
+	time.Sleep(time.Second * 5)
 	d := new(bufChain)
 	d.new(256)
 	go func() {
 		time.Sleep(time.Second)
-		for i := 0; i < 1000; i++ {
+		for i := 0; i < 30000; i++ {
 			unsa, ok := d.popTail()
 			str := (*string)(unsa)
 			if ok {
-				logs.Warn(i, str, *str, ok)
+				fmt.Println(i, str, *str, ok)
+				//logs.Warn(i, str, *str, ok)
+			} else {
+				fmt.Println("nil", i, ok)
+				//logs.Warn("nil", i, ok)
+			}
+		}
+	}()
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 3000; i++ {
+			go func(i int) {
+				for n := 0; n < 10; n++ {
+					data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
+					fmt.Println(data, unsafe.Pointer(&data))
+					//logs.Warn(data, unsafe.Pointer(&data))
+					d.pushHead(unsafe.Pointer(&data))
+				}
+			}(i)
+		}
+	}()
+	time.Sleep(time.Second * 100000)
+}
+
+func TestFIFO(t *testing.T) {
+	go func() {
+		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+	}()
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	time.Sleep(time.Second * 5)
+	d := new(FIFOQueue)
+	d.New()
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 30000; i++ {
+			data, err := d.Pop()
+			if err == nil {
+				//fmt.Println(i, string(data.buf), err)
+				logs.Warn(i, string(data.buf), err)
 			} else {
-				logs.Warn("nil", i, ok)
+				//fmt.Println("err", err)
+				logs.Warn("err", err)
 			}
 		}
 	}()
+	go func() {
+		time.Sleep(time.Second * 10)
+		for i := 0; i < 3000; i++ {
+			go func(i int) {
+				for n := 0; n < 10; n++ {
+					data := new(ListElement)
+					by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
+					_ = data.New(by, uint16(len(by)), true)
+					//fmt.Println(string((*data).buf), data)
+					logs.Warn(string((*data).buf), data)
+					d.Push(data)
+				}
+			}(i)
+		}
+	}()
+	time.Sleep(time.Second * 100000)
+}
+
+func TestPriority(t *testing.T) {
+	go func() {
+		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+	}()
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	time.Sleep(time.Second * 5)
+	d := new(PriorityQueue)
+	d.New()
 	go func() {
 		time.Sleep(time.Second)
-		for i := 0; i < 1000; i++ {
-			data := "test " + strconv.Itoa(i)
-			logs.Warn(data, unsafe.Pointer(&data))
-			go d.pushHead(unsafe.Pointer(&data))
+		for i := 0; i < 36000; i++ {
+			data := d.Pop()
+			//fmt.Println(i, string(data.buf), err)
+			logs.Warn(i, string(data.Content), data)
 		}
 	}()
-	time.Sleep(time.Second * 10)
+	go func() {
+		time.Sleep(time.Second * 10)
+		for i := 0; i < 3000; i++ {
+			go func(i int) {
+				for n := 0; n < 10; n++ {
+					data := new(common.MuxPackager)
+					by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
+					_ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
+					//fmt.Println(string((*data).buf), data)
+					logs.Warn(string((*data).Content), data)
+					d.Push(data)
+				}
+			}(i)
+			go func(i int) {
+				data := new(common.MuxPackager)
+				_ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
+				//fmt.Println(string((*data).buf), data)
+				logs.Warn(data)
+				d.Push(data)
+			}(i)
+			go func(i int) {
+				data := new(common.MuxPackager)
+				_ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
+				//fmt.Println(string((*data).buf), data)
+				logs.Warn(data)
+				d.Push(data)
+			}(i)
+		}
+	}()
+	time.Sleep(time.Second * 100000)
 }
+
+//func TestReceive(t *testing.T) {
+//	go func() {
+//		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+//	}()
+//	logs.EnableFuncCallDepth(true)
+//	logs.SetLogFuncCallDepth(3)
+//	time.Sleep(time.Second * 5)
+//	mux := new(Mux)
+//	mux.bw.readBandwidth = float64(1*1024*1024)
+//	mux.latency = float64(1/1000)
+//	wind := new(ReceiveWindow)
+//	wind.New(mux)
+//	wind.
+//	go func() {
+//		time.Sleep(time.Second)
+//		for i := 0; i < 36000; i++ {
+//			data := d.Pop()
+//			//fmt.Println(i, string(data.buf), err)
+//			logs.Warn(i, string(data.Content), data)
+//		}
+//	}()
+//	go func() {
+//		time.Sleep(time.Second*10)
+//		for i := 0; i < 3000; i++ {
+//			go func(i int) {
+//				for n := 0; n < 10; n++{
+//					data := new(common.MuxPackager)
+//					by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
+//					_ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
+//					//fmt.Println(string((*data).buf), data)
+//					logs.Warn(string((*data).Content), data)
+//					d.Push(data)
+//				}
+//			}(i)
+//			go func(i int) {
+//				data := new(common.MuxPackager)
+//				_ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
+//				//fmt.Println(string((*data).buf), data)
+//				logs.Warn(data)
+//				d.Push(data)
+//			}(i)
+//			go func(i int) {
+//				data := new(common.MuxPackager)
+//				_ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
+//				//fmt.Println(string((*data).buf), data)
+//				logs.Warn(data)
+//				d.Push(data)
+//			}(i)
+//		}
+//	}()
+//	time.Sleep(time.Second * 100000)
+//}

+ 20 - 12
lib/mux/queue.go

@@ -73,6 +73,8 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
 	return
 }
 
+const maxHunger uint8 = 10
+
 func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
 startPop:
 	ptr, ok := Self.highestChain.popTail()
@@ -80,7 +82,7 @@ startPop:
 		packager = (*common.MuxPackager)(ptr)
 		return
 	}
-	if Self.hunger < 100 {
+	if Self.hunger < maxHunger {
 		ptr, ok = Self.middleChain.popTail()
 		if ok {
 			packager = (*common.MuxPackager)(ptr)
@@ -96,6 +98,13 @@ startPop:
 		}
 		return
 	}
+	if Self.hunger > 0 {
+		ptr, ok = Self.middleChain.popTail()
+		if ok {
+			packager = (*common.MuxPackager)(ptr)
+			return
+		}
+	}
 	// PriorityQueue is empty, notice Push method
 	if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) {
 		select {
@@ -141,7 +150,7 @@ func (Self *FIFOQueue) New() {
 
 func (Self *FIFOQueue) Push(element *ListElement) {
 	Self.chain.pushHead(unsafe.Pointer(element))
-	Self.length += uint32(element.l)
+	atomic.AddUint32(&Self.length, uint32(element.l))
 	Self.allowPop()
 	return
 }
@@ -151,7 +160,7 @@ startPop:
 	ptr, ok := Self.chain.popTail()
 	if ok {
 		element = (*ListElement)(ptr)
-		Self.length -= uint32(element.l)
+		atomic.AddUint32(&Self.length, ^uint32(element.l-1))
 		return
 	}
 	if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) {
@@ -178,7 +187,7 @@ startPop:
 }
 
 func (Self *FIFOQueue) Len() (n uint32) {
-	return Self.length
+	return atomic.LoadUint32(&Self.length)
 }
 
 func (Self *FIFOQueue) Stop() {
@@ -273,17 +282,16 @@ func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
 		return nil, false
 	}
 	slot := &d.vals[tail&uint32(len(d.vals)-1)]
+	var val unsafe.Pointer
 	for {
-		typ := atomic.LoadPointer(slot)
-		if typ != nil {
+		val = atomic.LoadPointer(slot)
+		if val != nil {
+			// We now own slot.
 			break
 		}
 		// Another goroutine is still pushing data on the tail.
 	}
 
-	// We now own slot.
-	val := *slot
-
 	// Tell pushHead that we're done with this slot. Zeroing the
 	// slot is also important so we don't leave behind references
 	// that could keep this object live longer than necessary.
@@ -369,10 +377,10 @@ func (c *bufChain) pushHead(val unsafe.Pointer) {
 
 			d2 := &bufChainElt{prev: d}
 			d2.vals = make([]unsafe.Pointer, newSize)
-			storePoolChainElt(&c.head, d2)
-			storePoolChainElt(&d.next, d2)
 			d2.pushHead(val)
-			atomic.SwapInt32(&c.chainStatus, 0)
+			storePoolChainElt(&d.next, d2)
+			storePoolChainElt(&c.head, d2)
+			atomic.StoreInt32(&c.chainStatus, 0)
 		}
 	}
 }