bridge.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/pool"
  11. "github.com/cnlh/nps/lib/version"
  12. "github.com/cnlh/nps/server/tool"
  13. "github.com/cnlh/nps/vender/github.com/astaxie/beego"
  14. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  15. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  16. "net"
  17. "strconv"
  18. "sync"
  19. "time"
  20. )
  21. type Client struct {
  22. tunnel *conn.Conn
  23. signal *conn.Conn
  24. msg *conn.Conn
  25. linkMap map[int]*conn.Link
  26. linkStatusMap map[int]bool
  27. stop chan bool
  28. sync.RWMutex
  29. }
  30. func NewClient(t *conn.Conn, s *conn.Conn, m *conn.Conn) *Client {
  31. return &Client{
  32. linkMap: make(map[int]*conn.Link),
  33. stop: make(chan bool),
  34. linkStatusMap: make(map[int]bool),
  35. signal: s,
  36. tunnel: t,
  37. msg: m,
  38. }
  39. }
  40. type Bridge struct {
  41. TunnelPort int //通信隧道端口
  42. tcpListener *net.TCPListener //server端监听
  43. kcpListener *kcp.Listener //server端监听
  44. Client map[int]*Client
  45. tunnelType string //bridge type kcp or tcp
  46. OpenTask chan *file.Tunnel
  47. CloseClient chan int
  48. SecretChan chan *conn.Secret
  49. clientLock sync.RWMutex
  50. Register map[string]time.Time
  51. registerLock sync.RWMutex
  52. ipVerify bool
  53. runList map[int]interface{}
  54. }
  55. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge {
  56. t := new(Bridge)
  57. t.TunnelPort = tunnelPort
  58. t.Client = make(map[int]*Client)
  59. t.tunnelType = tunnelType
  60. t.OpenTask = make(chan *file.Tunnel)
  61. t.CloseClient = make(chan int)
  62. t.Register = make(map[string]time.Time)
  63. t.ipVerify = ipVerify
  64. t.runList = runList
  65. t.SecretChan = make(chan *conn.Secret)
  66. return t
  67. }
  68. func (s *Bridge) StartTunnel() error {
  69. var err error
  70. if s.tunnelType == "kcp" {
  71. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  72. if err != nil {
  73. return err
  74. }
  75. go func() {
  76. for {
  77. c, err := s.kcpListener.AcceptKCP()
  78. conn.SetUdpSession(c)
  79. if err != nil {
  80. logs.Warn(err)
  81. continue
  82. }
  83. go s.cliProcess(conn.NewConn(c))
  84. }
  85. }()
  86. } else {
  87. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  88. if err != nil {
  89. return err
  90. }
  91. go func() {
  92. for {
  93. c, err := s.tcpListener.Accept()
  94. if err != nil {
  95. logs.Warn(err)
  96. continue
  97. }
  98. go s.cliProcess(conn.NewConn(c))
  99. }
  100. }()
  101. }
  102. return nil
  103. }
  104. //验证失败,返回错误验证flag,并且关闭连接
  105. func (s *Bridge) verifyError(c *conn.Conn) {
  106. c.Write([]byte(common.VERIFY_EER))
  107. c.Conn.Close()
  108. }
  109. func (s *Bridge) verifySuccess(c *conn.Conn) {
  110. c.Write([]byte(common.VERIFY_SUCCESS))
  111. }
  112. func (s *Bridge) cliProcess(c *conn.Conn) {
  113. if b, err := c.ReadLen(32); err != nil || string(b) != crypt.Md5(version.GetVersion()) {
  114. logs.Info("The client %s version does not match", c.Conn.RemoteAddr())
  115. c.Close()
  116. return
  117. }
  118. c.Write([]byte(crypt.Md5(version.GetVersion())))
  119. c.SetReadDeadline(5, s.tunnelType)
  120. var buf []byte
  121. var err error
  122. if buf, err = c.ReadLen(32); err != nil {
  123. c.Close()
  124. return
  125. }
  126. //验证
  127. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  128. if err != nil {
  129. logs.Info("Current client connection validation error, close this client:", c.Conn.RemoteAddr())
  130. s.verifyError(c)
  131. return
  132. } else {
  133. s.verifySuccess(c)
  134. }
  135. //做一个判断 添加到对应的channel里面以供使用
  136. if flag, err := c.ReadFlag(); err == nil {
  137. s.typeDeal(flag, c, id)
  138. } else {
  139. logs.Warn(err, flag)
  140. }
  141. return
  142. }
  143. func (s *Bridge) closeClient(id int) {
  144. s.clientLock.Lock()
  145. defer s.clientLock.Unlock()
  146. if v, ok := s.Client[id]; ok {
  147. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  148. s.CloseClient <- c.Id
  149. }
  150. v.signal.WriteClose()
  151. delete(s.Client, id)
  152. }
  153. }
  154. func (s *Bridge) delClient(id int) {
  155. s.clientLock.Lock()
  156. defer s.clientLock.Unlock()
  157. if v, ok := s.Client[id]; ok {
  158. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  159. s.CloseClient <- c.Id
  160. }
  161. v.signal.Close()
  162. delete(s.Client, id)
  163. }
  164. }
  165. //tcp连接类型区分
  166. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  167. switch typeVal {
  168. case common.WORK_MAIN:
  169. //客户端已经存在,下线
  170. s.clientLock.Lock()
  171. if v, ok := s.Client[id]; ok {
  172. s.clientLock.Unlock()
  173. if v.signal != nil {
  174. v.signal.WriteClose()
  175. }
  176. v.Lock()
  177. v.signal = c
  178. v.Unlock()
  179. } else {
  180. s.Client[id] = NewClient(nil, c, nil)
  181. s.clientLock.Unlock()
  182. }
  183. logs.Info("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  184. go s.GetStatus(id)
  185. case common.WORK_CHAN:
  186. s.clientLock.Lock()
  187. if v, ok := s.Client[id]; ok {
  188. s.clientLock.Unlock()
  189. v.Lock()
  190. v.tunnel = c
  191. v.Unlock()
  192. } else {
  193. s.Client[id] = NewClient(c, nil, nil)
  194. s.clientLock.Unlock()
  195. }
  196. go s.clientCopy(id)
  197. case common.WORK_CONFIG:
  198. go s.GetConfig(c)
  199. case common.WORK_REGISTER:
  200. go s.register(c)
  201. case common.WORK_SECRET:
  202. if b, err := c.ReadLen(32); err == nil {
  203. s.SecretChan <- conn.NewSecret(string(b), c)
  204. }
  205. case common.WORK_P2P:
  206. //读取md5密钥
  207. if b, err := c.ReadLen(32); err != nil {
  208. return
  209. } else if t := file.GetCsvDb().GetTaskByMd5Password(string(b)); t == nil {
  210. return
  211. } else {
  212. s.clientLock.Lock()
  213. if v, ok := s.Client[t.Client.Id]; !ok {
  214. logs.Error("未获取到对应客户端")
  215. s.clientLock.Unlock()
  216. return
  217. } else {
  218. logs.Warn("获取到对应客户端")
  219. s.clientLock.Unlock()
  220. //向密钥对应的客户端发送与服务端udp建立连接信息,地址,密钥
  221. logs.Warn(v.signal.Write([]byte(common.NEW_UDP_CONN)))
  222. svrAddr := beego.AppConfig.String("serverIp") + ":" + beego.AppConfig.String("p2pPort")
  223. logs.Warn(svrAddr)
  224. logs.Warn(v.signal.WriteLenContent([]byte(svrAddr)))
  225. logs.Warn(string(b), v.signal.WriteLenContent(b))
  226. //向该请求者发送建立连接请求,服务器地址
  227. c.WriteLenContent([]byte(svrAddr))
  228. }
  229. }
  230. case common.WORK_SEND_STATUS:
  231. s.clientLock.Lock()
  232. if v, ok := s.Client[id]; ok {
  233. s.clientLock.Unlock()
  234. v.Lock()
  235. v.msg = c
  236. v.Unlock()
  237. } else {
  238. s.Client[id] = NewClient(nil, nil, c)
  239. s.clientLock.Unlock()
  240. }
  241. go s.getMsgStatus(id)
  242. }
  243. c.SetAlive(s.tunnelType)
  244. return
  245. }
  246. func (s *Bridge) getMsgStatus(clientId int) {
  247. s.clientLock.Lock()
  248. client := s.Client[clientId]
  249. s.clientLock.Unlock()
  250. if client == nil {
  251. return
  252. }
  253. for {
  254. if id, err := client.msg.GetLen(); err != nil {
  255. s.closeClient(clientId)
  256. return
  257. } else {
  258. client.Lock()
  259. if v, ok := client.linkMap[id]; ok {
  260. v.StatusCh <- true
  261. }
  262. client.Unlock()
  263. }
  264. }
  265. }
  266. func (s *Bridge) register(c *conn.Conn) {
  267. var hour int32
  268. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  269. s.registerLock.Lock()
  270. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
  271. s.registerLock.Unlock()
  272. }
  273. }
  274. //等待
  275. func (s *Bridge) waitStatus(clientId, id int) bool {
  276. ticker := time.NewTicker(time.Millisecond * 100)
  277. stop := time.After(time.Second * 10)
  278. for {
  279. select {
  280. case <-ticker.C:
  281. s.clientLock.Lock()
  282. if v, ok := s.Client[clientId]; ok {
  283. s.clientLock.Unlock()
  284. v.Lock()
  285. if vv, ok := v.linkStatusMap[id]; ok {
  286. ticker.Stop()
  287. v.Unlock()
  288. return vv
  289. }
  290. v.Unlock()
  291. } else {
  292. s.clientLock.Unlock()
  293. }
  294. case <-stop:
  295. return false
  296. }
  297. }
  298. }
  299. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (tunnel *conn.Conn, err error) {
  300. s.clientLock.Lock()
  301. if v, ok := s.Client[clientId]; ok {
  302. s.clientLock.Unlock()
  303. if s.ipVerify {
  304. s.registerLock.Lock()
  305. ip := common.GetIpByAddr(linkAddr)
  306. if v, ok := s.Register[ip]; !ok {
  307. s.registerLock.Unlock()
  308. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  309. } else {
  310. if !v.After(time.Now()) {
  311. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  312. }
  313. }
  314. s.registerLock.Unlock()
  315. }
  316. v.signal.SendLinkInfo(link)
  317. if err != nil {
  318. logs.Warn("send link information error:", err, link.Id)
  319. s.DelClient(clientId)
  320. return
  321. }
  322. if v.tunnel == nil {
  323. err = errors.New("get tunnel connection error")
  324. return
  325. } else {
  326. tunnel = v.tunnel
  327. }
  328. link.MsgConn = v.msg
  329. v.Lock()
  330. v.linkMap[link.Id] = link
  331. v.Unlock()
  332. if !s.waitStatus(clientId, link.Id) {
  333. err = errors.New(fmt.Sprintf("connect target %s fail", link.Host))
  334. return
  335. }
  336. } else {
  337. s.clientLock.Unlock()
  338. err = errors.New(fmt.Sprintf("the client %d is not connect", clientId))
  339. }
  340. return
  341. }
  342. //删除通信通道
  343. func (s *Bridge) DelClient(id int) {
  344. s.closeClient(id)
  345. }
  346. //get config
  347. func (s *Bridge) GetConfig(c *conn.Conn) {
  348. var client *file.Client
  349. var fail bool
  350. for {
  351. flag, err := c.ReadFlag()
  352. if err != nil {
  353. break
  354. }
  355. switch flag {
  356. case common.WORK_STATUS:
  357. if b, err := c.ReadLen(16); err != nil {
  358. break
  359. } else {
  360. var str string
  361. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  362. if err != nil {
  363. break
  364. }
  365. for _, v := range file.GetCsvDb().Hosts {
  366. if v.Client.Id == id {
  367. str += v.Remark + common.CONN_DATA_SEQ
  368. }
  369. }
  370. for _, v := range file.GetCsvDb().Tasks {
  371. if _, ok := s.runList[v.Id]; ok && v.Client.Id == id {
  372. str += v.Remark + common.CONN_DATA_SEQ
  373. }
  374. }
  375. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  376. binary.Write(c, binary.LittleEndian, []byte(str))
  377. }
  378. case common.NEW_CONF:
  379. var err error
  380. if client, err = c.GetConfigInfo(); err != nil {
  381. fail = true
  382. c.WriteAddFail()
  383. break
  384. } else {
  385. if err = file.GetCsvDb().NewClient(client); err != nil {
  386. fail = true
  387. c.WriteAddFail()
  388. break
  389. }
  390. c.WriteAddOk()
  391. c.Write([]byte(client.VerifyKey))
  392. }
  393. case common.NEW_HOST:
  394. if h, err := c.GetHostInfo(); err != nil {
  395. fail = true
  396. c.WriteAddFail()
  397. break
  398. } else if file.GetCsvDb().IsHostExist(h) {
  399. fail = true
  400. c.WriteAddFail()
  401. break
  402. } else {
  403. h.Client = client
  404. file.GetCsvDb().NewHost(h)
  405. c.WriteAddOk()
  406. }
  407. case common.NEW_TASK:
  408. if t, err := c.GetTaskInfo(); err != nil {
  409. fail = true
  410. c.WriteAddFail()
  411. break
  412. } else {
  413. ports := common.GetPorts(t.Ports)
  414. targets := common.GetPorts(t.Target)
  415. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  416. fail = true
  417. c.WriteAddFail()
  418. break
  419. } else if t.Mode == "secretServer" {
  420. ports = append(ports, 0)
  421. }
  422. if len(ports) == 0 {
  423. fail = true
  424. c.WriteAddFail()
  425. break
  426. }
  427. for i := 0; i < len(ports); i++ {
  428. tl := new(file.Tunnel)
  429. tl.Mode = t.Mode
  430. tl.Port = ports[i]
  431. if len(ports) == 1 {
  432. tl.Target = t.Target
  433. tl.Remark = t.Remark
  434. } else {
  435. tl.Remark = t.Remark + "_" + strconv.Itoa(tl.Port)
  436. tl.Target = t.TargetAddr + ":" + strconv.Itoa(targets[i])
  437. }
  438. tl.Id = file.GetCsvDb().GetTaskId()
  439. tl.Status = true
  440. tl.Flow = new(file.Flow)
  441. tl.NoStore = true
  442. tl.Client = client
  443. tl.Password = t.Password
  444. if err := file.GetCsvDb().NewTask(tl); err != nil {
  445. logs.Notice("Add task error ", err.Error())
  446. fail = true
  447. c.WriteAddFail()
  448. break
  449. }
  450. if b := tool.TestServerPort(tl.Port, tl.Mode); !b && t.Mode != "secretServer" {
  451. fail = true
  452. c.WriteAddFail()
  453. break
  454. } else {
  455. s.OpenTask <- tl
  456. }
  457. c.WriteAddOk()
  458. }
  459. }
  460. }
  461. }
  462. if fail && client != nil {
  463. s.CloseClient <- client.Id
  464. }
  465. c.Close()
  466. }
  467. func (s *Bridge) GetStatus(clientId int) {
  468. s.clientLock.Lock()
  469. client := s.Client[clientId]
  470. s.clientLock.Unlock()
  471. if client == nil {
  472. return
  473. }
  474. for {
  475. if id, status, err := client.signal.GetConnStatus(); err != nil {
  476. s.closeClient(clientId)
  477. return
  478. } else {
  479. client.Lock()
  480. client.linkStatusMap[id] = status
  481. client.Unlock()
  482. }
  483. }
  484. }
  485. func (s *Bridge) clientCopy(clientId int) {
  486. s.clientLock.Lock()
  487. client := s.Client[clientId]
  488. s.clientLock.Unlock()
  489. for {
  490. if id, err := client.tunnel.GetLen(); err != nil {
  491. logs.Info("read msg content length error close client")
  492. s.delClient(clientId)
  493. break
  494. } else {
  495. client.Lock()
  496. if link, ok := client.linkMap[id]; ok {
  497. client.Unlock()
  498. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  499. pool.PutBufPoolCopy(content)
  500. s.delClient(clientId)
  501. logs.Notice("read msg content error", err, "close client")
  502. break
  503. } else {
  504. link.MsgCh <- content
  505. }
  506. } else {
  507. client.Unlock()
  508. continue
  509. }
  510. }
  511. }
  512. }
  513. //TODO 清除有一个未知bug待处理
  514. func (s *Bridge) linkCleanSession() {
  515. ticker := time.NewTicker(time.Minute * 5)
  516. for {
  517. select {
  518. case <-ticker.C:
  519. s.clientLock.Lock()
  520. for _, v := range s.Client {
  521. v.Lock()
  522. for _, vv := range v.linkMap {
  523. if vv.FinishUse {
  524. delete(v.linkMap, vv.Id)
  525. }
  526. }
  527. v.Unlock()
  528. }
  529. s.clientLock.Unlock()
  530. }
  531. }
  532. }