瀏覽代碼

Merge pull request #276 from cnlh/dev

Dev 0.24.0 merge
ffdfgdfg 5 年之前
父節點
當前提交
fadfc24e52
共有 29 個文件被更改,包括 2792 次插入452 次删除
  1. 10 0
      Dockerfile.npc
  2. 11 0
      Dockerfile.nps
  3. 57 2
      README.md
  4. 5 0
      client/client.go
  5. 7 2
      client/control.go
  6. 1 1
      cmd/nps/nps.go
  7. 4 4
      go.mod
  8. 10 0
      go.sum
  9. 16 0
      lib/common/const.go
  10. 235 0
      lib/common/netpackager.go
  11. 199 0
      lib/common/pool.go
  12. 10 7
      lib/common/util.go
  13. 25 21
      lib/conn/conn.go
  14. 7 0
      lib/conn/listener.go
  15. 3 3
      lib/conn/snappy.go
  16. 73 0
      lib/goroutine/pool.go
  17. 35 3
      lib/install/install.go
  18. 1 1
      lib/mux/bytes.go
  19. 569 118
      lib/mux/conn.go
  20. 35 30
      lib/mux/map.go
  21. 402 114
      lib/mux/mux.go
  22. 366 29
      lib/mux/mux_test.go
  23. 544 49
      lib/mux/queue.go
  24. 154 0
      lib/mux/web.go
  25. 7 0
      lib/mux/web_test.go
  26. 0 60
      lib/pool/pool.go
  27. 2 2
      lib/version/version.go
  28. 1 2
      server/proxy/p2p.go
  29. 3 4
      server/proxy/udp.go

+ 10 - 0
Dockerfile.npc

@@ -0,0 +1,10 @@
+FROM golang as builder
+WORKDIR /go/src/github.com/cnlh/nps
+COPY . .
+RUN go get -d -v ./... 
+RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/npc/npc.go
+
+FROM scratch
+COPY --from=builder /go/src/github.com/cnlh/nps/npc /
+VOLUME /conf
+ENTRYPOINT ["/npc"]

+ 11 - 0
Dockerfile.nps

@@ -0,0 +1,11 @@
+FROM golang as builder
+WORKDIR /go/src/github.com/cnlh/nps
+COPY . .
+RUN go get -d -v ./... 
+RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/nps/nps.go
+
+FROM scratch
+COPY --from=builder /go/src/github.com/cnlh/nps/nps /
+COPY --from=builder /go/src/github.com/cnlh/nps/web /web
+VOLUME /conf
+CMD ["/nps"]

+ 57 - 2
README.md

@@ -26,6 +26,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
 * [安装](#安装)
     * [编译安装](#源码安装)
     * [release安装](#release安装)
+    * [docker安装](#docker安装)
 * [使用示例(以web主控模式为主)](#使用示例)
     * [统一准备工作](#统一准备工作(必做))
     * [http|https域名解析](#域名解析)
@@ -111,6 +112,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
    * [获取用户真实ip](#获取用户真实ip)
    * [客户端地址显示](#客户端地址显示)
    * [客户端与服务端版本对比](#客户端与服务端版本对比)
+   * [Linux系统限制](#Linux系统限制)
 * [webAPI](#webAPI)
 * [贡献](#贡献)
 * [支持nps发展](#捐赠)
@@ -120,7 +122,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
 
 ## 安装
 
-### releases安装
+### release安装
 > [releases](https://github.com/cnlh/nps/releases)
 
 下载对应的系统版本即可,服务端和客户端是单独的
@@ -133,6 +135,10 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
 
 > go build cmd/npc/npc.go
 
+### docker安装
+> [server](https://hub.docker.com/r/ffdfgdfg/nps)
+> [client](https://hub.docker.com/r/ffdfgdfg/npc)
+
 ## 使用示例
 
 ### 统一准备工作(必做)
@@ -144,6 +150,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
 ```shell
 ./npc -server=1.1.1.1:8284 -vkey=客户端的密钥
 ```
+**注意:运行服务端后,请确保能从客户端设备上正常访问配置文件中所配置的`bridge_port`端口,telnet,netcat这类的来检查**
 
 ### 域名解析
 
@@ -197,6 +204,9 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
 - 在刚才创建的客户端隧道管理中添加一条socks5代理,填写监听的端口(8003),保存。
 - 在外网环境的本机配置socks5代理(例如使用proxifier进行全局代理),ip为公网服务器ip(1.1.1.1),端口为填写的监听端口(8003),即可畅享内网了
 
+**注意**
+经过socks5代理,当收到socks5数据包时socket已经是accept状态。表现是扫描端口全open,建立连接后短时间关闭。若想同内网表现一致,建议远程连接一台设备。
+
 ### http正向代理
 
 **适用范围:**  在外网环境下使用http正向代理访问内网站点
@@ -375,7 +385,13 @@ server {
 ```
 (./nps|nps.exe) install
 ```
-安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行
+安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行,同时也会添加systemd配置。
+
+```
+sudo systemctl enable|disable|start|stop|restart|status nps
+```
+systemd,带有开机自启,自动重启配置,当进程结束后15秒会启动,日志输出至/var/log/nps/nps.log。
+建议采用此方式启动,能够捕获panic信息,便于排查问题。
 
 ```
 nps test|start|stop|restart|status
@@ -432,6 +448,27 @@ server_ip=xxx
 ```
  ./npc -config=npc配置文件路径
 ```
+可自行添加systemd service,例如:`npc.service`
+```
+[Unit]
+Description=npc - convenient proxy server client
+Documentation=https://github.com/cnlh/nps/
+After=network-online.target remote-fs.target nss-lookup.target
+Wants=network-online.target
+
+[Service]
+Type=simple
+KillMode=process
+Restart=always
+RestartSec=15s
+StandardOutput=append:/var/log/nps/npc.log
+ExecStartPre=/bin/echo 'Starting npc'
+ExecStopPost=/bin/echo 'Stopping npc'
+ExecStart=/absolutely path to/npc -server=ip:port -vkey=web界面中显示的密钥
+
+[Install]
+WantedBy=multi-user.target
+```
 #### 配置文件说明
 [示例配置文件](https://github.com/cnlh/nps/tree/master/conf/npc.conf)
 ##### 全局配置
@@ -794,6 +831,19 @@ nps支持对客户端的隧道数量进行限制,该功能默认是关闭的
 
 nps主要通信默认基于多路复用,无需开启。
 
+多路复用基于TCP滑动窗口原理设计,动态计算延迟以及带宽来算出应该往网络管道中打入的流量。
+由于主要通信大多采用TCP协议,并无法探测其实时丢包情况,对于产生丢包重传的情况,采用较大的宽容度,
+5分钟的等待时间,超时将会关闭当前隧道连接并重新建立,这将会抛弃当前所有的连接。
+在Linux上,可以通过调节内核参数来适应不同应用场景。
+
+对于需求大带宽又有一定的丢包的场景,可以保持默认参数不变,尽可能少抛弃连接
+高并发下可根据[Linux系统限制](#Linux系统限制) 调整
+
+对于延迟敏感而又有一定丢包的场景,可以适当调整TCP重传次数
+`tcp_syn_retries`, `tcp_retries1`, `tcp_retries2`
+高并发同上
+nps会在系统主动关闭连接的时候拿到报错,进而重新建立隧道连接
+
 ### 环境变量渲染
 npc支持环境变量渲染以适应在某些特殊场景下的要求。
 
@@ -902,6 +952,11 @@ LevelInformational->6 LevelDebug->7
 ### 客户端与服务端版本对比
 为了程序正常运行,客户端与服务端的核心版本必须一致,否则将导致客户端无法成功连接致服务端。
 
+### Linux系统限制
+默认情况下linux对连接数量有限制,对于性能好的机器完全可以调整内核参数以处理更多的连接。
+`tcp_max_syn_backlog` `somaxconn`
+酌情调整参数,增强网络性能
+
 ## webAPI
 
 ### webAPI验证说明

+ 5 - 0
client/client.go

@@ -49,6 +49,11 @@ retry:
 		time.Sleep(time.Second * 5)
 		goto retry
 	}
+	if c == nil {
+		logs.Error("Error data from server, and will be reconnected in five seconds")
+		time.Sleep(time.Second * 5)
+		goto retry
+	}
 	logs.Info("Successful connection with server %s", s.svrAddr)
 	//monitor the connection
 	go s.ping()

+ 7 - 2
client/control.go

@@ -223,8 +223,13 @@ func NewConn(tp string, vkey string, server string, connType string, proxyUrl st
 	if _, err := c.Write([]byte(crypt.Md5(version.GetVersion()))); err != nil {
 		return nil, err
 	}
-	if b, err := c.GetShortContent(32); err != nil || crypt.Md5(version.GetVersion()) != string(b) {
-		logs.Error("The client does not match the server version. The current version of the client is", version.GetVersion())
+	b, err := c.GetShortContent(32)
+	if err != nil {
+		logs.Error(err)
+		return nil, err
+	}
+	if crypt.Md5(version.GetVersion()) != string(b) {
+		logs.Error("The client does not match the server version. The current core version of the client is", version.GetVersion())
 		return nil, err
 	}
 	if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil {

+ 1 - 1
cmd/nps/nps.go

@@ -61,7 +61,7 @@ func main() {
 		logs.Error("Getting bridge_port error", err)
 		os.Exit(0)
 	}
-	logs.Info("the version of server is %s ,allow client version to be %s", version.VERSION, version.GetVersion())
+	logs.Info("the version of server is %s ,allow client core version to be %s", version.VERSION, version.GetVersion())
 	connection.InitConnectionService()
 	crypt.InitTls(filepath.Join(common.GetRunPath(), "conf", "server.pem"), filepath.Join(common.GetRunPath(), "conf", "server.key"))
 	tool.InitAllowPort()

+ 4 - 4
go.mod

@@ -10,18 +10,18 @@ require (
 	github.com/go-ole/go-ole v1.2.4 // indirect
 	github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
 	github.com/klauspost/cpuid v1.2.1 // indirect
-	github.com/klauspost/reedsolomon v1.9.2
+	github.com/klauspost/reedsolomon v1.9.2 // indirect
 	github.com/onsi/gomega v1.5.0 // indirect
+	github.com/panjf2000/ants/v2 v2.2.2
 	github.com/pkg/errors v0.8.0
 	github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
 	github.com/shirou/gopsutil v2.18.12+incompatible
 	github.com/stretchr/testify v1.3.0 // indirect
 	github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect
-	github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b
-	github.com/tjfoc/gmsm v1.0.1
+	github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b // indirect
+	github.com/tjfoc/gmsm v1.0.1 // indirect
 	github.com/xtaci/kcp-go v5.4.4+incompatible
 	github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect
-	golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85
 	golang.org/x/net v0.0.0-20181114220301-adae6a3d119a
 	golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa // indirect
 )

+ 10 - 0
go.sum

@@ -1,5 +1,6 @@
 github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
 github.com/OwnLocal/goes v1.0.0/go.mod h1:8rIFjBGTue3lCU0wplczcUgt9Gxgrkkrw7etMIcn8TM=
+github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
 github.com/astaxie/beego v1.12.0 h1:MRhVoeeye5N+Flul5PoVfD9CslfdoH+xqC/xvSQ5u2Y=
 github.com/astaxie/beego v1.12.0/go.mod h1:fysx+LZNZKnvh4GED/xND7jWtjCR6HzydR2Hh2Im57o=
@@ -15,14 +16,17 @@ github.com/couchbase/go-couchbase v0.0.0-20181122212707-3e9b6e1258bb/go.mod h1:T
 github.com/couchbase/gomemcached v0.0.0-20181122193126-5125a94a666c/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
 github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
 github.com/cupcake/rdb v0.0.0-20161107195141-43ba34106c76/go.mod h1:vYwsqCOLxGiisLwp9rITslkFNpZD5rz43tf41QFkTWY=
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
+github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
 github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
 github.com/exfly/beego v1.12.0-export-init h1:VQNYKdXhAwZGUaFmQv8Aj921O3rQJZRIF8xeGrhsjrI=
 github.com/exfly/beego v1.12.0-export-init/go.mod h1:fysx+LZNZKnvh4GED/xND7jWtjCR6HzydR2Hh2Im57o=
 github.com/exfly/beego v1.12.0 h1:OXwIwngaAx35Mga+jLiZmArusBxj8/H0jYXzGDAdwOg=
 github.com/exfly/beego v1.12.0/go.mod h1:fysx+LZNZKnvh4GED/xND7jWtjCR6HzydR2Hh2Im57o=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
 github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
 github.com/go-redis/redis v6.14.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
 github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@@ -40,9 +44,12 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
 github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/panjf2000/ants/v2 v2.2.2 h1:TWzusBjq/IflXhy+/S6u5wmMLCBdJnB9tPIx9Zmhvok=
+github.com/panjf2000/ants/v2 v2.2.2/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 h1:X+yvsM2yrEktyI+b2qND5gpH8YhURn0k8OCaeRnkINo=
 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644/go.mod h1:nkxAfR/5quYxwPZhyDxgasBMnRtBZd0FCEpawpjMUFg=
@@ -53,6 +60,7 @@ github.com/siddontang/ledisdb v0.0.0-20181029004158-becf5f38d373/go.mod h1:mF1Dp
 github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z92TR1JKMkLLoaOQk++LVnOKL3ScbJ8GNGA=
 github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
 github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 h1:89CEmDvlq/F7SJEOqkIdNDGJXrQIhuIx9D2DBXjavSU=
@@ -64,6 +72,7 @@ github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/
 github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
 github.com/xtaci/kcp-go v5.4.4+incompatible h1:QIJ0a0Q0N1G20yLHL2+fpdzyy2v/Cb3PI+xiwx/KK9c=
 github.com/xtaci/kcp-go v5.4.4+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE=
+github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM=
 github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE=
 golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85 h1:et7+NAX3lLIk5qUCTA9QelBjGE/NkhzYw/mhnr0s7nI=
 golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -75,6 +84,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
 golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=

+ 16 - 0
lib/common/const.go

@@ -36,3 +36,19 @@ WWW-Authenticate: Basic realm="easyProxy"
 
 `
 )
+
+const (
+	MUX_PING_FLAG uint8 = iota
+	MUX_NEW_CONN_OK
+	MUX_NEW_CONN_Fail
+	MUX_NEW_MSG
+	MUX_NEW_MSG_PART
+	MUX_MSG_SEND_OK
+	MUX_NEW_CONN
+	MUX_CONN_CLOSE
+	MUX_PING_RETURN
+	MUX_PING             int32 = -1
+	MAXIMUM_SEGMENT_SIZE       = PoolSizeWindow
+	MAXIMUM_WINDOW_SIZE        = 1 << 25 // 1<<31-1 TCP slide window size is very large,
+	// we use 32M, reduce memory usage
+)

+ 235 - 0
lib/common/netpackager.go

@@ -0,0 +1,235 @@
+package common
+
+import (
+	"bytes"
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"io"
+	"strings"
+)
+
+type NetPackager interface {
+	Pack(writer io.Writer) (err error)
+	UnPack(reader io.Reader) (err error)
+}
+
+type BasePackager struct {
+	Length  uint16
+	Content []byte
+}
+
+func (Self *BasePackager) NewPac(contents ...interface{}) (err error) {
+	Self.clean()
+	for _, content := range contents {
+		switch content.(type) {
+		case nil:
+			Self.Content = Self.Content[:0]
+		case []byte:
+			err = Self.appendByte(content.([]byte))
+		case string:
+			err = Self.appendByte([]byte(content.(string)))
+			if err != nil {
+				return
+			}
+			err = Self.appendByte([]byte(CONN_DATA_SEQ))
+		default:
+			err = Self.marshal(content)
+		}
+	}
+	Self.setLength()
+	return
+}
+
+func (Self *BasePackager) appendByte(data []byte) (err error) {
+	m := len(Self.Content)
+	n := m + len(data)
+	if n <= cap(Self.Content) {
+		Self.Content = Self.Content[0:n] // grow the length for copy
+		copy(Self.Content[m:n], data)
+		return nil
+	} else {
+		return errors.New("pack content too large")
+	}
+}
+
+//似乎这里涉及到父类作用域问题,当子类调用父类的方法时,其struct仅仅为父类的
+func (Self *BasePackager) Pack(writer io.Writer) (err error) {
+	err = binary.Write(writer, binary.LittleEndian, Self.Length)
+	if err != nil {
+		return
+	}
+	err = binary.Write(writer, binary.LittleEndian, Self.Content)
+	return
+}
+
+//Unpack 会导致传入的数字类型转化成float64!!
+//主要原因是json unmarshal并未传入正确的数据类型
+func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) {
+	Self.clean()
+	n += 2 // uint16
+	err = binary.Read(reader, binary.LittleEndian, &Self.Length)
+	if err != nil {
+		return
+	}
+	if int(Self.Length) > cap(Self.Content) {
+		err = errors.New("unpack err, content length too large")
+	}
+	Self.Content = Self.Content[:int(Self.Length)]
+	//n, err := io.ReadFull(reader, Self.Content)
+	//if n != int(Self.Length) {
+	//	err = io.ErrUnexpectedEOF
+	//}
+	err = binary.Read(reader, binary.LittleEndian, Self.Content)
+	n += Self.Length
+	return
+}
+
+func (Self *BasePackager) marshal(content interface{}) (err error) {
+	tmp, err := json.Marshal(content)
+	if err != nil {
+		return err
+	}
+	err = Self.appendByte(tmp)
+	return
+}
+
+func (Self *BasePackager) Unmarshal(content interface{}) (err error) {
+	err = json.Unmarshal(Self.Content, content)
+	if err != nil {
+		return err
+	}
+	return
+}
+
+func (Self *BasePackager) setLength() {
+	Self.Length = uint16(len(Self.Content))
+	return
+}
+
+func (Self *BasePackager) clean() {
+	Self.Length = 0
+	Self.Content = Self.Content[:0] // reset length
+}
+
+func (Self *BasePackager) Split() (strList []string) {
+	n := bytes.IndexByte(Self.Content, 0)
+	strList = strings.Split(string(Self.Content[:n]), CONN_DATA_SEQ)
+	strList = strList[0 : len(strList)-1]
+	return
+}
+
+type ConnPackager struct { // Todo
+	ConnType uint8
+	BasePackager
+}
+
+func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) {
+	Self.ConnType = connType
+	err = Self.BasePackager.NewPac(content...)
+	return
+}
+
+func (Self *ConnPackager) Pack(writer io.Writer) (err error) {
+	err = binary.Write(writer, binary.LittleEndian, Self.ConnType)
+	if err != nil {
+		return
+	}
+	err = Self.BasePackager.Pack(writer)
+	return
+}
+
+func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
+	err = binary.Read(reader, binary.LittleEndian, &Self.ConnType)
+	if err != nil && err != io.EOF {
+		return
+	}
+	n, err = Self.BasePackager.UnPack(reader)
+	n += 2
+	return
+}
+
+type MuxPackager struct {
+	Flag       uint8
+	Id         int32
+	Window     uint32
+	ReadLength uint32
+	BasePackager
+}
+
+func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) {
+	Self.Flag = flag
+	Self.Id = id
+	switch flag {
+	case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART:
+		Self.Content = WindowBuff.Get()
+		err = Self.BasePackager.NewPac(content...)
+		//logs.Warn(Self.Length, string(Self.Content))
+	case MUX_MSG_SEND_OK:
+		// MUX_MSG_SEND_OK contains two data
+		switch content[0].(type) {
+		case int:
+			Self.Window = uint32(content[0].(int))
+		case uint32:
+			Self.Window = content[0].(uint32)
+		}
+		switch content[1].(type) {
+		case int:
+			Self.ReadLength = uint32(content[1].(int))
+		case uint32:
+			Self.ReadLength = content[1].(uint32)
+		}
+	}
+	return
+}
+
+func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
+	err = binary.Write(writer, binary.LittleEndian, Self.Flag)
+	if err != nil {
+		return
+	}
+	err = binary.Write(writer, binary.LittleEndian, Self.Id)
+	if err != nil {
+		return
+	}
+	switch Self.Flag {
+	case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
+		err = Self.BasePackager.Pack(writer)
+		WindowBuff.Put(Self.Content)
+	case MUX_MSG_SEND_OK:
+		err = binary.Write(writer, binary.LittleEndian, Self.Window)
+		if err != nil {
+			return
+		}
+		err = binary.Write(writer, binary.LittleEndian, Self.ReadLength)
+	}
+	return
+}
+
+func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
+	err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
+	if err != nil {
+		return
+	}
+	err = binary.Read(reader, binary.LittleEndian, &Self.Id)
+	if err != nil {
+		return
+	}
+	switch Self.Flag {
+	case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
+		Self.Content = WindowBuff.Get() // need get a window buf from pool
+		Self.BasePackager.clean()       // also clean the content
+		n, err = Self.BasePackager.UnPack(reader)
+		//logs.Warn("unpack", Self.Length, string(Self.Content))
+	case MUX_MSG_SEND_OK:
+		err = binary.Read(reader, binary.LittleEndian, &Self.Window)
+		if err != nil {
+			return
+		}
+		n += 4 // uint32
+		err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength)
+		n += 4 // uint32
+	}
+	n += 5 //uint8 int32
+	return
+}

+ 199 - 0
lib/common/pool.go

@@ -0,0 +1,199 @@
+package common
+
+import (
+	"bytes"
+	"sync"
+)
+
+const PoolSize = 64 * 1024
+const PoolSizeSmall = 100
+const PoolSizeUdp = 1472
+const PoolSizeCopy = 32 << 10
+const PoolSizeBuffer = 4096
+const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1
+
+var BufPool = sync.Pool{
+	New: func() interface{} {
+		return make([]byte, PoolSize)
+	},
+}
+
+var BufPoolUdp = sync.Pool{
+	New: func() interface{} {
+		return make([]byte, PoolSizeUdp)
+	},
+}
+var BufPoolMax = sync.Pool{
+	New: func() interface{} {
+		return make([]byte, PoolSize)
+	},
+}
+var BufPoolSmall = sync.Pool{
+	New: func() interface{} {
+		return make([]byte, PoolSizeSmall)
+	},
+}
+var BufPoolCopy = sync.Pool{
+	New: func() interface{} {
+		return make([]byte, PoolSizeCopy)
+	},
+}
+
+func PutBufPoolUdp(buf []byte) {
+	if cap(buf) == PoolSizeUdp {
+		BufPoolUdp.Put(buf[:PoolSizeUdp])
+	}
+}
+
+func PutBufPoolCopy(buf []byte) {
+	if cap(buf) == PoolSizeCopy {
+		BufPoolCopy.Put(buf[:PoolSizeCopy])
+	}
+}
+
+func GetBufPoolCopy() []byte {
+	return (BufPoolCopy.Get().([]byte))[:PoolSizeCopy]
+}
+
+func PutBufPoolMax(buf []byte) {
+	if cap(buf) == PoolSize {
+		BufPoolMax.Put(buf[:PoolSize])
+	}
+}
+
+type copyBufferPool struct {
+	pool sync.Pool
+}
+
+func (Self *copyBufferPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			return make([]byte, PoolSizeCopy, PoolSizeCopy)
+		},
+	}
+}
+
+func (Self *copyBufferPool) Get() []byte {
+	buf := Self.pool.Get().([]byte)
+	return buf[:PoolSizeCopy] // just like make a new slice, but data may not be 0
+}
+
+func (Self *copyBufferPool) Put(x []byte) {
+	if len(x) == PoolSizeCopy {
+		Self.pool.Put(x)
+	} else {
+		x = nil // buf is not full, not allowed, New method returns a full buf
+	}
+}
+
+type windowBufferPool struct {
+	pool sync.Pool
+}
+
+func (Self *windowBufferPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			return make([]byte, PoolSizeWindow, PoolSizeWindow)
+		},
+	}
+}
+
+func (Self *windowBufferPool) Get() (buf []byte) {
+	buf = Self.pool.Get().([]byte)
+	return buf[:PoolSizeWindow]
+}
+
+func (Self *windowBufferPool) Put(x []byte) {
+	Self.pool.Put(x[:PoolSizeWindow]) // make buf to full
+}
+
+type bufferPool struct {
+	pool sync.Pool
+}
+
+func (Self *bufferPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			return bytes.NewBuffer(make([]byte, 0, PoolSizeBuffer))
+		},
+	}
+}
+
+func (Self *bufferPool) Get() *bytes.Buffer {
+	return Self.pool.Get().(*bytes.Buffer)
+}
+
+func (Self *bufferPool) Put(x *bytes.Buffer) {
+	x.Reset()
+	Self.pool.Put(x)
+}
+
+type muxPackagerPool struct {
+	pool sync.Pool
+}
+
+func (Self *muxPackagerPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			pack := MuxPackager{}
+			return &pack
+		},
+	}
+}
+
+func (Self *muxPackagerPool) Get() *MuxPackager {
+	return Self.pool.Get().(*MuxPackager)
+}
+
+func (Self *muxPackagerPool) Put(pack *MuxPackager) {
+	Self.pool.Put(pack)
+}
+
+type ListElement struct {
+	Buf  []byte
+	L    uint16
+	Part bool
+}
+
+type listElementPool struct {
+	pool sync.Pool
+}
+
+func (Self *listElementPool) New() {
+	Self.pool = sync.Pool{
+		New: func() interface{} {
+			element := ListElement{}
+			return &element
+		},
+	}
+}
+
+func (Self *listElementPool) Get() *ListElement {
+	return Self.pool.Get().(*ListElement)
+}
+
+func (Self *listElementPool) Put(element *ListElement) {
+	element.L = 0
+	element.Buf = nil
+	element.Part = false
+	Self.pool.Put(element)
+}
+
+var once = sync.Once{}
+var BuffPool = bufferPool{}
+var CopyBuff = copyBufferPool{}
+var MuxPack = muxPackagerPool{}
+var WindowBuff = windowBufferPool{}
+var ListElementPool = listElementPool{}
+
+func newPool() {
+	BuffPool.New()
+	CopyBuff.New()
+	MuxPack.New()
+	WindowBuff.New()
+	ListElementPool.New()
+}
+
+func init() {
+	once.Do(newPool)
+}

+ 10 - 7
lib/common/util.go

@@ -16,7 +16,6 @@ import (
 	"sync"
 
 	"github.com/cnlh/nps/lib/crypt"
-	"github.com/cnlh/nps/lib/pool"
 )
 
 //Get the corresponding IP address through domain name
@@ -109,6 +108,9 @@ func ChangeHostAndHeader(r *http.Request, host string, header string, addr strin
 		}
 	}
 	addr = strings.Split(addr, ":")[0]
+	if prior, ok := r.Header["X-Forwarded-For"]; ok {
+    		addr = strings.Join(prior, ", ") + ", " + addr
+    	}
 	r.Header.Set("X-Forwarded-For", addr)
 	r.Header.Set("X-Real-IP", addr)
 }
