bridge.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/mux"
  11. "github.com/cnlh/nps/lib/version"
  12. "github.com/cnlh/nps/server/connection"
  13. "github.com/cnlh/nps/server/tool"
  14. "github.com/cnlh/nps/vender/github.com/astaxie/beego"
  15. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  16. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  17. "net"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. )
  23. type Client struct {
  24. tunnel *mux.Mux
  25. signal *conn.Conn
  26. file *mux.Mux
  27. retryTime int // it will be add 1 when ping not ok until to 3 will close the client
  28. sync.RWMutex
  29. }
  30. func NewClient(t, f *mux.Mux, s *conn.Conn) *Client {
  31. return &Client{
  32. signal: s,
  33. tunnel: t,
  34. file: f,
  35. }
  36. }
  37. type Bridge struct {
  38. TunnelPort int //通信隧道端口
  39. Client map[int]*Client
  40. tunnelType string //bridge type kcp or tcp
  41. OpenTask chan *file.Tunnel
  42. CloseTask chan *file.Tunnel
  43. CloseClient chan int
  44. SecretChan chan *conn.Secret
  45. clientLock sync.RWMutex
  46. Register map[string]time.Time
  47. registerLock sync.RWMutex
  48. ipVerify bool
  49. runList map[int]interface{}
  50. }
  51. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge {
  52. t := new(Bridge)
  53. t.TunnelPort = tunnelPort
  54. t.Client = make(map[int]*Client)
  55. t.tunnelType = tunnelType
  56. t.OpenTask = make(chan *file.Tunnel)
  57. t.CloseTask = make(chan *file.Tunnel)
  58. t.CloseClient = make(chan int)
  59. t.Register = make(map[string]time.Time)
  60. t.ipVerify = ipVerify
  61. t.runList = runList
  62. t.SecretChan = make(chan *conn.Secret)
  63. return t
  64. }
  65. func (s *Bridge) StartTunnel() error {
  66. go s.ping()
  67. l, err := connection.GetBridgeListener(s.tunnelType)
  68. if err != nil {
  69. return err
  70. }
  71. if s.tunnelType == "kcp" {
  72. listener, ok := l.(*kcp.Listener)
  73. if !ok {
  74. return err
  75. }
  76. go func() {
  77. for {
  78. c, err := listener.AcceptKCP()
  79. conn.SetUdpSession(c)
  80. if err != nil {
  81. logs.Warn(err)
  82. continue
  83. }
  84. go s.cliProcess(conn.NewConn(c))
  85. }
  86. }()
  87. } else {
  88. listener, ok := l.(net.Listener)
  89. if !ok {
  90. return err
  91. }
  92. go func() {
  93. for {
  94. c, err := listener.Accept()
  95. if err != nil {
  96. logs.Warn(err)
  97. continue
  98. }
  99. go s.cliProcess(conn.NewConn(c))
  100. }
  101. }()
  102. }
  103. return nil
  104. }
  105. //get health information form client
  106. func (s *Bridge) GetHealthFromClient(id int, c *conn.Conn) {
  107. for {
  108. if info, status, err := c.GetHealthInfo(); err != nil {
  109. logs.Error(err)
  110. break
  111. } else if !status { //the status is true , return target to the targetArr
  112. for _, v := range file.GetCsvDb().Tasks {
  113. if v.Client.Id == id && v.Mode == "tcp" && strings.Contains(v.Target, info) {
  114. v.Lock()
  115. if v.TargetArr == nil || (len(v.TargetArr) == 0 && len(v.HealthRemoveArr) == 0) {
  116. v.TargetArr = common.TrimArr(strings.Split(v.Target, "\n"))
  117. }
  118. v.TargetArr = common.RemoveArrVal(v.TargetArr, info)
  119. if v.HealthRemoveArr == nil {
  120. v.HealthRemoveArr = make([]string, 0)
  121. }
  122. v.HealthRemoveArr = append(v.HealthRemoveArr, info)
  123. v.Unlock()
  124. }
  125. }
  126. for _, v := range file.GetCsvDb().Hosts {
  127. if v.Client.Id == id && strings.Contains(v.Target, info) {
  128. v.Lock()
  129. if v.TargetArr == nil || (len(v.TargetArr) == 0 && len(v.HealthRemoveArr) == 0) {
  130. v.TargetArr = common.TrimArr(strings.Split(v.Target, "\n"))
  131. }
  132. v.TargetArr = common.RemoveArrVal(v.TargetArr, info)
  133. if v.HealthRemoveArr == nil {
  134. v.HealthRemoveArr = make([]string, 0)
  135. }
  136. v.HealthRemoveArr = append(v.HealthRemoveArr, info)
  137. v.Unlock()
  138. }
  139. }
  140. } else { //the status is false,remove target from the targetArr
  141. for _, v := range file.GetCsvDb().Tasks {
  142. if v.Client.Id == id && v.Mode == "tcp" && common.IsArrContains(v.HealthRemoveArr, info) && !common.IsArrContains(v.TargetArr, info) {
  143. v.Lock()
  144. v.TargetArr = append(v.TargetArr, info)
  145. v.HealthRemoveArr = common.RemoveArrVal(v.HealthRemoveArr, info)
  146. v.Unlock()
  147. }
  148. }
  149. for _, v := range file.GetCsvDb().Hosts {
  150. if v.Client.Id == id && common.IsArrContains(v.HealthRemoveArr, info) && !common.IsArrContains(v.TargetArr, info) {
  151. v.Lock()
  152. v.TargetArr = append(v.TargetArr, info)
  153. v.HealthRemoveArr = common.RemoveArrVal(v.HealthRemoveArr, info)
  154. v.Unlock()
  155. }
  156. }
  157. }
  158. }
  159. }
  160. //验证失败,返回错误验证flag,并且关闭连接
  161. func (s *Bridge) verifyError(c *conn.Conn) {
  162. c.Write([]byte(common.VERIFY_EER))
  163. c.Conn.Close()
  164. }
  165. func (s *Bridge) verifySuccess(c *conn.Conn) {
  166. c.Write([]byte(common.VERIFY_SUCCESS))
  167. }
  168. func (s *Bridge) cliProcess(c *conn.Conn) {
  169. //read test flag
  170. if _, err := c.GetShortContent(3); err != nil {
  171. logs.Info("The client %s connect error", c.Conn.RemoteAddr())
  172. return
  173. }
  174. //version check
  175. if b, err := c.GetShortContent(32); err != nil || string(b) != crypt.Md5(version.GetVersion()) {
  176. logs.Info("The client %s version does not match", c.Conn.RemoteAddr())
  177. c.Close()
  178. return
  179. }
  180. //write server version to client
  181. c.Write([]byte(crypt.Md5(version.GetVersion())))
  182. c.SetReadDeadline(5, s.tunnelType)
  183. var buf []byte
  184. var err error
  185. //get vKey from client
  186. if buf, err = c.GetShortContent(32); err != nil {
  187. c.Close()
  188. return
  189. }
  190. //verify
  191. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  192. if err != nil {
  193. logs.Info("Current client connection validation error, close this client:", c.Conn.RemoteAddr())
  194. s.verifyError(c)
  195. return
  196. } else {
  197. s.verifySuccess(c)
  198. }
  199. if flag, err := c.ReadFlag(); err == nil {
  200. s.typeDeal(flag, c, id)
  201. } else {
  202. logs.Warn(err, flag)
  203. }
  204. return
  205. }
  206. func (s *Bridge) DelClient(id int, isOther bool) {
  207. s.clientLock.Lock()
  208. defer s.clientLock.Unlock()
  209. if v, ok := s.Client[id]; ok {
  210. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  211. s.CloseClient <- c.Id
  212. }
  213. if v.signal != nil {
  214. v.signal.Close()
  215. }
  216. delete(s.Client, id)
  217. }
  218. }
  219. //use different
  220. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  221. switch typeVal {
  222. case common.WORK_MAIN:
  223. //the vKey connect by another ,close the client of before
  224. s.clientLock.Lock()
  225. if v, ok := s.Client[id]; ok {
  226. s.clientLock.Unlock()
  227. if v.signal != nil {
  228. v.signal.WriteClose()
  229. }
  230. v.Lock()
  231. v.signal = c
  232. v.Unlock()
  233. } else {
  234. s.Client[id] = NewClient(nil, nil, c)
  235. s.clientLock.Unlock()
  236. }
  237. go s.GetHealthFromClient(id, c)
  238. logs.Info("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  239. case common.WORK_CHAN:
  240. s.clientLock.Lock()
  241. if v, ok := s.Client[id]; ok {
  242. s.clientLock.Unlock()
  243. v.Lock()
  244. v.tunnel = mux.NewMux(c.Conn)
  245. v.Unlock()
  246. } else {
  247. s.Client[id] = NewClient(mux.NewMux(c.Conn), nil, nil)
  248. s.clientLock.Unlock()
  249. }
  250. case common.WORK_CONFIG:
  251. var isPub bool
  252. client, err := file.GetCsvDb().GetClient(id);
  253. if err == nil {
  254. if client.VerifyKey == beego.AppConfig.String("public_vkey") {
  255. isPub = true
  256. } else {
  257. isPub = false
  258. }
  259. }
  260. binary.Write(c, binary.LittleEndian, isPub)
  261. go s.getConfig(c, isPub, client)
  262. case common.WORK_REGISTER:
  263. go s.register(c)
  264. case common.WORK_SECRET:
  265. if b, err := c.GetShortContent(32); err == nil {
  266. s.SecretChan <- conn.NewSecret(string(b), c)
  267. }
  268. case common.WORK_FILE:
  269. s.clientLock.Lock()
  270. if v, ok := s.Client[id]; ok {
  271. s.clientLock.Unlock()
  272. v.Lock()
  273. v.file = mux.NewMux(c.Conn)
  274. v.Unlock()
  275. } else {
  276. s.Client[id] = NewClient(nil, mux.NewMux(c.Conn), nil)
  277. s.clientLock.Unlock()
  278. }
  279. case common.WORK_P2P:
  280. //read md5 secret
  281. if b, err := c.GetShortContent(32); err != nil {
  282. return
  283. } else if t := file.GetCsvDb().GetTaskByMd5Password(string(b)); t == nil {
  284. return
  285. } else {
  286. s.clientLock.Lock()
  287. if v, ok := s.Client[t.Client.Id]; !ok {
  288. s.clientLock.Unlock()
  289. return
  290. } else {
  291. s.clientLock.Unlock()
  292. //向密钥对应的客户端发送与服务端udp建立连接信息,地址,密钥
  293. v.signal.Write([]byte(common.NEW_UDP_CONN))
  294. svrAddr := beego.AppConfig.String("p2p_ip") + ":" + beego.AppConfig.String("p2p_port")
  295. if err != nil {
  296. logs.Warn("get local udp addr error")
  297. return
  298. }
  299. v.signal.WriteLenContent([]byte(svrAddr))
  300. v.signal.WriteLenContent(b)
  301. //向该请求者发送建立连接请求,服务器地址
  302. c.WriteLenContent([]byte(svrAddr))
  303. }
  304. }
  305. }
  306. c.SetAlive(s.tunnelType)
  307. return
  308. }
  309. //register ip
  310. func (s *Bridge) register(c *conn.Conn) {
  311. var hour int32
  312. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  313. s.registerLock.Lock()
  314. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Minute * time.Duration(hour))
  315. s.registerLock.Unlock()
  316. }
  317. }
  318. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string, t *file.Tunnel) (target net.Conn, err error) {
  319. s.clientLock.Lock()
  320. if v, ok := s.Client[clientId]; ok {
  321. s.clientLock.Unlock()
  322. //If ip is restricted to do ip verification
  323. if s.ipVerify {
  324. s.registerLock.Lock()
  325. ip := common.GetIpByAddr(linkAddr)
  326. if v, ok := s.Register[ip]; !ok {
  327. s.registerLock.Unlock()
  328. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  329. } else {
  330. s.registerLock.Unlock()
  331. if !v.After(time.Now()) {
  332. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  333. }
  334. }
  335. }
  336. var tunnel *mux.Mux
  337. if t != nil && t.Mode == "file" {
  338. tunnel = v.file
  339. } else {
  340. tunnel = v.tunnel
  341. }
  342. if tunnel == nil {
  343. err = errors.New("the client connect error")
  344. return
  345. }
  346. if target, err = tunnel.NewConn(); err != nil {
  347. return
  348. }
  349. if t != nil && t.Mode == "file" {
  350. return
  351. }
  352. if _, err = conn.NewConn(target).SendLinkInfo(link); err != nil {
  353. logs.Info("new connect error ,the target %s refuse to connect", link.Host)
  354. return
  355. }
  356. } else {
  357. s.clientLock.Unlock()
  358. err = errors.New(fmt.Sprintf("the client %d is not connect", clientId))
  359. }
  360. return
  361. }
  362. func (s *Bridge) ping() {
  363. ticker := time.NewTicker(time.Second * 5)
  364. for {
  365. select {
  366. case <-ticker.C:
  367. s.clientLock.Lock()
  368. arr := make([]int, 0)
  369. for k, v := range s.Client {
  370. if v.tunnel == nil || v.signal == nil {
  371. v.retryTime += 1
  372. if v.retryTime >= 3 {
  373. arr = append(arr, k)
  374. }
  375. continue
  376. }
  377. if v.tunnel.IsClose {
  378. arr = append(arr, k)
  379. }
  380. }
  381. s.clientLock.Unlock()
  382. for _, v := range arr {
  383. logs.Info("the client %d closed", v)
  384. s.DelClient(v, false)
  385. }
  386. }
  387. }
  388. }
  389. //get config and add task from client config
  390. func (s *Bridge) getConfig(c *conn.Conn, isPub bool, client *file.Client) {
  391. var fail bool
  392. loop:
  393. for {
  394. flag, err := c.ReadFlag()
  395. if err != nil {
  396. break
  397. }
  398. switch flag {
  399. case common.WORK_STATUS:
  400. if b, err := c.GetShortContent(32); err != nil {
  401. break loop
  402. } else {
  403. var str string
  404. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  405. if err != nil {
  406. break loop
  407. }
  408. file.GetCsvDb().Lock()
  409. for _, v := range file.GetCsvDb().Hosts {
  410. if v.Client.Id == id {
  411. str += v.Remark + common.CONN_DATA_SEQ
  412. }
  413. }
  414. for _, v := range file.GetCsvDb().Tasks {
  415. if _, ok := s.runList[v.Id]; ok && v.Client.Id == id {
  416. str += v.Remark + common.CONN_DATA_SEQ
  417. }
  418. }
  419. file.GetCsvDb().Unlock()
  420. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  421. binary.Write(c, binary.LittleEndian, []byte(str))
  422. }
  423. case common.NEW_CONF:
  424. var err error
  425. if client, err = c.GetConfigInfo(); err != nil {
  426. fail = true
  427. c.WriteAddFail()
  428. break loop
  429. } else {
  430. if err = file.GetCsvDb().NewClient(client); err != nil {
  431. fail = true
  432. c.WriteAddFail()
  433. break loop
  434. }
  435. c.WriteAddOk()
  436. c.Write([]byte(client.VerifyKey))
  437. s.clientLock.Lock()
  438. s.Client[client.Id] = NewClient(nil, nil, nil)
  439. s.clientLock.Unlock()
  440. }
  441. case common.NEW_HOST:
  442. h, err := c.GetHostInfo()
  443. if err != nil {
  444. fail = true
  445. c.WriteAddFail()
  446. break loop
  447. }
  448. h.Client = client
  449. if h.Location == "" {
  450. h.Location = "/"
  451. }
  452. if !client.HasHost(h) {
  453. if file.GetCsvDb().IsHostExist(h) {
  454. fail = true
  455. c.WriteAddFail()
  456. break loop
  457. } else {
  458. file.GetCsvDb().NewHost(h)
  459. c.WriteAddOk()
  460. }
  461. } else {
  462. c.WriteAddOk()
  463. }
  464. case common.NEW_TASK:
  465. if t, err := c.GetTaskInfo(); err != nil {
  466. fail = true
  467. c.WriteAddFail()
  468. break loop
  469. } else {
  470. ports := common.GetPorts(t.Ports)
  471. targets := common.GetPorts(t.Target)
  472. if len(ports) > 1 && (t.Mode == "tcp" || t.Mode == "udp") && (len(ports) != len(targets)) {
  473. fail = true
  474. c.WriteAddFail()
  475. break loop
  476. } else if t.Mode == "secret" {
  477. ports = append(ports, 0)
  478. }
  479. if len(ports) == 0 {
  480. fail = true
  481. c.WriteAddFail()
  482. break loop
  483. }
  484. for i := 0; i < len(ports); i++ {
  485. tl := new(file.Tunnel)
  486. tl.Mode = t.Mode
  487. tl.Port = ports[i]
  488. if len(ports) == 1 {
  489. tl.Target = t.Target
  490. tl.Remark = t.Remark
  491. } else {
  492. tl.Remark = t.Remark + "_" + strconv.Itoa(tl.Port)
  493. if t.TargetAddr != "" {
  494. tl.Target = t.TargetAddr + ":" + strconv.Itoa(targets[i])
  495. } else {
  496. tl.Target = strconv.Itoa(targets[i])
  497. }
  498. }
  499. tl.Id = file.GetCsvDb().GetTaskId()
  500. tl.Status = true
  501. tl.Flow = new(file.Flow)
  502. tl.NoStore = true
  503. tl.Client = client
  504. tl.Password = t.Password
  505. tl.LocalPath = t.LocalPath
  506. tl.StripPre = t.StripPre
  507. if !client.HasTunnel(tl) {
  508. if err := file.GetCsvDb().NewTask(tl); err != nil {
  509. logs.Notice("Add task error ", err.Error())
  510. fail = true
  511. c.WriteAddFail()
  512. break loop
  513. }
  514. if b := tool.TestServerPort(tl.Port, tl.Mode); !b && t.Mode != "secret" && t.Mode != "p2p" {
  515. fail = true
  516. c.WriteAddFail()
  517. break loop
  518. } else {
  519. s.OpenTask <- tl
  520. }
  521. }
  522. c.WriteAddOk()
  523. }
  524. }
  525. }
  526. }
  527. if fail && client != nil {
  528. s.DelClient(client.Id, false)
  529. }
  530. c.Close()
  531. }