http.go 7.0 KB


  1. package proxy
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/http/httputil"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "ehang.io/nps/bridge"
  15. "ehang.io/nps/lib/cache"
  16. "ehang.io/nps/lib/common"
  17. "ehang.io/nps/lib/conn"
  18. "ehang.io/nps/lib/file"
  19. "ehang.io/nps/server/connection"
  20. "github.com/astaxie/beego/logs"
  21. )
  22. type httpServer struct {
  23. BaseServer
  24. httpPort int
  25. httpsPort int
  26. httpServer *http.Server
  27. httpsServer *http.Server
  28. httpsListener net.Listener
  29. useCache bool
  30. addOrigin bool
  31. cache *cache.Cache
  32. cacheLen int
  33. }
  34. func NewHttp(bridge *bridge.Bridge, c *file.Tunnel, httpPort, httpsPort int, useCache bool, cacheLen int, addOrigin bool) *httpServer {
  35. httpServer := &httpServer{
  36. BaseServer: BaseServer{
  37. task: c,
  38. bridge: bridge,
  39. Mutex: sync.Mutex{},
  40. },
  41. httpPort: httpPort,
  42. httpsPort: httpsPort,
  43. useCache: useCache,
  44. cacheLen: cacheLen,
  45. addOrigin: addOrigin,
  46. }
  47. if useCache {
  48. httpServer.cache = cache.New(cacheLen)
  49. }
  50. return httpServer
  51. }
  52. func (s *httpServer) Start() error {
  53. var err error
  54. if s.errorContent, err = common.ReadAllFromFile(filepath.Join(common.GetRunPath(), "web", "static", "page", "error.html")); err != nil {
  55. s.errorContent = []byte("nps 404")
  56. }
  57. if s.httpPort > 0 {
  58. s.httpServer = s.NewServer(s.httpPort, "http")
  59. go func() {
  60. l, err := connection.GetHttpListener()
  61. if err != nil {
  62. logs.Error(err)
  63. os.Exit(0)
  64. }
  65. err = s.httpServer.Serve(l)
  66. if err != nil {
  67. logs.Error(err)
  68. os.Exit(0)
  69. }
  70. }()
  71. }
  72. if s.httpsPort > 0 {
  73. s.httpsServer = s.NewServer(s.httpsPort, "https")
  74. go func() {
  75. s.httpsListener, err = connection.GetHttpsListener()
  76. if err != nil {
  77. logs.Error(err)
  78. os.Exit(0)
  79. }
  80. logs.Error(NewHttpsServer(s.httpsListener, s.bridge, s.useCache, s.cacheLen).Start())
  81. }()
  82. }
  83. return nil
  84. }
  85. func (s *httpServer) Close() error {
  86. if s.httpsListener != nil {
  87. s.httpsListener.Close()
  88. }
  89. if s.httpsServer != nil {
  90. s.httpsServer.Close()
  91. }
  92. if s.httpServer != nil {
  93. s.httpServer.Close()
  94. }
  95. return nil
  96. }
  97. func (s *httpServer) NewServer(port int, scheme string) *http.Server {
  98. rProxy := NewHttpReverseProxy(s)
  99. return &http.Server{
  100. Addr: ":" + strconv.Itoa(port),
  101. Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  102. r.URL.Scheme = scheme
  103. rProxy.ServeHTTP(w, r)
  104. }),
  105. // Disable HTTP/2.
  106. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  107. }
  108. }
  109. type HttpReverseProxy struct {
  110. proxy *ReverseProxy
  111. responseHeaderTimeout time.Duration
  112. }
  113. func (rp *HttpReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
  114. var (
  115. host *file.Host
  116. targetAddr string
  117. err error
  118. )
  119. if host, err = file.GetDb().GetInfoByHost(req.Host, req); err != nil {
  120. rw.WriteHeader(http.StatusNotFound)
  121. rw.Write([]byte(req.Host + " not found"))
  122. return
  123. }
  124. if host.Client.Cnf.U != "" && host.Client.Cnf.P != "" && !common.CheckAuth(req, host.Client.Cnf.U, host.Client.Cnf.P) {
  125. rw.WriteHeader(http.StatusUnauthorized)
  126. rw.Write([]byte("Unauthorized"))
  127. return
  128. }
  129. if targetAddr, err = host.Target.GetRandomTarget(); err != nil {
  130. rw.WriteHeader(http.StatusBadGateway)
  131. rw.Write([]byte("502 Bad Gateway"))
  132. return
  133. }
  134. req = req.WithContext(context.WithValue(req.Context(), "host", host))
  135. req = req.WithContext(context.WithValue(req.Context(), "target", targetAddr))
  136. req = req.WithContext(context.WithValue(req.Context(), "req", req))
  137. rp.proxy.ServeHTTP(rw, req)
  138. }
  139. func NewHttpReverseProxy(s *httpServer) *HttpReverseProxy {
  140. rp := &HttpReverseProxy{
  141. responseHeaderTimeout: 30 * time.Second,
  142. }
  143. local, _ := net.ResolveTCPAddr("tcp", "127.0.0.1")
  144. proxy := NewReverseProxy(&httputil.ReverseProxy{
  145. Director: func(r *http.Request) {
  146. r.URL.Host = r.Host
  147. if host, err := file.GetDb().GetInfoByHost(r.Host, r); err != nil {
  148. logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI)
  149. return
  150. } else {
  151. common.ChangeHostAndHeader(r, host.HostChange, host.HeaderChange, "", false)
  152. }
  153. },
  154. Transport: &http.Transport{
  155. ResponseHeaderTimeout: rp.responseHeaderTimeout,
  156. DisableKeepAlives: true,
  157. DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
  158. var (
  159. host *file.Host
  160. target net.Conn
  161. err error
  162. connClient io.ReadWriteCloser
  163. targetAddr string
  164. lk *conn.Link
  165. )
  166. r := ctx.Value("req").(*http.Request)
  167. host = ctx.Value("host").(*file.Host)
  168. targetAddr = ctx.Value("target").(string)
  169. lk = conn.NewLink("http", targetAddr, host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr, host.Target.LocalProxy)
  170. if target, err = s.bridge.SendLinkInfo(host.Client.Id, lk, nil); err != nil {
  171. logs.Notice("connect to target %s error %s", lk.Host, err)
  172. return nil, NewHTTPError(http.StatusBadGateway, "Cannot connect to the server")
  173. }
  174. connClient = conn.GetConn(target, lk.Crypt, lk.Compress, host.Client.Rate, true)
  175. return &flowConn{
  176. ReadWriteCloser: connClient,
  177. fakeAddr: local,
  178. host: host,
  179. }, nil
  180. },
  181. },
  182. ErrorHandler: func(rw http.ResponseWriter, req *http.Request, err error) {
  183. logs.Warn("do http proxy request error: %v", err)
  184. rw.WriteHeader(http.StatusNotFound)
  185. },
  186. })
  187. proxy.WebSocketDialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
  188. var (
  189. host *file.Host
  190. target net.Conn
  191. err error
  192. connClient io.ReadWriteCloser
  193. targetAddr string
  194. lk *conn.Link
  195. )
  196. r := ctx.Value("req").(*http.Request)
  197. host = ctx.Value("host").(*file.Host)
  198. targetAddr = ctx.Value("target").(string)
  199. lk = conn.NewLink("tcp", targetAddr, host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr, host.Target.LocalProxy)
  200. if target, err = s.bridge.SendLinkInfo(host.Client.Id, lk, nil); err != nil {
  201. logs.Notice("connect to target %s error %s", lk.Host, err)
  202. return nil, NewHTTPError(http.StatusBadGateway, "Cannot connect to the target")
  203. }
  204. connClient = conn.GetConn(target, lk.Crypt, lk.Compress, host.Client.Rate, true)
  205. return &flowConn{
  206. ReadWriteCloser: connClient,
  207. fakeAddr: local,
  208. host: host,
  209. }, nil
  210. }
  211. rp.proxy = proxy
  212. return rp
  213. }
  214. type flowConn struct {
  215. io.ReadWriteCloser
  216. fakeAddr net.Addr
  217. host *file.Host
  218. flowIn int64
  219. flowOut int64
  220. once sync.Once
  221. }
  222. func (c *flowConn) Read(p []byte) (n int, err error) {
  223. n, err = c.ReadWriteCloser.Read(p)
  224. c.flowIn += int64(n)
  225. return n, err
  226. }
  227. func (c *flowConn) Write(p []byte) (n int, err error) {
  228. n, err = c.ReadWriteCloser.Write(p)
  229. c.flowOut += int64(n)
  230. return n, err
  231. }
  232. func (c *flowConn) Close() error {
  233. c.once.Do(func() { c.host.Flow.Add(c.flowIn, c.flowOut) })
  234. return c.ReadWriteCloser.Close()
  235. }
  236. func (c *flowConn) LocalAddr() net.Addr { return c.fakeAddr }
  237. func (c *flowConn) RemoteAddr() net.Addr { return c.fakeAddr }
  238. func (*flowConn) SetDeadline(t time.Time) error { return nil }
  239. func (*flowConn) SetReadDeadline(t time.Time) error { return nil }
  240. func (*flowConn) SetWriteDeadline(t time.Time) error { return nil }