123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- // This module is used for port reuse
- // Distinguish client, web manager , HTTP and HTTPS according to the difference of protocol
- package mux
- import (
- "bufio"
- "bytes"
- "io"
- "net"
- "os"
- "strconv"
- "strings"
- "time"
- "ehang.io/nps/lib/common"
- "github.com/astaxie/beego/logs"
- "github.com/pkg/errors"
- )
- const (
- HTTP_GET = 716984
- HTTP_POST = 807983
- HTTP_HEAD = 726965
- HTTP_PUT = 808585
- HTTP_DELETE = 686976
- HTTP_CONNECT = 677978
- HTTP_OPTIONS = 798084
- HTTP_TRACE = 848265
- CLIENT = 848384
- ACCEPT_TIME_OUT = 10
- )
- type PortMux struct {
- net.Listener
- port int
- isClose bool
- managerHost string
- clientConn chan *PortConn
- httpConn chan *PortConn
- httpsConn chan *PortConn
- managerConn chan *PortConn
- }
- func NewPortMux(port int, managerHost string) *PortMux {
- pMux := &PortMux{
- managerHost: managerHost,
- port: port,
- clientConn: make(chan *PortConn),
- httpConn: make(chan *PortConn),
- httpsConn: make(chan *PortConn),
- managerConn: make(chan *PortConn),
- }
- pMux.Start()
- return pMux
- }
- func (pMux *PortMux) Start() error {
- // Port multiplexing is based on TCP only
- tcpAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:"+strconv.Itoa(pMux.port))
- if err != nil {
- return err
- }
- pMux.Listener, err = net.ListenTCP("tcp", tcpAddr)
- if err != nil {
- logs.Error(err)
- os.Exit(0)
- }
- go func() {
- for {
- conn, err := pMux.Listener.Accept()
- if err != nil {
- logs.Warn(err)
- //close
- pMux.Close()
- }
- go pMux.process(conn)
- }
- }()
- return nil
- }
- func (pMux *PortMux) process(conn net.Conn) {
- // Recognition according to different signs
- // read 3 byte
- buf := make([]byte, 3)
- if n, err := io.ReadFull(conn, buf); err != nil || n != 3 {
- return
- }
- var ch chan *PortConn
- var rs []byte
- var buffer bytes.Buffer
- var readMore = false
- switch common.BytesToNum(buf) {
- case HTTP_CONNECT, HTTP_DELETE, HTTP_GET, HTTP_HEAD, HTTP_OPTIONS, HTTP_POST, HTTP_PUT, HTTP_TRACE: //http and manager
- buffer.Reset()
- r := bufio.NewReader(conn)
- buffer.Write(buf)
- for {
- b, _, err := r.ReadLine()
- if err != nil {
- logs.Warn("read line error", err.Error())
- conn.Close()
- break
- }
- buffer.Write(b)
- buffer.Write([]byte("\r\n"))
- if strings.Index(string(b), "Host:") == 0 || strings.Index(string(b), "host:") == 0 {
- // Remove host and space effects
- str := strings.Replace(string(b), "Host:", "", -1)
- str = strings.Replace(str, "host:", "", -1)
- str = strings.TrimSpace(str)
- // Determine whether it is the same as the manager domain name
- if common.GetIpByAddr(str) == pMux.managerHost {
- ch = pMux.managerConn
- } else {
- ch = pMux.httpConn
- }
- b, _ := r.Peek(r.Buffered())
- buffer.Write(b)
- rs = buffer.Bytes()
- break
- }
- }
- case CLIENT: // client connection
- ch = pMux.clientConn
- default: // https
- readMore = true
- ch = pMux.httpsConn
- }
- if len(rs) == 0 {
- rs = buf
- }
- timer := time.NewTimer(ACCEPT_TIME_OUT)
- select {
- case <-timer.C:
- case ch <- newPortConn(conn, rs, readMore):
- }
- }
- func (pMux *PortMux) Close() error {
- if pMux.isClose {
- return errors.New("the port mux has closed")
- }
- pMux.isClose = true
- close(pMux.clientConn)
- close(pMux.httpsConn)
- close(pMux.httpConn)
- close(pMux.managerConn)
- return pMux.Listener.Close()
- }
- func (pMux *PortMux) GetClientListener() net.Listener {
- return NewPortListener(pMux.clientConn, pMux.Listener.Addr())
- }
- func (pMux *PortMux) GetHttpListener() net.Listener {
- return NewPortListener(pMux.httpConn, pMux.Listener.Addr())
- }
- func (pMux *PortMux) GetHttpsListener() net.Listener {
- return NewPortListener(pMux.httpsConn, pMux.Listener.Addr())
- }
- func (pMux *PortMux) GetManagerListener() net.Listener {
- return NewPortListener(pMux.managerConn, pMux.Listener.Addr())
- }
|