pmux.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // This module is used for port reuse
  2. // Distinguish client, web manager , HTTP and HTTPS according to the difference of protocol
  3. package mux
  4. import (
  5. "bufio"
  6. "bytes"
  7. "io"
  8. "net"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "ehang.io/nps/lib/common"
  14. "github.com/astaxie/beego/logs"
  15. "github.com/pkg/errors"
  16. )
  17. const (
  18. HTTP_GET = 716984
  19. HTTP_POST = 807983
  20. HTTP_HEAD = 726965
  21. HTTP_PUT = 808585
  22. HTTP_DELETE = 686976
  23. HTTP_CONNECT = 677978
  24. HTTP_OPTIONS = 798084
  25. HTTP_TRACE = 848265
  26. CLIENT = 848384
  27. ACCEPT_TIME_OUT = 10
  28. )
  29. type PortMux struct {
  30. net.Listener
  31. port int
  32. isClose bool
  33. managerHost string
  34. clientConn chan *PortConn
  35. httpConn chan *PortConn
  36. httpsConn chan *PortConn
  37. managerConn chan *PortConn
  38. }
  39. func NewPortMux(port int, managerHost string) *PortMux {
  40. pMux := &PortMux{
  41. managerHost: managerHost,
  42. port: port,
  43. clientConn: make(chan *PortConn),
  44. httpConn: make(chan *PortConn),
  45. httpsConn: make(chan *PortConn),
  46. managerConn: make(chan *PortConn),
  47. }
  48. pMux.Start()
  49. return pMux
  50. }
  51. func (pMux *PortMux) Start() error {
  52. // Port multiplexing is based on TCP only
  53. tcpAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:"+strconv.Itoa(pMux.port))
  54. if err != nil {
  55. return err
  56. }
  57. pMux.Listener, err = net.ListenTCP("tcp", tcpAddr)
  58. if err != nil {
  59. logs.Error(err)
  60. os.Exit(0)
  61. }
  62. go func() {
  63. for {
  64. conn, err := pMux.Listener.Accept()
  65. if err != nil {
  66. logs.Warn(err)
  67. //close
  68. pMux.Close()
  69. }
  70. go pMux.process(conn)
  71. }
  72. }()
  73. return nil
  74. }
  75. func (pMux *PortMux) process(conn net.Conn) {
  76. // Recognition according to different signs
  77. // read 3 byte
  78. buf := make([]byte, 3)
  79. if n, err := io.ReadFull(conn, buf); err != nil || n != 3 {
  80. return
  81. }
  82. var ch chan *PortConn
  83. var rs []byte
  84. var buffer bytes.Buffer
  85. var readMore = false
  86. switch common.BytesToNum(buf) {
  87. case HTTP_CONNECT, HTTP_DELETE, HTTP_GET, HTTP_HEAD, HTTP_OPTIONS, HTTP_POST, HTTP_PUT, HTTP_TRACE: //http and manager
  88. buffer.Reset()
  89. r := bufio.NewReader(conn)
  90. buffer.Write(buf)
  91. for {
  92. b, _, err := r.ReadLine()
  93. if err != nil {
  94. logs.Warn("read line error", err.Error())
  95. conn.Close()
  96. break
  97. }
  98. buffer.Write(b)
  99. buffer.Write([]byte("\r\n"))
  100. if strings.Index(string(b), "Host:") == 0 || strings.Index(string(b), "host:") == 0 {
  101. // Remove host and space effects
  102. str := strings.Replace(string(b), "Host:", "", -1)
  103. str = strings.Replace(str, "host:", "", -1)
  104. str = strings.TrimSpace(str)
  105. // Determine whether it is the same as the manager domain name
  106. if common.GetIpByAddr(str) == pMux.managerHost {
  107. ch = pMux.managerConn
  108. } else {
  109. ch = pMux.httpConn
  110. }
  111. b, _ := r.Peek(r.Buffered())
  112. buffer.Write(b)
  113. rs = buffer.Bytes()
  114. break
  115. }
  116. }
  117. case CLIENT: // client connection
  118. ch = pMux.clientConn
  119. default: // https
  120. readMore = true
  121. ch = pMux.httpsConn
  122. }
  123. if len(rs) == 0 {
  124. rs = buf
  125. }
  126. timer := time.NewTimer(ACCEPT_TIME_OUT)
  127. select {
  128. case <-timer.C:
  129. case ch <- newPortConn(conn, rs, readMore):
  130. }
  131. }
  132. func (pMux *PortMux) Close() error {
  133. if pMux.isClose {
  134. return errors.New("the port mux has closed")
  135. }
  136. pMux.isClose = true
  137. close(pMux.clientConn)
  138. close(pMux.httpsConn)
  139. close(pMux.httpConn)
  140. close(pMux.managerConn)
  141. return pMux.Listener.Close()
  142. }
  143. func (pMux *PortMux) GetClientListener() net.Listener {
  144. return NewPortListener(pMux.clientConn, pMux.Listener.Addr())
  145. }
  146. func (pMux *PortMux) GetHttpListener() net.Listener {
  147. return NewPortListener(pMux.httpConn, pMux.Listener.Addr())
  148. }
  149. func (pMux *PortMux) GetHttpsListener() net.Listener {
  150. return NewPortListener(pMux.httpsConn, pMux.Listener.Addr())
  151. }
  152. func (pMux *PortMux) GetManagerListener() net.Listener {
  153. return NewPortListener(pMux.managerConn, pMux.Listener.Addr())
  154. }