bridge.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/pool"
  11. "github.com/cnlh/nps/lib/version"
  12. "github.com/cnlh/nps/server/tool"
  13. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  14. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  15. "net"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. type Client struct {
  21. tunnel *conn.Conn
  22. signal *conn.Conn
  23. msg *conn.Conn
  24. linkMap map[int]*conn.Link
  25. linkStatusMap map[int]bool
  26. stop chan bool
  27. sync.RWMutex
  28. }
  29. func NewClient(t *conn.Conn, s *conn.Conn, m *conn.Conn) *Client {
  30. return &Client{
  31. linkMap: make(map[int]*conn.Link),
  32. stop: make(chan bool),
  33. linkStatusMap: make(map[int]bool),
  34. signal: s,
  35. tunnel: t,
  36. msg: m,
  37. }
  38. }
  39. type Bridge struct {
  40. TunnelPort int //通信隧道端口
  41. tcpListener *net.TCPListener //server端监听
  42. kcpListener *kcp.Listener //server端监听
  43. Client map[int]*Client
  44. tunnelType string //bridge type kcp or tcp
  45. OpenTask chan *file.Tunnel
  46. CloseClient chan int
  47. SecretChan chan *conn.Secret
  48. clientLock sync.RWMutex
  49. Register map[string]time.Time
  50. registerLock sync.RWMutex
  51. ipVerify bool
  52. runList map[int]interface{}
  53. }
  54. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge {
  55. t := new(Bridge)
  56. t.TunnelPort = tunnelPort
  57. t.Client = make(map[int]*Client)
  58. t.tunnelType = tunnelType
  59. t.OpenTask = make(chan *file.Tunnel)
  60. t.CloseClient = make(chan int)
  61. t.Register = make(map[string]time.Time)
  62. t.ipVerify = ipVerify
  63. t.runList = runList
  64. t.SecretChan = make(chan *conn.Secret)
  65. return t
  66. }
  67. func (s *Bridge) StartTunnel() error {
  68. var err error
  69. if s.tunnelType == "kcp" {
  70. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  71. if err != nil {
  72. return err
  73. }
  74. go func() {
  75. for {
  76. c, err := s.kcpListener.AcceptKCP()
  77. conn.SetUdpSession(c)
  78. if err != nil {
  79. logs.Warn(err)
  80. continue
  81. }
  82. go s.cliProcess(conn.NewConn(c))
  83. }
  84. }()
  85. } else {
  86. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  87. if err != nil {
  88. return err
  89. }
  90. go func() {
  91. for {
  92. c, err := s.tcpListener.Accept()
  93. if err != nil {
  94. logs.Warn(err)
  95. continue
  96. }
  97. go s.cliProcess(conn.NewConn(c))
  98. }
  99. }()
  100. }
  101. return nil
  102. }
  103. //验证失败,返回错误验证flag,并且关闭连接
  104. func (s *Bridge) verifyError(c *conn.Conn) {
  105. c.Write([]byte(common.VERIFY_EER))
  106. c.Conn.Close()
  107. }
  108. func (s *Bridge) verifySuccess(c *conn.Conn) {
  109. c.Write([]byte(common.VERIFY_SUCCESS))
  110. }
  111. func (s *Bridge) cliProcess(c *conn.Conn) {
  112. c.Write([]byte(crypt.Md5(version.GetVersion())))
  113. if b, err := c.ReadFlag(); err != nil || string(b) != version.VERSION_OK {
  114. logs.Info("The client %s version does not match", c.Conn.RemoteAddr())
  115. c.Close()
  116. return
  117. }
  118. c.SetReadDeadline(5, s.tunnelType)
  119. var buf []byte
  120. var err error
  121. if buf, err = c.ReadLen(32); err != nil {
  122. c.Close()
  123. return
  124. }
  125. //验证
  126. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  127. if err != nil {
  128. logs.Info("Current client connection validation error, close this client:", c.Conn.RemoteAddr())
  129. s.verifyError(c)
  130. return
  131. } else {
  132. s.verifySuccess(c)
  133. }
  134. //做一个判断 添加到对应的channel里面以供使用
  135. if flag, err := c.ReadFlag(); err == nil {
  136. s.typeDeal(flag, c, id)
  137. } else {
  138. logs.Warn(err, flag)
  139. }
  140. return
  141. }
  142. func (s *Bridge) closeClient(id int) {
  143. s.clientLock.Lock()
  144. defer s.clientLock.Unlock()
  145. if v, ok := s.Client[id]; ok {
  146. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  147. s.CloseClient <- c.Id
  148. }
  149. v.signal.WriteClose()
  150. delete(s.Client, id)
  151. }
  152. }
  153. func (s *Bridge) delClient(id int) {
  154. s.clientLock.Lock()
  155. defer s.clientLock.Unlock()
  156. if v, ok := s.Client[id]; ok {
  157. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  158. s.CloseClient <- c.Id
  159. }
  160. v.signal.Close()
  161. delete(s.Client, id)
  162. }
  163. }
  164. //tcp连接类型区分
  165. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  166. switch typeVal {
  167. case common.WORK_MAIN:
  168. //客户端已经存在,下线
  169. s.clientLock.Lock()
  170. if v, ok := s.Client[id]; ok {
  171. s.clientLock.Unlock()
  172. if v.signal != nil {
  173. v.signal.WriteClose()
  174. }
  175. v.Lock()
  176. v.signal = c
  177. v.Unlock()
  178. } else {
  179. s.Client[id] = NewClient(nil, c, nil)
  180. s.clientLock.Unlock()
  181. }
  182. logs.Info("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  183. go s.GetStatus(id)
  184. case common.WORK_CHAN:
  185. s.clientLock.Lock()
  186. if v, ok := s.Client[id]; ok {
  187. s.clientLock.Unlock()
  188. v.Lock()
  189. v.tunnel = c
  190. v.Unlock()
  191. } else {
  192. s.Client[id] = NewClient(c, nil, nil)
  193. s.clientLock.Unlock()
  194. }
  195. go s.clientCopy(id)
  196. case common.WORK_CONFIG:
  197. go s.GetConfig(c)
  198. case common.WORK_REGISTER:
  199. go s.register(c)
  200. case common.WORD_SECRET:
  201. if b, err := c.ReadLen(32); err == nil {
  202. s.SecretChan <- conn.NewSecret(string(b), c)
  203. }
  204. case common.WORK_SEND_STATUS:
  205. s.clientLock.Lock()
  206. if v, ok := s.Client[id]; ok {
  207. s.clientLock.Unlock()
  208. v.Lock()
  209. v.msg = c
  210. v.Unlock()
  211. } else {
  212. s.Client[id] = NewClient(nil, nil, c)
  213. s.clientLock.Unlock()
  214. }
  215. go s.getMsgStatus(id)
  216. }
  217. c.SetAlive(s.tunnelType)
  218. return
  219. }
  220. func (s *Bridge) getMsgStatus(clientId int) {
  221. s.clientLock.Lock()
  222. client := s.Client[clientId]
  223. s.clientLock.Unlock()
  224. if client == nil {
  225. return
  226. }
  227. for {
  228. if id, err := client.msg.GetLen(); err != nil {
  229. s.closeClient(clientId)
  230. return
  231. } else {
  232. client.Lock()
  233. if v, ok := client.linkMap[id]; ok {
  234. v.StatusCh <- true
  235. }
  236. client.Unlock()
  237. }
  238. }
  239. }
  240. func (s *Bridge) register(c *conn.Conn) {
  241. var hour int32
  242. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  243. s.registerLock.Lock()
  244. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
  245. s.registerLock.Unlock()
  246. }
  247. }
  248. //等待
  249. func (s *Bridge) waitStatus(clientId, id int) bool {
  250. ticker := time.NewTicker(time.Millisecond * 100)
  251. stop := time.After(time.Second * 10)
  252. for {
  253. select {
  254. case <-ticker.C:
  255. s.clientLock.Lock()
  256. if v, ok := s.Client[clientId]; ok {
  257. s.clientLock.Unlock()
  258. v.Lock()
  259. if vv, ok := v.linkStatusMap[id]; ok {
  260. ticker.Stop()
  261. v.Unlock()
  262. return vv
  263. }
  264. v.Unlock()
  265. } else {
  266. s.clientLock.Unlock()
  267. }
  268. case <-stop:
  269. return false
  270. }
  271. }
  272. }
  273. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (tunnel *conn.Conn, err error) {
  274. s.clientLock.Lock()
  275. if v, ok := s.Client[clientId]; ok {
  276. s.clientLock.Unlock()
  277. if s.ipVerify {
  278. s.registerLock.Lock()
  279. ip := common.GetIpByAddr(linkAddr)
  280. if v, ok := s.Register[ip]; !ok {
  281. s.registerLock.Unlock()
  282. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  283. } else {
  284. if !v.After(time.Now()) {
  285. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  286. }
  287. }
  288. s.registerLock.Unlock()
  289. }
  290. v.signal.SendLinkInfo(link)
  291. if err != nil {
  292. logs.Warn("send link information error:", err, link.Id)
  293. s.DelClient(clientId)
  294. return
  295. }
  296. if v.tunnel == nil {
  297. err = errors.New("get tunnel connection error")
  298. return
  299. } else {
  300. tunnel = v.tunnel
  301. }
  302. link.MsgConn = v.msg
  303. v.Lock()
  304. v.linkMap[link.Id] = link
  305. v.Unlock()
  306. if !s.waitStatus(clientId, link.Id) {
  307. err = errors.New(fmt.Sprintf("connect target %s fail", link.Host))
  308. return
  309. }
  310. } else {
  311. s.clientLock.Unlock()
  312. err = errors.New(fmt.Sprintf("the client %d is not connect", clientId))
  313. }
  314. return
  315. }
  316. //删除通信通道
  317. func (s *Bridge) DelClient(id int) {
  318. s.closeClient(id)
  319. }
  320. //get config
  321. func (s *Bridge) GetConfig(c *conn.Conn) {
  322. var client *file.Client
  323. var fail bool
  324. for {
  325. flag, err := c.ReadFlag()
  326. if err != nil {
  327. break
  328. }
  329. switch flag {
  330. case common.WORK_STATUS:
  331. if b, err := c.ReadLen(16); err != nil {
  332. break
  333. } else {
  334. var str string
  335. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  336. if err != nil {
  337. break
  338. }
  339. for _, v := range file.GetCsvDb().Hosts {
  340. if v.Client.Id == id {
  341. str += v.Remark + common.CONN_DATA_SEQ
  342. }
  343. }
  344. for _, v := range file.GetCsvDb().Tasks {
  345. if _, ok := s.runList[v.Id]; ok && v.Client.Id == id {
  346. str += v.Remark + common.CONN_DATA_SEQ
  347. }
  348. }
  349. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  350. binary.Write(c, binary.LittleEndian, []byte(str))
  351. }
  352. case common.NEW_CONF:
  353. var err error
  354. if client, err = c.GetConfigInfo(); err != nil {
  355. c.Write([]byte(client.VerifyKey))
  356. fail = true
  357. c.WriteAddFail()
  358. break
  359. } else {
  360. c.Write([]byte(client.VerifyKey))
  361. file.GetCsvDb().NewClient(client)
  362. c.WriteAddOk()
  363. }
  364. case common.NEW_HOST:
  365. if h, err := c.GetHostInfo(); err != nil {
  366. fail = true
  367. c.WriteAddFail()
  368. break
  369. } else if file.GetCsvDb().IsHostExist(h) {
  370. fail = true
  371. c.WriteAddFail()
  372. break
  373. } else {
  374. h.Client = client
  375. file.GetCsvDb().NewHost(h)
  376. c.WriteAddOk()
  377. }
  378. case common.NEW_TASK:
  379. if t, err := c.GetTaskInfo(); err != nil {
  380. fail = true
  381. c.WriteAddFail()
  382. break
  383. } else {
  384. ports := common.GetPorts(t.Ports)
  385. targets := common.GetPorts(t.Target)
  386. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  387. fail = true
  388. c.WriteAddFail()
  389. break
  390. } else if t.Mode == "secretServer" {
  391. ports = append(ports, 0)
  392. }
  393. if len(ports) == 0 {
  394. fail = true
  395. c.WriteAddFail()
  396. break
  397. }
  398. for i := 0; i < len(ports); i++ {
  399. tl := new(file.Tunnel)
  400. tl.Mode = t.Mode
  401. tl.Port = ports[i]
  402. if len(ports) == 1 {
  403. tl.Target = t.Target
  404. tl.Remark = t.Remark
  405. } else {
  406. tl.Remark = t.Remark + "_" + strconv.Itoa(tl.Port)
  407. tl.Target = t.TargetAddr + ":" + strconv.Itoa(targets[i])
  408. }
  409. tl.Id = file.GetCsvDb().GetTaskId()
  410. tl.Status = true
  411. tl.Flow = new(file.Flow)
  412. tl.NoStore = true
  413. tl.Client = client
  414. tl.Password = t.Password
  415. if err := file.GetCsvDb().NewTask(tl); err != nil {
  416. logs.Notice("Add task error ", err.Error())
  417. fail = true
  418. c.WriteAddFail()
  419. break
  420. }
  421. if b := tool.TestServerPort(tl.Port, tl.Mode); !b && t.Mode != "secretServer" {
  422. fail = true
  423. c.WriteAddFail()
  424. break
  425. } else {
  426. s.OpenTask <- tl
  427. }
  428. c.WriteAddOk()
  429. }
  430. }
  431. }
  432. }
  433. if fail && client != nil {
  434. s.CloseClient <- client.Id
  435. }
  436. c.Close()
  437. }
  438. func (s *Bridge) GetStatus(clientId int) {
  439. s.clientLock.Lock()
  440. client := s.Client[clientId]
  441. s.clientLock.Unlock()
  442. if client == nil {
  443. return
  444. }
  445. for {
  446. if id, status, err := client.signal.GetConnStatus(); err != nil {
  447. s.closeClient(clientId)
  448. return
  449. } else {
  450. client.Lock()
  451. client.linkStatusMap[id] = status
  452. client.Unlock()
  453. }
  454. }
  455. }
  456. func (s *Bridge) clientCopy(clientId int) {
  457. s.clientLock.Lock()
  458. client := s.Client[clientId]
  459. s.clientLock.Unlock()
  460. for {
  461. if id, err := client.tunnel.GetLen(); err != nil {
  462. logs.Info("read msg content length error close client")
  463. s.delClient(clientId)
  464. break
  465. } else {
  466. client.Lock()
  467. if link, ok := client.linkMap[id]; ok {
  468. client.Unlock()
  469. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  470. pool.PutBufPoolCopy(content)
  471. s.delClient(clientId)
  472. logs.Notice("read msg content error", err, "close client")
  473. break
  474. } else {
  475. link.MsgCh <- content
  476. }
  477. } else {
  478. client.Unlock()
  479. continue
  480. }
  481. }
  482. }
  483. }