Browse Source

P2p first version

刘河 6 năm trước cách đây
mục cha
commit
534d428c6d

+ 29 - 3
bridge/bridge.go

@@ -11,6 +11,7 @@ import (
 	"github.com/cnlh/nps/lib/pool"
 	"github.com/cnlh/nps/lib/version"
 	"github.com/cnlh/nps/server/tool"
+	"github.com/cnlh/nps/vender/github.com/astaxie/beego"
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
 	"net"
@@ -71,7 +72,6 @@ func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int
 }
 
 func (s *Bridge) StartTunnel() error {
-	go s.linkCleanSession()
 	var err error
 	if s.tunnelType == "kcp" {
 		s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
@@ -209,10 +209,35 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
 		go s.GetConfig(c)
 	case common.WORK_REGISTER:
 		go s.register(c)
-	case common.WORD_SECRET:
+	case common.WORK_SECRET:
 		if b, err := c.ReadLen(32); err == nil {
 			s.SecretChan <- conn.NewSecret(string(b), c)
 		}
+	case common.WORK_P2P:
+		//读取md5密钥
+		if b, err := c.ReadLen(32); err != nil {
+			return
+		} else if t := file.GetCsvDb().GetTaskByMd5Password(string(b)); t == nil {
+			return
+		} else {
+			s.clientLock.Lock()
+			if v, ok := s.Client[t.Client.Id]; !ok {
+				logs.Error("未获取到对应客户端")
+				s.clientLock.Unlock()
+				return
+			} else {
+				logs.Warn("获取到对应客户端")
+				s.clientLock.Unlock()
+				//向密钥对应的客户端发送与服务端udp建立连接信息,地址,密钥
+				logs.Warn(v.signal.Write([]byte(common.NEW_UDP_CONN)))
+				svrAddr := beego.AppConfig.String("serverIp") + ":" + beego.AppConfig.String("p2pPort")
+				logs.Warn(svrAddr)
+				logs.Warn(v.signal.WriteLenContent([]byte(svrAddr)))
+				logs.Warn(string(b), v.signal.WriteLenContent(b))
+				//向该请求者发送建立连接请求,服务器地址
+				c.WriteLenContent([]byte(svrAddr))
+			}
+		}
 	case common.WORK_SEND_STATUS:
 		s.clientLock.Lock()
 		if v, ok := s.Client[id]; ok {
@@ -511,6 +536,7 @@ func (s *Bridge) clientCopy(clientId int) {
 	}
 }
 
+//TODO 清除有一个未知bug待处理
 func (s *Bridge) linkCleanSession() {
 	ticker := time.NewTicker(time.Minute * 5)
 	for {
@@ -526,7 +552,7 @@ func (s *Bridge) linkCleanSession() {
 				}
 				v.Unlock()
 			}
-			s.clientLock.RUnlock()
+			s.clientLock.Unlock()
 		}
 	}
 }

+ 108 - 25
client/client.go

