1
0
刘河 6 жил өмнө
parent
commit
9f03c2f6eb

+ 55 - 18
bridge/bridge.go

@@ -22,19 +22,21 @@ import (
 type Client struct {
 	tunnel        *conn.Conn
 	signal        *conn.Conn
+	msg           *conn.Conn
 	linkMap       map[int]*conn.Link
 	linkStatusMap map[int]bool
 	stop          chan bool
 	sync.RWMutex
 }
 
-func NewClient(t *conn.Conn, s *conn.Conn) *Client {
+func NewClient(t *conn.Conn, s *conn.Conn, m *conn.Conn) *Client {
 	return &Client{
 		linkMap:       make(map[int]*conn.Link),
 		stop:          make(chan bool),
 		linkStatusMap: make(map[int]bool),
 		signal:        s,
 		tunnel:        t,
+		msg:           m,
 	}
 }
 
@@ -150,6 +152,17 @@ func (s *Bridge) closeClient(id int) {
 		delete(s.Client, id)
 	}
 }
+func (s *Bridge) delClient(id int) {
+	s.clientLock.Lock()
+	defer s.clientLock.Unlock()
+	if v, ok := s.Client[id]; ok {
+		if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
+			s.CloseClient <- c.Id
+		}
+		v.signal.Close()
+		delete(s.Client, id)
+	}
+}
 
 //tcp连接类型区分
 func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
@@ -166,7 +179,7 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
 			v.signal = c
 			v.Unlock()
 		} else {
-			s.Client[id] = NewClient(nil, c)
+			s.Client[id] = NewClient(nil, c, nil)
 			s.clientLock.Unlock()
 		}
 		lg.Printf("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
@@ -179,7 +192,7 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
 			v.tunnel = c
 			v.Unlock()
 		} else {
-			s.Client[id] = NewClient(c, nil)
+			s.Client[id] = NewClient(c, nil, nil)
 			s.clientLock.Unlock()
 		}
 		go s.clientCopy(id)
@@ -187,11 +200,44 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
 		go s.GetConfig(c)
 	case common.WORK_REGISTER:
 		go s.register(c)
+	case common.WORK_SEND_STATUS:
+		s.clientLock.Lock()
+		if v, ok := s.Client[id]; ok {
+			s.clientLock.Unlock()
+			v.Lock()
+			v.msg = c
+			v.Unlock()
+		} else {
+			s.Client[id] = NewClient(nil, nil, c)
+			s.clientLock.Unlock()
+		}
+		go s.getMsgStatus(id)
 	}
 	c.SetAlive(s.tunnelType)
 	return
 }
 
