123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- package proxy
- import (
- "errors"
- "net"
- "net/http"
- "sync"
- "ehang.io/nps/bridge"
- "ehang.io/nps/lib/common"
- "ehang.io/nps/lib/conn"
- "ehang.io/nps/lib/file"
- "github.com/astaxie/beego/logs"
- )
- type Service interface {
- Start() error
- Close() error
- }
- type NetBridge interface {
- SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (target net.Conn, err error)
- }
- //BaseServer struct
- type BaseServer struct {
- id int
- bridge NetBridge
- task *file.Tunnel
- errorContent []byte
- sync.Mutex
- }
- func NewBaseServer(bridge *bridge.Bridge, task *file.Tunnel) *BaseServer {
- return &BaseServer{
- bridge: bridge,
- task: task,
- errorContent: nil,
- Mutex: sync.Mutex{},
- }
- }
- //add the flow
- func (s *BaseServer) FlowAdd(in, out int64) {
- s.Lock()
- defer s.Unlock()
- s.task.Flow.ExportFlow += out
- s.task.Flow.InletFlow += in
- }
- //change the flow
- func (s *BaseServer) FlowAddHost(host *file.Host, in, out int64) {
- s.Lock()
- defer s.Unlock()
- host.Flow.ExportFlow += out
- host.Flow.InletFlow += in
- }
- //write fail bytes to the connection
- func (s *BaseServer) writeConnFail(c net.Conn) {
- c.Write([]byte(common.ConnectionFailBytes))
- c.Write(s.errorContent)
- }
- //auth check
- func (s *BaseServer) auth(r *http.Request, c *conn.Conn, u, p string) error {
- if u != "" && p != "" && !common.CheckAuth(r, u, p) {
- c.Write([]byte(common.UnauthorizedBytes))
- c.Close()
- return errors.New("401 Unauthorized")
- }
- return nil
- }
- //check flow limit of the client ,and decrease the allow num of client
- func (s *BaseServer) CheckFlowAndConnNum(client *file.Client) error {
- if client.Flow.FlowLimit > 0 && (client.Flow.FlowLimit<<20) < (client.Flow.ExportFlow+client.Flow.InletFlow) {
- return errors.New("Traffic exceeded")
- }
- if !client.GetConn() {
- return errors.New("Connections exceed the current client limit")
- }
- return nil
- }
- //create a new connection and start bytes copying
- func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string, f func(), flow *file.Flow, localProxy bool) error {
- link := conn.NewLink(tp, addr, client.Cnf.Crypt, client.Cnf.Compress, c.Conn.RemoteAddr().String(), localProxy)
- if target, err := s.bridge.SendLinkInfo(client.Id, link, s.task); err != nil {
- logs.Warn("get connection from client id %d error %s", client.Id, err.Error())
- c.Close()
- return err
- } else {
- if f != nil {
- f()
- }
- conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, flow, true, rb)
- }
- return nil
- }
|