http.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package proxy
  2. import (
  3. "bufio"
  4. "crypto/tls"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/http/httputil"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "ehang.io/nps/bridge"
  15. "ehang.io/nps/lib/cache"
  16. "ehang.io/nps/lib/common"
  17. "ehang.io/nps/lib/conn"
  18. "ehang.io/nps/lib/file"
  19. "ehang.io/nps/server/connection"
  20. "github.com/astaxie/beego/logs"
  21. )
  22. type httpServer struct {
  23. BaseServer
  24. httpPort int
  25. httpsPort int
  26. httpServer *http.Server
  27. httpsServer *http.Server
  28. httpsListener net.Listener
  29. useCache bool
  30. addOrigin bool
  31. cache *cache.Cache
  32. cacheLen int
  33. }
  34. func NewHttp(bridge *bridge.Bridge, c *file.Tunnel, httpPort, httpsPort int, useCache bool, cacheLen int, addOrigin bool) *httpServer {
  35. httpServer := &httpServer{
  36. BaseServer: BaseServer{
  37. task: c,
  38. bridge: bridge,
  39. Mutex: sync.Mutex{},
  40. },
  41. httpPort: httpPort,
  42. httpsPort: httpsPort,
  43. useCache: useCache,
  44. cacheLen: cacheLen,
  45. addOrigin: addOrigin,
  46. }
  47. if useCache {
  48. httpServer.cache = cache.New(cacheLen)
  49. }
  50. return httpServer
  51. }
  52. func (s *httpServer) Start() error {
  53. var err error
  54. if s.errorContent, err = common.ReadAllFromFile(filepath.Join(common.GetRunPath(), "web", "static", "page", "error.html")); err != nil {
  55. s.errorContent = []byte("nps 404")
  56. }
  57. if s.httpPort > 0 {
  58. s.httpServer = s.NewServer(s.httpPort, "http")
  59. go func() {
  60. l, err := connection.GetHttpListener()
  61. if err != nil {
  62. logs.Error(err)
  63. os.Exit(0)
  64. }
  65. err = s.httpServer.Serve(l)
  66. if err != nil {
  67. logs.Error(err)
  68. os.Exit(0)
  69. }
  70. }()
  71. }
  72. if s.httpsPort > 0 {
  73. s.httpsServer = s.NewServer(s.httpsPort, "https")
  74. go func() {
  75. s.httpsListener, err = connection.GetHttpsListener()
  76. if err != nil {
  77. logs.Error(err)
  78. os.Exit(0)
  79. }
  80. logs.Error(NewHttpsServer(s.httpsListener, s.bridge, s.useCache, s.cacheLen).Start())
  81. }()
  82. }
  83. return nil
  84. }
  85. func (s *httpServer) Close() error {
  86. if s.httpsListener != nil {
  87. s.httpsListener.Close()
  88. }
  89. if s.httpsServer != nil {
  90. s.httpsServer.Close()
  91. }
  92. if s.httpServer != nil {
  93. s.httpServer.Close()
  94. }
  95. return nil
  96. }
  97. func (s *httpServer) handleTunneling(w http.ResponseWriter, r *http.Request) {
  98. hijacker, ok := w.(http.Hijacker)
  99. if !ok {
  100. http.Error(w, "Hijacking not supported", http.StatusInternalServerError)
  101. return
  102. }
  103. c, _, err := hijacker.Hijack()
  104. if err != nil {
  105. http.Error(w, err.Error(), http.StatusServiceUnavailable)
  106. }
  107. s.handleHttp(conn.NewConn(c), r)
  108. }
  109. func (s *httpServer) handleHttp(c *conn.Conn, r *http.Request) {
  110. var (
  111. host *file.Host
  112. target net.Conn
  113. err error
  114. connClient io.ReadWriteCloser
  115. scheme = r.URL.Scheme
  116. lk *conn.Link
  117. targetAddr string
  118. lenConn *conn.LenConn
  119. isReset bool
  120. wg sync.WaitGroup
  121. )
  122. defer func() {
  123. if connClient != nil {
  124. connClient.Close()
  125. } else {
  126. s.writeConnFail(c.Conn)
  127. }
  128. c.Close()
  129. }()
  130. reset:
  131. if isReset {
  132. host.Client.AddConn()
  133. }
  134. if host, err = file.GetDb().GetInfoByHost(r.Host, r); err != nil {
  135. logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI)
  136. return
  137. }
  138. if err := s.CheckFlowAndConnNum(host.Client); err != nil {
  139. logs.Warn("client id %d, host id %d, error %s, when https connection", host.Client.Id, host.Id, err.Error())
  140. return
  141. }
  142. if !isReset {
  143. defer host.Client.AddConn()
  144. }
  145. if err = s.auth(r, c, host.Client.Cnf.U, host.Client.Cnf.P); err != nil {
  146. logs.Warn("auth error", err, r.RemoteAddr)
  147. return
  148. }
  149. if targetAddr, err = host.Target.GetRandomTarget(); err != nil {
  150. logs.Warn(err.Error())
  151. return
  152. }
  153. lk = conn.NewLink("http", targetAddr, host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr, host.Target.LocalProxy)
  154. if target, err = s.bridge.SendLinkInfo(host.Client.Id, lk, nil); err != nil {
  155. logs.Notice("connect to target %s error %s", lk.Host, err)
  156. return
  157. }
  158. connClient = conn.GetConn(target, lk.Crypt, lk.Compress, host.Client.Rate, true)
  159. //read from inc-client
  160. go func() {
  161. wg.Add(1)
  162. isReset = false
  163. defer connClient.Close()
  164. defer func() {
  165. wg.Done()
  166. if !isReset {
  167. c.Close()
  168. }
  169. }()
  170. for {
  171. if resp, err := http.ReadResponse(bufio.NewReader(connClient), r); err != nil || resp == nil || r == nil {
  172. // if there got broken pipe, http.ReadResponse will get a nil
  173. return
  174. } else {
  175. //if the cache is start and the response is in the extension,store the response to the cache list
  176. if s.useCache && r.URL != nil && strings.Contains(r.URL.Path, ".") {
  177. b, err := httputil.DumpResponse(resp, true)
  178. if err != nil {
  179. return
  180. }
  181. c.Write(b)
  182. host.Flow.Add(0, int64(len(b)))
  183. s.cache.Add(filepath.Join(host.Host, r.URL.Path), b)
  184. } else {
  185. lenConn := conn.NewLenConn(c)
  186. if err := resp.Write(lenConn); err != nil {
  187. logs.Error(err)
  188. return
  189. }
  190. host.Flow.Add(0, int64(lenConn.Len))
  191. }
  192. }
  193. }
  194. }()
  195. for {
  196. //if the cache start and the request is in the cache list, return the cache
  197. if s.useCache {
  198. if v, ok := s.cache.Get(filepath.Join(host.Host, r.URL.Path)); ok {
  199. n, err := c.Write(v.([]byte))
  200. if err != nil {
  201. break
  202. }
  203. logs.Trace("%s request, method %s, host %s, url %s, remote address %s, return cache", r.URL.Scheme, r.Method, r.Host, r.URL.Path, c.RemoteAddr().String())
  204. host.Flow.Add(0, int64(n))
  205. //if return cache and does not create a new conn with client and Connection is not set or close, close the connection.
  206. if strings.ToLower(r.Header.Get("Connection")) == "close" || strings.ToLower(r.Header.Get("Connection")) == "" {
  207. break
  208. }
  209. goto readReq
  210. }
  211. }
  212. //change the host and header and set proxy setting
  213. common.ChangeHostAndHeader(r, host.HostChange, host.HeaderChange, c.Conn.RemoteAddr().String(), s.addOrigin)
  214. logs.Trace("%s request, method %s, host %s, url %s, remote address %s, target %s", r.URL.Scheme, r.Method, r.Host, r.URL.Path, c.RemoteAddr().String(), lk.Host)
  215. //write
  216. lenConn = conn.NewLenConn(connClient)
  217. if err := r.Write(lenConn); err != nil {
  218. logs.Error(err)
  219. break
  220. }
  221. host.Flow.Add(int64(lenConn.Len), 0)
  222. readReq:
  223. //read req from connection
  224. if r, err = http.ReadRequest(bufio.NewReader(c)); err != nil {
  225. break
  226. }
  227. r.URL.Scheme = scheme
  228. //What happened ,Why one character less???
  229. r.Method = resetReqMethod(r.Method)
  230. if hostTmp, err := file.GetDb().GetInfoByHost(r.Host, r); err != nil {
  231. logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI)
  232. break
  233. } else if host != hostTmp {
  234. host = hostTmp
  235. isReset = true
  236. connClient.Close()
  237. goto reset
  238. }
  239. }
  240. wg.Wait()
  241. }
  242. func resetReqMethod(method string) string {
  243. if method == "ET" {
  244. return "GET"
  245. }
  246. if method == "OST" {
  247. return "POST"
  248. }
  249. return method
  250. }
  251. func (s *httpServer) NewServer(port int, scheme string) *http.Server {
  252. return &http.Server{
  253. Addr: ":" + strconv.Itoa(port),
  254. Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  255. r.URL.Scheme = scheme
  256. s.handleTunneling(w, r)
  257. }),
  258. // Disable HTTP/2.
  259. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  260. }
  261. }