bridge.go 11 KB


  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/lg"
  11. "github.com/cnlh/nps/lib/pool"
  12. "github.com/cnlh/nps/server/tool"
  13. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  14. "log"
  15. "net"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. type Client struct {
  21. tunnel *conn.Conn
  22. signal *conn.Conn
  23. msg *conn.Conn
  24. linkMap map[int]*conn.Link
  25. linkStatusMap map[int]bool
  26. stop chan bool
  27. sync.RWMutex
  28. }
  29. func NewClient(t *conn.Conn, s *conn.Conn, m *conn.Conn) *Client {
  30. return &Client{
  31. linkMap: make(map[int]*conn.Link),
  32. stop: make(chan bool),
  33. linkStatusMap: make(map[int]bool),
  34. signal: s,
  35. tunnel: t,
  36. msg: m,
  37. }
  38. }
  39. type Bridge struct {
  40. TunnelPort int //通信隧道端口
  41. tcpListener *net.TCPListener //server端监听
  42. kcpListener *kcp.Listener //server端监听
  43. Client map[int]*Client
  44. tunnelType string //bridge type kcp or tcp
  45. OpenTask chan *file.Tunnel
  46. CloseClient chan int
  47. clientLock sync.RWMutex
  48. Register map[string]time.Time
  49. registerLock sync.RWMutex
  50. ipVerify bool
  51. runList map[int]interface{}
  52. }
  53. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge {
  54. t := new(Bridge)
  55. t.TunnelPort = tunnelPort
  56. t.Client = make(map[int]*Client)
  57. t.tunnelType = tunnelType
  58. t.OpenTask = make(chan *file.Tunnel)
  59. t.CloseClient = make(chan int)
  60. t.Register = make(map[string]time.Time)
  61. t.ipVerify = ipVerify
  62. t.runList = runList
  63. return t
  64. }
  65. func (s *Bridge) StartTunnel() error {
  66. var err error
  67. if s.tunnelType == "kcp" {
  68. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  69. if err != nil {
  70. return err
  71. }
  72. go func() {
  73. for {
  74. c, err := s.kcpListener.AcceptKCP()
  75. conn.SetUdpSession(c)
  76. if err != nil {
  77. lg.Println(err)
  78. continue
  79. }
  80. go s.cliProcess(conn.NewConn(c))
  81. }
  82. }()
  83. } else {
  84. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  85. if err != nil {
  86. return err
  87. }
  88. go func() {
  89. for {
  90. c, err := s.tcpListener.Accept()
  91. if err != nil {
  92. lg.Println(err)
  93. continue
  94. }
  95. go s.cliProcess(conn.NewConn(c))
  96. }
  97. }()
  98. }
  99. return nil
  100. }
  101. //验证失败,返回错误验证flag,并且关闭连接
  102. func (s *Bridge) verifyError(c *conn.Conn) {
  103. c.Write([]byte(common.VERIFY_EER))
  104. c.Conn.Close()
  105. }
  106. func (s *Bridge) verifySuccess(c *conn.Conn) {
  107. c.Write([]byte(common.VERIFY_SUCCESS))
  108. }
  109. func (s *Bridge) cliProcess(c *conn.Conn) {
  110. c.SetReadDeadline(5, s.tunnelType)
  111. var buf []byte
  112. var err error
  113. if buf, err = c.ReadLen(32); err != nil {
  114. c.Close()
  115. return
  116. }
  117. //验证
  118. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  119. if err != nil {
  120. lg.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
  121. s.verifyError(c)
  122. return
  123. } else {
  124. s.verifySuccess(c)
  125. }
  126. //做一个判断 添加到对应的channel里面以供使用
  127. if flag, err := c.ReadFlag(); err == nil {
  128. s.typeDeal(flag, c, id)
  129. } else {
  130. log.Println(err, flag)
  131. }
  132. return
  133. }
  134. func (s *Bridge) closeClient(id int) {
  135. s.clientLock.Lock()
  136. defer s.clientLock.Unlock()
  137. if v, ok := s.Client[id]; ok {
  138. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  139. s.CloseClient <- c.Id
  140. }
  141. v.signal.WriteClose()
  142. delete(s.Client, id)
  143. }
  144. }
  145. func (s *Bridge) delClient(id int) {
  146. s.clientLock.Lock()
  147. defer s.clientLock.Unlock()
  148. if v, ok := s.Client[id]; ok {
  149. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  150. s.CloseClient <- c.Id
  151. }
  152. v.signal.Close()
  153. delete(s.Client, id)
  154. }
  155. }
  156. //tcp连接类型区分
  157. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  158. switch typeVal {
  159. case common.WORK_MAIN:
  160. //客户端已经存在,下线
  161. s.clientLock.Lock()
  162. if v, ok := s.Client[id]; ok {
  163. s.clientLock.Unlock()
  164. if v.signal != nil {
  165. v.signal.WriteClose()
  166. }
  167. v.Lock()
  168. v.signal = c
  169. v.Unlock()
  170. } else {
  171. s.Client[id] = NewClient(nil, c, nil)
  172. s.clientLock.Unlock()
  173. }
  174. lg.Printf("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  175. go s.GetStatus(id)
  176. case common.WORK_CHAN:
  177. s.clientLock.Lock()
  178. if v, ok := s.Client[id]; ok {
  179. s.clientLock.Unlock()
  180. v.Lock()
  181. v.tunnel = c
  182. v.Unlock()
  183. } else {
  184. s.Client[id] = NewClient(c, nil, nil)
  185. s.clientLock.Unlock()
  186. }
  187. go s.clientCopy(id)
  188. case common.WORK_CONFIG:
  189. go s.GetConfig(c)
  190. case common.WORK_REGISTER:
  191. go s.register(c)
  192. case common.WORK_SEND_STATUS:
  193. s.clientLock.Lock()
  194. if v, ok := s.Client[id]; ok {
  195. s.clientLock.Unlock()
  196. v.Lock()
  197. v.msg = c
  198. v.Unlock()
  199. } else {
  200. s.Client[id] = NewClient(nil, nil, c)
  201. s.clientLock.Unlock()
  202. }
  203. go s.getMsgStatus(id)
  204. }
  205. c.SetAlive(s.tunnelType)
  206. return
  207. }
  208. func (s *Bridge) getMsgStatus(clientId int) {
  209. s.clientLock.Lock()
  210. client := s.Client[clientId]
  211. s.clientLock.Unlock()
  212. if client == nil {
  213. return
  214. }
  215. for {
  216. if id, err := client.msg.GetLen(); err != nil {
  217. s.closeClient(clientId)
  218. return
  219. } else {
  220. client.Lock()
  221. if v, ok := client.linkMap[id]; ok {
  222. v.StatusCh <- true
  223. }
  224. client.Unlock()
  225. }
  226. }
  227. }
  228. func (s *Bridge) register(c *conn.Conn) {
  229. var hour int32
  230. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  231. s.registerLock.Lock()
  232. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
  233. s.registerLock.Unlock()
  234. }
  235. }
  236. //等待
  237. func (s *Bridge) waitStatus(clientId, id int) bool {
  238. ticker := time.NewTicker(time.Millisecond * 100)
  239. stop := time.After(time.Second * 10)
  240. for {
  241. select {
  242. case <-ticker.C:
  243. s.clientLock.Lock()
  244. if v, ok := s.Client[clientId]; ok {
  245. s.clientLock.Unlock()
  246. v.Lock()
  247. if vv, ok := v.linkStatusMap[id]; ok {
  248. ticker.Stop()
  249. v.Unlock()
  250. return vv
  251. }
  252. v.Unlock()
  253. } else {
  254. s.clientLock.Unlock()
  255. }
  256. case <-stop:
  257. return false
  258. }
  259. }
  260. }
  261. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (tunnel *conn.Conn, err error) {
  262. s.clientLock.Lock()
  263. if v, ok := s.Client[clientId]; ok {
  264. s.clientLock.Unlock()
  265. if s.ipVerify {
  266. s.registerLock.Lock()
  267. ip := common.GetIpByAddr(linkAddr)
  268. if v, ok := s.Register[ip]; !ok {
  269. s.registerLock.Unlock()
  270. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  271. } else {
  272. if !v.After(time.Now()) {
  273. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  274. }
  275. }
  276. s.registerLock.Unlock()
  277. }
  278. v.signal.SendLinkInfo(link)
  279. if err != nil {
  280. lg.Println("send link information error:", err, link.Id)
  281. s.DelClient(clientId)
  282. return
  283. }
  284. if v.tunnel == nil {
  285. err = errors.New("get tunnel connection error")
  286. return
  287. } else {
  288. tunnel = v.tunnel
  289. }
  290. link.MsgConn = v.msg
  291. v.Lock()
  292. v.linkMap[link.Id] = link
  293. v.Unlock()
  294. if !s.waitStatus(clientId, link.Id) {
  295. err = errors.New("connect fail")
  296. return
  297. }
  298. } else {
  299. s.clientLock.Unlock()
  300. err = errors.New("the connection is not connect")
  301. }
  302. return
  303. }
  304. //删除通信通道
  305. func (s *Bridge) DelClient(id int) {
  306. s.closeClient(id)
  307. }
  308. //get config
  309. func (s *Bridge) GetConfig(c *conn.Conn) {
  310. var client *file.Client
  311. var fail bool
  312. for {
  313. flag, err := c.ReadFlag()
  314. if err != nil {
  315. break
  316. }
  317. switch flag {
  318. case common.WORK_STATUS:
  319. if b, err := c.ReadLen(16); err != nil {
  320. break
  321. } else {
  322. var str string
  323. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  324. if err != nil {
  325. break
  326. }
  327. for _, v := range file.GetCsvDb().Hosts {
  328. if v.Client.Id == id {
  329. str += v.Remark + common.CONN_DATA_SEQ
  330. }
  331. }
  332. for _, v := range file.GetCsvDb().Tasks {
  333. if _, ok := s.runList[v.Id]; ok && v.Client.Id == id {
  334. str += v.Remark + common.CONN_DATA_SEQ
  335. }
  336. }
  337. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  338. binary.Write(c, binary.LittleEndian, []byte(str))
  339. }
  340. case common.NEW_CONF:
  341. //new client ,Set the client not to store to the file
  342. client = file.NewClient(crypt.GetRandomString(16), true, false)
  343. client.Remark = "public veky"
  344. //Send the key to the client
  345. file.GetCsvDb().NewClient(client)
  346. c.Write([]byte(client.VerifyKey))
  347. if config, err := c.GetConfigInfo(); err != nil {
  348. fail = true
  349. c.WriteAddFail()
  350. break
  351. } else {
  352. client.Cnf = config
  353. c.WriteAddOk()
  354. }
  355. case common.NEW_HOST:
  356. if h, err := c.GetHostInfo(); err != nil {
  357. fail = true
  358. c.WriteAddFail()
  359. break
  360. } else if file.GetCsvDb().IsHostExist(h) {
  361. fail = true
  362. c.WriteAddFail()
  363. } else {
  364. h.Client = client
  365. file.GetCsvDb().NewHost(h)
  366. c.WriteAddOk()
  367. }
  368. case common.NEW_TASK:
  369. if t, err := c.GetTaskInfo(); err != nil {
  370. fail = true
  371. c.WriteAddFail()
  372. break
  373. } else {
  374. ports := common.GetPorts(t.Ports)
  375. targets := common.GetPorts(t.Target)
  376. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  377. fail = true
  378. c.WriteAddFail()
  379. break
  380. }
  381. for i := 0; i < len(ports); i++ {
  382. tl := new(file.Tunnel)
  383. tl.Mode = t.Mode
  384. tl.Port = ports[i]
  385. if len(ports) == 1 {
  386. tl.Target = t.Target
  387. tl.Remark = t.Remark
  388. } else {
  389. tl.Remark = t.Remark + "_" + strconv.Itoa(tl.Port)
  390. tl.Target = strconv.Itoa(targets[i])
  391. }
  392. tl.Id = file.GetCsvDb().GetTaskId()
  393. tl.Status = true
  394. tl.Flow = new(file.Flow)
  395. tl.NoStore = true
  396. tl.Client = client
  397. file.GetCsvDb().NewTask(tl)
  398. if b := tool.TestServerPort(tl.Port, tl.Mode); !b {
  399. fail = true
  400. c.WriteAddFail()
  401. } else {
  402. s.OpenTask <- tl
  403. }
  404. c.WriteAddOk()
  405. }
  406. }
  407. }
  408. }
  409. if fail && client != nil {
  410. s.CloseClient <- client.Id
  411. }
  412. c.Close()
  413. }
  414. func (s *Bridge) GetStatus(clientId int) {
  415. s.clientLock.Lock()
  416. client := s.Client[clientId]
  417. s.clientLock.Unlock()
  418. if client == nil {
  419. return
  420. }
  421. for {
  422. if id, status, err := client.signal.GetConnStatus(); err != nil {
  423. s.closeClient(clientId)
  424. return
  425. } else {
  426. client.Lock()
  427. client.linkStatusMap[id] = status
  428. client.Unlock()
  429. }
  430. }
  431. }
  432. func (s *Bridge) clientCopy(clientId int) {
  433. s.clientLock.Lock()
  434. client := s.Client[clientId]
  435. s.clientLock.Unlock()
  436. for {
  437. if id, err := client.tunnel.GetLen(); err != nil {
  438. lg.Println("read msg content length error close client")
  439. s.delClient(clientId)
  440. break
  441. } else {
  442. client.Lock()
  443. if link, ok := client.linkMap[id]; ok {
  444. client.Unlock()
  445. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  446. pool.PutBufPoolCopy(content)
  447. s.delClient(clientId)
  448. lg.Println("read msg content error", err, "close client")
  449. break
  450. } else {
  451. link.MsgCh <- content
  452. }
  453. } else {
  454. client.Unlock()
  455. continue
  456. }
  457. }
  458. }
  459. }