Browse Source

Connection disconnection |ip limit bug

刘河 6 years ago
parent
commit
e24b2921ac
5 changed files with 51 additions and 43 deletions
  1. 1 1
      bridge/bridge.go
  2. 4 4
      lib/mux/conn.go
  3. 29 36
      lib/mux/mux.go
  4. 6 2
      server/proxy/http.go
  5. 11 0
      server/server.go

+ 1 - 1
bridge/bridge.go

@@ -324,7 +324,7 @@ func (s *Bridge) register(c *conn.Conn) {
 	var hour int32
 	if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
 		s.registerLock.Lock()
-		s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Minute * time.Duration(hour))
+		s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
 		s.registerLock.Unlock()
 	}
 }

+ 4 - 4
lib/mux/conn.go

@@ -26,6 +26,7 @@ type conn struct {
 	connId           int32
 	isClose          bool
 	readWait         bool
+	hasWrite         int
 	mux              *Mux
 }
 
@@ -83,9 +84,7 @@ func (s *conn) Read(buf []byte) (n int, err error) {
 	} else {
 		n = copy(buf, s.readBuffer[s.startRead:s.endRead])
 		s.startRead += n
-		if s.waitQueue.Size() < s.mux.waitQueueSize/2 {
-			s.mux.sendInfo(MUX_MSG_SEND_OK, s.connId, nil)
-		}
+		s.mux.sendInfo(MUX_MSG_SEND_OK, s.connId, nil)
 	}
 	return
 }