@@ -5,6 +5,7 @@ import (
 	"github.com/cnlh/nps/lib/conn"
 	"github.com/cnlh/nps/lib/pool"
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
+	"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
 	"net"
 	"os"
 	"sync"
@@ -38,7 +39,6 @@ func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl st
 
 //start
 func (s *TRPClient) Start() {
-	go s.linkCleanSession()
 retry:
 	c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
 	if err != nil {
@@ -82,8 +82,8 @@ func (s *TRPClient) processor(c *conn.Conn) {
 				s.linkMap[link.Id] = link
 				s.Unlock()
 				link.MsgConn = s.msgTunnel
-				go s.linkProcess(link, c)
-				link.Run(false)
+				go linkProcess(link, c, s.tunnel)
+				link.RunWrite()
 			}
 		case common.RES_CLOSE:
 			logs.Error("The authentication key is connected by another client or the server closes the client.")
@@ -91,6 +91,14 @@ func (s *TRPClient) processor(c *conn.Conn) {
 		case common.RES_MSG:
 			logs.Error("Server-side return error")
 			break
+		case common.NEW_UDP_CONN:
+			//读取服务端地址、密钥 继续做处理
+			if lAddr, err := c.GetLenContent(); err != nil {
+				return
+			} else if pwd, err := c.GetLenContent(); err == nil {
+				logs.Warn(string(lAddr), string(pwd))
+				go s.newUdpConn(string(lAddr), string(pwd))
+			}
 		default:
 			logs.Warn("The error could not be resolved")
 			break
@@ -100,37 +108,112 @@ func (s *TRPClient) processor(c *conn.Conn) {
 	s.Close()
 }
 
-func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
-	link.Host = common.FormatAddress(link.Host)
-	//与目标建立连接
-	server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
-
+func (s *TRPClient) newUdpConn(rAddr string, md5Password string) {
+	tmpConn, err := net.Dial("udp", "114.114.114.114:53")
 	if err != nil {
-		c.WriteFail(link.Id)
-		logs.Warn("connect to ", link.Host, "error:", err)
+		logs.Warn(err)
 		return
 	}
-	c.WriteSuccess(link.Id)
-	link.Conn = conn.NewConn(server)
-	buf := pool.BufPoolCopy.Get().([]byte)
-	for {
-		if n, err := server.Read(buf); err != nil {
-			s.tunnel.SendMsg([]byte(common.IO_EOF), link)
-			break
+	tmpConn.Close()
+	//与服务端建立udp连接
+	localAddr, _ := net.ResolveUDPAddr("udp", tmpConn.LocalAddr().String())
+	localConn, err := net.ListenUDP("udp", localAddr)
+	if err != nil {
+		logs.Warn(err)
+		return
+	}
+	localKcpConn, err := kcp.NewConn(rAddr, nil, 150, 3, localConn)
+	logs.Warn(localConn.RemoteAddr(), rAddr)
+	conn.SetUdpSession(localKcpConn)
+	if err != nil {
+		logs.Warn(err)
+		return
+	}
+	localToolConn := conn.NewConn(localKcpConn)
+	//写入密钥、provider身份
+	if _, err := localToolConn.Write([]byte(md5Password)); err != nil {
+		logs.Warn(err)
+		return
+	}
+	if _, err := localToolConn.Write([]byte(common.WORK_P2P_PROVIDER)); err != nil {
+		logs.Warn(err)
+		return
+	}
+	//接收服务端传的visitor地址
+	if b, err := localToolConn.GetLenContent(); err != nil {
+		logs.Warn(err)
+		return
+	} else {
+		logs.Warn("收到服务端回传地址", string(b))
+		//向visitor地址发送测试消息
+		visitorAddr, err := net.ResolveUDPAddr("udp", string(b))
+		if err != nil {
+			logs.Warn(err)
+		}
+		logs.Warn(visitorAddr.String())
+		if n, err := localConn.WriteTo([]byte("test"), visitorAddr); err != nil {
+			logs.Warn(err)
 		} else {
-			if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil {
-				c.Close()
-				break
+			logs.Warn("write", n)
+		}
+		//给服务端发反馈
+		if _, err := localToolConn.Write([]byte(common.VERIFY_SUCCESS)); err != nil {
+			logs.Warn(err)
+		}
+		//关闭与服务端的连接
+		localConn.Close()
+		//关闭与服务端udp conn,建立新的监听
+		localConn, err = net.ListenUDP("udp", localAddr)
+
+		if err != nil {
+			logs.Warn(err)
+		}
+		l, err := kcp.ServeConn(nil, 150, 3, localConn)
+		if err != nil {
+			logs.Warn(err)
+			return
+		}
+		for {
+			//接收新的监听,得到conn,
+			udpTunnel, err := l.AcceptKCP()
+			logs.Warn(udpTunnel.RemoteAddr(), udpTunnel.LocalAddr())
+			if err != nil {
+				logs.Warn(err)
+				l.Close()
+				return
 			}
-			if link.ConnType == common.CONN_UDP {
+			conn.SetUdpSession(udpTunnel)
+			if udpTunnel.RemoteAddr().String() == string(b) {
+				//读取link,设置msgCh 设置msgConn消息回传响应机制
+				c, e := net.Dial("tcp", "123.206.77.88:22")
+				if e != nil {
+					logs.Warn(e)
+					return
+				}
+
+				go common.CopyBuffer(c, udpTunnel)
+				common.CopyBuffer(udpTunnel, c)
+				//读取flag ping/new/msg/msgConn//分别对于不同的做法
 				break
 			}
 		}
-		<-link.StatusCh
+
 	}
-	pool.PutBufPoolCopy(buf)
-	s.Lock()
-	s.Unlock()
+}
+
+func linkProcess(link *conn.Link, statusConn, msgConn *conn.Conn) {
+	link.Host = common.FormatAddress(link.Host)
+	//与目标建立连接
+	server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3)
+
+	if err != nil {
+		statusConn.WriteFail(link.Id)
+		logs.Warn("connect to ", link.Host, "error:", err)
+		return
+	}
+	statusConn.WriteSuccess(link.Id)
+	link.Conn = conn.NewConn(server)
+	link.RunRead(msgConn)
 }
 
 func (s *TRPClient) getMsgStatus() {

+ 74 - 7
client/local.go

@@ -3,8 +3,10 @@ package client
 import (
 	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/config"
+	"github.com/cnlh/nps/lib/conn"
 	"github.com/cnlh/nps/lib/crypt"
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
+	"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
 	"net"
 	"strings"
 )
@@ -39,16 +41,81 @@ func StartLocalServer(l *config.LocalServer, config *config.CommonConfig) error
 	return nil
 }
 
-func process(conn net.Conn, config *config.CommonConfig, l *config.LocalServer) {
-	c, err := NewConn(config.Tp, config.VKey, config.Server, common.WORD_SECRET, config.ProxyUrl)
+func process(localTcpConn net.Conn, config *config.CommonConfig, l *config.LocalServer) {
+	var workType string
+	if l.Type == "secret" {
+		workType = common.WORK_SECRET
+	} else {
+		workType = common.WORK_P2P
+	}
+	remoteConn, err := NewConn(config.Tp, config.VKey, config.Server, workType, config.ProxyUrl)
 	if err != nil {
 		logs.Error("Local connection server failed ", err.Error())
 	}
-	if _, err := c.Write([]byte(crypt.Md5(l.Password))); err != nil {
+	if _, err := remoteConn.Write([]byte(crypt.Md5(l.Password))); err != nil {
 		logs.Error("Local connection server failed ", err.Error())
 	}
-	go common.CopyBuffer(c, conn)
-	common.CopyBuffer(conn, c)
-	c.Close()
-	conn.Close()
+	if l.Type == "secret" {
+		go common.CopyBuffer(remoteConn, localTcpConn)
+		common.CopyBuffer(localTcpConn, remoteConn)
+		remoteConn.Close()
+		localTcpConn.Close()
+	} else {
+		//读取服务端地址、密钥 继续做处理
+		logs.Warn(111)
+		if rAddr, err := remoteConn.GetLenContent(); err != nil {
+			return
+		} else {
+			logs.Warn(222)
+			//与服务端udp建立连接
+			tmpConn, err := net.Dial("udp", "114.114.114.114:53")
+			if err != nil {
+				logs.Warn(err)
+			}
+			tmpConn.Close()
+			//与服务端建立udp连接
+			localAddr, _ := net.ResolveUDPAddr("udp", tmpConn.LocalAddr().String())
+			localConn, err := net.ListenUDP("udp", localAddr)
+			if err != nil {
+				return
+			}
+			logs.Warn(333)
+			localKcpConn, err := kcp.NewConn(string(rAddr), nil, 150, 3, localConn)
+			conn.SetUdpSession(localKcpConn)
+			if err != nil {
+				logs.Warn(err)
+			}
+			localToolConn := conn.NewConn(localKcpConn)
+			//写入密钥、provider身份
+			if _, err := localToolConn.Write([]byte(crypt.Md5(l.Password))); err != nil {
+				return
+			}
+			if _, err := localToolConn.Write([]byte(common.WORK_P2P_VISITOR)); err != nil {
+				return
+			}
+			logs.Warn(444)
+			//接收服务端传的visitor地址
+			if b, err := localToolConn.GetLenContent(); err != nil {
+				logs.Warn(err)
+				return
+			} else {
+				logs.Warn("收到服务回传地址", string(b))
+				//关闭与服务端连接
+				localConn.Close()
+				//建立新的连接
+				localConn, err = net.ListenUDP("udp", localAddr)
+				udpTunnel, err := kcp.NewConn(string(b), nil, 150, 3, localConn)
+				if err != nil || udpTunnel == nil {
+					logs.Warn(err)
+					return
+				}
+				conn.SetUdpSession(udpTunnel)
+				logs.Warn(udpTunnel.RemoteAddr(), string(b), udpTunnel.LocalAddr())
+
+				go common.CopyBuffer(udpTunnel, localTcpConn)
+				common.CopyBuffer(localTcpConn, udpTunnel)
+			}
+		}
+
+	}
 }

+ 6 - 6
conf/npc.conf

@@ -1,5 +1,5 @@
 [common]
-server=127.0.0.1:8284
+server=123.206.77.88:8284
 tp=tcp
 vkey=123
 auto_reconnection=true
@@ -16,10 +16,6 @@ host_change=www.proxy.com
 target=127.0.0.1:8080
 location=/cdn
 
-[ssh_1118]
-mode=secretServer
-password=1111
-target=123.206.77.88:22
 
 [tcp]
 mode=tcpServer
@@ -37,4 +33,8 @@ port=9004
 [udp]
 mode=udpServer
 port=9003
-target=114.114.114.53
+target=114.114.114.53
+
+[p2p_ssh]
+port=2000
+password=p2pssh

+ 6 - 1
lib/common/const.go

@@ -13,12 +13,16 @@ const (
 	WORK_SEND_STATUS  = "sdst"
 	WORK_CONFIG       = "conf"
 	WORK_REGISTER     = "rgst"
-	WORD_SECRET       = "sert"
+	WORK_SECRET       = "sert"
+	WORK_P2P          = "p2pm"
+	WORK_P2P_VISITOR  = "p2pv"
+	WORK_P2P_PROVIDER = "p2pp"
 	WORK_STATUS       = "stus"
 	RES_SIGN          = "sign"
 	RES_MSG           = "msg0"
 	RES_CLOSE         = "clse"
 	NEW_CONN          = "conn" //新连接标志
+	NEW_UDP_CONN      = "udpc" //p2p udp conn
 	NEW_TASK          = "task" //新连接标志
 	NEW_CONF          = "conf" //新连接标志
 	NEW_HOST          = "host" //新连接标志
@@ -33,4 +37,5 @@ WWW-Authenticate: Basic realm="easyProxy"
 	ConnectionFailBytes = `HTTP/1.1 404 Not Found
 
 `
+
 )

+ 24 - 2
lib/common/util.go

@@ -251,7 +251,29 @@ func GetIpByAddr(addr string) string {
 
 func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
 	buf := pool.BufPoolCopy.Get().([]byte)
-	io.CopyBuffer(dst, src, buf)
-	pool.PutBufPoolCopy(buf)
+	for {
+		nr, er := src.Read(buf)
+		if nr > 0 {
+			nw, ew := dst.Write(buf[0:nr])
+			if nw > 0 {
+				written += int64(nw)
+			}
+			if ew != nil {
+				err = ew
+				break
+			}
+			if nr != nw {
+				err = io.ErrShortWrite
+				break
+			}
+		}
+		if er != nil {
+			if er != io.EOF {
+				err = er
+			}
+			break
+		}
+	}
+	defer pool.PutBufPoolCopy(buf)
 	return written, err
 }

+ 10 - 1
lib/config/config.go

@@ -18,6 +18,7 @@ type CommonConfig struct {
 	Client           *file.Client
 }
 type LocalServer struct {
+	Type     string
 	Port     int
 	Password string
 }
@@ -53,7 +54,15 @@ func NewConfig(path string) (c *Config, err error) {
 			nowContent = c.content[nowIndex:nextIndex]
 
 			if strings.Index(getTitleContent(c.title[i]), "secret") == 0 {
-				c.LocalServer = append(c.LocalServer, delLocalService(nowContent))
+				local := delLocalService(nowContent)
+				local.Type = "secret"
+				c.LocalServer = append(c.LocalServer, local)
+				continue
+			}
+			if strings.Index(getTitleContent(c.title[i]), "p2p") == 0 {
+				local := delLocalService(nowContent)
+				local.Type = "p2p"
+				c.LocalServer = append(c.LocalServer, local)
 				continue
 			}
 			switch c.title[i] {

+ 22 - 3
lib/conn/conn.go

@@ -69,6 +69,15 @@ func (s *Conn) GetHost() (method, address string, rb []byte, err error, r *http.
 	return
 }
 
+func (s *Conn) GetLenContent() (b []byte, err error) {
+	var l int
+	if l, err = s.GetLen(); err != nil {
+		return
+	}
+	b, err = s.ReadLen(l)
+	return
+}
+
 //读取指定长度内容
 func (s *Conn) ReadLen(cLen int) ([]byte, error) {
 	if cLen > pool.PoolSize {
@@ -77,10 +86,11 @@ func (s *Conn) ReadLen(cLen int) ([]byte, error) {
 	var buf []byte
 	if cLen < pool.PoolSizeSmall {
 		buf = pool.BufPoolSmall.Get().([]byte)[:cLen]
-		defer pool.PutBufPoolSmall(buf)
+		//TODO 回收
+		//defer pool.PutBufPoolSmall(buf)
 	} else {
 		buf = pool.BufPoolMax.Get().([]byte)[:cLen]
-		defer pool.PutBufPoolMax(buf)
+		//defer pool.PutBufPoolMax(buf)
 	}
 	if n, err := io.ReadFull(s, buf); err != nil || n != cLen {
 		return buf, errors.New("Error reading specified length " + err.Error())
@@ -95,6 +105,14 @@ func (s *Conn) GetLen() (int, error) {
 	return int(l), err
 }
 
+func (s *Conn) WriteLenContent(buf []byte) (err error) {
+	var b []byte
+	if b, err = GetLenBytes(buf); err != nil {
+		return
+	}
+	return binary.Write(s.Conn, binary.LittleEndian, b)
+}
+
 //read flag
 func (s *Conn) ReadFlag() (string, error) {
 	val, err := s.ReadLen(4)
@@ -477,7 +495,6 @@ func (s *Conn) WriteChan() (int, error) {
 	defer s.Unlock()
 	return s.Write([]byte(common.WORK_CHAN))
 }
-
 //获取长度+内容
 func GetLenBytes(buf []byte) (b []byte, err error) {
 	raw := bytes.NewBuffer([]byte{})
@@ -491,6 +508,7 @@ func GetLenBytes(buf []byte) (b []byte, err error) {
 	return
 }
 
+
 //解析出长度
 func GetLenByBytes(buf []byte) (int, error) {
 	nlen := binary.LittleEndian.Uint32(buf)
@@ -508,4 +526,5 @@ func SetUdpSession(sess *kcp.UDPSession) {
 	sess.SetNoDelay(1, 10, 2, 1)
 	sess.SetMtu(1600)
 	sess.SetACKNoDelay(true)
+	sess.SetWriteDelay(false)
 }

+ 24 - 2
lib/conn/link.go

@@ -56,7 +56,7 @@ func NewLink(id int, connType string, host string, en, de int, crypt bool, c *Co
 	}
 }
 
-func (s *Link) Run(flow bool) {
+func (s *Link) RunWrite() {
 	go func() {
 		for {
 			select {
@@ -76,7 +76,7 @@ func (s *Link) Run(flow bool) {
 					} else {
 						s.Conn.Write(content)
 					}
-					if flow {
+					if s.Flow != nil {
 						s.Flow.Add(0, len(content))
 					}
 					if s.ConnType == common.CONN_UDP {
@@ -89,3 +89,25 @@ func (s *Link) Run(flow bool) {
 		}
 	}()
 }
+func (s *Link) RunRead(msgConn *Conn) {
+	buf := pool.BufPoolCopy.Get().([]byte)
+	for {
+		if n, err := s.Conn.Read(buf); err != nil {
+			msgConn.SendMsg([]byte(common.IO_EOF), s)
+			break
+		} else {
+			if _, err := msgConn.SendMsg(buf[:n], s); err != nil {
+				msgConn.Close()
+				break
+			}
+			if s.ConnType == common.CONN_UDP {
+				break
+			}
+			if s.Flow != nil {
+				s.Flow.Add(n, 0)
+			}
+		}
+		<-s.StatusCh
+	}
+	pool.PutBufPoolCopy(buf)
+}

+ 1 - 1
lib/file/file.go

@@ -181,7 +181,7 @@ func (s *Csv) DelTask(id int) error {
 }
 
 //md5 password
-func (s *Csv) GetSecretTask(p string) *Tunnel {
+func (s *Csv) GetTaskByMd5Password(p string) *Tunnel {
 	for _, v := range s.Tasks {
 		if crypt.Md5(v.Password) == p {
 			return v

+ 0 - 1
lib/file/obj.go

@@ -53,7 +53,6 @@ func NewClient(vKey string, noStore bool, noDisplay bool) *Client {
 		Flow:      new(Flow),
 		Rate:      nil,
 		NoStore:   noStore,
-		id:        GetCsvDb().GetClientId(),
 		RWMutex:   sync.RWMutex{},
 		NoDisplay: noDisplay,
 	}

+ 32 - 0
lib/mux/bytes.go

@@ -0,0 +1,32 @@
+package mux
+
+import (
+	"bytes"
+	"encoding/binary"
+	"io"
+)
+
+//write bytes with int32 length
+func WriteLenBytes(buf []byte, w io.Writer) (int, error) {
+	raw := bytes.NewBuffer([]byte{})
+	if err := binary.Write(raw, binary.LittleEndian, int32(len(buf))); err != nil {
+		return 0, err
+	}
+	if err := binary.Write(raw, binary.LittleEndian, buf); err != nil {
+		return 0, err
+	}
+	return w.Write(raw.Bytes())
+}
+
+//read bytes by length
+func ReadLenBytes(buf []byte, r io.Reader) (int, error) {
+	var l int32
+	var err error
+	if binary.Read(r, binary.LittleEndian, &l) != nil {
+		return 0, err
+	}
+	if _, err = io.ReadFull(r, buf[:l]); err != nil {
+		return 0, err
+	}
+	return int(l), nil
+}

+ 148 - 0
lib/mux/conn.go

@@ -0,0 +1,148 @@
+package mux
+
+import (
+	"errors"
+	"github.com/cnlh/nps/lib/pool"
+	"io"
+	"net"
+	"time"
+)
+
+type conn struct {
+	net.Conn
+	readMsgCh        chan []byte
+	getStatusCh      chan struct{}
+	connStatusOkCh   chan struct{}
+	connStatusFailCh chan struct{}
+	readTimeOut      time.Time
+	writeTimeOut     time.Time
+	sendMsgCh        chan *msg  //mux
+	sendStatusCh     chan int32 //mux
+	connId           int32
+	isClose          bool
+	mux              *Mux
+}
+
+type msg struct {
+	connId  int32
+	content []byte
+}
+
+func NewMsg(connId int32, content []byte) *msg {
+	return &msg{
+		connId:  connId,
+		content: content,
+	}
+}
+
+func NewConn(connId int32, mux *Mux, sendMsgCh chan *msg, sendStatusCh chan int32) *conn {
+	return &conn{
+		readMsgCh:        make(chan []byte),
+		getStatusCh:      make(chan struct{}),
+		connStatusOkCh:   make(chan struct{}),
+		connStatusFailCh: make(chan struct{}),
+		readTimeOut:      time.Time{},
+		writeTimeOut:     time.Time{},
+		sendMsgCh:        sendMsgCh,
+		sendStatusCh:     sendStatusCh,
+		connId:           connId,
+		isClose:          false,
+		mux:              mux,
+	}
+}
+
+func (s *conn) Read(buf []byte) (int, error) {
+	if s.isClose {
+		return 0, errors.New("the conn has closed")
+	}
+	var b []byte
+	if t := s.readTimeOut.Sub(time.Now()); t > 0 {
+		timer := time.NewTimer(t)
+		select {
+		case <-timer.C:
+			s.Close()
+			return 0, errors.New("read timeout")
+		case b = <-s.readMsgCh:
+		}
+	} else {
+		b = <-s.readMsgCh
+	}
+	defer pool.PutBufPoolCopy(b)
+	if s.isClose {
+		return 0, io.EOF
+	}
+	s.sendStatusCh <- s.connId
+	return copy(buf, b), nil
+}
+
+func (s *conn) Write(buf []byte) (int, error) {
+	if s.isClose {
+		return 0, errors.New("the conn has closed")
+	}
+
+	if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
+		timer := time.NewTimer(t)
+		select {
+		case <-timer.C:
+			s.Close()
+			return 0, errors.New("write timeout")
+		case s.sendMsgCh <- NewMsg(s.connId, buf):
+		}
+	} else {
+		s.sendMsgCh <- NewMsg(s.connId, buf)
+	}
+
+	if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
+		timer := time.NewTimer(t)
+		select {
+		case <-timer.C:
+			s.Close()
+			return 0, errors.New("write timeout")
+		case <-s.getStatusCh:
+		}
+	} else {
+		<-s.getStatusCh
+	}
+
+	if s.isClose {
+		return 0, io.EOF
+	}
+	return len(buf), nil
+}
+
+func (s *conn) Close() error {
+	if s.isClose {
+		return errors.New("the conn has closed")
+	}
+	s.isClose = true
+	close(s.getStatusCh)
+	close(s.readMsgCh)
+	close(s.connStatusOkCh)
+	close(s.connStatusFailCh)
+	s.sendMsgCh <- NewMsg(s.connId, nil)
+	return nil
+}
+
+func (s *conn) LocalAddr() net.Addr {
+	return s.mux.conn.LocalAddr()
+}
+
+func (s *conn) RemoteAddr() net.Addr {
+	return s.mux.conn.RemoteAddr()
+}
+
+func (s *conn) SetDeadline(t time.Time) error {
+	s.readTimeOut = t
+	s.writeTimeOut = t
+	return nil
+}
+
+func (s *conn) SetReadDeadline(t time.Time) error {
+	s.readTimeOut = t
+	return nil
+}
+
+func (s *conn) SetWriteDeadline(t time.Time) error {
+	s.writeTimeOut = t
+	return nil
+}

+ 64 - 0
lib/mux/map.go

@@ -0,0 +1,64 @@
+package mux
+
+import (
+	"sync"
+	"time"
+)
+
+type connMap struct {
+	connMap map[int32]*conn
+	closeCh chan struct{}
+	sync.RWMutex
+}
+
+func NewConnMap() *connMap {
+	connMap := &connMap{
+		connMap: make(map[int32]*conn),
+		closeCh: make(chan struct{}),
+	}
+	go connMap.clean()
+	return connMap
+}
+
+func (s *connMap) Get(id int32) (*conn, bool) {
+	s.Lock()
+	defer s.Unlock()
+	if v, ok := s.connMap[id]; ok {
+		return v, true
+	}
+	return nil, false
+}
+
+func (s *connMap) Set(id int32, v *conn) {
+	s.Lock()
+	defer s.Unlock()
+	s.connMap[id] = v
+}
+
+func (s *connMap) Close() {
+	s.Lock()
+	defer s.Unlock()
+	for _, v := range s.connMap {
+		v.isClose = true
+	}
+	s.closeCh <- struct{}{}
+}
+
+func (s *connMap) clean() {
+	ticker := time.NewTimer(time.Minute * 1)
+	for {
+		select {
+		case <-ticker.C:
+			s.Lock()
+			for _, v := range s.connMap {
+				if v.isClose {
+					delete(s.connMap, v.connId)
+				}
+			}
+			s.Unlock()
+		case <-s.closeCh:
+			ticker.Stop()
+			return
+		}
+	}
+}

+ 226 - 0
lib/mux/mux.go

@@ -0,0 +1,226 @@
+package mux
+
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"github.com/cnlh/nps/lib/pool"
+	"math"
+	"net"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+const (
+	MUX_PING_FLAG int32 = iota
+	MUX_NEW_CONN_OK
+	MUX_NEW_CONN_Fail
+	MUX_NEW_MSG
+	MUX_MSG_SEND_OK
+	MUX_NEW_CONN
+	MUX_PING
+	MUX_CONN_CLOSE
+)
+
+type Mux struct {
+	net.Listener
+	conn         net.Conn
+	connMap      *connMap
+	sendMsgCh    chan *msg  //write msg chan
+	sendStatusCh chan int32 //write read ok chan
+	newConnCh    chan *conn
+	id           int32
+	closeChan    chan struct{}
+	isClose      bool
+	sync.Mutex
+}
+
+func NewMux(c net.Conn) *Mux {
+	m := &Mux{
+		conn:         c,
+		connMap:      NewConnMap(),
+		sendMsgCh:    make(chan *msg),
+		sendStatusCh: make(chan int32),
+		id:           0,
+		closeChan:    make(chan struct{}),
+		newConnCh:    make(chan *conn),
+		isClose:      false,
+	}
+	//read session by flag
+	go m.readSession()
+	//write session
+	go m.writeSession()
+	//ping
+	go m.ping()
+	return m
+}
+
+func (s *Mux) NewConn() (*conn, error) {
+	if s.isClose {
+		return nil, errors.New("the mux has closed")
+	}
+	conn := NewConn(s.getId(), s, s.sendMsgCh, s.sendStatusCh)
+	raw := bytes.NewBuffer([]byte{})
+	if err := binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN); err != nil {
+		return nil, err
+	}
+	if err := binary.Write(raw, binary.LittleEndian, conn.connId); err != nil {
+		return nil, err
+	}
+	//it must be set before send
+	s.connMap.Set(conn.connId, conn)
+	if _, err := s.conn.Write(raw.Bytes()); err != nil {
+		return nil, err
+	}
+	select {
+	case <-conn.connStatusOkCh:
+		return conn, nil
+	case <-conn.connStatusFailCh:
+	}
+	return nil, errors.New("create connection fail,the server refused the connection")
+}
+
+func (s *Mux) Accept() (net.Conn, error) {
+	if s.isClose {
+		return nil, errors.New("accpet error,the conn has closed")
+	}
+	return <-s.newConnCh, nil
+}
+
+func (s *Mux) Addr() net.Addr {
+	return s.conn.LocalAddr()
+}
+
+func (s *Mux) ping() {
+	go func() {
+		ticker := time.NewTicker(time.Second * 5)
+		raw := bytes.NewBuffer([]byte{})
+		for {
+			select {
+			case <-ticker.C:
+			}
+			//Avoid going beyond the scope
+			if (math.MaxInt32 - s.id) < 10000 {
+				s.id = 0
+			}
+			raw.Reset()
+			binary.Write(raw, binary.LittleEndian, MUX_PING_FLAG)
+			binary.Write(raw, binary.LittleEndian, MUX_PING)
+			if _, err := s.conn.Write(raw.Bytes()); err != nil {
+				s.Close()
+				break
+			}
+		}
+	}()
+	select {
+	case <-s.closeChan:
+	}
+}
+
+func (s *Mux) writeSession() {
+	go func() {
+		raw := bytes.NewBuffer([]byte{})
+		for {
+			raw.Reset()
+			select {
+			case msg := <-s.sendMsgCh:
+				if msg.content == nil { //close
+					binary.Write(raw, binary.LittleEndian, MUX_CONN_CLOSE)
+					binary.Write(raw, binary.LittleEndian, msg.connId)
+					break
+				}
+				binary.Write(raw, binary.LittleEndian, MUX_NEW_MSG)
+				binary.Write(raw, binary.LittleEndian, msg.connId)
+				binary.Write(raw, binary.LittleEndian, int32(len(msg.content)))
+				binary.Write(raw, binary.LittleEndian, msg.content)
+			case connId := <-s.sendStatusCh:
+				binary.Write(raw, binary.LittleEndian, MUX_MSG_SEND_OK)
+				binary.Write(raw, binary.LittleEndian, connId)
+			}
+			if _, err := s.conn.Write(raw.Bytes()); err != nil {
+				s.Close()
+				break
+			}
+		}
+	}()
+	select {
+	case <-s.closeChan:
+	}
+}
+
+func (s *Mux) readSession() {
+	go func() {
+		raw := bytes.NewBuffer([]byte{})
+		for {
+			var flag, i int32
+			if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
+				if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
+					break
+				}
+				switch flag {
+				case MUX_NEW_CONN: //new conn
+					conn := NewConn(i, s, s.sendMsgCh, s.sendStatusCh)
+					s.connMap.Set(i, conn) //it has been set before send ok
+					s.newConnCh <- conn
+					raw.Reset()
+					binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN_OK)
+					binary.Write(raw, binary.LittleEndian, i)
+					s.conn.Write(raw.Bytes())
+					continue
+				case MUX_PING_FLAG: //ping
+					continue
+				}
+				if conn, ok := s.connMap.Get(i); ok {
+					switch flag {
+					case MUX_NEW_MSG: //new msg from remote conn
+						buf := pool.BufPoolCopy.Get().([]byte)
+						if n, err := ReadLenBytes(buf, s.conn); err == nil {
+							if !conn.isClose {
+								conn.readMsgCh <- buf[:n]
+							} else {
+								pool.PutBufPoolCopy(buf)
+							}
+						} else { //read len bytes error,the mux has broken
+							break
+						}
+					case MUX_MSG_SEND_OK: //the remote has read
+						conn.getStatusCh <- struct{}{}
+					case MUX_NEW_CONN_OK: //conn ok
+						conn.connStatusOkCh <- struct{}{}
+					case MUX_NEW_CONN_Fail:
+						conn.connStatusFailCh <- struct{}{}
+					case MUX_CONN_CLOSE: //close the connection
+						conn.Close()
+					}
+				}
+			} else {
+				break
+			}
+		}
+		s.Close()
+	}()
+	select {
+	case <-s.closeChan:
+	}
+}
+
+func (s *Mux) Close() error {
+	if s.isClose {
+		return errors.New("the mux has closed")
+	}
+	s.isClose = true
+	s.connMap.Close()
+	s.closeChan <- struct{}{}
+	s.closeChan <- struct{}{}
+	s.closeChan <- struct{}{}
+	close(s.closeChan)
+	close(s.sendMsgCh)
+	close(s.sendStatusCh)
+	return s.conn.Close()
+}
+
+//get new connId as unique flag
+func (s *Mux) getId() int32 {
+	return atomic.AddInt32(&s.id, 1)
+}

+ 96 - 0
lib/mux/mux_test.go

@@ -0,0 +1,96 @@
+package mux
+
+import (
+	"github.com/cnlh/nps/lib/common"
+	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
+	"log"
+	"net"
+	"net/http"
+	_ "net/http/pprof"
+	"testing"
+	"time"
+)
+
+var conn1 net.Conn
+var conn2 net.Conn
+
+func TestNewMux(t *testing.T) {
+	go func() {
+		http.ListenAndServe("0.0.0.0:8899", nil)
+	}()
+	logs.EnableFuncCallDepth(true)
+	logs.SetLogFuncCallDepth(3)
+	server()
+	client()
+	time.Sleep(time.Second * 3)
+	go func() {
+		m2 := NewMux(conn2)
+		for {
+			c, err := m2.Accept()
+			if err != nil {
+				log.Fatalln(err)
+			}
+			go func(c net.Conn) {
+				c2, err := net.Dial("tcp", "127.0.0.1:8080")
+				if err != nil {
+					log.Fatalln(err)
+				}
+				go common.CopyBuffer(c2, c)
+				common.CopyBuffer(c, c2)
+				c.Close()
+				c2.Close()
+			}(c)
+		}
+	}()
+
+	go func() {
+		m1 := NewMux(conn1)
+		l, err := net.Listen("tcp", "127.0.0.1:7777")
+		if err != nil {
+			log.Fatalln(err)
+		}
+		for {
+			conn, err := l.Accept()
+			if err != nil {
+				log.Fatalln(err)
+			}
+			go func(conn net.Conn) {
+				tmpCpnn, err := m1.NewConn()
+				if err != nil {
+					log.Fatalln(err)
+				}
+				go common.CopyBuffer(tmpCpnn, conn)
+				common.CopyBuffer(conn, tmpCpnn)
+				conn.Close()
+				tmpCpnn.Close()
+			}(conn)
+		}
+	}()
+
+	for {
+		time.Sleep(time.Second * 5)
+	}
+}
+
+func server() {
+	var err error
+	l, err := net.Listen("tcp", "127.0.0.1:9999")
+	if err != nil {
+		log.Fatalln(err)
+	}
+	go func() {
+		conn1, err = l.Accept()
+		if err != nil {
+			log.Fatalln(err)
+		}
+	}()
+	return
+}
+
+func client() {
+	var err error
+	conn2, err = net.Dial("tcp", "127.0.0.1:9999")
+	if err != nil {
+		log.Fatalln(err)
+	}
+}

+ 1 - 0
lib/pool/pool.go

@@ -42,6 +42,7 @@ func PutBufPoolUdp(buf []byte) {
 	}
 }
 
+
 func PutBufPoolCopy(buf []byte) {
 	if cap(buf) == PoolSizeCopy {
 		BufPoolCopy.Put(buf[:PoolSizeCopy])

+ 4 - 21
server/proxy/base.go

@@ -6,7 +6,6 @@ import (
 	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/conn"
 	"github.com/cnlh/nps/lib/file"
-	"github.com/cnlh/nps/lib/pool"
 	"net"
 	"net/http"
 	"sync"
@@ -58,27 +57,11 @@ func (s *BaseServer) linkCopy(link *conn.Link, c *conn.Conn, rb []byte, tunnel *
 		flow.Add(len(rb), 0)
 		<-link.StatusCh
 	}
-
-	buf := pool.BufPoolCopy.Get().([]byte)
-	for {
-		if err := s.checkFlow(); err != nil {
-			c.Close()
-			break
-		}
-		if n, err := c.Read(buf); err != nil {
-			tunnel.SendMsg([]byte(common.IO_EOF), link)
-			break
-		} else {
-			if _, err := tunnel.SendMsg(buf[:n], link); err != nil {
-				c.Close()
-				break
-			}
-			flow.Add(n, 0)
-		}
-		<-link.StatusCh
+	if err := s.checkFlow(); err != nil {
+		c.Close()
 	}
+	link.RunRead(tunnel)
 	s.task.Client.AddConn()
-	pool.PutBufPoolCopy(buf)
 }
 
 func (s *BaseServer) writeConnFail(c net.Conn) {
@@ -111,7 +94,7 @@ func (s *BaseServer) DealClient(c *conn.Conn, addr string, rb []byte) error {
 		c.Close()
 		return err
 	} else {
-		link.Run(true)
+		link.RunWrite()
 		s.linkCopy(link, c, rb, tunnel, s.task.Flow)
 	}
 	return nil

+ 1 - 1
server/proxy/http.go

@@ -150,7 +150,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
 				logs.Notice(err)
 				break
 			}
-			lk.Run(true)
+			lk.RunWrite()
 			isConn = false
 		} else {
 			r, err = http.ReadRequest(bufio.NewReader(c))

+ 104 - 0
server/proxy/p2p.go

@@ -0,0 +1,104 @@
+package proxy
+
+import (
+	"github.com/cnlh/nps/lib/common"
+	"github.com/cnlh/nps/lib/conn"
+	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
+	"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
+	"strconv"
+	"time"
+)
+
+type P2PServer struct {
+	BaseServer
+	p2pPort int
+	p2p     map[string]*p2p
+}
+
+type p2p struct {
+	provider     *conn.Conn
+	visitor      *conn.Conn
+	visitorAddr  string
+	providerAddr string
+}
+
+func NewP2PServer(p2pPort int) *P2PServer {
+	return &P2PServer{
+		p2pPort: p2pPort,
+		p2p:     make(map[string]*p2p),
+	}
+}
+
+func (s *P2PServer) Start() error {
+	kcpListener, err := kcp.ListenWithOptions(":"+strconv.Itoa(s.p2pPort), nil, 150, 3)
+	if err != nil {
+		logs.Error(err)
+		return err
+	}
+	for {
+		c, err := kcpListener.AcceptKCP()
+		conn.SetUdpSession(c)
+		if err != nil {
+			logs.Warn(err)
+			continue
+		}
+		go s.p2pProcess(conn.NewConn(c))
+	}
+	return nil
+}
+
+func (s *P2PServer) p2pProcess(c *conn.Conn) {
+	logs.Warn("new link", c.Conn.RemoteAddr())
+	//获取密钥
+	var (
+		f   string
+		b   []byte
+		err error
+		v   *p2p
+		ok  bool
+	)
+	if b, err = c.ReadLen(32); err != nil {
+		return
+	}
+	//获取角色
+	if f, err = c.ReadFlag(); err != nil {
+		return
+	}
+	logs.Warn("收到", string(b), f)
+	if v, ok = s.p2p[string(b)]; !ok {
+		v = new(p2p)
+		s.p2p[string(b)] = v
+	}
+	logs.Warn(f, c.Conn.RemoteAddr().String())
+	//存储
+	if f == common.WORK_P2P_VISITOR {
+		v.visitorAddr = c.Conn.RemoteAddr().String()
+		v.visitor = c
+		for {
+			time.Sleep(time.Second)
+			if v.provider != nil {
+				break
+			}
+		}
+		logs.Warn("等待确认")
+		if _, err := v.provider.ReadFlag(); err == nil {
+			v.visitor.WriteLenContent([]byte(v.providerAddr))
+			logs.Warn("收到确认")
+			delete(s.p2p, string(b))
+		} else {
+			logs.Warn("收到确认失败", err)
+		}
+	} else {
+		v.providerAddr = c.Conn.RemoteAddr().String()
+		v.provider = c
+		for {
+			time.Sleep(time.Second)
+			if v.visitor != nil {
+				v.provider.WriteLenContent([]byte(v.visitorAddr))
+				break
+			}
+		}
+	}
+	//假设是连接者、等待对应的被连接者连上后,发送被连接者信息
+	//假设是被连接者,等待对应的连接者脸上后,发送连接者信息
+}

+ 1 - 1
server/proxy/socks5.go

@@ -148,7 +148,7 @@ func (s *Sock5ModeServer) doConnect(c net.Conn, command uint8) {
 		return
 	} else {
 		s.sendReply(c, succeeded)
-		link.Run(true)
+		link.RunWrite()
 		s.linkCopy(link, conn.NewConn(c), nil, tunnel, s.task.Flow)
 	}
 	return

+ 1 - 1
server/proxy/udp.go

@@ -58,7 +58,7 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) {
 		s.task.Flow.Add(len(data), 0)
 		tunnel.SendMsg(data, link)
 		pool.PutBufPoolUdp(data)
-		link.Run(true)
+		link.RunWrite()
 	}
 }
 

+ 4 - 1
server/server.go

@@ -48,7 +48,7 @@ func DealBridgeTask() {
 			file.GetCsvDb().DelClient(id)
 		case s := <-Bridge.SecretChan:
 			logs.Trace("New secret connection, addr", s.Conn.Conn.RemoteAddr())
-			if t := file.GetCsvDb().GetSecretTask(s.Password); t != nil {
+			if t := file.GetCsvDb().GetTaskByMd5Password(s.Password); t != nil {
 				if !t.Client.GetConn() {
 					logs.Info("Connections exceed the current client %d limit", t.Client.Id)
 					s.Conn.Close()
@@ -75,6 +75,9 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) {
 	} else {
 		logs.Info("Server startup, the bridge type is %s, the bridge port is %d", bridgeType, bridgePort)
 	}
+	if p, err := beego.AppConfig.Int("p2pPort"); err == nil {
+		go proxy.NewP2PServer(p).Start()
+	}
 	go DealBridgeTask()
 	if svr := NewMode(Bridge, cnf); svr != nil {
 		if err := svr.Start(); err != nil {