bridge.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/pool"
  11. "github.com/cnlh/nps/lib/version"
  12. "github.com/cnlh/nps/server/tool"
  13. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  14. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  15. "net"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. type Client struct {
  21. tunnel *conn.Conn
  22. signal *conn.Conn
  23. msg *conn.Conn
  24. linkMap map[int]*conn.Link
  25. linkStatusMap map[int]bool
  26. stop chan bool
  27. sync.RWMutex
  28. }
  29. func NewClient(t *conn.Conn, s *conn.Conn, m *conn.Conn) *Client {
  30. return &Client{
  31. linkMap: make(map[int]*conn.Link),
  32. stop: make(chan bool),
  33. linkStatusMap: make(map[int]bool),
  34. signal: s,
  35. tunnel: t,
  36. msg: m,
  37. }
  38. }
  39. type Bridge struct {
  40. TunnelPort int //通信隧道端口
  41. tcpListener *net.TCPListener //server端监听
  42. kcpListener *kcp.Listener //server端监听
  43. Client map[int]*Client
  44. tunnelType string //bridge type kcp or tcp
  45. OpenTask chan *file.Tunnel
  46. CloseClient chan int
  47. SecretChan chan *conn.Secret
  48. clientLock sync.RWMutex
  49. Register map[string]time.Time
  50. registerLock sync.RWMutex
  51. ipVerify bool
  52. runList map[int]interface{}
  53. }
  54. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge {
  55. t := new(Bridge)
  56. t.TunnelPort = tunnelPort
  57. t.Client = make(map[int]*Client)
  58. t.tunnelType = tunnelType
  59. t.OpenTask = make(chan *file.Tunnel)
  60. t.CloseClient = make(chan int)
  61. t.Register = make(map[string]time.Time)
  62. t.ipVerify = ipVerify
  63. t.runList = runList
  64. t.SecretChan = make(chan *conn.Secret)
  65. return t
  66. }
  67. func (s *Bridge) StartTunnel() error {
  68. go s.linkCleanSession()
  69. var err error
  70. if s.tunnelType == "kcp" {
  71. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  72. if err != nil {
  73. return err
  74. }
  75. go func() {
  76. for {
  77. c, err := s.kcpListener.AcceptKCP()
  78. conn.SetUdpSession(c)
  79. if err != nil {
  80. logs.Warn(err)
  81. continue
  82. }
  83. go s.cliProcess(conn.NewConn(c))
  84. }
  85. }()
  86. } else {
  87. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  88. if err != nil {
  89. return err
  90. }
  91. go func() {
  92. for {
  93. c, err := s.tcpListener.Accept()
  94. if err != nil {
  95. logs.Warn(err)
  96. continue
  97. }
  98. go s.cliProcess(conn.NewConn(c))
  99. }
  100. }()
  101. }
  102. return nil
  103. }
  104. //验证失败,返回错误验证flag,并且关闭连接
  105. func (s *Bridge) verifyError(c *conn.Conn) {
  106. c.Write([]byte(common.VERIFY_EER))
  107. c.Conn.Close()
  108. }
  109. func (s *Bridge) verifySuccess(c *conn.Conn) {
  110. c.Write([]byte(common.VERIFY_SUCCESS))
  111. }
  112. func (s *Bridge) cliProcess(c *conn.Conn) {
  113. if b, err := c.ReadLen(32); err != nil || string(b) != crypt.Md5(version.GetVersion()) {
  114. logs.Info("The client %s version does not match", c.Conn.RemoteAddr())
  115. c.Close()
  116. return
  117. }
  118. c.Write([]byte(crypt.Md5(version.GetVersion())))
  119. c.SetReadDeadline(5, s.tunnelType)
  120. var buf []byte
  121. var err error
  122. if buf, err = c.ReadLen(32); err != nil {
  123. c.Close()
  124. return
  125. }
  126. //验证
  127. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  128. if err != nil {
  129. logs.Info("Current client connection validation error, close this client:", c.Conn.RemoteAddr())
  130. s.verifyError(c)
  131. return
  132. } else {
  133. s.verifySuccess(c)
  134. }
  135. //做一个判断 添加到对应的channel里面以供使用
  136. if flag, err := c.ReadFlag(); err == nil {
  137. s.typeDeal(flag, c, id)
  138. } else {
  139. logs.Warn(err, flag)
  140. }
  141. return
  142. }
  143. func (s *Bridge) closeClient(id int) {
  144. s.clientLock.Lock()
  145. defer s.clientLock.Unlock()
  146. if v, ok := s.Client[id]; ok {
  147. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  148. s.CloseClient <- c.Id
  149. }
  150. v.signal.WriteClose()
  151. delete(s.Client, id)
  152. }
  153. }
  154. func (s *Bridge) delClient(id int) {
  155. s.clientLock.Lock()
  156. defer s.clientLock.Unlock()
  157. if v, ok := s.Client[id]; ok {
  158. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  159. s.CloseClient <- c.Id
  160. }
  161. v.signal.Close()
  162. delete(s.Client, id)
  163. }
  164. }
  165. //tcp连接类型区分
  166. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  167. switch typeVal {
  168. case common.WORK_MAIN:
  169. //客户端已经存在,下线
  170. s.clientLock.Lock()
  171. if v, ok := s.Client[id]; ok {
  172. s.clientLock.Unlock()
  173. if v.signal != nil {
  174. v.signal.WriteClose()
  175. }
  176. v.Lock()
  177. v.signal = c
  178. v.Unlock()
  179. } else {
  180. s.Client[id] = NewClient(nil, c, nil)
  181. s.clientLock.Unlock()
  182. }
  183. logs.Info("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  184. go s.GetStatus(id)
  185. case common.WORK_CHAN:
  186. s.clientLock.Lock()
  187. if v, ok := s.Client[id]; ok {
  188. s.clientLock.Unlock()
  189. v.Lock()
  190. v.tunnel = c
  191. v.Unlock()
  192. } else {
  193. s.Client[id] = NewClient(c, nil, nil)
  194. s.clientLock.Unlock()
  195. }
  196. go s.clientCopy(id)
  197. case common.WORK_CONFIG:
  198. go s.GetConfig(c)
  199. case common.WORK_REGISTER:
  200. go s.register(c)
  201. case common.WORD_SECRET:
  202. if b, err := c.ReadLen(32); err == nil {
  203. s.SecretChan <- conn.NewSecret(string(b), c)
  204. }
  205. case common.WORK_SEND_STATUS:
  206. s.clientLock.Lock()
  207. if v, ok := s.Client[id]; ok {
  208. s.clientLock.Unlock()
  209. v.Lock()
  210. v.msg = c
  211. v.Unlock()
  212. } else {
  213. s.Client[id] = NewClient(nil, nil, c)
  214. s.clientLock.Unlock()
  215. }
  216. go s.getMsgStatus(id)
  217. }
  218. c.SetAlive(s.tunnelType)
  219. return
  220. }
  221. func (s *Bridge) getMsgStatus(clientId int) {
  222. s.clientLock.Lock()
  223. client := s.Client[clientId]
  224. s.clientLock.Unlock()
  225. if client == nil {
  226. return
  227. }
  228. for {
  229. if id, err := client.msg.GetLen(); err != nil {
  230. s.closeClient(clientId)
  231. return
  232. } else {
  233. client.Lock()
  234. if v, ok := client.linkMap[id]; ok {
  235. v.StatusCh <- true
  236. }
  237. client.Unlock()
  238. }
  239. }
  240. }
  241. func (s *Bridge) register(c *conn.Conn) {
  242. var hour int32
  243. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  244. s.registerLock.Lock()
  245. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
  246. s.registerLock.Unlock()
  247. }
  248. }
  249. //等待
  250. func (s *Bridge) waitStatus(clientId, id int) bool {
  251. ticker := time.NewTicker(time.Millisecond * 100)
  252. stop := time.After(time.Second * 10)
  253. for {
  254. select {
  255. case <-ticker.C:
  256. s.clientLock.Lock()
  257. if v, ok := s.Client[clientId]; ok {
  258. s.clientLock.Unlock()
  259. v.Lock()
  260. if vv, ok := v.linkStatusMap[id]; ok {
  261. ticker.Stop()
  262. v.Unlock()
  263. return vv
  264. }
  265. v.Unlock()
  266. } else {
  267. s.clientLock.Unlock()
  268. }
  269. case <-stop:
  270. return false
  271. }
  272. }
  273. }
  274. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (tunnel *conn.Conn, err error) {
  275. s.clientLock.Lock()
  276. if v, ok := s.Client[clientId]; ok {
  277. s.clientLock.Unlock()
  278. if s.ipVerify {
  279. s.registerLock.Lock()
  280. ip := common.GetIpByAddr(linkAddr)
  281. if v, ok := s.Register[ip]; !ok {
  282. s.registerLock.Unlock()
  283. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  284. } else {
  285. if !v.After(time.Now()) {
  286. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  287. }
  288. }
  289. s.registerLock.Unlock()
  290. }
  291. v.signal.SendLinkInfo(link)
  292. if err != nil {
  293. logs.Warn("send link information error:", err, link.Id)
  294. s.DelClient(clientId)
  295. return
  296. }
  297. if v.tunnel == nil {
  298. err = errors.New("get tunnel connection error")
  299. return
  300. } else {
  301. tunnel = v.tunnel
  302. }
  303. link.MsgConn = v.msg
  304. v.Lock()
  305. v.linkMap[link.Id] = link
  306. v.Unlock()
  307. if !s.waitStatus(clientId, link.Id) {
  308. err = errors.New(fmt.Sprintf("connect target %s fail", link.Host))
  309. return
  310. }
  311. } else {
  312. s.clientLock.Unlock()
  313. err = errors.New(fmt.Sprintf("the client %d is not connect", clientId))
  314. }
  315. return
  316. }
  317. //删除通信通道
  318. func (s *Bridge) DelClient(id int) {
  319. s.closeClient(id)
  320. }
  321. //get config
  322. func (s *Bridge) GetConfig(c *conn.Conn) {
  323. var client *file.Client
  324. var fail bool
  325. for {
  326. flag, err := c.ReadFlag()
  327. if err != nil {
  328. break
  329. }
  330. switch flag {
  331. case common.WORK_STATUS:
  332. if b, err := c.ReadLen(16); err != nil {
  333. break
  334. } else {
  335. var str string
  336. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  337. if err != nil {
  338. break
  339. }
  340. for _, v := range file.GetCsvDb().Hosts {
  341. if v.Client.Id == id {
  342. str += v.Remark + common.CONN_DATA_SEQ
  343. }
  344. }
  345. for _, v := range file.GetCsvDb().Tasks {
  346. if _, ok := s.runList[v.Id]; ok && v.Client.Id == id {
  347. str += v.Remark + common.CONN_DATA_SEQ
  348. }
  349. }
  350. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  351. binary.Write(c, binary.LittleEndian, []byte(str))
  352. }
  353. case common.NEW_CONF:
  354. var err error
  355. if client, err = c.GetConfigInfo(); err != nil {
  356. fail = true
  357. c.WriteAddFail()
  358. break
  359. } else {
  360. if err = file.GetCsvDb().NewClient(client); err != nil {
  361. fail = true
  362. c.WriteAddFail()
  363. break
  364. }
  365. c.WriteAddOk()
  366. c.Write([]byte(client.VerifyKey))
  367. }
  368. case common.NEW_HOST:
  369. if h, err := c.GetHostInfo(); err != nil {
  370. fail = true
  371. c.WriteAddFail()
  372. break
  373. } else if file.GetCsvDb().IsHostExist(h) {
  374. fail = true
  375. c.WriteAddFail()
  376. break
  377. } else {
  378. h.Client = client
  379. file.GetCsvDb().NewHost(h)
  380. c.WriteAddOk()
  381. }
  382. case common.NEW_TASK:
  383. if t, err := c.GetTaskInfo(); err != nil {
  384. fail = true
  385. c.WriteAddFail()
  386. break
  387. } else {
  388. ports := common.GetPorts(t.Ports)
  389. targets := common.GetPorts(t.Target)
  390. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  391. fail = true
  392. c.WriteAddFail()
  393. break
  394. } else if t.Mode == "secretServer" {
  395. ports = append(ports, 0)
  396. }
  397. if len(ports) == 0 {
  398. fail = true
  399. c.WriteAddFail()
  400. break
  401. }
  402. for i := 0; i < len(ports); i++ {
  403. tl := new(file.Tunnel)
  404. tl.Mode = t.Mode
  405. tl.Port = ports[i]
  406. if len(ports) == 1 {
  407. tl.Target = t.Target
  408. tl.Remark = t.Remark
  409. } else {
  410. tl.Remark = t.Remark + "_" + strconv.Itoa(tl.Port)
  411. tl.Target = t.TargetAddr + ":" + strconv.Itoa(targets[i])
  412. }
  413. tl.Id = file.GetCsvDb().GetTaskId()
  414. tl.Status = true
  415. tl.Flow = new(file.Flow)
  416. tl.NoStore = true
  417. tl.Client = client
  418. tl.Password = t.Password
  419. if err := file.GetCsvDb().NewTask(tl); err != nil {
  420. logs.Notice("Add task error ", err.Error())
  421. fail = true
  422. c.WriteAddFail()
  423. break
  424. }
  425. if b := tool.TestServerPort(tl.Port, tl.Mode); !b && t.Mode != "secretServer" {
  426. fail = true
  427. c.WriteAddFail()
  428. break
  429. } else {
  430. s.OpenTask <- tl
  431. }
  432. c.WriteAddOk()
  433. }
  434. }
  435. }
  436. }
  437. if fail && client != nil {
  438. s.CloseClient <- client.Id
  439. }
  440. c.Close()
  441. }
  442. func (s *Bridge) GetStatus(clientId int) {
  443. s.clientLock.Lock()
  444. client := s.Client[clientId]
  445. s.clientLock.Unlock()
  446. if client == nil {
  447. return
  448. }
  449. for {
  450. if id, status, err := client.signal.GetConnStatus(); err != nil {
  451. s.closeClient(clientId)
  452. return
  453. } else {
  454. client.Lock()
  455. client.linkStatusMap[id] = status
  456. client.Unlock()
  457. }
  458. }
  459. }
  460. func (s *Bridge) clientCopy(clientId int) {
  461. s.clientLock.Lock()
  462. client := s.Client[clientId]
  463. s.clientLock.Unlock()
  464. for {
  465. if id, err := client.tunnel.GetLen(); err != nil {
  466. logs.Info("read msg content length error close client")
  467. s.delClient(clientId)
  468. break
  469. } else {
  470. client.Lock()
  471. if link, ok := client.linkMap[id]; ok {
  472. client.Unlock()
  473. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  474. pool.PutBufPoolCopy(content)
  475. s.delClient(clientId)
  476. logs.Notice("read msg content error", err, "close client")
  477. break
  478. } else {
  479. link.MsgCh <- content
  480. }
  481. } else {
  482. client.Unlock()
  483. continue
  484. }
  485. }
  486. }
  487. }
  488. func (s *Bridge) linkCleanSession() {
  489. ticker := time.NewTicker(time.Minute * 5)
  490. for {
  491. select {
  492. case <-ticker.C:
  493. s.clientLock.Lock()
  494. for _, v := range s.Client {
  495. v.Lock()
  496. for _, vv := range v.linkMap {
  497. if vv.FinishUse {
  498. delete(v.linkMap, vv.Id)
  499. }
  500. }
  501. v.Unlock()
  502. }
  503. s.clientLock.RUnlock()
  504. }
  505. }
  506. }