@@ -264,11 +266,14 @@ func GetPortByAddr(addr string) int {
 	return p
 }
 
-func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
-	buf := pool.GetBufPoolCopy()
-	defer pool.PutBufPoolCopy(buf)
+func CopyBuffer(dst io.Writer, src io.Reader, label ...string) (written int64, err error) {
+	buf := CopyBuff.Get()
+	defer CopyBuff.Put(buf)
 	for {
 		nr, er := src.Read(buf)
+		//if len(pr)>0 && pr[0] && nr > 50 {
+		//	logs.Warn(string(buf[:50]))
+		//}
 		if nr > 0 {
 			nw, ew := dst.Write(buf[0:nr])
 			if nw > 0 {
@@ -284,9 +289,7 @@ func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
 			}
 		}
 		if er != nil {
-			if er != io.EOF {
-				err = er
-			}
+			err = er
 			break
 		}
 	}

+ 25 - 21
lib/conn/conn.go

@@ -6,20 +6,20 @@ import (
 	"encoding/binary"
 	"encoding/json"
 	"errors"
+	"github.com/astaxie/beego/logs"
+	"github.com/cnlh/nps/lib/goroutine"
 	"io"
 	"net"
 	"net/http"
 	"net/url"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/crypt"
 	"github.com/cnlh/nps/lib/file"
 	"github.com/cnlh/nps/lib/mux"
-	"github.com/cnlh/nps/lib/pool"
 	"github.com/cnlh/nps/lib/rate"
 	"github.com/xtaci/kcp-go"
 )
@@ -159,8 +159,8 @@ func (s *Conn) SendHealthInfo(info, status string) (int, error) {
 //get health info from conn
 func (s *Conn) GetHealthInfo() (info string, status bool, err error) {
 	var l int
-	buf := pool.BufPoolMax.Get().([]byte)
-	defer pool.PutBufPoolMax(buf)
+	buf := common.BufPoolMax.Get().([]byte)
+	defer common.PutBufPoolMax(buf)
 	if l, err = s.GetLen(); err != nil {
 		return
 	} else if _, err = s.ReadLen(l, buf); err != nil {
@@ -233,8 +233,8 @@ func (s *Conn) SendInfo(t interface{}, flag string) (int, error) {
 //get task info
 func (s *Conn) getInfo(t interface{}) (err error) {
 	var l int
-	buf := pool.BufPoolMax.Get().([]byte)
-	defer pool.PutBufPoolMax(buf)
+	buf := common.BufPoolMax.Get().([]byte)
+	defer common.PutBufPoolMax(buf)
 	if l, err = s.GetLen(); err != nil {
 		return
 	} else if _, err = s.ReadLen(l, buf); err != nil {
@@ -351,25 +351,29 @@ func SetUdpSession(sess *kcp.UDPSession) {
 
 //conn1 mux conn
 func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) {
-	var in, out int64
-	var wg sync.WaitGroup
+	//var in, out int64
+	//var wg sync.WaitGroup
 	connHandle := GetConn(conn1, crypt, snappy, rate, isServer)
 	if rb != nil {
 		connHandle.Write(rb)
 	}
-	go func(in *int64) {
-		wg.Add(1)
-		*in, _ = common.CopyBuffer(connHandle, conn2)
-		connHandle.Close()
-		conn2.Close()
-		wg.Done()
-	}(&in)
-	out, _ = common.CopyBuffer(conn2, connHandle)
-	connHandle.Close()
-	conn2.Close()
-	wg.Wait()
-	if flow != nil {
-		flow.Add(in, out)
+	//go func(in *int64) {
+	//	wg.Add(1)
+	//	*in, _ = common.CopyBuffer(connHandle, conn2)
+	//	connHandle.Close()
+	//	conn2.Close()
+	//	wg.Done()
+	//}(&in)
+	//out, _ = common.CopyBuffer(conn2, connHandle)
+	//connHandle.Close()
+	//conn2.Close()
+	//wg.Wait()
+	//if flow != nil {
+	//	flow.Add(in, out)
+	//}
+	err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow))
+	if err != nil {
+		logs.Error(err)
 	}
 }
 

+ 7 - 0
lib/conn/listener.go

@@ -43,9 +43,16 @@ func Accept(l net.Listener, f func(c net.Conn)) {
 			if strings.Contains(err.Error(), "use of closed network connection") {
 				break
 			}
+			if strings.Contains(err.Error(), "the mux has closed") {
+				break
+			}
 			logs.Warn(err)
 			continue
 		}
+		if c == nil {
+			logs.Warn("nil connection")
+			break
+		}
 		go f(c)
 	}
 }

+ 3 - 3
lib/conn/snappy.go

@@ -3,7 +3,7 @@ package conn
 import (
 	"io"
 
-	"github.com/cnlh/nps/lib/pool"
+	"github.com/cnlh/nps/lib/common"
 	"github.com/golang/snappy"
 )
 
@@ -32,8 +32,8 @@ func (s *SnappyConn) Write(b []byte) (n int, err error) {
 
 //snappy压缩读
 func (s *SnappyConn) Read(b []byte) (n int, err error) {
-	buf := pool.BufPool.Get().([]byte)
-	defer pool.BufPool.Put(buf)
+	buf := common.BufPool.Get().([]byte)
+	defer common.BufPool.Put(buf)
 	if n, err = s.r.Read(buf); err != nil {
 		return
 	}

+ 73 - 0
lib/goroutine/pool.go

@@ -0,0 +1,73 @@
+package goroutine
+
+import (
+	"github.com/cnlh/nps/lib/common"
+	"github.com/cnlh/nps/lib/file"
+	"github.com/panjf2000/ants/v2"
+	"io"
+	"net"
+	"sync"
+)
+
+type connGroup struct {
+	src io.ReadWriteCloser
+	dst io.ReadWriteCloser
+	wg  *sync.WaitGroup
+	n   *int64
+}
+
+func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup {
+	return connGroup{
+		src: src,
+		dst: dst,
+		wg:  wg,
+		n:   n,
+	}
+}
+
+func copyConnGroup(group interface{}) {
+	cg, ok := group.(connGroup)
+	if !ok {
+		return
+	}
+	var err error
+	*cg.n, err = common.CopyBuffer(cg.dst, cg.src)
+	if err != nil {
+		cg.src.Close()
+		cg.dst.Close()
+		//logs.Warn("close npc by copy from nps", err, c.connId)
+	}
+	cg.wg.Done()
+}
+
+type Conns struct {
+	conn1 io.ReadWriteCloser // mux connection
+	conn2 net.Conn           // outside connection
+	flow  *file.Flow
+}
+
+func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns {
+	return Conns{
+		conn1: c1,
+		conn2: c2,
+		flow:  flow,
+	}
+}
+
+func copyConns(group interface{}) {
+	conns := group.(Conns)
+	wg := new(sync.WaitGroup)
+	wg.Add(2)
+	var in, out int64
+	_ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in))
+	// outside to mux : incoming
+	_ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out))
+	// mux to outside : outgoing
+	wg.Wait()
+	if conns.flow != nil {
+		conns.flow.Add(in, out)
+	}
+}
+
+var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false))
+var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false))