@@ -116,9 +115,10 @@ func (s *conn) write(buf []byte, ch chan struct{}) {
 	start := 0
 	l := len(buf)
 	for {
-		if s.stopWrite {
+		if s.hasWrite > 10 {
 			<-s.getStatusCh
 		}
+		s.hasWrite++
 		if l-start > pool.PoolSizeCopy {
 			s.mux.sendInfo(MUX_NEW_MSG, s.connId, buf[start:start+pool.PoolSizeCopy])
 			start += pool.PoolSizeCopy

+ 29 - 36
lib/mux/mux.go

@@ -5,6 +5,7 @@ import (
 	"encoding/binary"
 	"errors"
 	"github.com/cnlh/nps/lib/pool"
+	"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
 	"math"
 	"net"
 	"sync"
@@ -22,32 +23,27 @@ const (
 	MUX_PING
 	MUX_CONN_CLOSE
 	MUX_PING_RETURN
-	MUX_STOP_WRITE
-	RETRY_TIME = 2 //Heart beat allowed fault tolerance times
 )
 
 type Mux struct {
 	net.Listener
-	conn          net.Conn
-	connMap       *connMap
-	newConnCh     chan *conn
-	id            int32
-	closeChan     chan struct{}
-	IsClose       bool
-	pingOk        int
-	waitQueueSize int
+	conn      net.Conn
+	connMap   *connMap
+	newConnCh chan *conn
+	id        int32
+	closeChan chan struct{}
+	IsClose   bool
 	sync.Mutex
 }
 
 func NewMux(c net.Conn) *Mux {
 	m := &Mux{
-		conn:          c,
-		connMap:       NewConnMap(),
-		id:            0,
-		closeChan:     make(chan struct{}),
-		newConnCh:     make(chan *conn),
-		IsClose:       false,
-		waitQueueSize: 10, //TODO :In order to be more efficient, this value can be dynamically generated according to the delay algorithm.
+		conn:      c,
+		connMap:   NewConnMap(),
+		id:        0,
+		closeChan: make(chan struct{}),
+		newConnCh: make(chan *conn),
+		IsClose:   false,
 	}
 	//read session by flag
 	go m.readSession()
@@ -104,7 +100,7 @@ func (s *Mux) sendInfo(flag int32, id int32, content []byte) error {
 		binary.Write(raw, binary.LittleEndian, int32(len(content)))
 		binary.Write(raw, binary.LittleEndian, content)
 	}
-	if _, err := s.conn.Write(raw.Bytes()); err != nil || s.pingOk > RETRY_TIME {
+	if _, err := s.conn.Write(raw.Bytes()); err != nil {
 		s.Close()
 		return err
 	}
@@ -113,7 +109,7 @@ func (s *Mux) sendInfo(flag int32, id int32, content []byte) error {
 
 func (s *Mux) ping() {
 	go func() {
-		ticker := time.NewTicker(time.Second * 5)
+		ticker := time.NewTicker(time.Second * 1)
 		for {
 			select {
 			case <-ticker.C:
@@ -122,10 +118,11 @@ func (s *Mux) ping() {
 			if (math.MaxInt32 - s.id) < 10000 {
 				s.id = 0
 			}
-			if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || s.pingOk > RETRY_TIME {
+			if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil {
+				logs.Error("ping error,close the connection")
+				s.Close()
 				break
 			}
-			s.pingOk += 1
 		}
 	}()
 	select {
@@ -155,7 +152,6 @@ func (s *Mux) readSession() {
 					s.sendInfo(MUX_PING_RETURN, MUX_PING, nil)
 					continue
 				case MUX_PING_RETURN:
-					s.pingOk -= 1
 					continue
 				case MUX_NEW_MSG:
 					buf = pool.GetBufPoolCopy()
@@ -173,19 +169,12 @@ func (s *Mux) readSession() {
 							conn.readWait = false
 							conn.readCh <- struct{}{}
 						}
-						if conn.waitQueue.Size() > s.waitQueueSize {
-							s.sendInfo(MUX_STOP_WRITE, conn.connId, nil)
-						}
-					case MUX_STOP_WRITE:
-						conn.stopWrite = true
 					case MUX_MSG_SEND_OK: //the remote has read
-						if conn.stopWrite {
-							conn.stopWrite = false
-							select {
-							case conn.getStatusCh <- struct{}{}:
-							default:
-							}
+						select {
+						case conn.getStatusCh <- struct{}{}:
+						default:
 						}
+						conn.hasWrite --
 					case MUX_NEW_CONN_OK: //conn ok
 						conn.connStatusOkCh <- struct{}{}
 					case MUX_NEW_CONN_Fail:
@@ -198,6 +187,7 @@ func (s *Mux) readSession() {
 					pool.PutBufPoolCopy(buf)
 				}
 			} else {
+				logs.Error("read or send error")
 				break
 			}
 		}
@@ -214,9 +204,12 @@ func (s *Mux) Close() error {
 	}
 	s.IsClose = true
 	s.connMap.Close()
-	s.closeChan <- struct{}{}
-	s.closeChan <- struct{}{}
-	s.closeChan <- struct{}{}
+	select {
+	case s.closeChan <- struct{}{}:
+	}
+	select {
+	case s.closeChan <- struct{}{}:
+	}
 	return s.conn.Close()
 }
 

+ 6 - 2
server/proxy/http.go

@@ -211,14 +211,18 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
 		}
 		//根据设定,修改header和host
 		common.ChangeHostAndHeader(r, host.HostChange, host.HeaderChange, c.Conn.RemoteAddr().String())
-		b, err := httputil.DumpRequest(r, true)
+		b, err := httputil.DumpRequest(r, false)
 		if err != nil {
 			break
 		}
-		host.Flow.Add(int64(len(b)), 0)
 		logs.Trace("%s request, method %s, host %s, url %s, remote address %s, target %s", r.URL.Scheme, r.Method, r.Host, r.RequestURI, r.RemoteAddr, lk.Host)
 		//write
 		connClient.Write(b)
+		if bodyLen, err := common.CopyBuffer(connClient, r.Body); err != nil {
+			break
+		} else {
+			host.Flow.Add(int64(len(b))+bodyLen, 0)
+		}
 	}
 end:
 	if isConn {

+ 11 - 0
server/server.go

@@ -91,6 +91,7 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) {
 		go proxy.NewP2PServer(p).Start()
 	}
 	go DealBridgeTask()
+	go dealClientFlow()
 	if svr := NewMode(Bridge, cnf); svr != nil {
 		if err := svr.Start(); err != nil {
 			logs.Error(err)
@@ -101,6 +102,16 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) {
 	}
 }
 
+func dealClientFlow() {
+	ticker := time.NewTicker(time.Minute)
+	for {
+		select {
+		case <-ticker.C:
+			dealClientData(file.GetCsvDb().Clients)
+		}
+	}
+}
+
 //new a server by mode name
 func NewMode(Bridge *bridge.Bridge, c *file.Tunnel) proxy.Service {
 	var service proxy.Service