+func (s *Bridge) getMsgStatus(clientId int) {
+	s.clientLock.Lock()
+	client := s.Client[clientId]
+	s.clientLock.Unlock()
+
+	if client == nil {
+		return
+	}
+	for {
+		if id, err := client.msg.GetLen(); err != nil {
+			s.closeClient(clientId)
+			return
+		} else {
+			client.Lock()
+			if v, ok := client.linkMap[id]; ok {
+				v.StatusCh <- true
+			}
+			client.Unlock()
+		}
+	}
+}
 func (s *Bridge) register(c *conn.Conn) {
 	var hour int32
 	if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
@@ -251,12 +297,14 @@ func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (t
 			s.DelClient(clientId)
 			return
 		}
+
 		if v.tunnel == nil {
 			err = errors.New("get tunnel connection error")
 			return
 		} else {
 			tunnel = v.tunnel
 		}
+		link.MsgConn = v.msg
 		v.Lock()
 		v.linkMap[link.Id] = link
 		v.Unlock()
@@ -412,7 +460,8 @@ func (s *Bridge) clientCopy(clientId int) {
 
 	for {
 		if id, err := client.tunnel.GetLen(); err != nil {
-			s.closeClient(clientId)
+			lg.Println("read msg content length error close client")
+			s.delClient(clientId)
 			break
 		} else {
 			client.Lock()
@@ -420,23 +469,11 @@ func (s *Bridge) clientCopy(clientId int) {
 				client.Unlock()
 				if content, err := client.tunnel.GetMsgContent(link); err != nil {
 					pool.PutBufPoolCopy(content)
-					s.closeClient(clientId)
+					s.delClient(clientId)
 					lg.Println("read msg content error", err, "close client")
 					break
 				} else {
-					if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
-						if link.Conn != nil {
-							link.Conn.Close()
-						}
-					} else {
-						if link.UdpListener != nil && link.UdpRemoteAddr != nil {
-							link.UdpListener.WriteToUDP(content, link.UdpRemoteAddr)
-						} else {
-							link.Conn.Write(content)
-						}
-						link.Flow.Add(0, len(content))
-					}
-					pool.PutBufPoolCopy(content)
+					link.MsgCh <- content
 				}
 			} else {
 				client.Unlock()

+ 35 - 11
client/client.go

@@ -21,6 +21,7 @@ type TRPClient struct {
 	svrAddr        string
 	linkMap        map[int]*conn.Link
 	tunnel         *conn.Conn
+	msgTunnel      *conn.Conn
 	bridgeConnType string
 	stop           chan bool
 	proxyUrl       string
@@ -67,6 +68,7 @@ func (s *TRPClient) Close() {
 //处理
 func (s *TRPClient) processor(c *conn.Conn) {
 	go s.dealChan()
+	go s.getMsgStatus()
 	for {
 		flags, err := c.ReadFlag()
 		if err != nil {
@@ -83,7 +85,9 @@ func (s *TRPClient) processor(c *conn.Conn) {
 				s.Lock()
 				s.linkMap[link.Id] = link
 				s.Unlock()
+				link.MsgConn = s.msgTunnel
 				go s.linkProcess(link, c)
+				link.Run(false)
 			}
 		case common.RES_CLOSE:
 			lg.Fatalln("The authentication key is connected by another client or the server closes the client.")
@@ -109,9 +113,7 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
 		lg.Println("connect to ", link.Host, "error:", err)
 		return
 	}
-
 	c.WriteSuccess(link.Id)
-
 	link.Conn = conn.NewConn(server)
 	buf := pool.BufPoolCopy.Get().([]byte)
 	for {
@@ -123,8 +125,11 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
 				c.Close()
 				break
 			}
-			lg.Println("send ok", link.Id)
+			if link.ConnType == common.CONN_UDP {
+				break
+			}
 		}
+		<-link.StatusCh
 	}
 	pool.PutBufPoolCopy(buf)
 	s.Lock()
@@ -132,6 +137,31 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) {
 	s.Unlock()
 }
 
+func (s *TRPClient) getMsgStatus() {
+	var err error
+	s.msgTunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_SEND_STATUS, s.proxyUrl)
+	if err != nil {
+		lg.Println("connect to ", s.svrAddr, "error:", err)
+		return
+	}
+	go func() {
+		for {
+			if id, err := s.msgTunnel.GetLen(); err != nil {
+				break
+			} else {
+				s.Lock()
+				if v, ok := s.linkMap[id]; ok {
+					s.Unlock()
+					v.StatusCh <- true
+				} else {
+					s.Unlock()
+				}
+			}
+		}
+	}()
+	<-s.stop
+}
+
 //隧道模式处理
 func (s *TRPClient) dealChan() {
 	var err error
@@ -140,26 +170,20 @@ func (s *TRPClient) dealChan() {
 		lg.Println("connect to ", s.svrAddr, "error:", err)
 		return
 	}
-
 	go func() {
 		for {
 			if id, err := s.tunnel.GetLen(); err != nil {
+				lg.Println("get id error", err, id)
 				break
 			} else {
 				s.Lock()
 				if v, ok := s.linkMap[id]; ok {
 					s.Unlock()
 					if content, err := s.tunnel.GetMsgContent(v); err != nil {
-						lg.Println("get msg content error:", err, id)
 						pool.PutBufPoolCopy(content)
 						break
 					} else {
-						if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
-							v.Conn.Close()
-						} else if v.Conn != nil {
-							v.Conn.Write(content)
-						}
-						pool.PutBufPoolCopy(content)
+						v.MsgCh <- content
 					}
 				} else {
 					s.Unlock()

+ 5 - 0
cmd/nps/nps.go

@@ -12,6 +12,8 @@ import (
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego"
 	_ "github.com/cnlh/nps/web/routers"
 	"log"
+	"net/http"
+	_ "net/http/pprof"
 	"os"
 	"path/filepath"
 )
@@ -38,6 +40,9 @@ func main() {
 			return
 		}
 	}
+	go func() {
+		http.ListenAndServe("0.0.0.0:8899", nil)
+	}()
 	if *logType == "stdout" {
 		lg.InitLogFile("nps", true, common.GetLogPath())
 	} else {

+ 3 - 6
conf/npc.conf

@@ -2,11 +2,8 @@
 server=127.0.0.1:8284
 tp=tcp
 vkey=123
-compress=snappy
-crypt=true
 auto_reconnection=true
-username=111
-password=222
+
 [web1]
 host=a.o.com
 host_change=www.proxy.com
@@ -21,7 +18,7 @@ header_set_proxy=nps
 
 [tcp]
 mode=tcpServer
-target=8001-8005,8006
+target=8001-8005,8080
 port=9001-9005,9006
 
 [socks5]
@@ -34,5 +31,5 @@ port=9008
 
 [udp]
 mode=udpServer
-port=9009
+port=53
 target=114.114.114.114:53

+ 15 - 14
lib/common/const.go

@@ -6,20 +6,21 @@ const (
 	COMPRESS_NONE_DECODE
 	COMPRESS_SNAPY_ENCODE
 	COMPRESS_SNAPY_DECODE
-	VERIFY_EER     = "vkey"
-	VERIFY_SUCCESS = "sucs"
-	WORK_MAIN      = "main"
-	WORK_CHAN      = "chan"
-	WORK_CONFIG    = "conf"
-	WORK_REGISTER  = "rgst"
-	WORK_STATUS    = "stus"
-	RES_SIGN       = "sign"
-	RES_MSG        = "msg0"
-	RES_CLOSE      = "clse"
-	NEW_CONN       = "conn" //新连接标志
-	NEW_TASK       = "task" //新连接标志
-	NEW_CONF       = "conf" //新连接标志
-	NEW_HOST       = "host" //新连接标志
+	VERIFY_EER       = "vkey"
+	VERIFY_SUCCESS   = "sucs"
+	WORK_MAIN        = "main"
+	WORK_CHAN        = "chan"
+	WORK_SEND_STATUS = "sdst"
+	WORK_CONFIG      = "conf"
+	WORK_REGISTER    = "rgst"
+	WORK_STATUS      = "stus"
+	RES_SIGN         = "sign"
+	RES_MSG          = "msg0"
+	RES_CLOSE        = "clse"
+	NEW_CONN         = "conn" //新连接标志
+	NEW_TASK         = "task" //新连接标志
+	NEW_CONF         = "conf" //新连接标志
+	NEW_HOST         = "host" //新连接标志
 
 	CONN_TCP          = "tcp"
 	CONN_UDP          = "udp"

+ 11 - 9
lib/conn/conn.go

@@ -74,12 +74,12 @@ func (s *Conn) ReadLen(cLen int) ([]byte, error) {
 		return nil, errors.New("长度错误" + strconv.Itoa(cLen))
 	}
 	var buf []byte
-	if cLen <= pool.PoolSizeSmall {
+	if cLen < pool.PoolSizeSmall {
 		buf = pool.BufPoolSmall.Get().([]byte)[:cLen]
-		defer pool.BufPoolSmall.Put(buf)
+		defer pool.PutBufPoolSmall(buf)
 	} else {
 		buf = pool.BufPoolMax.Get().([]byte)[:cLen]
-		defer pool.BufPoolMax.Put(buf)
+		defer pool.PutBufPoolMax(buf)
 	}
 	if n, err := io.ReadFull(s, buf); err != nil || n != cLen {
 		return buf, errors.New("Error reading specified length " + err.Error())
@@ -190,14 +190,10 @@ func (s *Conn) SendMsg(content []byte, link *Link) (n int, err error) {
 	*/
 	s.Lock()
 	defer s.Unlock()
-	raw := bytes.NewBuffer([]byte{})
-	binary.Write(raw, binary.LittleEndian, int32(link.Id))
-	if n, err = s.Write(raw.Bytes()); err != nil {
+	if err = binary.Write(s.Conn, binary.LittleEndian, int32(link.Id)); err != nil {
 		return
 	}
-	raw.Reset()
-	binary.Write(raw, binary.LittleEndian, content)
-	n, err = s.WriteTo(raw.Bytes(), link.En, link.Crypt, link.Rate)
+	n, err = s.WriteTo(content, link.En, link.Crypt, link.Rate)
 	return
 }
 
@@ -260,6 +256,8 @@ func (s *Conn) GetLinkInfo() (lk *Link, err error) {
 		lk.En = common.GetIntNoErrByStr(string(buf[11+hostLen]))
 		lk.De = common.GetIntNoErrByStr(string(buf[12+hostLen]))
 		lk.Crypt = common.GetBoolByStr(string(buf[13+hostLen]))
+		lk.MsgCh = make(chan []byte)
+		lk.StatusCh = make(chan bool)
 	}
 	return
 }
@@ -399,6 +397,10 @@ func (s *Conn) GetTaskInfo() (t *file.Tunnel, err error) {
 	return
 }
 
+func (s *Conn) WriteWriteSuccess(id int) error {
+	return binary.Write(s.Conn, binary.LittleEndian, int32(id))
+}
+
 //write connect success
 func (s *Conn) WriteSuccess(id int) (int, error) {
 	raw := bytes.NewBuffer([]byte{})

+ 37 - 0
lib/conn/link.go

@@ -1,7 +1,9 @@
 package conn
 
 import (
+	"github.com/cnlh/nps/lib/common"
 	"github.com/cnlh/nps/lib/file"
+	"github.com/cnlh/nps/lib/pool"
 	"github.com/cnlh/nps/lib/rate"
 	"net"
 )
@@ -18,6 +20,9 @@ type Link struct {
 	UdpListener   *net.UDPConn
 	Rate          *rate.Rate
 	UdpRemoteAddr *net.UDPAddr
+	MsgCh         chan []byte
+	MsgConn       *Conn
+	StatusCh      chan bool
 }
 
 func NewLink(id int, connType string, host string, en, de int, crypt bool, c *Conn, flow *file.Flow, udpListener *net.UDPConn, rate *rate.Rate, UdpRemoteAddr *net.UDPAddr) *Link {
@@ -33,5 +38,37 @@ func NewLink(id int, connType string, host string, en, de int, crypt bool, c *Co
 		UdpListener:   udpListener,
 		Rate:          rate,
 		UdpRemoteAddr: UdpRemoteAddr,
+		MsgCh:         make(chan []byte),
+		StatusCh:      make(chan bool),
 	}
 }
+
+func (s *Link) Run(flow bool) {
+	go func() {
+		for {
+			select {
+			case content := <-s.MsgCh:
+				if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
+					if s.Conn != nil {
+						s.Conn.Close()
+					}
+					return
+				} else {
+					if s.UdpListener != nil && s.UdpRemoteAddr != nil {
+						s.UdpListener.WriteToUDP(content, s.UdpRemoteAddr)
+					} else {
+						s.Conn.Write(content)
+					}
+					if flow {
+						s.Flow.Add(0, len(content))
+					}
+					if s.ConnType == common.CONN_UDP {
+						return
+					}
+					s.MsgConn.WriteWriteSuccess(s.Id)
+					pool.PutBufPoolCopy(content)
+				}
+			}
+		}
+	}()
+}

+ 15 - 3
lib/pool/pool.go

@@ -36,14 +36,26 @@ var BufPoolCopy = sync.Pool{
 	},
 }
 
+func PutBufPoolUdp(buf []byte) {
+	if cap(buf) == PoolSizeUdp {
+		BufPoolUdp.Put(buf[:PoolSizeUdp])
+	}
+}
+
 func PutBufPoolCopy(buf []byte) {
 	if cap(buf) == PoolSizeCopy {
 		BufPoolCopy.Put(buf[:PoolSizeCopy])
 	}
 }
 
-func PutBufPoolUdp(buf []byte) {
-	if cap(buf) == PoolSizeUdp {
-		BufPoolUdp.Put(buf[:PoolSizeUdp])
+func PutBufPoolSmall(buf []byte) {
+	if cap(buf) == PoolSizeSmall {
+		BufPoolSmall.Put(buf[:PoolSizeSmall])
+	}
+}
+
+func PutBufPoolMax(buf []byte) {
+	if cap(buf) == PoolSize {
+		BufPoolMax.Put(buf[:PoolSize])
 	}
 }

+ 1 - 0
server/proxy/base.go

@@ -60,6 +60,7 @@ func (s *server) linkCopy(link *conn.Link, c *conn.Conn, rb []byte, tunnel *conn
 			}
 			flow.Add(n, 0)
 		}
+		<-link.StatusCh
 	}
 	pool.PutBufPoolCopy(buf)
 }

+ 3 - 4
server/proxy/http.go

@@ -9,7 +9,6 @@ import (
 	"github.com/cnlh/nps/lib/file"
 	"github.com/cnlh/nps/lib/lg"
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego"
-	"log"
 	"net/http"
 	"net/http/httputil"
 	"path/filepath"
@@ -137,9 +136,10 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
 			}
 			lk = conn.NewLink(host.Client.GetId(), common.CONN_TCP, host.GetRandomTarget(), host.Client.Cnf.CompressEncode, host.Client.Cnf.CompressDecode, host.Client.Cnf.Crypt, c, host.Flow, nil, host.Client.Rate, nil)
 			if tunnel, err = s.bridge.SendLinkInfo(host.Client.Id, lk, c.Conn.RemoteAddr().String()); err != nil {
-				log.Println(err)
+				lg.Println(err)
 				break
 			}
+			lk.Run(true)
 			isConn = false
 		} else {
 			r, err = http.ReadRequest(bufio.NewReader(c))
@@ -166,6 +166,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
 			c.Close()
 			break
 		}
+		<-lk.StatusCh
 	}
 end:
 	if isConn {
@@ -173,9 +174,7 @@ end:
 	} else {
 		tunnel.SendMsg([]byte(common.IO_EOF), lk)
 	}
-
 	c.Close()
-
 }
 
 func (s *httpServer) NewServer(port int) *http.Server {

+ 1 - 0
server/proxy/socks5.go

@@ -148,6 +148,7 @@ func (s *Sock5ModeServer) doConnect(c net.Conn, command uint8) {
 		return
 	} else {
 		s.sendReply(c, succeeded)
+		link.Run(true)
 		s.linkCopy(link, conn.NewConn(c), nil, tunnel, s.task.Flow)
 	}
 	return

+ 1 - 0
server/proxy/tcp.go

@@ -57,6 +57,7 @@ func (s *TunnelModeServer) dealClient(c *conn.Conn, cnf *file.Config, addr strin
 		c.Close()
 		return err
 	} else {
+		link.Run(true)
 		s.linkCopy(link, c, rb, tunnel, s.task.Flow)
 	}
 	return nil

+ 1 - 0
server/proxy/udp.go

@@ -56,6 +56,7 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) {
 		s.task.Flow.Add(len(data), 0)
 		tunnel.SendMsg(data, link)
 		pool.PutBufPoolUdp(data)
+		link.Run(true)
 	}
 }