+ 35 - 3
lib/install/install.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"log"
 	"os"
 	"path/filepath"
@@ -13,6 +14,23 @@ import (
 )
 
 func InstallNps() {
+	unit := `[Unit]
+Description=nps - convenient proxy server
+Documentation=https://github.com/cnlh/nps/
+After=network-online.target remote-fs.target nss-lookup.target
+Wants=network-online.target`
+	service := `[Service]
+Type=simple
+KillMode=process
+Restart=always
+RestartSec=15s
+StandardOutput=append:/var/log/nps/nps.log
+ExecStartPre=/bin/echo 'Starting nps'
+ExecStopPost=/bin/echo 'Stopping nps'
+ExecStart=`
+	install := `[Install]
+WantedBy=multi-user.target`
+
 	path := common.GetInstallPath()
 	if common.FileExists(path) {
 		log.Fatalf("the path %s has exist, does not support install", path)
@@ -35,21 +53,35 @@ func InstallNps() {
 				log.Fatalln(err)
 			} else {
 				os.Chmod("/usr/local/bin/nps", 0755)
+				service += "/usr/local/bin/nps"
 				log.Println("Executable files have been copied to", "/usr/local/bin/nps")
 			}
 		} else {
 			os.Chmod("/usr/bin/nps", 0755)
+			service += "/usr/bin/nps"
 			log.Println("Executable files have been copied to", "/usr/bin/nps")
 		}
-
+		systemd := unit + "\n\n" + service + "\n\n" + install
+		_ = os.Remove("/usr/lib/systemd/system/nps.service")
+		err := ioutil.WriteFile("/usr/lib/systemd/system/nps.service", []byte(systemd), 0644)
+		if err != nil {
+			log.Println("Write systemd service err ", err)
+		}
+		_ = os.Mkdir("/var/log/nps", 644)
 	}
 	log.Println("install ok!")
 	log.Println("Static files and configuration files in the current directory will be useless")
 	log.Println("The new configuration file is located in", path, "you can edit them")
 	if !common.IsWindows() {
-		log.Println("You can start with nps test|start|stop|restart|status anywhere")
+		log.Println(`You can start with:
+sudo systemctl enable|disable|start|stop|restart|status nps
+or:
+nps test|start|stop|restart|status 
+anywhere!`)
 	} else {
-		log.Println("You can copy executable files to any directory and start working with nps.exe test|start|stop|restart|status")
+		log.Println(`You can copy executable files to any directory and start working with:
+nps.exe test|start|stop|restart|status
+now!`)
 	}
 }
 func MkidrDirAll(path string, v ...string) {

+ 1 - 1
lib/mux/bytes.go

@@ -20,7 +20,7 @@ func WriteLenBytes(buf []byte, w io.Writer) (int, error) {
 
 //read bytes by length
 func ReadLenBytes(buf []byte, r io.Reader) (int, error) {
-	var l int32
+	var l uint32
 	var err error
 	if binary.Read(r, binary.LittleEndian, &l) != nil {
 		return 0, err

+ 569 - 118
lib/mux/conn.go

@@ -3,11 +3,14 @@ package mux
 import (
 	"errors"
 	"io"
+	"math"
 	"net"
+	"runtime"
 	"sync"
+	"sync/atomic"
 	"time"
 
-	"github.com/cnlh/nps/lib/pool"
+	"github.com/cnlh/nps/lib/common"
 )
 
 type conn struct {
@@ -15,34 +18,36 @@ type conn struct {
 	getStatusCh      chan struct{}
 	connStatusOkCh   chan struct{}
 	connStatusFailCh chan struct{}
-	readTimeOut      time.Time
-	writeTimeOut     time.Time
-	readBuffer       []byte
-	startRead        int //now read position
-	endRead          int //now end read
-	readFlag         bool
-	readCh           chan struct{}
-	waitQueue        *sliceEntry
-	stopWrite        bool
 	connId           int32
 	isClose          bool
-	readWait         bool
-	hasWrite         int
-	mux              *Mux
+	closeFlag        bool // close conn flag
+	receiveWindow    *ReceiveWindow
+	sendWindow       *SendWindow
+	once             sync.Once
+	//label            string
 }
 
-var connPool = sync.Pool{}
-
-func NewConn(connId int32, mux *Mux) *conn {
+func NewConn(connId int32, mux *Mux, label ...string) *conn {
 	c := &conn{
-		readCh:           make(chan struct{}),
 		getStatusCh:      make(chan struct{}),
 		connStatusOkCh:   make(chan struct{}),
 		connStatusFailCh: make(chan struct{}),
-		waitQueue:        NewQueue(),
 		connId:           connId,
-		mux:              mux,
+		receiveWindow:    new(ReceiveWindow),
+		sendWindow:       new(SendWindow),
+		once:             sync.Once{},
 	}
+	//if len(label) > 0 {
+	//	c.label = label[0]
+	//}
+	c.receiveWindow.New(mux)
+	c.sendWindow.New(mux)
+	//logm := &connLog{
+	//	startTime: time.Now(),
+	//	isClose:   false,
+	//	logs:      []string{c.label + "new conn success"},
+	//}
+	//setM(label[0], int(connId), logm)
 	return c
 }
 
@@ -50,135 +55,581 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 	if s.isClose || buf == nil {
 		return 0, errors.New("the conn has closed")
 	}
-	if s.endRead-s.startRead == 0 { //read finish or start
-		if s.waitQueue.Size() == 0 {
-			s.readWait = true
-			if t := s.readTimeOut.Sub(time.Now()); t > 0 {
-				timer := time.NewTimer(t)
-				defer timer.Stop()
-				select {
-				case <-timer.C:
-					s.readWait = false
-					return 0, errors.New("read timeout")
-				case <-s.readCh:
-				}
-			} else {
-				<-s.readCh
-			}
-		}
-		if s.isClose { //If the connection is closed instead of  continuing command
-			return 0, errors.New("the conn has closed")
-		}
-		if node, err := s.waitQueue.Pop(); err != nil {
-			s.Close()
-			return 0, io.EOF
-		} else {
-			pool.PutBufPoolCopy(s.readBuffer)
-			s.readBuffer = node.val
-			s.endRead = node.l
-			s.startRead = 0
-		}
-	}
-	if len(buf) < s.endRead-s.startRead {
-		n = copy(buf, s.readBuffer[s.startRead:s.startRead+len(buf)])
-		s.startRead += n
-	} else {
-		n = copy(buf, s.readBuffer[s.startRead:s.endRead])
-		s.startRead += n
-		s.mux.sendInfo(MUX_MSG_SEND_OK, s.connId, nil)
+	if len(buf) == 0 {
+		return 0, nil
 	}
+	// waiting for takeout from receive window finish or timeout
+	//now := time.Now()
+	n, err = s.receiveWindow.Read(buf, s.connId)
+	//t := time.Now().Sub(now)
+	//if t.Seconds() > 0.5 {
+	//logs.Warn("conn read long", n, t.Seconds())
+	//}
+	//var errstr string
+	//if err == nil {
+	//	errstr = "err:nil"
+	//} else {
+	//	errstr = err.Error()
+	//}
+	//d := getM(s.label, int(s.connId))
+	//d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
+	//setM(s.label, int(s.connId), d)
 	return
 }
 
-func (s *conn) Write(buf []byte) (int, error) {
+func (s *conn) Write(buf []byte) (n int, err error) {
 	if s.isClose {
 		return 0, errors.New("the conn has closed")
 	}
-	ch := make(chan struct{})
-	go s.write(buf, ch)
-	if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
-		timer := time.NewTimer(t)
-		defer timer.Stop()
-		select {
-		case <-timer.C:
-			return 0, errors.New("write timeout")
-		case <-ch:
-		}
-	} else {
-		<-ch
+	if s.closeFlag {
+		//s.Close()
+		return 0, errors.New("io: write on closed conn")
 	}
-	if s.isClose {
-		return 0, io.EOF
+	if len(buf) == 0 {
+		return 0, nil
 	}
-	return len(buf), nil
+	//logs.Warn("write buf", len(buf))
+	//now := time.Now()
+	n, err = s.sendWindow.WriteFull(buf, s.connId)
+	//t := time.Now().Sub(now)
+	//if t.Seconds() > 0.5 {
+	//	logs.Warn("conn write long", n, t.Seconds())
+	//}
+	return
 }
-func (s *conn) write(buf []byte, ch chan struct{}) {
-	start := 0
-	l := len(buf)
-	for {
-		if s.hasWrite > 50 {
-			<-s.getStatusCh
-		}
-		s.hasWrite++
-		if l-start > pool.PoolSizeCopy {
-			s.mux.sendInfo(MUX_NEW_MSG, s.connId, buf[start:start+pool.PoolSizeCopy])
-			start += pool.PoolSizeCopy
-		} else {
-			s.mux.sendInfo(MUX_NEW_MSG, s.connId, buf[start:l])
-			break
-		}
-	}
-	ch <- struct{}{}
+
+func (s *conn) Close() (err error) {
+	s.once.Do(s.closeProcess)
+	return
 }
 
-func (s *conn) Close() error {
-	if s.isClose {
-		return errors.New("the conn has closed")
-	}
-	times := 0
-retry:
-	if s.waitQueue.Size() > 0 && times < 600 {
-		time.Sleep(time.Millisecond * 100)
-		times++
-		goto retry
-	}
-	if s.isClose {
-		return errors.New("the conn has closed")
-	}
+func (s *conn) closeProcess() {
 	s.isClose = true
-	pool.PutBufPoolCopy(s.readBuffer)
-	if s.readWait {
-		s.readCh <- struct{}{}
+	s.receiveWindow.mux.connMap.Delete(s.connId)
+	if !s.receiveWindow.mux.IsClose {
+		// if server or user close the conn while reading, will get a io.EOF
+		// and this Close method will be invoke, send this signal to close other side
+		s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
 	}
-	s.waitQueue.Clear()
-	s.mux.connMap.Delete(s.connId)
-	if !s.mux.IsClose {
-		s.mux.sendInfo(MUX_CONN_CLOSE, s.connId, nil)
-	}
-	connPool.Put(s)
-	return nil
+	s.sendWindow.CloseWindow()
+	s.receiveWindow.CloseWindow()
+	//d := getM(s.label, int(s.connId))
+	//d.isClose = true
+	//d.logs = append(d.logs, s.label+"close "+time.Now().String())
+	//setM(s.label, int(s.connId), d)
+	return
 }
 
 func (s *conn) LocalAddr() net.Addr {
-	return s.mux.conn.LocalAddr()
+	return s.receiveWindow.mux.conn.LocalAddr()
 }
 
 func (s *conn) RemoteAddr() net.Addr {
-	return s.mux.conn.RemoteAddr()
+	return s.receiveWindow.mux.conn.RemoteAddr()
 }
 
 func (s *conn) SetDeadline(t time.Time) error {
-	s.readTimeOut = t
-	s.writeTimeOut = t
+	_ = s.SetReadDeadline(t)
+	_ = s.SetWriteDeadline(t)
 	return nil
 }
 
 func (s *conn) SetReadDeadline(t time.Time) error {
-	s.readTimeOut = t
+	s.receiveWindow.SetTimeOut(t)
 	return nil
 }
 
 func (s *conn) SetWriteDeadline(t time.Time) error {
-	s.writeTimeOut = t
+	s.sendWindow.SetTimeOut(t)
+	return nil
+}
+
+type window struct {
+	remainingWait uint64 // 64bit alignment
+	off           uint32
+	maxSize       uint32
+	closeOp       bool
+	closeOpCh     chan struct{}
+	mux           *Mux
+}
+
+func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
+	const mask = 1<<dequeueBits - 1
+	remaining = uint32((ptrs >> dequeueBits) & mask)
+	wait = uint32(ptrs & mask)
+	return
+}
+
+func (Self *window) pack(remaining, wait uint32) uint64 {
+	const mask = 1<<dequeueBits - 1
+	return (uint64(remaining) << dequeueBits) |
+		uint64(wait&mask)
+}
+
+func (Self *window) New() {
+	Self.closeOpCh = make(chan struct{}, 2)
+}
+
+func (Self *window) CloseWindow() {
+	if !Self.closeOp {
+		Self.closeOp = true
+		Self.closeOpCh <- struct{}{}
+		Self.closeOpCh <- struct{}{}
+	}
+}
+
+type ReceiveWindow struct {
+	window
+	bufQueue ReceiveWindowQueue
+	element  *common.ListElement
+	count    int8
+	once     sync.Once
+}
+
+func (Self *ReceiveWindow) New(mux *Mux) {
+	// initial a window for receive
+	Self.bufQueue.New()
+	Self.element = common.ListElementPool.Get()
+	Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
+	Self.mux = mux
+	Self.window.New()
+}
+
+func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
+	// receive window remaining
+	l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
+	l -= int64(delta)
+	if l > 0 {
+		n = uint32(l)
+	}
+	return
+}
+
+func (Self *ReceiveWindow) calcSize() {
+	// calculating maximum receive window size
+	if Self.count == 0 {
+		//logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
+		conns := Self.mux.connMap.Size()
+		n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
+			Self.mux.bw.Get() / float64(conns))
+		if n < common.MAXIMUM_SEGMENT_SIZE*10 {
+			n = common.MAXIMUM_SEGMENT_SIZE * 10
+		}
+		bufLen := Self.bufQueue.Len()
+		if n < bufLen {
+			n = bufLen
+		}
+		if n < Self.maxSize/2 {
+			n = Self.maxSize / 2
+		}
+		// set the minimal size
+		if n > 2*Self.maxSize {
+			n = 2 * Self.maxSize
+		}
+		if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
+			n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
+		}
+		// set the maximum size
+		//logs.Warn("n", n)
+		atomic.StoreUint32(&Self.maxSize, n)
+		Self.count = -10
+	}
+	Self.count += 1
+	return
+}
+
+func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
+	if Self.closeOp {
+		return errors.New("conn.receiveWindow: write on closed window")
+	}
+	element, err := NewListElement(buf, l, part)
+	//logs.Warn("push the buf", len(buf), l, (&element).l)
+	if err != nil {
+		return
+	}
+	Self.calcSize() // calculate the max window size
+	var wait uint32
+start:
+	ptrs := atomic.LoadUint64(&Self.remainingWait)
+	_, wait = Self.unpack(ptrs)
+	newRemaining := Self.remainingSize(l)
+	// calculate the remaining window size now, plus the element we will push
+	if newRemaining == 0 {
+		//logs.Warn("window full true", remaining)
+		wait = 1
+	}
+	if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
+		goto start
+		// another goroutine change the status, make sure shall we need wait
+	}
+	Self.bufQueue.Push(element)
+	// status check finish, now we can push the element into the queue
+	if wait == 0 {
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
+		// send the remaining window size, not including zero size
+	}
 	return nil
 }
+
+func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
+	if Self.closeOp {
+		return 0, io.EOF // receive close signal, returns eof
+	}
+	pOff := 0
+	l := 0
+	//logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
+copyData:
+	if Self.off == uint32(Self.element.L) {
+		// on the first Read method invoked, Self.off and Self.element.l
+		// both zero value
+		common.ListElementPool.Put(Self.element)
+		if Self.closeOp {
+			return 0, io.EOF
+		}
+		Self.element, err = Self.bufQueue.Pop()
+		// if the queue is empty, Pop method will wait until one element push
+		// into the queue successful, or timeout.
+		// timer start on timeout parameter is set up ,
+		// reset to 60s if timeout and data still available
+		Self.off = 0
+		if err != nil {
+			return // queue receive stop or time out, break the loop and return
+		}
+		//logs.Warn("pop element", Self.element.l, Self.element.part)
+	}
+	l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
+	pOff += l
+	Self.off += uint32(l)
+	//logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
+	n += l
+	l = 0
+	if Self.off == uint32(Self.element.L) {
+		//logs.Warn("put the element end ", string(Self.element.buf[:15]))
+		common.WindowBuff.Put(Self.element.Buf)
+		Self.sendStatus(id, Self.element.L)
+		// check the window full status
+	}
+	if pOff < len(p) && Self.element.Part {
+		// element is a part of the segments, trying to fill up buf p
+		goto copyData
+	}
+	return // buf p is full or all of segments in buf, return
+}
+
+func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
+	var remaining, wait uint32
+	for {
+		ptrs := atomic.LoadUint64(&Self.remainingWait)
+		remaining, wait = Self.unpack(ptrs)
+		remaining += uint32(l)
+		if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
+			break
+		}
+		runtime.Gosched()
+		// another goroutine change remaining or wait status, make sure
+		// we need acknowledge other side
+	}
+	// now we get the current window status success
+	if wait == 1 {
+		//logs.Warn("send the wait status", remaining)
+		Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
+	}
+	return
+}
+
+func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
+	// waiting for FIFO queue Pop method
+	Self.bufQueue.SetTimeOut(t)
+}
+
+func (Self *ReceiveWindow) Stop() {
+	// queue has no more data to push, so unblock pop method
+	Self.once.Do(Self.bufQueue.Stop)
+}
+
+func (Self *ReceiveWindow) CloseWindow() {
+	Self.window.CloseWindow()
+	Self.Stop()
+	Self.release()
+}
+
+func (Self *ReceiveWindow) release() {
+	//if Self.element != nil {
+	//	if Self.element.Buf != nil {
+	//		common.WindowBuff.Put(Self.element.Buf)
+	//	}
+	//	common.ListElementPool.Put(Self.element)
+	//}
+	for {
+		Self.element = Self.bufQueue.TryPop()
+		if Self.element == nil {
+			return
+		}
+		if Self.element.Buf != nil {
+			common.WindowBuff.Put(Self.element.Buf)
+		}
+		common.ListElementPool.Put(Self.element)
+	} // release resource
+}
+
+type SendWindow struct {
+	window
+	buf       []byte
+	setSizeCh chan struct{}
+	timeout   time.Time
+}
+
+func (Self *SendWindow) New(mux *Mux) {
+	Self.setSizeCh = make(chan struct{})
+	Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
+	atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
+	Self.mux = mux
+	Self.window.New()
+}
+
+func (Self *SendWindow) SetSendBuf(buf []byte) {
+	// send window buff from conn write method, set it to send window
+	Self.buf = buf
+	Self.off = 0
+}
+
+func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
+	// set the window size from receive window
+	defer func() {
+		if recover() != nil {
+			closed = true
+		}
+	}()
+	if Self.closeOp {
+		close(Self.setSizeCh)
+		return true
+	}
+	//logs.Warn("set send window size to ", windowSize, newRemaining)
+	var remaining, wait, newWait uint32
+	for {
+		ptrs := atomic.LoadUint64(&Self.remainingWait)
+		remaining, wait = Self.unpack(ptrs)
+		if remaining == newRemaining {
+			//logs.Warn("waiting for another window size")
+			return false // waiting for receive another usable window size
+		}
+		if newRemaining == 0 && wait == 1 {
+			newWait = 1 // keep the wait status,
+			// also if newRemaining is not zero, change wait to 0
+		}
+		if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
+			break
+		}
+		// anther goroutine change wait status or window size
+	}
+	if wait == 1 {
+		// send window into the wait status, need notice the channel
+		//logs.Warn("send window remaining size is 0")
+		Self.allow()
+	}
+	// send window not into the wait status, so just do slide
+	return false
+}
+
+func (Self *SendWindow) allow() {
+	select {
+	case Self.setSizeCh <- struct{}{}:
+		//logs.Warn("send window remaining size is 0 finish")
+		return
+	case <-Self.closeOpCh:
+		close(Self.setSizeCh)
+		return
+	}
+}
+
+func (Self *SendWindow) sent(sentSize uint32) {
+	atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
+}
+
+func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
+	// returns buf segments, return only one segments, need a loop outside
+	// until err = io.EOF
+	if Self.closeOp {
+		return nil, 0, false, errors.New("conn.writeWindow: window closed")
+	}
+	if Self.off == uint32(len(Self.buf)) {
+		return nil, 0, false, io.EOF
+		// send window buff is drain, return eof and get another one
+	}
+	var remaining uint32
+start:
+	ptrs := atomic.LoadUint64(&Self.remainingWait)
+	remaining, _ = Self.unpack(ptrs)
+	if remaining == 0 {
+		if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
+			goto start // another goroutine change the window, try again
+		}
+		// into the wait status
+		//logs.Warn("send window into wait status")
+		err = Self.waitReceiveWindow()
+		if err != nil {
+			return nil, 0, false, err
+		}
+		//logs.Warn("rem into wait finish")
+		goto start
+	}
+	// there are still remaining window
+	//logs.Warn("rem", remaining)
+	if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
+		sendSize = common.MAXIMUM_SEGMENT_SIZE
+		//logs.Warn("cut buf by mss")
+	} else {
+		sendSize = uint32(len(Self.buf[Self.off:]))
+	}
+	if remaining < sendSize {
+		// usable window size is small than
+		// window MAXIMUM_SEGMENT_SIZE or send buf left
+		sendSize = remaining
+		//logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
+	}
+	//logs.Warn("send size", sendSize)
+	if sendSize < uint32(len(Self.buf[Self.off:])) {
+		part = true
+	}
+	p = Self.buf[Self.off : sendSize+Self.off]
+	Self.off += sendSize
+	Self.sent(sendSize)
+	return
+}
+
+func (Self *SendWindow) waitReceiveWindow() (err error) {
+	t := Self.timeout.Sub(time.Now())
+	if t < 0 {
+		t = time.Minute * 5
+	}
+	timer := time.NewTimer(t)
+	defer timer.Stop()
+	// waiting for receive usable window size, or timeout
+	select {
+	case _, ok := <-Self.setSizeCh:
+		if !ok {
+			return errors.New("conn.writeWindow: window closed")
+		}
+		return nil
+	case <-timer.C:
+		return errors.New("conn.writeWindow: write to time out")
+	case <-Self.closeOpCh:
+		return errors.New("conn.writeWindow: window closed")
+	}
+}
+
+func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
+	Self.SetSendBuf(buf) // set the buf to send window
+	//logs.Warn("set the buf to send window")
+	var bufSeg []byte
+	var part bool
+	var l uint32
+	for {
+		bufSeg, l, part, err = Self.WriteTo()
+		//logs.Warn("buf seg", len(bufSeg), part, err)
+		// get the buf segments from send window
+		if bufSeg == nil && part == false && err == io.EOF {
+			// send window is drain, break the loop
+			err = nil
+			break
+		}
+		if err != nil {
+			break
+		}
+		n += int(l)
+		l = 0
+		if part {
+			Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
+		} else {
+			Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
+			//logs.Warn("buf seg sent", len(bufSeg), part, err)
+		}
+		// send to other side, not send nil data to other side
+	}
+	//logs.Warn("buf seg write success")
+	return
+}
+
+func (Self *SendWindow) SetTimeOut(t time.Time) {
+	// waiting for receive a receive window size
+	Self.timeout = t
+}
+
+//type bandwidth struct {
+//	readStart     time.Time
+//	lastReadStart time.Time
+//	readEnd       time.Time
+//	lastReadEnd time.Time
+//	bufLength     int
+//	lastBufLength int
+//	count         int8
+//	readBW        float64
+//	writeBW       float64
+//	readBandwidth float64
+//}
+//
+//func (Self *bandwidth) StartRead() {
+//	Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
+//	if !Self.lastReadStart.IsZero() {
+//		if Self.count == -5 {
+//			Self.calcBandWidth()
+//		}
+//	}
+//}
+//
+//func (Self *bandwidth) EndRead() {
+//	Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
+//	if Self.count == -5 {
+//		Self.calcWriteBandwidth()
+//	}
+//	if Self.count == 0 {
+//		Self.calcReadBandwidth()
+//		Self.count = -6
+//	}
+//	Self.count += 1
+//}
+//
+//func (Self *bandwidth) SetCopySize(n int) {
+//	// must be invoke between StartRead and EndRead
+//	Self.lastBufLength, Self.bufLength = Self.bufLength, n
+//}
+//// calculating
+//// start end start end
+////     read     read
+////        write
+//
+//func (Self *bandwidth) calcBandWidth()  {
+//	t := Self.readStart.Sub(Self.lastReadStart)
+//	if Self.lastBufLength >= 32768 {
+//		Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
+//	}
+//}
+//
+//func (Self *bandwidth) calcReadBandwidth() {
+//	// Bandwidth between nps and npc
+//	readTime := Self.readEnd.Sub(Self.readStart)
+//	Self.readBW = float64(Self.bufLength) / readTime.Seconds()
+//	//logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
+//}
+//
+//func (Self *bandwidth) calcWriteBandwidth() {
+//	// Bandwidth between nps and user, npc and application
+//	writeTime := Self.readStart.Sub(Self.lastReadEnd)
+//	Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
+//	//logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
+//}
+//
+//func (Self *bandwidth) Get() (bw float64) {
+//	// The zero value, 0 for numeric types
+//	if Self.writeBW == 0 && Self.readBW == 0 {
+//		//logs.Warn("bw both 0")
+//		return 100
+//	}
+//	if Self.writeBW == 0 && Self.readBW != 0 {
+//		return Self.readBW
+//	}
+//	if Self.readBW == 0 && Self.writeBW != 0 {
+//		return Self.writeBW
+//	}
+//	return Self.readBandwidth
+//}

