bridge.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/lg"
  11. "github.com/cnlh/nps/lib/pool"
  12. "github.com/cnlh/nps/server/tool"
  13. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  14. "log"
  15. "net"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. type Client struct {
  21. tunnel *conn.Conn
  22. signal *conn.Conn
  23. linkMap map[int]*conn.Link
  24. linkStatusMap map[int]bool
  25. stop chan bool
  26. sync.RWMutex
  27. }
  28. func NewClient(t *conn.Conn, s *conn.Conn) *Client {
  29. return &Client{
  30. linkMap: make(map[int]*conn.Link),
  31. stop: make(chan bool),
  32. linkStatusMap: make(map[int]bool),
  33. signal: s,
  34. tunnel: t,
  35. }
  36. }
  37. type Bridge struct {
  38. TunnelPort int //通信隧道端口
  39. tcpListener *net.TCPListener //server端监听
  40. kcpListener *kcp.Listener //server端监听
  41. Client map[int]*Client
  42. tunnelType string //bridge type kcp or tcp
  43. OpenTask chan *file.Tunnel
  44. CloseClient chan int
  45. clientLock sync.RWMutex
  46. Register map[string]time.Time
  47. registerLock sync.RWMutex
  48. ipVerify bool
  49. runList map[int]interface{}
  50. }
  51. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge {
  52. t := new(Bridge)
  53. t.TunnelPort = tunnelPort
  54. t.Client = make(map[int]*Client)
  55. t.tunnelType = tunnelType
  56. t.OpenTask = make(chan *file.Tunnel)
  57. t.CloseClient = make(chan int)
  58. t.Register = make(map[string]time.Time)
  59. t.ipVerify = ipVerify
  60. t.runList = runList
  61. return t
  62. }
  63. func (s *Bridge) StartTunnel() error {
  64. var err error
  65. if s.tunnelType == "kcp" {
  66. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  67. if err != nil {
  68. return err
  69. }
  70. go func() {
  71. for {
  72. c, err := s.kcpListener.AcceptKCP()
  73. conn.SetUdpSession(c)
  74. if err != nil {
  75. lg.Println(err)
  76. continue
  77. }
  78. go s.cliProcess(conn.NewConn(c))
  79. }
  80. }()
  81. } else {
  82. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  83. if err != nil {
  84. return err
  85. }
  86. go func() {
  87. for {
  88. c, err := s.tcpListener.Accept()
  89. if err != nil {
  90. lg.Println(err)
  91. continue
  92. }
  93. go s.cliProcess(conn.NewConn(c))
  94. }
  95. }()
  96. }
  97. return nil
  98. }
  99. //验证失败,返回错误验证flag,并且关闭连接
  100. func (s *Bridge) verifyError(c *conn.Conn) {
  101. c.Write([]byte(common.VERIFY_EER))
  102. c.Conn.Close()
  103. }
  104. func (s *Bridge) verifySuccess(c *conn.Conn) {
  105. c.Write([]byte(common.VERIFY_SUCCESS))
  106. }
  107. func (s *Bridge) cliProcess(c *conn.Conn) {
  108. c.SetReadDeadline(5, s.tunnelType)
  109. var buf []byte
  110. var err error
  111. if buf, err = c.ReadLen(32); err != nil {
  112. c.Close()
  113. return
  114. }
  115. //验证
  116. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  117. if err != nil {
  118. lg.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
  119. s.verifyError(c)
  120. return
  121. } else {
  122. s.verifySuccess(c)
  123. }
  124. //做一个判断 添加到对应的channel里面以供使用
  125. if flag, err := c.ReadFlag(); err == nil {
  126. s.typeDeal(flag, c, id)
  127. } else {
  128. log.Println(err, flag)
  129. }
  130. return
  131. }
  132. func (s *Bridge) closeClient(id int) {
  133. s.clientLock.Lock()
  134. defer s.clientLock.Unlock()
  135. if v, ok := s.Client[id]; ok {
  136. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  137. s.CloseClient <- c.Id
  138. }
  139. v.signal.WriteClose()
  140. delete(s.Client, id)
  141. }
  142. }
  143. //tcp连接类型区分
  144. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  145. switch typeVal {
  146. case common.WORK_MAIN:
  147. //客户端已经存在,下线
  148. s.clientLock.Lock()
  149. if v, ok := s.Client[id]; ok {
  150. s.clientLock.Unlock()
  151. if v.signal != nil {
  152. v.signal.WriteClose()
  153. }
  154. v.Lock()
  155. v.signal = c
  156. v.Unlock()
  157. } else {
  158. s.Client[id] = NewClient(nil, c)
  159. s.clientLock.Unlock()
  160. }
  161. lg.Printf("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  162. go s.GetStatus(id)
  163. case common.WORK_CHAN:
  164. s.clientLock.Lock()
  165. if v, ok := s.Client[id]; ok {
  166. s.clientLock.Unlock()
  167. v.Lock()
  168. v.tunnel = c
  169. v.Unlock()
  170. } else {
  171. s.Client[id] = NewClient(c, nil)
  172. s.clientLock.Unlock()
  173. }
  174. go s.clientCopy(id)
  175. case common.WORK_CONFIG:
  176. go s.GetConfig(c)
  177. case common.WORK_REGISTER:
  178. go s.register(c)
  179. }
  180. c.SetAlive(s.tunnelType)
  181. return
  182. }
  183. func (s *Bridge) register(c *conn.Conn) {
  184. var hour int32
  185. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  186. s.registerLock.Lock()
  187. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
  188. s.registerLock.Unlock()
  189. }
  190. }
  191. //等待
  192. func (s *Bridge) waitStatus(clientId, id int) bool {
  193. ticker := time.NewTicker(time.Millisecond * 100)
  194. stop := time.After(time.Second * 10)
  195. for {
  196. select {
  197. case <-ticker.C:
  198. s.clientLock.Lock()
  199. if v, ok := s.Client[clientId]; ok {
  200. s.clientLock.Unlock()
  201. v.Lock()
  202. if vv, ok := v.linkStatusMap[id]; ok {
  203. ticker.Stop()
  204. v.Unlock()
  205. return vv
  206. }
  207. v.Unlock()
  208. } else {
  209. s.clientLock.Unlock()
  210. }
  211. case <-stop:
  212. return false
  213. }
  214. }
  215. }
  216. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (tunnel *conn.Conn, err error) {
  217. s.clientLock.Lock()
  218. if v, ok := s.Client[clientId]; ok {
  219. s.clientLock.Unlock()
  220. if s.ipVerify {
  221. s.registerLock.Lock()
  222. ip := common.GetIpByAddr(linkAddr)
  223. if v, ok := s.Register[ip]; !ok {
  224. s.registerLock.Unlock()
  225. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  226. } else {
  227. if !v.After(time.Now()) {
  228. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  229. }
  230. }
  231. s.registerLock.Unlock()
  232. }
  233. v.signal.SendLinkInfo(link)
  234. if err != nil {
  235. lg.Println("send link information error:", err, link.Id)
  236. s.DelClient(clientId)
  237. return
  238. }
  239. if v.tunnel == nil {
  240. err = errors.New("get tunnel connection error")
  241. return
  242. } else {
  243. tunnel = v.tunnel
  244. }
  245. v.Lock()
  246. v.linkMap[link.Id] = link
  247. v.Unlock()
  248. if !s.waitStatus(clientId, link.Id) {
  249. err = errors.New("connect fail")
  250. return
  251. }
  252. } else {
  253. s.clientLock.Unlock()
  254. err = errors.New("the connection is not connect")
  255. }
  256. return
  257. }
  258. //删除通信通道
  259. func (s *Bridge) DelClient(id int) {
  260. s.closeClient(id)
  261. }
  262. //get config
  263. func (s *Bridge) GetConfig(c *conn.Conn) {
  264. var client *file.Client
  265. var fail bool
  266. for {
  267. flag, err := c.ReadFlag()
  268. if err != nil {
  269. break
  270. }
  271. switch flag {
  272. case common.WORK_STATUS:
  273. if b, err := c.ReadLen(16); err != nil {
  274. break
  275. } else {
  276. var str string
  277. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  278. if err != nil {
  279. break
  280. }
  281. for _, v := range file.GetCsvDb().Hosts {
  282. if v.Client.Id == id {
  283. str += v.Remark + common.CONN_DATA_SEQ
  284. }
  285. }
  286. for _, v := range file.GetCsvDb().Tasks {
  287. if _, ok := s.runList[v.Id]; ok && v.Client.Id == id {
  288. str += v.Remark + common.CONN_DATA_SEQ
  289. }
  290. }
  291. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  292. binary.Write(c, binary.LittleEndian, []byte(str))
  293. }
  294. case common.NEW_CONF:
  295. //new client ,Set the client not to store to the file
  296. client = file.NewClient(crypt.GetRandomString(16), true, false)
  297. client.Remark = "public veky"
  298. //Send the key to the client
  299. file.GetCsvDb().NewClient(client)
  300. c.Write([]byte(client.VerifyKey))
  301. if config, err := c.GetConfigInfo(); err != nil {
  302. fail = true
  303. c.WriteAddFail()
  304. break
  305. } else {
  306. client.Cnf = config
  307. c.WriteAddOk()
  308. }
  309. case common.NEW_HOST:
  310. if h, err := c.GetHostInfo(); err != nil {
  311. fail = true
  312. c.WriteAddFail()
  313. break
  314. } else if file.GetCsvDb().IsHostExist(h) {
  315. fail = true
  316. c.WriteAddFail()
  317. } else {
  318. h.Client = client
  319. file.GetCsvDb().NewHost(h)
  320. c.WriteAddOk()
  321. }
  322. case common.NEW_TASK:
  323. if t, err := c.GetTaskInfo(); err != nil {
  324. fail = true
  325. c.WriteAddFail()
  326. break
  327. } else {
  328. ports := common.GetPorts(t.Ports)
  329. targets := common.GetPorts(t.Target)
  330. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  331. fail = true
  332. c.WriteAddFail()
  333. break
  334. }
  335. for i := 0; i < len(ports); i++ {
  336. tl := new(file.Tunnel)
  337. tl.Mode = t.Mode
  338. tl.Port = ports[i]
  339. if len(ports) == 1 {
  340. tl.Target = t.Target
  341. tl.Remark = t.Remark
  342. } else {
  343. tl.Remark = t.Remark + "_" + strconv.Itoa(tl.Port)
  344. tl.Target = strconv.Itoa(targets[i])
  345. }
  346. tl.Id = file.GetCsvDb().GetTaskId()
  347. tl.Status = true
  348. tl.Flow = new(file.Flow)
  349. tl.NoStore = true
  350. tl.Client = client
  351. file.GetCsvDb().NewTask(tl)
  352. if b := tool.TestServerPort(tl.Port, tl.Mode); !b {
  353. fail = true
  354. c.WriteAddFail()
  355. } else {
  356. s.OpenTask <- tl
  357. }
  358. c.WriteAddOk()
  359. }
  360. }
  361. }
  362. }
  363. if fail && client != nil {
  364. s.CloseClient <- client.Id
  365. }
  366. c.Close()
  367. }
  368. func (s *Bridge) GetStatus(clientId int) {
  369. s.clientLock.Lock()
  370. client := s.Client[clientId]
  371. s.clientLock.Unlock()
  372. if client == nil {
  373. return
  374. }
  375. for {
  376. if id, status, err := client.signal.GetConnStatus(); err != nil {
  377. s.closeClient(clientId)
  378. return
  379. } else {
  380. client.Lock()
  381. client.linkStatusMap[id] = status
  382. client.Unlock()
  383. }
  384. }
  385. }
  386. func (s *Bridge) clientCopy(clientId int) {
  387. s.clientLock.Lock()
  388. client := s.Client[clientId]
  389. s.clientLock.Unlock()
  390. for {
  391. if id, err := client.tunnel.GetLen(); err != nil {
  392. s.closeClient(clientId)
  393. break
  394. } else {
  395. client.Lock()
  396. if link, ok := client.linkMap[id]; ok {
  397. client.Unlock()
  398. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  399. pool.PutBufPoolCopy(content)
  400. s.closeClient(clientId)
  401. lg.Println("read msg content error", err, "close client")
  402. break
  403. } else {
  404. if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
  405. if link.Conn != nil {
  406. link.Conn.Close()
  407. }
  408. } else {
  409. if link.UdpListener != nil && link.UdpRemoteAddr != nil {
  410. link.UdpListener.WriteToUDP(content, link.UdpRemoteAddr)
  411. } else {
  412. link.Conn.Write(content)
  413. }
  414. link.Flow.Add(0, len(content))
  415. }
  416. pool.PutBufPoolCopy(content)
  417. }
  418. } else {
  419. client.Unlock()
  420. continue
  421. }
  422. }
  423. }
  424. }