Browse Source

Https kcp

刘河 6 năm trước cách đây
mục cha
commit
a66b465046

+ 14 - 12
bridge/bridge.go

@@ -16,6 +16,7 @@ import (
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
 	"net"
+	"os"
 	"strconv"
 	"strings"
 	"sync"
@@ -70,15 +71,14 @@ func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int
 
 func (s *Bridge) StartTunnel() error {
 	go s.ping()
-	l, err := connection.GetBridgeListener(s.tunnelType)
-	if err != nil {
-		return err
-	}
 	if s.tunnelType == "kcp" {
-		listener, ok := l.(*kcp.Listener)
-		if !ok {
+		listener, err := kcp.ListenWithOptions(beego.AppConfig.String("bridge_ip")+":"+beego.AppConfig.String("bridge_port"), nil, 150, 3)
+		if err != nil {
+			logs.Error(err)
+			os.Exit(0)
 			return err
 		}
+		logs.Info("server start, the bridge type is %s, the bridge port is %d", s.tunnelType, s.TunnelPort)
 		go func() {
 			for {
 				c, err := listener.AcceptKCP()
@@ -91,8 +91,10 @@ func (s *Bridge) StartTunnel() error {
 			}
 		}()
 	} else {
-		listener, ok := l.(net.Listener)
-		if !ok {
+		listener, err := connection.GetBridgeListener(s.tunnelType)
+		if err != nil {
+			logs.Error(err)
+			os.Exit(0)
 			return err
 		}
 		go func() {
@@ -253,10 +255,10 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
 		if v, ok := s.Client[id]; ok {
 			s.clientLock.Unlock()
 			v.Lock()
-			v.tunnel = mux.NewMux(c.Conn)
+			v.tunnel = mux.NewMux(c.Conn, s.tunnelType)
 			v.Unlock()
 		} else {
-			s.Client[id] = NewClient(mux.NewMux(c.Conn), nil, nil)
+			s.Client[id] = NewClient(mux.NewMux(c.Conn, s.tunnelType), nil, nil)
 			s.clientLock.Unlock()
 		}
 	case common.WORK_CONFIG:
@@ -282,10 +284,10 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
 		if v, ok := s.Client[id]; ok {
 			s.clientLock.Unlock()
 			v.Lock()
-			v.file = mux.NewMux(c.Conn)
+			v.file = mux.NewMux(c.Conn, s.tunnelType)
 			v.Unlock()
 		} else {
-			s.Client[id] = NewClient(nil, mux.NewMux(c.Conn), nil)
+			s.Client[id] = NewClient(nil, mux.NewMux(c.Conn, s.tunnelType), nil)
 			s.clientLock.Unlock()
 		}
 	case common.WORK_P2P:

+ 16 - 18
client/client.go

@@ -15,11 +15,11 @@ import (
 type TRPClient struct {
 	svrAddr        string
 	bridgeConnType string
-	stop           chan bool
 	proxyUrl       string
 	vKey           string
 	tunnel         *mux.Mux
 	signal         *conn.Conn
+	ticker         *time.Ticker
 	cnf            *config.Config
 }
 
@@ -29,7 +29,6 @@ func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl st
 		svrAddr:        svraddr,
 		vKey:           vKey,
 		bridgeConnType: bridgeConnType,
-		stop:           make(chan bool),
 		proxyUrl:       proxyUrl,
 		cnf:            cnf,
 	}
@@ -51,8 +50,9 @@ retry:
 }
 
 func (s *TRPClient) Close() {
-	s.stop <- true
+	s.tunnel.Close()
 	s.signal.Close()
+	s.ticker.Stop()
 }
 
 //处理
@@ -168,7 +168,7 @@ func (s *TRPClient) newUdpConn(rAddr string, md5Password string) {
 		if udpTunnel.RemoteAddr().String() == string(b) {
 			conn.SetUdpSession(udpTunnel)
 			//读取link,设置msgCh 设置msgConn消息回传响应机制
-			l := mux.NewMux(udpTunnel)
+			l := mux.NewMux(udpTunnel, s.bridgeConnType)
 			for {
 				connMux, err := l.Accept()
 				if err != nil {
@@ -187,18 +187,16 @@ func (s *TRPClient) dealChan() {
 		logs.Error("connect to ", s.svrAddr, "error:", err)
 		return
 	}
-	go func() {
-		s.tunnel = mux.NewMux(tunnel.Conn)
-		for {
-			src, err := s.tunnel.Accept()
-			if err != nil {
-				logs.Warn(err)
-				break
-			}
-			go s.srcProcess(src)
+	s.tunnel = mux.NewMux(tunnel.Conn, s.bridgeConnType)
+	for {
+		src, err := s.tunnel.Accept()
+		if err != nil {
+			logs.Warn(err)
+			s.Close()
+			break
 		}
-	}()
-	<-s.stop
+		go s.srcProcess(src)
+	}
 }
 
 func (s *TRPClient) srcProcess(src net.Conn) {
@@ -221,14 +219,14 @@ func (s *TRPClient) srcProcess(src net.Conn) {
 }
 
 func (s *TRPClient) ping() {
-	ticker := time.NewTicker(time.Second * 5)
+	s.ticker = time.NewTicker(time.Second * 5)
 loop:
 	for {
 		select {
-		case <-ticker.C:
+		case <-s.ticker.C:
 			if s.tunnel.IsClose {
 				s.Close()
-				ticker.Stop()
+				s.ticker.Stop()
 				break loop
 			}
 		}

+ 2 - 2
client/local.go

@@ -39,7 +39,7 @@ func startLocalFileServer(config *config.CommonConfig, t *file.Tunnel, vkey stri
 	}
 	logs.Info("start local file system, local path %s, strip prefix %s ,remote port %s ", t.LocalPath, t.StripPre, t.Ports)
 	fileServer = append(fileServer, srv)
-	listener := mux.NewMux(remoteConn.Conn)
+	listener := mux.NewMux(remoteConn.Conn, common.CONN_TCP)
 	logs.Warn(srv.Serve(listener))
 }
 
@@ -88,7 +88,7 @@ func processP2P(localTcpConn net.Conn, config *config.CommonConfig, l *config.Lo
 		if udpConn == nil {
 			return
 		}
-		muxSession = mux.NewMux(udpConn)
+		muxSession = mux.NewMux(udpConn, "kcp")
 	}
 	nowConn, err := muxSession.NewConn()
 	if err != nil {

+ 2 - 1
conf/nps.conf

@@ -6,7 +6,8 @@ runmode = dev
 
 #HTTP(S) proxy port, no startup if empty
 http_proxy_port=80
-#https_proxy_port=8024
+#https_proxy_port=443
+https_just_proxy=true
 #certFile absolute path
 pem_path=conf/server.pem
 #KeyFile absolute path

+ 9 - 0
lib/common/util.go

@@ -335,3 +335,12 @@ func RemoveArrVal(arr []string, val string) []string {
 	}
 	return arr
 }
+
+func BytesToNum(b []byte) int {
+	var str string
+	for i := 0; i < len(b); i++ {
+		str += strconv.Itoa(int(b[i]))
+	}
+	x, _ := strconv.Atoi(str)
+	return int(x)
+}

+ 1 - 1
lib/file/file.go

@@ -561,7 +561,7 @@ func (s *Csv) GetInfoByHost(host string, r *http.Request) (h *Host, err error) {
 			v.Location = "/"
 		}
 		if strings.Index(r.RequestURI, v.Location) == 0 {
-			if h == nil || (len(v.Location) > len(h.Location)) {
+			if h == nil || (len(v.Location) < len(h.Location)) {
 				h = v
 			}
 		}

+ 13 - 6
lib/mux/mux.go

@@ -5,7 +5,6 @@ import (
 	"encoding/binary"
 	"errors"
 	"github.com/cnlh/nps/lib/pool"
-	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"math"
 	"net"
 	"sync"
@@ -33,10 +32,12 @@ type Mux struct {
 	id        int32
 	closeChan chan struct{}
 	IsClose   bool
+	pingOk    int
+	connType  string
 	sync.Mutex
 }
 
-func NewMux(c net.Conn) *Mux {
+func NewMux(c net.Conn, connType string) *Mux {
 	m := &Mux{
 		conn:      c,
 		connMap:   NewConnMap(),
@@ -44,6 +45,7 @@ func NewMux(c net.Conn) *Mux {
 		closeChan: make(chan struct{}),
 		newConnCh: make(chan *conn),
 		IsClose:   false,
+		connType:  connType,
 	}
 	//read session by flag
 	go m.readSession()
@@ -85,7 +87,11 @@ func (s *Mux) Accept() (net.Conn, error) {
 	if s.IsClose {
 		return nil, errors.New("accpet error,the conn has closed")
 	}
-	return <-s.newConnCh, nil
+	conn := <-s.newConnCh
+	if conn == nil {
+		return nil, errors.New("accpet error,the conn has closed")
+	}
+	return conn, nil
 }
 
 func (s *Mux) Addr() net.Addr {
@@ -118,11 +124,11 @@ func (s *Mux) ping() {
 			if (math.MaxInt32 - s.id) < 10000 {
 				s.id = 0
 			}
-			if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil {
-				logs.Error("ping error,close the connection")
+			if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") {
 				s.Close()
 				break
 			}
+			s.pingOk++
 		}
 	}()
 	select {
@@ -141,6 +147,7 @@ func (s *Mux) readSession() {
 				if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
 					break
 				}
+				s.pingOk = 0
 				switch flag {
 				case MUX_NEW_CONN: //new conn
 					conn := NewConn(i, s)
@@ -187,7 +194,6 @@ func (s *Mux) readSession() {
 					pool.PutBufPoolCopy(buf)
 				}
 			} else {
-				logs.Error("read or send error")
 				break
 			}
 		}
@@ -210,6 +216,7 @@ func (s *Mux) Close() error {
 	select {
 	case s.closeChan <- struct{}{}:
 	}
+	close(s.newConnCh)
 	return s.conn.Close()
 }
 

+ 1 - 10
lib/mux/pmux.go

@@ -88,7 +88,7 @@ func (pMux *PortMux) process(conn net.Conn) {
 	var ch chan *PortConn
 	var rs []byte
 	var buffer bytes.Buffer
-	switch bytesToNum(buf) {
+	switch common.BytesToNum(buf) {
 	case HTTP_CONNECT, HTTP_DELETE, HTTP_GET, HTTP_HEAD, HTTP_OPTIONS, HTTP_POST, HTTP_PUT, HTTP_TRACE: //http and manager
 		buffer.Reset()
 		r := bufio.NewReader(conn)
@@ -161,12 +161,3 @@ func (pMux *PortMux) GetHttpsListener() net.Listener {
 func (pMux *PortMux) GetManagerListener() net.Listener {
 	return NewPortListener(pMux.managerConn, pMux.Listener.Addr())
 }
-
-func bytesToNum(b []byte) int {
-	var str string
-	for i := 0; i < len(b); i++ {
-		str += strconv.Itoa(int(b[i]))
-	}
-	x, _ := strconv.Atoi(str)
-	return int(x)
-}

+ 1 - 9
server/connection/connection.go

@@ -4,7 +4,6 @@ import (
 	"github.com/cnlh/nps/lib/mux"
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego"
 	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
-	"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
 	"net"
 	"os"
 	"strconv"
@@ -32,7 +31,7 @@ func InitConnectionService() {
 	}
 }
 
-func GetBridgeListener(tp string) (interface{}, error) {
+func GetBridgeListener(tp string) (net.Listener, error) {
 	logs.Info("server start, the bridge type is %s, the bridge port is %s", tp, bridgePort)
 	var p int
 	var err error
@@ -41,13 +40,6 @@ func GetBridgeListener(tp string) (interface{}, error) {
 	}
 	if pMux != nil {
 		return pMux.GetClientListener(), nil
-	} else if tp == "udp" {
-		if p, err = beego.AppConfig.Int("bridge_port"); err != nil {
-			logs.Error(err)
-			os.Exit(0)
-		} else {
-			return kcp.ListenWithOptions(":"+strconv.Itoa(p), nil, 150, 3)
-		}
 	}
 	return net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP(beego.AppConfig.String("bridge_ip")), p, ""})
 }

+ 7 - 7
server/proxy/base.go

@@ -72,21 +72,21 @@ func (s *BaseServer) checkFlow() error {
 }
 
 //与客户端建立通道
-func (s *BaseServer) DealClient(c *conn.Conn, addr string, rb []byte, tp string) error {
-	link := conn.NewLink(tp, addr, s.task.Client.Cnf.Crypt, s.task.Client.Cnf.Compress, c.Conn.RemoteAddr().String())
+func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string) error {
+	link := conn.NewLink(tp, addr, client.Cnf.Crypt, client.Cnf.Compress, c.Conn.RemoteAddr().String())
 
-	if target, err := s.bridge.SendLinkInfo(s.task.Client.Id, link, c.Conn.RemoteAddr().String(), s.task); err != nil {
-		logs.Warn("task id %d get connection from client id %d  error %s", s.task.Id, s.task.Client.Id, err.Error())
+	if target, err := s.bridge.SendLinkInfo(client.Id, link, c.Conn.RemoteAddr().String(), s.task); err != nil {
+		logs.Warn("task id %d get connection from client id %d  error %s", s.task.Id, client.Id, err.Error())
 		c.Close()
 		return err
 	} else {
 		if rb != nil {
 			//HTTP proxy crypt or compress
-			conn.GetConn(target, link.Crypt, link.Compress, s.task.Client.Rate, true).Write(rb)
+			conn.GetConn(target, link.Crypt, link.Compress, client.Rate, true).Write(rb)
 		}
-		conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, s.task.Client.Rate, s.task.Flow, true)
+		conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, s.task.Flow, true)
 	}
 
-	s.task.Client.AddConn()
+	client.AddConn()
 	return nil
 }

+ 60 - 5
server/proxy/http.go

@@ -2,6 +2,7 @@ package proxy
 
 import (
 	"bufio"
+	"bytes"
 	"crypto/tls"
 	"github.com/cnlh/nps/bridge"
 	"github.com/cnlh/nps/lib/common"
@@ -14,6 +15,7 @@ import (
 	"net"
 	"net/http"
 	"net/http/httputil"
+	"net/url"
 	"os"
 	"path/filepath"
 	"strconv"
@@ -49,6 +51,49 @@ func NewHttp(bridge *bridge.Bridge, c *file.Tunnel) *httpServer {
 	}
 }
 
+func (s *httpServer) processHttps(c net.Conn) {
+	buf := make([]byte, 2<<10)
+	n, err := c.Read(buf)
+	if err != nil {
+		return
+	}
+	var host *file.Host
+	file.GetCsvDb().Lock()
+	for _, host = range file.GetCsvDb().Hosts {
+		if bytes.Index(buf[:n], []byte(host.Host)) >= 0 {
+			break
+		}
+	}
+	file.GetCsvDb().Unlock()
+	if host == nil {
+		logs.Error("new https connection can't be parsed!", c.RemoteAddr().String())
+		c.Close()
+		return
+	}
+	var targetAddr string
+	r := new(http.Request)
+	r.RequestURI = "/"
+	r.URL = new(url.URL)
+	r.URL.Scheme = "https"
+	r.Host = host.Host
+	//read the host form connection
+	if !host.Client.GetConn() { //conn num limit
+		logs.Notice("connections exceed the current client %d limit %d ,now connection num %d", host.Client.Id, host.Client.MaxConn, host.Client.NowConn)
+		c.Close()
+		return
+	}
+	//流量限制
+	if host.Client.Flow.FlowLimit > 0 && (host.Client.Flow.FlowLimit<<20) < (host.Client.Flow.ExportFlow+host.Client.Flow.InletFlow) {
+		logs.Warn("Traffic exceeded client id %s", host.Client.Id)
+		return
+	}
+	if targetAddr, err = host.GetRandomTarget(); err != nil {
+		logs.Warn(err.Error())
+	}
+	logs.Trace("new https connection,clientId %d,host %s,remote address %s", host.Client.Id, r.Host, c.RemoteAddr().String())
+	s.DealClient(conn.NewConn(c), host.Client, targetAddr, buf[:n], common.CONN_TCP)
+}
+
 func (s *httpServer) Start() error {
 	var err error
 	var httpSrv, httpsSrv *http.Server
@@ -81,16 +126,26 @@ func (s *httpServer) Start() error {
 		}
 		httpsSrv = s.NewServer(s.httpsPort, "https")
 		go func() {
-			logs.Info("Start https listener, port is", s.httpsPort)
 			l, err := connection.GetHttpsListener()
 			if err != nil {
 				logs.Error(err)
 				os.Exit(0)
 			}
-			err = httpsSrv.ServeTLS(l, s.pemPath, s.keyPath)
-			if err != nil {
-				logs.Error(err)
-				os.Exit(0)
+			if b, err := beego.AppConfig.Bool("https_just_proxy"); err == nil && b {
+				for {
+					c, err := l.Accept()
+					if err != nil {
+						logs.Error(err)
+						break
+					}
+					go s.processHttps(c)
+				}
+			} else {
+				err = httpsSrv.ServeTLS(l, s.pemPath, s.keyPath)
+				if err != nil {
+					logs.Error(err)
+					os.Exit(0)
+				}
 			}
 		}()
 	}

+ 2 - 2
server/proxy/tcp.go

@@ -115,7 +115,7 @@ func ProcessTunnel(c *conn.Conn, s *TunnelModeServer) error {
 		logs.Warn("tcp port %d ,client id %d,task id %d connect error %s", s.task.Port, s.task.Client.Id, s.task.Id, err.Error())
 		return err
 	}
-	return s.DealClient(c, targetAddr, nil, common.CONN_TCP)
+	return s.DealClient(c, s.task.Client, targetAddr, nil, common.CONN_TCP)
 }
 
 //http代理模式
@@ -133,5 +133,5 @@ func ProcessHttp(c *conn.Conn, s *TunnelModeServer) error {
 	if err := s.auth(r, c, s.task.Client.Cnf.U, s.task.Client.Cnf.P); err != nil {
 		return err
 	}
-	return s.DealClient(c, addr, rb, common.CONN_TCP)
+	return s.DealClient(c, s.task.Client, addr, rb, common.CONN_TCP)
 }

+ 1 - 1
server/server.go

@@ -66,7 +66,7 @@ func DealBridgeTask() {
 					logs.Info("Connections exceed the current client %d limit", t.Client.Id)
 					s.Conn.Close()
 				} else if t.Status {
-					go proxy.NewBaseServer(Bridge, t).DealClient(s.Conn, t.Target, nil, common.CONN_TCP)
+					go proxy.NewBaseServer(Bridge, t).DealClient(s.Conn, t.Client, t.Target, nil, common.CONN_TCP)
 				} else {
 					s.Conn.Close()
 					logs.Trace("This key %s cannot be processed,status is close", s.Password)