+ 35 - 30
lib/mux/map.go

@@ -2,28 +2,35 @@ package mux
 
 import (
 	"sync"
-	"time"
 )
 
 type connMap struct {
 	connMap map[int32]*conn
-	closeCh chan struct{}
+	//closeCh chan struct{}
 	sync.RWMutex
 }
 
 func NewConnMap() *connMap {
 	connMap := &connMap{
 		connMap: make(map[int32]*conn),
-		closeCh: make(chan struct{}),
+		//closeCh: make(chan struct{}),
 	}
-	go connMap.clean()
+	//go connMap.clean()
 	return connMap
 }
 
+func (s *connMap) Size() (n int) {
+	s.Lock()
+	n = len(s.connMap)
+	s.Unlock()
+	return
+}
+
 func (s *connMap) Get(id int32) (*conn, bool) {
 	s.Lock()
-	defer s.Unlock()
-	if v, ok := s.connMap[id]; ok && v != nil {
+	v, ok := s.connMap[id]
+	s.Unlock()
+	if ok && v != nil {
 		return v, true
 	}
 	return nil, false
@@ -31,40 +38,38 @@ func (s *connMap) Get(id int32) (*conn, bool) {
 
 func (s *connMap) Set(id int32, v *conn) {
 	s.Lock()
-	defer s.Unlock()
 	s.connMap[id] = v
+	s.Unlock()
 }
 
 func (s *connMap) Close() {
-	s.Lock()
-	defer s.Unlock()
+	//s.closeCh <- struct{}{} // stop the clean goroutine first
 	for _, v := range s.connMap {
-		v.isClose = true
+		v.Close() // close all the connections in the mux
 	}
-	s.closeCh <- struct{}{}
 }
 
 func (s *connMap) Delete(id int32) {
 	s.Lock()
-	defer s.Unlock()
 	delete(s.connMap, id)
+	s.Unlock()
 }
 
-func (s *connMap) clean() {
-	ticker := time.NewTimer(time.Minute * 1)
-	for {
-		select {
-		case <-ticker.C:
-			s.Lock()
-			for _, v := range s.connMap {
-				if v.isClose {
-					delete(s.connMap, v.connId)
-				}
-			}
-			s.Unlock()
-		case <-s.closeCh:
-			ticker.Stop()
-			return
-		}
-	}
-}
+//func (s *connMap) clean() {
+//	ticker := time.NewTimer(time.Minute * 1)
+//	for {
+//		select {
+//		case <-ticker.C:
+//			s.Lock()
+//			for _, v := range s.connMap {
+//				if v.isClose {
+//					delete(s.connMap, v.connId)
+//				}
+//			}
+//			s.Unlock()
+//		case <-s.closeCh:
+//			ticker.Stop()
+//			return
+//		}
+//	}
+//}

+ 402 - 114
lib/mux/mux.go

@@ -1,57 +1,59 @@
 package mux
 
 import (
-	"bytes"
-	"encoding/binary"
 	"errors"
+	"io"
 	"math"
 	"net"
-	"sync"
 	"sync/atomic"
 	"time"
 
-	"github.com/cnlh/nps/lib/pool"
-)
-
-const (
-	MUX_PING_FLAG int32 = iota
-	MUX_NEW_CONN_OK
-	MUX_NEW_CONN_Fail
-	MUX_NEW_MSG
-	MUX_MSG_SEND_OK
-	MUX_NEW_CONN
-	MUX_PING
-	MUX_CONN_CLOSE
-	MUX_PING_RETURN
+	"github.com/astaxie/beego/logs"
+	"github.com/cnlh/nps/lib/common"
 )
 
 type Mux struct {
+	latency uint64 // we store latency in bits, but it's float64
 	net.Listener
-	conn      net.Conn
-	connMap   *connMap
-	newConnCh chan *conn
-	id        int32
-	closeChan chan struct{}
-	IsClose   bool
-	pingOk    int
-	connType  string
-	sync.Mutex
+	conn          net.Conn
+	connMap       *connMap
+	newConnCh     chan *conn
+	id            int32
+	closeChan     chan struct{}
+	IsClose       bool
+	pingOk        uint32
+	counter       *latencyCounter
+	bw            *bandwidth
+	pingCh        chan []byte
+	pingCheckTime uint32
+	connType      string
+	writeQueue    PriorityQueue
+	newConnQueue  ConnQueue
 }
 
 func NewMux(c net.Conn, connType string) *Mux {
+	//c.(*net.TCPConn).SetReadBuffer(0)
+	//c.(*net.TCPConn).SetWriteBuffer(0)
 	m := &Mux{
 		conn:      c,
 		connMap:   NewConnMap(),
 		id:        0,
-		closeChan: make(chan struct{}),
+		closeChan: make(chan struct{}, 1),
 		newConnCh: make(chan *conn),
+		bw:        new(bandwidth),
 		IsClose:   false,
 		connType:  connType,
+		pingCh:    make(chan []byte),
+		counter:   newLatencyCounter(),
 	}
+	m.writeQueue.New()
+	m.newConnQueue.New()
 	//read session by flag
-	go m.readSession()
+	m.readSession()
 	//ping
-	go m.ping()
+	m.ping()
+	m.pingReturn()
+	m.writeSession()
 	return m
 }
 
@@ -59,12 +61,10 @@ func (s *Mux) NewConn() (*conn, error) {
 	if s.IsClose {
 		return nil, errors.New("the mux has closed")
 	}
-	conn := NewConn(s.getId(), s)
+	conn := NewConn(s.getId(), s, "nps ")
 	//it must be set before send
 	s.connMap.Set(conn.connId, conn)
-	if err := s.sendInfo(MUX_NEW_CONN, conn.connId, nil); err != nil {
-		return nil, err
-	}
+	s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil)
 	//set a timer timeout 30 second
 	timer := time.NewTimer(time.Minute * 2)
 	defer timer.Stop()
@@ -79,7 +79,7 @@ func (s *Mux) NewConn() (*conn, error) {
 
 func (s *Mux) Accept() (net.Conn, error) {
 	if s.IsClose {
-		return nil, errors.New("accpet error,the conn has closed")
+		return nil, errors.New("accpet error,the mux has closed")
 	}
 	conn := <-s.newConnCh
 	if conn == nil {
@@ -92,129 +92,417 @@ func (s *Mux) Addr() net.Addr {
 	return s.conn.LocalAddr()
 }
 
-func (s *Mux) sendInfo(flag int32, id int32, content []byte) error {
-	raw := bytes.NewBuffer([]byte{})
-	binary.Write(raw, binary.LittleEndian, flag)
-	binary.Write(raw, binary.LittleEndian, id)
-	if content != nil && len(content) > 0 {
-		binary.Write(raw, binary.LittleEndian, int32(len(content)))
-		binary.Write(raw, binary.LittleEndian, content)
+func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
+	if s.IsClose {
+		return
 	}
-	if _, err := s.conn.Write(raw.Bytes()); err != nil {
+	var err error
+	pack := common.MuxPack.Get()
+	err = pack.NewPac(flag, id, data...)
+	if err != nil {
+		common.MuxPack.Put(pack)
+		logs.Error("mux: new pack err", err)
 		s.Close()
-		return err
+		return
+	}
+	s.writeQueue.Push(pack)
+	return
+}
+
+func (s *Mux) writeSession() {
+	go s.packBuf()
+	//go s.writeBuf()
+}
+
+func (s *Mux) packBuf() {
+	//buffer := common.BuffPool.Get()
+	for {
+		if s.IsClose {
+			break
+		}
+		//buffer.Reset()
+		pack := s.writeQueue.Pop()
+		if s.IsClose {
+			break
+		}
+		//buffer := common.BuffPool.Get()
+		err := pack.Pack(s.conn)
+		common.MuxPack.Put(pack)
+		if err != nil {
+			logs.Error("mux: pack err", err)
+			//common.BuffPool.Put(buffer)
+			s.Close()
+			break
+		}
+		//logs.Warn(buffer.String())
+		//s.bufQueue.Push(buffer)
+		//l := buffer.Len()
+		//n, err := buffer.WriteTo(s.conn)
+		//common.BuffPool.Put(buffer)
+		//if err != nil || int(n) != l {
+		//	logs.Error("mux: close from write session fail ", err, n, l)
+		//	s.Close()
+		//	break
+		//}
 	}
-	return nil
 }
 
+//func (s *Mux) writeBuf() {
+//	for {
+//		if s.IsClose {
+//			break
+//		}
+//		buffer, err := s.bufQueue.Pop()
+//		if err != nil {
+//			break
+//		}
+//		l := buffer.Len()
+//		n, err := buffer.WriteTo(s.conn)
+//		common.BuffPool.Put(buffer)
+//		if err != nil || int(n) != l {
+//			logs.Warn("close from write session fail ", err, n, l)
+//			s.Close()
+//			break
+//		}
+//	}
+//}
+
 func (s *Mux) ping() {
 	go func() {
-		ticker := time.NewTicker(time.Second * 1)
+		now, _ := time.Now().UTC().MarshalText()
+		s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
+		// send the ping flag and get the latency first
+		ticker := time.NewTicker(time.Second * 5)
 		for {
+			if s.IsClose {
+				ticker.Stop()
+				break
+			}
 			select {
 			case <-ticker.C:
 			}
-			//Avoid going beyond the scope
-			if (math.MaxInt32 - s.id) < 10000 {
-				s.id = 0
+			if atomic.LoadUint32(&s.pingCheckTime) >= 60 {
+				logs.Error("mux: ping time out")
+				s.Close()
+				// more than 5 minutes not receive the ping return package,
+				// mux conn is damaged, maybe a packet drop, close it
+				break
 			}
-			if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") {
+			now, _ := time.Now().UTC().MarshalText()
+			s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
+			atomic.AddUint32(&s.pingCheckTime, 1)
+			if atomic.LoadUint32(&s.pingOk) > 10 && s.connType == "kcp" {
+				logs.Error("mux: kcp ping err")
 				s.Close()
 				break
 			}
-			s.pingOk++
+			atomic.AddUint32(&s.pingOk, 1)
+		}
+	}()
+}
+
+func (s *Mux) pingReturn() {
+	go func() {
+		var now time.Time
+		var data []byte
+		for {
+			if s.IsClose {
+				break
+			}
+			select {
+			case data = <-s.pingCh:
+				atomic.StoreUint32(&s.pingCheckTime, 0)
+			case <-s.closeChan:
+				break
+			}
+			_ = now.UnmarshalText(data)
+			latency := time.Now().UTC().Sub(now).Seconds() / 2
+			if latency > 0 {
+				atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency)))
+				// convert float64 to bits, store it atomic
+			}
+			//logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency)))
+			if cap(data) > 0 {
+				common.WindowBuff.Put(data)
+			}
 		}
 	}()
-	select {
-	case <-s.closeChan:
-	}
 }
 
 func (s *Mux) readSession() {
-	var buf []byte
 	go func() {
+		var connection *conn
 		for {
-			var flag, i int32
-			var n int
-			var err error
-			if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
-				if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
-					break
-				}
-				s.pingOk = 0
-				switch flag {
-				case MUX_NEW_CONN: //new conn
-					conn := NewConn(i, s)
-					s.connMap.Set(i, conn) //it has been set before send ok
-					s.newConnCh <- conn
-					s.sendInfo(MUX_NEW_CONN_OK, i, nil)
+			if s.IsClose {
+				break
+			}
+			connection = s.newConnQueue.Pop()
+			if s.IsClose {
+				break // make sure that is closed
+			}
+			s.connMap.Set(connection.connId, connection) //it has been set before send ok
+			s.newConnCh <- connection
+			s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
+		}
+	}()
+	go func() {
+		pack := common.MuxPack.Get()
+		var l uint16
+		var err error
+		for {
+			if s.IsClose {
+				break
+			}
+			pack = common.MuxPack.Get()
+			s.bw.StartRead()
+			if l, err = pack.UnPack(s.conn); err != nil {
+				logs.Error("mux: read session unpack from connection err", err)
+				s.Close()
+				break
+			}
+			s.bw.SetCopySize(l)
+			atomic.StoreUint32(&s.pingOk, 0)
+			switch pack.Flag {
+			case common.MUX_NEW_CONN: //new connection
+				connection := NewConn(pack.Id, s)
+				s.newConnQueue.Push(connection)
+				continue
+			case common.MUX_PING_FLAG: //ping
+				s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
+				common.WindowBuff.Put(pack.Content)
+				continue
+			case common.MUX_PING_RETURN:
+				//go func(content []byte) {
+				s.pingCh <- pack.Content
+				//}(pack.Content)
+				continue
+			}
+			if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
+				switch pack.Flag {
+				case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
+					err = s.newMsg(connection, pack)
+					if err != nil {
+						logs.Error("mux: read session connection new msg err", err)
+						connection.Close()
+					}
 					continue
-				case MUX_PING_FLAG: //ping
-					s.sendInfo(MUX_PING_RETURN, MUX_PING, nil)
+				case common.MUX_NEW_CONN_OK: //connection ok
+					connection.connStatusOkCh <- struct{}{}
 					continue
-				case MUX_PING_RETURN:
+				case common.MUX_NEW_CONN_Fail:
+					connection.connStatusFailCh <- struct{}{}
 					continue
-				case MUX_NEW_MSG:
-					buf = pool.GetBufPoolCopy()
-					if n, err = ReadLenBytes(buf, s.conn); err != nil {
-						break
+				case common.MUX_MSG_SEND_OK:
+					if connection.isClose {
+						continue
 					}
+					connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
+					continue
+				case common.MUX_CONN_CLOSE: //close the connection
+					connection.closeFlag = true
+					//s.connMap.Delete(pack.Id)
+					//go func(connection *conn) {
+					connection.receiveWindow.Stop() // close signal to receive window
+					//}(connection)
+					continue
 				}
-				if conn, ok := s.connMap.Get(i); ok && !conn.isClose {
-					switch flag {
-					case MUX_NEW_MSG: //new msg from remote conn
-						//insert wait queue
-						conn.waitQueue.Push(NewBufNode(buf, n))
-						//judge len if >xxx ,send stop
-						if conn.readWait {
-							conn.readWait = false
-							conn.readCh <- struct{}{}
-						}
-					case MUX_MSG_SEND_OK: //the remote has read
-						select {
-						case conn.getStatusCh <- struct{}{}:
-						default:
-						}
-						conn.hasWrite--
-					case MUX_NEW_CONN_OK: //conn ok
-						conn.connStatusOkCh <- struct{}{}
-					case MUX_NEW_CONN_Fail:
-						conn.connStatusFailCh <- struct{}{}
-					case MUX_CONN_CLOSE: //close the connection
-						go conn.Close()
-						s.connMap.Delete(i)
-					}
-				} else if flag == MUX_NEW_MSG {
-					pool.PutBufPoolCopy(buf)
-				}
-			} else {
-				break
+			} else if pack.Flag == common.MUX_CONN_CLOSE {
+				continue
 			}
+			common.MuxPack.Put(pack)
 		}
+		common.MuxPack.Put(pack)
 		s.Close()
 	}()
-	select {
-	case <-s.closeChan:
+}
+
+func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) {
+	if connection.isClose {
+		err = io.ErrClosedPipe
+		return
+	}
+	//logs.Warn("read session receive new msg", pack.Length)
+	//go func(connection *conn, pack *common.MuxPackager) { // do not block read session
+	//insert into queue
+	if pack.Flag == common.MUX_NEW_MSG_PART {
+		err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id)
 	}
+	if pack.Flag == common.MUX_NEW_MSG {
+		err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id)
+	}
+	//logs.Warn("read session write success", pack.Length)
+	return
 }
 
-func (s *Mux) Close() error {
+func (s *Mux) Close() (err error) {
+	logs.Warn("close mux")
 	if s.IsClose {
 		return errors.New("the mux has closed")
 	}
 	s.IsClose = true
 	s.connMap.Close()
-	select {
-	case s.closeChan <- struct{}{}:
+	s.connMap = nil
+	//s.bufQueue.Stop()
+	s.closeChan <- struct{}{}
+	close(s.newConnCh)
+	err = s.conn.Close()
+	s.release()
+	return
+}
+
+func (s *Mux) release() {
+	for {
+		pack := s.writeQueue.TryPop()
+		if pack == nil {
+			break
+		}
+		if pack.BasePackager.Content != nil {
+			common.WindowBuff.Put(pack.BasePackager.Content)
+		}
+		common.MuxPack.Put(pack)
 	}
-	select {
-	case s.closeChan <- struct{}{}:
+	for {
+		connection := s.newConnQueue.TryPop()
+		if connection == nil {
+			break
+		}
+		connection = nil
 	}
-	close(s.newConnCh)
-	return s.conn.Close()
+	s.writeQueue.Stop()
+	s.newConnQueue.Stop()
 }
 
 //get new connId as unique flag
-func (s *Mux) getId() int32 {
-	return atomic.AddInt32(&s.id, 1)
+func (s *Mux) getId() (id int32) {
+	//Avoid going beyond the scope
+	if (math.MaxInt32 - s.id) < 10000 {
+		atomic.StoreInt32(&s.id, 0)
+	}
+	id = atomic.AddInt32(&s.id, 1)
+	if _, ok := s.connMap.Get(id); ok {
+		return s.getId()
+	}
+	return
+}
+
+type bandwidth struct {
+	readBandwidth uint64 // store in bits, but it's float64
+	readStart     time.Time
+	lastReadStart time.Time
+	bufLength     uint32
+}
+
+func (Self *bandwidth) StartRead() {
+	if Self.readStart.IsZero() {
+		Self.readStart = time.Now()
+	}
+	if Self.bufLength >= common.MAXIMUM_SEGMENT_SIZE*300 {
+		Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
+		Self.calcBandWidth()
+	}
+}
+
+func (Self *bandwidth) SetCopySize(n uint16) {
+	Self.bufLength += uint32(n)
+}
+
+func (Self *bandwidth) calcBandWidth() {
+	t := Self.readStart.Sub(Self.lastReadStart)
+	atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds()))
+	Self.bufLength = 0
+}
+
+func (Self *bandwidth) Get() (bw float64) {
+	// The zero value, 0 for numeric types
+	bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth))
+	if bw <= 0 {
+		bw = 100
+	}
+	//logs.Warn(bw)
+	return
+}
+
+const counterBits = 4
+const counterMask = 1<<counterBits - 1
+
+func newLatencyCounter() *latencyCounter {
+	return &latencyCounter{
+		buf:     make([]float64, 1<<counterBits, 1<<counterBits),
+		headMin: 0,
+	}
+}
+
+type latencyCounter struct {
+	buf []float64 //buf is a fixed length ring buffer,
+	// if buffer is full, new value will replace the oldest one.
+	headMin uint8 //head indicate the head in ring buffer,
+	// in meaning, slot in list will be replaced;
+	// min indicate this slot value is minimal in list.
+}
+
+func (Self *latencyCounter) unpack(idxs uint8) (head, min uint8) {
+	head = uint8((idxs >> counterBits) & counterMask)
+	// we set head is 4 bits
+	min = uint8(idxs & counterMask)
+	return
+}
+
+func (Self *latencyCounter) pack(head, min uint8) uint8 {
+	return uint8(head<<counterBits) |
+		uint8(min&counterMask)
+}
+
+func (Self *latencyCounter) add(value float64) {
+	head, min := Self.unpack(Self.headMin)
+	Self.buf[head] = value
+	if head == min {
+		min = Self.minimal()
+		//if head equals min, means the min slot already be replaced,
+		// so we need to find another minimal value in the list,
+		// and change the min indicator
+	}
+	if Self.buf[min] > value {
+		min = head
+	}
+	head++
+	Self.headMin = Self.pack(head, min)
+}
+
+func (Self *latencyCounter) minimal() (min uint8) {
+	var val float64
+	var i uint8
+	for i = 0; i < counterMask; i++ {
+		if Self.buf[i] > 0 {
+			if val > Self.buf[i] {
+				val = Self.buf[i]
+				min = i
+			}
+		}
+	}
+	return
+}
+
+func (Self *latencyCounter) Latency(value float64) (latency float64) {
+	Self.add(value)
+	_, min := Self.unpack(Self.headMin)
+	latency = Self.buf[min] * Self.countSuccess()
+	return
+}
+
+const lossRatio = 1.6
+
+func (Self *latencyCounter) countSuccess() (successRate float64) {
+	var success, loss, i uint8
+	_, min := Self.unpack(Self.headMin)
+	for i = 0; i < counterMask; i++ {
+		if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 {
+			loss++
+		}
+		if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 {
+			success++
+		}
+	}
+	// counting all the data in the ring buf, except zero
+	successRate = float64(success) / float64(loss+success)
+	return
 }

+ 366 - 29
lib/mux/mux_test.go

@@ -1,16 +1,22 @@
 package mux
 
 import (
+	"bufio"
+	"fmt"
+	"github.com/cnlh/nps/lib/common"
+	"github.com/cnlh/nps/lib/goroutine"
+	"io"
 	"log"
 	"net"
 	"net/http"
+	"net/http/httputil"
 	_ "net/http/pprof"
+	"strconv"
 	"testing"
 	"time"
+	"unsafe"
 
 	"github.com/astaxie/beego/logs"
-	"github.com/cnlh/nps/lib/common"
-	"github.com/cnlh/nps/lib/pool"
 )
 
 var conn1 net.Conn
@@ -24,24 +30,54 @@ func TestNewMux(t *testing.T) {
 	logs.SetLogFuncCallDepth(3)
 	server()
 	client()
+	//poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false))
 	time.Sleep(time.Second * 3)
 	go func() {
 		m2 := NewMux(conn2, "tcp")
 		for {
+			//logs.Warn("npc starting accept")
 			c, err := m2.Accept()
 			if err != nil {
-				log.Fatalln(err)
+				logs.Warn(err)
+				continue
 			}
-			go func(c net.Conn) {
-				c2, err := net.Dial("tcp", "127.0.0.1:8082")
-				if err != nil {
-					log.Fatalln(err)
-				}
-				go common.CopyBuffer(c2, c)
-				common.CopyBuffer(c, c2)
+			//logs.Warn("npc accept success ")
+			c2, err := net.Dial("tcp", "127.0.0.1:80")
+			if err != nil {
+				logs.Warn(err)
 				c.Close()
-				c2.Close()
-			}(c)
+				continue
+			}
+			//c2.(*net.TCPConn).SetReadBuffer(0)
+			//c2.(*net.TCPConn).SetReadBuffer(0)
+			_ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil))
+			//go func(c2 net.Conn, c *conn) {
+			//	wg := new(sync.WaitGroup)
+			//	wg.Add(2)
+			//	_ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg))
+			//	//go func() {
+			//	//	_, err = common.CopyBuffer(c2, c)
+			//	//	if err != nil {
+			//	//		c2.Close()
+			//	//		c.Close()
+			//	//		//logs.Warn("close npc by copy from nps", err, c.connId)
+			//	//	}
+			//	//	wg.Done()
+			//	//}()
+			//	//wg.Add(1)
+			//	_ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg))
+			//	//go func() {
+			//	//	_, err = common.CopyBuffer(c, c2)
+			//	//	if err != nil {
+			//	//		c2.Close()
+			//	//		c.Close()
+			//	//		//logs.Warn("close npc by copy from server", err, c.connId)
+			//	//	}
+			//	//	wg.Done()
+			//	//}()
+			//	//logs.Warn("npc wait")
+			//	wg.Wait()
+			//}(c2, c.(*conn))
 		}
 	}()
 
@@ -49,26 +85,58 @@ func TestNewMux(t *testing.T) {
 		m1 := NewMux(conn1, "tcp")
 		l, err := net.Listen("tcp", "127.0.0.1:7777")
 		if err != nil {
-			log.Fatalln(err)
+			logs.Warn(err)
 		}
 		for {
-			conn, err := l.Accept()
+			//logs.Warn("nps starting accept")
+			conns, err := l.Accept()
 			if err != nil {
-				log.Fatalln(err)
+				logs.Warn(err)
+				continue
 			}
-			go func(conn net.Conn) {
-				tmpCpnn, err := m1.NewConn()
-				if err != nil {
-					log.Fatalln(err)
-				}
-				go common.CopyBuffer(tmpCpnn, conn)
-				common.CopyBuffer(conn, tmpCpnn)
-				conn.Close()
-				tmpCpnn.Close()
-			}(conn)
+			//conns.(*net.TCPConn).SetReadBuffer(0)
+			//conns.(*net.TCPConn).SetReadBuffer(0)
+			//logs.Warn("nps accept success starting new conn")
+			tmpCpnn, err := m1.NewConn()
+			if err != nil {
+				logs.Warn("nps new conn err ", err)
+				continue
+			}
+			//logs.Warn("nps new conn success ", tmpCpnn.connId)
+			_ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil))
+			//go func(tmpCpnn *conn, conns net.Conn) {
+			//	wg := new(sync.WaitGroup)
+			//	wg.Add(2)
+			//	_ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg))
+			//	//go func() {
+			//	//	_, err := common.CopyBuffer(tmpCpnn, conns)
+			//	//	if err != nil {
+			//	//		conns.Close()
+			//	//		tmpCpnn.Close()
+			//	//		//logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
+			//	//	}
+			//	//}()
+			//	//wg.Add(1)
+			//	_ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg))
+			//	//time.Sleep(time.Second)
+			//	//_, err = common.CopyBuffer(conns, tmpCpnn)
+			//	//if err != nil {
+			//	//	conns.Close()
+			//	//	tmpCpnn.Close()
+			//	//	//logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
+			//	//}
+			//	wg.Wait()
+			//}(tmpCpnn, conns)
 		}
 	}()
 
+	//go NewLogServer()
+	time.Sleep(time.Second * 5)
+	//for i := 0; i < 1; i++ {
+	//	go test_raw(i)
+	//}
+	//test_request()
+
 	for {
 		time.Sleep(time.Second * 5)
 	}
@@ -78,12 +146,12 @@ func server() {
 	var err error
 	l, err := net.Listen("tcp", "127.0.0.1:9999")
 	if err != nil {
-		log.Fatalln(err)
+		logs.Warn(err)
 	}
 	go func() {
 		conn1, err = l.Accept()
 		if err != nil {
-			log.Fatalln(err)
+			logs.Warn(err)
 		}
 	}()
 	return
@@ -93,12 +161,79 @@ func client() {
 	var err error
 	conn2, err = net.Dial("tcp", "127.0.0.1:9999")
 	if err != nil {
-		log.Fatalln(err)
+		logs.Warn(err)
+	}
+}
+
+func test_request() {
+	conn, _ := net.Dial("tcp", "127.0.0.1:7777")
+	for i := 0; i < 1000; i++ {
+		conn.Write([]byte(`GET / HTTP/1.1
+Host: 127.0.0.1:7777
+Connection: keep-alive
+
+
+`))
+		r, err := http.ReadResponse(bufio.NewReader(conn), nil)
+		if err != nil {
+			logs.Warn("close by read response err", err)
+			break
+		}
+		logs.Warn("read response success", r)
+		b, err := httputil.DumpResponse(r, true)
+		if err != nil {
+			logs.Warn("close by dump response err", err)
+			break
+		}
+		fmt.Println(string(b[:20]), err)
+		//time.Sleep(time.Second)
+	}
+	logs.Warn("finish")
+}
+
+func test_raw(k int) {
+	for i := 0; i < 1000; i++ {
+		ti := time.Now()
+		conn, err := net.Dial("tcp", "127.0.0.1:7777")
+		if err != nil {
+			logs.Warn("conn dial err", err)
+		}
+		tid := time.Now()
+		conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
+Host: 127.0.0.1:7777
+
+
+`))
+		tiw := time.Now()
+		buf := make([]byte, 3572)
+		n, err := io.ReadFull(conn, buf)
+		//n, err := conn.Read(buf)
+		if err != nil {
+			logs.Warn("close by read response err", err)
+			break
+		}
+		logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
+		//time.Sleep(time.Second)
+		err = conn.Close()
+		if err != nil {
+			logs.Warn("close conn err ", err)
+		}
+		now := time.Now()
+		du := now.Sub(ti).Seconds()
+		dud := now.Sub(tid).Seconds()
+		duw := now.Sub(tiw).Seconds()
+		if du > 1 {
+			logs.Warn("duration long", du, dud, duw, k, i)
+		}
+		if n != 3572 {
+			logs.Warn("n loss", n, string(buf))
+		}
 	}
+	logs.Warn("finish")
 }
 
 func TestNewConn(t *testing.T) {
-	buf := pool.GetBufPoolCopy()
+	buf := common.GetBufPoolCopy()
 	logs.Warn(len(buf), cap(buf))
 	//b := pool.GetBufPoolCopy()
 	//b[0] = 1
@@ -108,3 +243,205 @@ func TestNewConn(t *testing.T) {
 	logs.Warn(copy(buf[:3], b), len(buf), cap(buf))
 	logs.Warn(len(buf), buf[0])
 }
+
+func TestDQueue(t *testing.T) {
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	d := new(bufDequeue)
+	d.vals = make([]unsafe.Pointer, 8)
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 10; i++ {
+			logs.Warn(i)
+			logs.Warn(d.popTail())
+		}
+	}()
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 10; i++ {
+			data := "test"
+			go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data)))
+		}
+	}()
+	time.Sleep(time.Second * 3)
+}
+
+func TestChain(t *testing.T) {
+	go func() {
+		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+	}()
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	time.Sleep(time.Second * 5)
+	d := new(bufChain)
+	d.new(256)
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 30000; i++ {
+			unsa, ok := d.popTail()
+			str := (*string)(unsa)
+			if ok {
+				fmt.Println(i, str, *str, ok)
+				//logs.Warn(i, str, *str, ok)
+			} else {
+				fmt.Println("nil", i, ok)
+				//logs.Warn("nil", i, ok)
+			}
+		}
+	}()
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 3000; i++ {
+			go func(i int) {
+				for n := 0; n < 10; n++ {
+					data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
+					fmt.Println(data, unsafe.Pointer(&data))
+					//logs.Warn(data, unsafe.Pointer(&data))
+					d.pushHead(unsafe.Pointer(&data))
+				}
+			}(i)
+		}
+	}()
+	time.Sleep(time.Second * 100000)
+}
+
+func TestFIFO(t *testing.T) {
+	go func() {
+		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+	}()
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	time.Sleep(time.Second * 5)
+	d := new(ReceiveWindowQueue)
+	d.New()
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 1001; i++ {
+			data, err := d.Pop()
+			if err == nil {
+				//fmt.Println(i, string(data.buf), err)
+				logs.Warn(i, string(data.Buf), err)
+				common.ListElementPool.Put(data)
+			} else {
+				//fmt.Println("err", err)
+				logs.Warn("err", err)
+			}
+			//logs.Warn(d.Len())
+		}
+		logs.Warn("pop finish")
+	}()
+	go func() {
+		time.Sleep(time.Second * 10)
+		for i := 0; i < 1000; i++ {
+			by := []byte("test " + strconv.Itoa(i) + " ") //
+			data, _ := NewListElement(by, uint16(len(by)), true)
+			//fmt.Println(string((*data).buf), data)
+			//logs.Warn(string((*data).buf), data)
+			d.Push(data)
+		}
+	}()
+	time.Sleep(time.Second * 100000)
+}
+
+func TestPriority(t *testing.T) {
+	go func() {
+		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+	}()
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	time.Sleep(time.Second * 5)
+	d := new(PriorityQueue)
+	d.New()
+	go func() {
+		time.Sleep(time.Second)
+		for i := 0; i < 360050; i++ {
+			data := d.Pop()
+			//fmt.Println(i, string(data.buf), err)
+			logs.Warn(i, string(data.Content), data)
+		}
+		logs.Warn("pop finish")
+	}()
+	go func() {
+		time.Sleep(time.Second * 10)
+		for i := 0; i < 30000; i++ {
+			go func(i int) {
+				for n := 0; n < 10; n++ {
+					data := new(common.MuxPackager)
+					by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
+					_ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
+					//fmt.Println(string((*data).buf), data)
+					logs.Warn(string((*data).Content), data)
+					d.Push(data)
+				}
+			}(i)
+			go func(i int) {
+				data := new(common.MuxPackager)
+				_ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
+				//fmt.Println(string((*data).buf), data)
+				logs.Warn(data)
+				d.Push(data)
+			}(i)
+			go func(i int) {
+				data := new(common.MuxPackager)
+				_ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
+				//fmt.Println(string((*data).buf), data)
+				logs.Warn(data)
+				d.Push(data)
+			}(i)
+		}
+	}()
+	time.Sleep(time.Second * 100000)
+}
+
+//func TestReceive(t *testing.T) {
+//	go func() {
+//		log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
+//	}()
+//	logs.EnableFuncCallDepth(true)
+//	logs.SetLogFuncCallDepth(3)
+//	time.Sleep(time.Second * 5)
+//	mux := new(Mux)
+//	mux.bw.readBandwidth = float64(1*1024*1024)
+//	mux.latency = float64(1/1000)
+//	wind := new(ReceiveWindow)
+//	wind.New(mux)
+//	wind.
+//	go func() {
+//		time.Sleep(time.Second)
+//		for i := 0; i < 36000; i++ {
+//			data := d.Pop()
+//			//fmt.Println(i, string(data.buf), err)
+//			logs.Warn(i, string(data.Content), data)
+//		}
+//	}()
+//	go func() {
+//		time.Sleep(time.Second*10)
+//		for i := 0; i < 3000; i++ {
+//			go func(i int) {
+//				for n := 0; n < 10; n++{
+//					data := new(common.MuxPackager)
+//					by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
+//					_ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
+//					//fmt.Println(string((*data).buf), data)
+//					logs.Warn(string((*data).Content), data)
+//					d.Push(data)
+//				}
+//			}(i)
+//			go func(i int) {
+//				data := new(common.MuxPackager)
+//				_ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
+//				//fmt.Println(string((*data).buf), data)
+//				logs.Warn(data)
+//				d.Push(data)
+//			}(i)
+//			go func(i int) {
+//				data := new(common.MuxPackager)
+//				_ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
+//				//fmt.Println(string((*data).buf), data)
+//				logs.Warn(data)
+//				d.Push(data)
+//			}(i)
+//		}
+//	}()
+//	time.Sleep(time.Second * 100000)
+//}

+ 544 - 49
lib/mux/queue.go

@@ -2,82 +2,577 @@ package mux
 
 import (
 	"errors"
+	"github.com/cnlh/nps/lib/common"
+	"io"
+	"math"
+	"runtime"
 	"sync"
-
-	"github.com/cnlh/nps/lib/pool"
+	"sync/atomic"
+	"time"
+	"unsafe"
 )
 
-type Element *bufNode
+type PriorityQueue struct {
+	highestChain *bufChain
+	middleChain  *bufChain
+	lowestChain  *bufChain
+	starving     uint8
+	stop         bool
+	cond         *sync.Cond
+}
+
+func (Self *PriorityQueue) New() {
+	Self.highestChain = new(bufChain)
+	Self.highestChain.new(4)
+	Self.middleChain = new(bufChain)
+	Self.middleChain.new(32)
+	Self.lowestChain = new(bufChain)
+	Self.lowestChain.new(256)
+	locker := new(sync.Mutex)
+	Self.cond = sync.NewCond(locker)
+}
+
+func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
+	//logs.Warn("push start")
+	Self.push(packager)
+	Self.cond.Broadcast()
+	//logs.Warn("push finish")
+	return
+}
+
+func (Self *PriorityQueue) push(packager *common.MuxPackager) {
+	switch packager.Flag {
+	case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
+		Self.highestChain.pushHead(unsafe.Pointer(packager))
+	// the ping package need highest priority
+	// prevent ping calculation error
+	case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail:
+		// the new conn package need some priority too
+		Self.middleChain.pushHead(unsafe.Pointer(packager))
+	default:
+		Self.lowestChain.pushHead(unsafe.Pointer(packager))
+	}
+}
+
+const maxStarving uint8 = 8
 
-type bufNode struct {
-	val []byte //buf value
-	l   int    //length
+func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
+	var iter bool
+	for {
+		packager = Self.TryPop()
+		if packager != nil {
+			return
+		}
+		if Self.stop {
+			return
+		}
+		if iter {
+			break
+			// trying to pop twice
+		}
+		iter = true
+		runtime.Gosched()
+	}
+	Self.cond.L.Lock()
+	defer Self.cond.L.Unlock()
+	for packager = Self.TryPop(); packager == nil; {
+		if Self.stop {
+			return
+		}
+		//logs.Warn("queue into wait")
+		Self.cond.Wait()
+		// wait for it with no more iter
+		packager = Self.TryPop()
+		//logs.Warn("queue wait finish", packager)
+	}
+	return
 }
 
-func NewBufNode(buf []byte, l int) *bufNode {
-	return &bufNode{
-		val: buf,
-		l:   l,
+func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) {
+	ptr, ok := Self.highestChain.popTail()
+	if ok {
+		packager = (*common.MuxPackager)(ptr)
+		return
 	}
+	if Self.starving < maxStarving {
+		// not pop too much, lowestChain will wait too long
+		ptr, ok = Self.middleChain.popTail()
+		if ok {
+			packager = (*common.MuxPackager)(ptr)
+			Self.starving++
+			return
+		}
+	}
+	ptr, ok = Self.lowestChain.popTail()
+	if ok {
+		packager = (*common.MuxPackager)(ptr)
+		if Self.starving > 0 {
+			Self.starving = uint8(Self.starving / 2)
+		}
+		return
+	}
+	if Self.starving > 0 {
+		ptr, ok = Self.middleChain.popTail()
+		if ok {
+			packager = (*common.MuxPackager)(ptr)
+			Self.starving++
+			return
+		}
+	}
+	return
+}
+
+func (Self *PriorityQueue) Stop() {
+	Self.stop = true
+	Self.cond.Broadcast()
+}
+
+type ConnQueue struct {
+	chain    *bufChain
+	starving uint8
+	stop     bool
+	cond     *sync.Cond
+}
+
+func (Self *ConnQueue) New() {
+	Self.chain = new(bufChain)
+	Self.chain.new(32)
+	locker := new(sync.Mutex)
+	Self.cond = sync.NewCond(locker)
 }
 
-type Queue interface {
-	Push(e Element) //向队列中添加元素
-	Pop() Element   //移除队列中最前面的元素
-	Clear() bool    //清空队列
-	Size() int      //获取队列的元素个数
-	IsEmpty() bool  //判断队列是否是空
+func (Self *ConnQueue) Push(connection *conn) {
+	Self.chain.pushHead(unsafe.Pointer(connection))
+	Self.cond.Broadcast()
+	return
 }
 
-type sliceEntry struct {
-	element []Element
-	sync.Mutex
+func (Self *ConnQueue) Pop() (connection *conn) {
+	var iter bool
+	for {
+		connection = Self.TryPop()
+		if connection != nil {
+			return
+		}
+		if Self.stop {
+			return
+		}
+		if iter {
+			break
+			// trying to pop twice
+		}
+		iter = true
+		runtime.Gosched()
+	}
+	Self.cond.L.Lock()
+	defer Self.cond.L.Unlock()
+	for connection = Self.TryPop(); connection == nil; {
+		if Self.stop {
+			return
+		}
+		//logs.Warn("queue into wait")
+		Self.cond.Wait()
+		// wait for it with no more iter
+		connection = Self.TryPop()
+		//logs.Warn("queue wait finish", packager)
+	}
+	return
 }
 
-func NewQueue() *sliceEntry {
-	return &sliceEntry{}
+func (Self *ConnQueue) TryPop() (connection *conn) {
+	ptr, ok := Self.chain.popTail()
+	if ok {
+		connection = (*conn)(ptr)
+		return
+	}
+	return
 }
 
-//向队列中添加元素
-func (entry *sliceEntry) Push(e Element) {
-	entry.Lock()
-	defer entry.Unlock()
-	entry.element = append(entry.element, e)
+func (Self *ConnQueue) Stop() {
+	Self.stop = true
+	Self.cond.Broadcast()
 }
 
-//移除队列中最前面的额元素
-func (entry *sliceEntry) Pop() (Element, error) {
-	if entry.IsEmpty() {
-		return nil, errors.New("queue is empty!")
+func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) {
+	if uint16(len(buf)) != l {
+		err = errors.New("ListElement: buf length not match")
+		return
 	}
-	entry.Lock()
-	defer entry.Unlock()
-	firstElement := entry.element[0]
-	entry.element = entry.element[1:]
-	return firstElement, nil
+	//if l == 0 {
+	//	logs.Warn("push zero")
+	//}
+	element = common.ListElementPool.Get()
+	element.Buf = buf
+	element.L = l
+	element.Part = part
+	return
 }
 
-func (entry *sliceEntry) Clear() bool {
-	entry.Lock()
-	defer entry.Unlock()
-	if entry.IsEmpty() {
+type ReceiveWindowQueue struct {
+	lengthWait uint64
+	chain      *bufChain
+	stopOp     chan struct{}
+	readOp     chan struct{}
+	timeout    time.Time
+}
+
+func (Self *ReceiveWindowQueue) New() {
+	Self.readOp = make(chan struct{})
+	Self.chain = new(bufChain)
+	Self.chain.new(64)
+	Self.stopOp = make(chan struct{}, 2)
+}
+
+func (Self *ReceiveWindowQueue) Push(element *common.ListElement) {
+	var length, wait uint32
+	for {
+		ptrs := atomic.LoadUint64(&Self.lengthWait)
+		length, wait = Self.chain.head.unpack(ptrs)
+		length += uint32(element.L)
+		if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) {
+			break
+		}
+		// another goroutine change the length or into wait, make sure
+	}
+	//logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf))
+	Self.chain.pushHead(unsafe.Pointer(element))
+	//logs.Warn("window push", Self.Len())
+	if wait == 1 {
+		Self.allowPop()
+	}
+	return
+}
+
+func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) {
+	var length uint32
+startPop:
+	ptrs := atomic.LoadUint64(&Self.lengthWait)
+	length, _ = Self.chain.head.unpack(ptrs)
+	if length == 0 {
+		if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) {
+			goto startPop // another goroutine is pushing
+		}
+		err = Self.waitPush()
+		// there is no more data in queue, wait for it
+		if err != nil {
+			return
+		}
+		goto startPop // wait finish, trying to get the new status
+	}
+	// length is not zero, so try to pop
+	for {
+		element = Self.TryPop()
+		if element != nil {
+			return
+		}
+		runtime.Gosched() // another goroutine is still pushing
+	}
+}
+
+func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) {
+	ptr, ok := Self.chain.popTail()
+	if ok {
+		//logs.Warn("window pop before", Self.Len())
+		element = (*common.ListElement)(ptr)
+		atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<<dequeueBits - 1))
+		//logs.Warn("window pop", Self.Len(), uint32(element.l))
+		return
+	}
+	return nil
+}
+
+func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
+	//logs.Warn("allow pop", Self.Len())
+	select {
+	case Self.readOp <- struct{}{}:
 		return false
+	case <-Self.stopOp:
+		return true
+	}
+}
+
+func (Self *ReceiveWindowQueue) waitPush() (err error) {
+	//logs.Warn("wait push")
+	//defer logs.Warn("wait push finish")
+	t := Self.timeout.Sub(time.Now())
+	if t <= 0 {
+		t = time.Minute * 5
+	}
+	timer := time.NewTimer(t)
+	defer timer.Stop()
+	//logs.Warn("queue into wait")
+	select {
+	case <-Self.readOp:
+		//logs.Warn("queue wait finish")
+		return nil
+	case <-Self.stopOp:
+		err = io.EOF
+		return
+	case <-timer.C:
+		err = errors.New("mux.queue: read time out")
+		return
+	}
+}
+
+func (Self *ReceiveWindowQueue) Len() (n uint32) {
+	ptrs := atomic.LoadUint64(&Self.lengthWait)
+	n, _ = Self.chain.head.unpack(ptrs)
+	return
+}
+
+func (Self *ReceiveWindowQueue) Stop() {
+	Self.stopOp <- struct{}{}
+	Self.stopOp <- struct{}{}
+}
+
+func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) {
+	Self.timeout = t
+}
+
+// https://golang.org/src/sync/poolqueue.go
+
+type bufDequeue struct {
+	// headTail packs together a 32-bit head index and a 32-bit
+	// tail index. Both are indexes into vals modulo len(vals)-1.
+	//
+	// tail = index of oldest data in queue
+	// head = index of next slot to fill
+	//
+	// Slots in the range [tail, head) are owned by consumers.
+	// A consumer continues to own a slot outside this range until
+	// it nils the slot, at which point ownership passes to the
+	// producer.
+	//
+	// The head index is stored in the most-significant bits so
+	// that we can atomically add to it and the overflow is
+	// harmless.
+	headTail uint64
+
+	// vals is a ring buffer of interface{} values stored in this
+	// dequeue. The size of this must be a power of 2.
+	//
+	// A slot is still in use until *both* the tail
+	// index has moved beyond it and typ has been set to nil. This
+	// is set to nil atomically by the consumer and read
+	// atomically by the producer.
+	vals     []unsafe.Pointer
+	starving uint32
+}
+
+const dequeueBits = 32
+
+// dequeueLimit is the maximum size of a bufDequeue.
+//
+// This must be at most (1<<dequeueBits)/2 because detecting fullness
+// depends on wrapping around the ring buffer without wrapping around
+// the index. We divide by 4 so this fits in an int on 32-bit.
+const dequeueLimit = (1 << dequeueBits) / 4
+
+func (d *bufDequeue) unpack(ptrs uint64) (head, tail uint32) {
+	const mask = 1<<dequeueBits - 1
+	head = uint32((ptrs >> dequeueBits) & mask)
+	tail = uint32(ptrs & mask)
+	return
+}
+
+func (d *bufDequeue) pack(head, tail uint32) uint64 {
+	const mask = 1<<dequeueBits - 1
+	return (uint64(head) << dequeueBits) |
+		uint64(tail&mask)
+}
+
+// pushHead adds val at the head of the queue. It returns false if the
+// queue is full.
+func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
+	var slot *unsafe.Pointer
+	var starve uint8
+	if atomic.LoadUint32(&d.starving) > 0 {
+		runtime.Gosched()
 	}
-	for i := 0; i < entry.Size(); i++ {
-		pool.PutBufPoolCopy(entry.element[i].val)
-		entry.element[i] = nil
+	for {
+		ptrs := atomic.LoadUint64(&d.headTail)
+		head, tail := d.unpack(ptrs)
+		if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
+			// Queue is full.
+			return false
+		}
+		ptrs2 := d.pack(head+1, tail)
+		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+			slot = &d.vals[head&uint32(len(d.vals)-1)]
+			if starve >= 3 && atomic.LoadUint32(&d.starving) > 0 {
+				atomic.StoreUint32(&d.starving, 0)
+			}
+			break
+		}
+		starve++
+		if starve >= 3 {
+			atomic.StoreUint32(&d.starving, 1)
+		}
 	}
-	entry.element = nil
+	// The head slot is free, so we own it.
+	*slot = val
 	return true
 }
 
-func (entry *sliceEntry) Size() int {
-	return len(entry.element)
+// popTail removes and returns the element at the tail of the queue.
+// It returns false if the queue is empty. It may be called by any
+// number of consumers.
+func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
+	ptrs := atomic.LoadUint64(&d.headTail)
+	head, tail := d.unpack(ptrs)
+	if tail == head {
+		// Queue is empty.
+		return nil, false
+	}
+	slot := &d.vals[tail&uint32(len(d.vals)-1)]
+	var val unsafe.Pointer
+	for {
+		val = atomic.LoadPointer(slot)
+		if val != nil {
+			// We now own slot.
+			break
+		}
+		// Another goroutine is still pushing data on the tail.
+	}
+
+	// Tell pushHead that we're done with this slot. Zeroing the
+	// slot is also important so we don't leave behind references
+	// that could keep this object live longer than necessary.
+	//
+	// We write to val first and then publish that we're done with
+	atomic.StorePointer(slot, nil)
+	// At this point pushHead owns the slot.
+	if tail < math.MaxUint32 {
+		atomic.AddUint64(&d.headTail, 1)
+	} else {
+		atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1))
+	}
+	return val, true
+}
+
+// bufChain is a dynamically-sized version of bufDequeue.
+//
+// This is implemented as a doubly-linked list queue of poolDequeues
+// where each dequeue is double the size of the previous one. Once a
+// dequeue fills up, this allocates a new one and only ever pushes to
+// the latest dequeue. Pops happen from the other end of the list and
+// once a dequeue is exhausted, it gets removed from the list.
+type bufChain struct {
+	// head is the bufDequeue to push to. This is only accessed
+	// by the producer, so doesn't need to be synchronized.
+	head *bufChainElt
+
+	// tail is the bufDequeue to popTail from. This is accessed
+	// by consumers, so reads and writes must be atomic.
+	tail     *bufChainElt
+	newChain uint32
 }
 
-func (entry *sliceEntry) IsEmpty() bool {
-	if len(entry.element) == 0 {
-		return true
+type bufChainElt struct {
+	bufDequeue
+
+	// next and prev link to the adjacent poolChainElts in this
+	// bufChain.
+	//
+	// next is written atomically by the producer and read
+	// atomically by the consumer. It only transitions from nil to
+	// non-nil.
+	//
+	// prev is written atomically by the consumer and read
+	// atomically by the producer. It only transitions from
+	// non-nil to nil.
+	next, prev *bufChainElt
+}
+
+func storePoolChainElt(pp **bufChainElt, v *bufChainElt) {
+	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
+}
+
+func loadPoolChainElt(pp **bufChainElt) *bufChainElt {
+	return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
+}
+
+func (c *bufChain) new(initSize int) {
+	// Initialize the chain.
+	// initSize must be a power of 2
+	d := new(bufChainElt)
+	d.vals = make([]unsafe.Pointer, initSize)
+	storePoolChainElt(&c.head, d)
+	storePoolChainElt(&c.tail, d)
+}
+
+func (c *bufChain) pushHead(val unsafe.Pointer) {
+startPush:
+	for {
+		if atomic.LoadUint32(&c.newChain) > 0 {
+			runtime.Gosched()
+		} else {
+			break
+		}
+	}
+
+	d := loadPoolChainElt(&c.head)
+
+	if d.pushHead(val) {
+		return
+	}
+
+	// The current dequeue is full. Allocate a new one of twice
+	// the size.
+	if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) {
+		newSize := len(d.vals) * 2
+		if newSize >= dequeueLimit {
+			// Can't make it any bigger.
+			newSize = dequeueLimit
+		}
+
+		d2 := &bufChainElt{prev: d}
+		d2.vals = make([]unsafe.Pointer, newSize)
+		d2.pushHead(val)
+		storePoolChainElt(&c.head, d2)
+		storePoolChainElt(&d.next, d2)
+		atomic.StoreUint32(&c.newChain, 0)
+		return
+	}
+	goto startPush
+}
+
+func (c *bufChain) popTail() (unsafe.Pointer, bool) {
+	d := loadPoolChainElt(&c.tail)
+	if d == nil {
+		return nil, false
+	}
+
+	for {
+		// It's important that we load the next pointer
+		// *before* popping the tail. In general, d may be
+		// transiently empty, but if next is non-nil before
+		// the TryPop and the TryPop fails, then d is permanently
+		// empty, which is the only condition under which it's
+		// safe to drop d from the chain.
+		d2 := loadPoolChainElt(&d.next)
+
+		if val, ok := d.popTail(); ok {
+			return val, ok
+		}
+
+		if d2 == nil {
+			// This is the only dequeue. It's empty right
+			// now, but could be pushed to in the future.
+			return nil, false
+		}
+
+		// The tail of the chain has been drained, so move on
+		// to the next dequeue. Try to drop it from the chain
+		// so the next TryPop doesn't have to look at the empty
+		// dequeue again.
+		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
+			// We won the race. Clear the prev pointer so
+			// the garbage collector can collect the empty
+			// dequeue and so popHead doesn't back up
+			// further than necessary.
+			storePoolChainElt(&d2.prev, nil)
+		}
+		d = d2
 	}
-	return false
 }

+ 154 - 0
lib/mux/web.go

@@ -0,0 +1,154 @@
+package mux
+
+import (
+	"fmt"
+	"github.com/astaxie/beego/logs"
+	"net/http"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+type connLog struct {
+	startTime time.Time
+	isClose   bool
+	logs      []string
+}
+
+var logms map[int]*connLog
+var logmc map[int]*connLog
+
+var copyMaps map[int]*connLog
+var copyMapc map[int]*connLog
+var stashTimeNow time.Time
+var mutex sync.Mutex
+
+func deepCopyMaps() {
+	copyMaps = make(map[int]*connLog)
+	for k, v := range logms {
+		copyMaps[k] = &connLog{
+			startTime: v.startTime,
+			isClose:   v.isClose,
+			logs:      v.logs,
+		}
+	}
+}
+
+func deepCopyMapc() {
+	copyMapc = make(map[int]*connLog)
+	for k, v := range logmc {
+		copyMapc[k] = &connLog{
+			startTime: v.startTime,
+			isClose:   v.isClose,
+			logs:      v.logs,
+		}
+	}
+}
+
+func init() {
+	logms = make(map[int]*connLog)
+	logmc = make(map[int]*connLog)
+}
+
+type IntSlice []int
+
+func (s IntSlice) Len() int { return len(s) }
+
+func (s IntSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func (s IntSlice) Less(i, j int) bool { return s[i] < s[j] }
+
+func NewLogServer() {
+	http.HandleFunc("/", index)
+	http.HandleFunc("/detail", detail)
+	http.HandleFunc("/stash", stash)
+	fmt.Println(http.ListenAndServe(":8899", nil))
+}
+
+func stash(w http.ResponseWriter, r *http.Request) {
+	stashTimeNow = time.Now()
+	deepCopyMaps()
+	deepCopyMapc()
+	w.Write([]byte("ok"))
+}
+
+func getM(label string, id int) (cL *connLog) {
+	label = strings.TrimSpace(label)
+	mutex.Lock()
+	defer mutex.Unlock()
+	if label == "nps" {
+		cL = logms[id]
+	}
+	if label == "npc" {
+		cL = logmc[id]
+	}
+	return
+}
+
+func setM(label string, id int, cL *connLog) {
+	label = strings.TrimSpace(label)
+	mutex.Lock()
+	defer mutex.Unlock()
+	if label == "nps" {
+		logms[id] = cL
+	}
+	if label == "npc" {
+		logmc[id] = cL
+	}
+}
+
+func index(w http.ResponseWriter, r *http.Request) {
+	var keys []int
+	for k := range copyMaps {
+		keys = append(keys, k)
+	}
+	sort.Sort(IntSlice(keys))
+	var s string
+	s += "<h1>nps</h1>"
+	for _, v := range keys {
+		connL := copyMaps[v]
+		s += "<a href='/detail?id=" + strconv.Itoa(v) + "&label=nps" + "'>" + strconv.Itoa(v) + "</a>----------"
+		s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------"
+		s += strconv.FormatBool(connL.isClose)
+		s += "<br>"
+	}
+
+	keys = keys[:0]
+	s += "<h1>npc</h1>"
+	for k := range copyMapc {
+		keys = append(keys, k)
+	}
+	sort.Sort(IntSlice(keys))
+
+	for _, v := range keys {
+		connL := copyMapc[v]
+		s += "<a href='/detail?id=" + strconv.Itoa(v) + "&label=npc" + "'>" + strconv.Itoa(v) + "</a>----------"
+		s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------"
+		s += strconv.FormatBool(connL.isClose)
+		s += "<br>"
+	}
+	w.Write([]byte(s))
+}
+
+func detail(w http.ResponseWriter, r *http.Request) {
+	id := r.FormValue("id")
+	label := r.FormValue("label")
+	logs.Warn(label)
+	i, _ := strconv.Atoi(id)
+	var v *connLog
+	if label == "nps" {
+		v, _ = copyMaps[i]
+	}
+	if label == "npc" {
+		v, _ = copyMapc[i]
+	}
+	var s string
+	if v != nil {
+		for i, vv := range v.logs {
+			s += "<p>" + strconv.Itoa(i+1) + ":" + vv + "</p>"
+		}
+	}
+	w.Write([]byte(s))
+}

+ 7 - 0
lib/mux/web_test.go

@@ -0,0 +1,7 @@
+package mux
+
+import "testing"
+
+func TestWeb(t *testing.T) {
+	NewLogServer()
+}

+ 0 - 60
lib/pool/pool.go

@@ -1,60 +0,0 @@
-package pool
-
-import (
-	"sync"
-)
-
-const PoolSize = 64 * 1024
-const PoolSizeSmall = 100
-const PoolSizeUdp = 1472
-const PoolSizeCopy = 32 << 10
-
-var BufPool = sync.Pool{
-	New: func() interface{} {
-		return make([]byte, PoolSize)
-	},
-}
-
-var BufPoolUdp = sync.Pool{
-	New: func() interface{} {
-		return make([]byte, PoolSizeUdp)
-	},
-}
-var BufPoolMax = sync.Pool{
-	New: func() interface{} {
-		return make([]byte, PoolSize)
-	},
-}
-var BufPoolSmall = sync.Pool{
-	New: func() interface{} {
-		return make([]byte, PoolSizeSmall)
-	},
-}
-var BufPoolCopy = sync.Pool{
-	New: func() interface{} {
-		buf := make([]byte, PoolSizeCopy)
-		return &buf
-	},
-}
-
-func PutBufPoolUdp(buf []byte) {
-	if cap(buf) == PoolSizeUdp {
-		BufPoolUdp.Put(buf[:PoolSizeUdp])
-	}
-}
-
-func PutBufPoolCopy(buf []byte) {
-	if cap(buf) == PoolSizeCopy {
-		BufPoolCopy.Put(&buf)
-	}
-}
-
-func GetBufPoolCopy() []byte {
-	return (*BufPoolCopy.Get().(*[]byte))[:PoolSizeCopy]
-}
-
-func PutBufPoolMax(buf []byte) {
-	if cap(buf) == PoolSize {
-		BufPoolMax.Put(buf[:PoolSize])
-	}
-}

+ 2 - 2
lib/version/version.go

@@ -1,8 +1,8 @@
 package version
 
-const VERSION = "0.23.2"
+const VERSION = "0.24.0"
 
 // Compulsory minimum version, Minimum downward compatibility to this version
 func GetVersion() string {
-	return "0.21.0"
+	return "0.24.0"
 }

+ 1 - 2
server/proxy/p2p.go

@@ -7,7 +7,6 @@ import (
 
 	"github.com/astaxie/beego/logs"
 	"github.com/cnlh/nps/lib/common"
-	"github.com/cnlh/nps/lib/pool"
 )
 
 type P2PServer struct {
@@ -37,7 +36,7 @@ func (s *P2PServer) Start() error {
 		return err
 	}
 	for {
-		buf := pool.BufPoolUdp.Get().([]byte)
+		buf := common.BufPoolUdp.Get().([]byte)
 		n, addr, err := s.listener.ReadFromUDP(buf)
 		if err != nil {
 			if strings.Contains(err.Error(), "use of closed network connection") {

+ 3 - 4
server/proxy/udp.go

@@ -9,7 +9,6 @@ import (
 	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/conn"
 	"github.com/cnlh/nps/lib/file"
-	"github.com/cnlh/nps/lib/pool"
 )
 
 type UdpModeServer struct {
@@ -34,7 +33,7 @@ func (s *UdpModeServer) Start() error {
 	if err != nil {
 		return err
 	}
-	buf := pool.BufPoolUdp.Get().([]byte)
+	buf := common.BufPoolUdp.Get().([]byte)
 	for {
 		n, addr, err := s.listener.ReadFromUDP(buf)
 		if err != nil {
@@ -60,8 +59,8 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) {
 		return
 	} else {
 		s.task.Flow.Add(int64(len(data)), 0)
-		buf := pool.BufPoolUdp.Get().([]byte)
-		defer pool.BufPoolUdp.Put(buf)
+		buf := common.BufPoolUdp.Get().([]byte)
+		defer common.BufPoolUdp.Put(buf)
 		target.Write(data)
 		s.task.Flow.Add(int64(len(data)), 0)
 		if n, err := target.Read(buf); err != nil {