bridge.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/cnlh/nps/lib/common"
  7. "github.com/cnlh/nps/lib/conn"
  8. "github.com/cnlh/nps/lib/crypt"
  9. "github.com/cnlh/nps/lib/file"
  10. "github.com/cnlh/nps/lib/lg"
  11. "github.com/cnlh/nps/lib/pool"
  12. "github.com/cnlh/nps/server/tool"
  13. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  14. "log"
  15. "net"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. type Client struct {
  21. tunnel *conn.Conn
  22. signal *conn.Conn
  23. linkMap map[int]*conn.Link
  24. linkStatusMap map[int]bool
  25. stop chan bool
  26. sync.RWMutex
  27. }
  28. func NewClient(t *conn.Conn, s *conn.Conn) *Client {
  29. return &Client{
  30. linkMap: make(map[int]*conn.Link),
  31. stop: make(chan bool),
  32. linkStatusMap: make(map[int]bool),
  33. signal: s,
  34. tunnel: t,
  35. }
  36. }
  37. type Bridge struct {
  38. TunnelPort int //通信隧道端口
  39. tcpListener *net.TCPListener //server端监听
  40. kcpListener *kcp.Listener //server端监听
  41. Client map[int]*Client
  42. tunnelType string //bridge type kcp or tcp
  43. OpenTask chan *file.Tunnel
  44. CloseClient chan int
  45. clientLock sync.RWMutex
  46. Register map[string]time.Time
  47. registerLock sync.RWMutex
  48. ipVerify bool
  49. }
  50. func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool) *Bridge {
  51. t := new(Bridge)
  52. t.TunnelPort = tunnelPort
  53. t.Client = make(map[int]*Client)
  54. t.tunnelType = tunnelType
  55. t.OpenTask = make(chan *file.Tunnel)
  56. t.CloseClient = make(chan int)
  57. t.Register = make(map[string]time.Time)
  58. t.ipVerify = ipVerify
  59. return t
  60. }
  61. func (s *Bridge) StartTunnel() error {
  62. var err error
  63. if s.tunnelType == "kcp" {
  64. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  65. if err != nil {
  66. return err
  67. }
  68. go func() {
  69. for {
  70. c, err := s.kcpListener.AcceptKCP()
  71. conn.SetUdpSession(c)
  72. if err != nil {
  73. lg.Println(err)
  74. continue
  75. }
  76. go s.cliProcess(conn.NewConn(c))
  77. }
  78. }()
  79. } else {
  80. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  81. if err != nil {
  82. return err
  83. }
  84. go func() {
  85. for {
  86. c, err := s.tcpListener.Accept()
  87. if err != nil {
  88. lg.Println(err)
  89. continue
  90. }
  91. go s.cliProcess(conn.NewConn(c))
  92. }
  93. }()
  94. }
  95. return nil
  96. }
  97. //验证失败,返回错误验证flag,并且关闭连接
  98. func (s *Bridge) verifyError(c *conn.Conn) {
  99. c.Write([]byte(common.VERIFY_EER))
  100. c.Conn.Close()
  101. }
  102. func (s *Bridge) verifySuccess(c *conn.Conn) {
  103. c.Write([]byte(common.VERIFY_SUCCESS))
  104. }
  105. func (s *Bridge) cliProcess(c *conn.Conn) {
  106. c.SetReadDeadline(5, s.tunnelType)
  107. var buf []byte
  108. var err error
  109. if buf, err = c.ReadLen(32); err != nil {
  110. c.Close()
  111. return
  112. }
  113. //验证
  114. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  115. if err != nil {
  116. lg.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
  117. s.verifyError(c)
  118. return
  119. } else {
  120. s.verifySuccess(c)
  121. }
  122. //做一个判断 添加到对应的channel里面以供使用
  123. if flag, err := c.ReadFlag(); err == nil {
  124. s.typeDeal(flag, c, id)
  125. } else {
  126. log.Println(err, flag)
  127. }
  128. return
  129. }
  130. func (s *Bridge) closeClient(id int) {
  131. s.clientLock.Lock()
  132. defer s.clientLock.Unlock()
  133. if v, ok := s.Client[id]; ok {
  134. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  135. s.CloseClient <- c.Id
  136. }
  137. v.signal.WriteClose()
  138. delete(s.Client, id)
  139. }
  140. }
  141. //tcp连接类型区分
  142. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  143. switch typeVal {
  144. case common.WORK_MAIN:
  145. //客户端已经存在,下线
  146. s.clientLock.Lock()
  147. if v, ok := s.Client[id]; ok {
  148. s.clientLock.Unlock()
  149. if v.signal != nil {
  150. v.signal.WriteClose()
  151. }
  152. v.Lock()
  153. v.signal = c
  154. v.Unlock()
  155. } else {
  156. s.Client[id] = NewClient(nil, c)
  157. s.clientLock.Unlock()
  158. }
  159. lg.Printf("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  160. go s.GetStatus(id)
  161. case common.WORK_CHAN:
  162. s.clientLock.Lock()
  163. if v, ok := s.Client[id]; ok {
  164. s.clientLock.Unlock()
  165. v.Lock()
  166. v.tunnel = c
  167. v.Unlock()
  168. } else {
  169. s.Client[id] = NewClient(c, nil)
  170. s.clientLock.Unlock()
  171. }
  172. go s.clientCopy(id)
  173. case common.WORK_CONFIG:
  174. go s.GetConfig(c)
  175. case common.WORK_REGISTER:
  176. go s.register(c)
  177. }
  178. c.SetAlive(s.tunnelType)
  179. return
  180. }
  181. func (s *Bridge) register(c *conn.Conn) {
  182. var hour int32
  183. if err := binary.Read(c, binary.LittleEndian, &hour); err == nil {
  184. s.registerLock.Lock()
  185. s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour))
  186. lg.Println(s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())])
  187. s.registerLock.Unlock()
  188. }
  189. }
  190. //等待
  191. func (s *Bridge) waitStatus(clientId, id int) bool {
  192. ticker := time.NewTicker(time.Millisecond * 100)
  193. stop := time.After(time.Second * 10)
  194. for {
  195. select {
  196. case <-ticker.C:
  197. s.clientLock.Lock()
  198. if v, ok := s.Client[clientId]; ok {
  199. s.clientLock.Unlock()
  200. v.Lock()
  201. if vv, ok := v.linkStatusMap[id]; ok {
  202. ticker.Stop()
  203. v.Unlock()
  204. return vv
  205. }
  206. v.Unlock()
  207. } else {
  208. s.clientLock.Unlock()
  209. }
  210. case <-stop:
  211. return false
  212. }
  213. }
  214. }
  215. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (tunnel *conn.Conn, err error) {
  216. s.clientLock.Lock()
  217. if v, ok := s.Client[clientId]; ok {
  218. s.clientLock.Unlock()
  219. if s.ipVerify {
  220. s.registerLock.Lock()
  221. ip := common.GetIpByAddr(linkAddr)
  222. if v, ok := s.Register[ip]; !ok {
  223. s.registerLock.Unlock()
  224. return nil, errors.New(fmt.Sprintf("The ip %s is not in the validation list", ip))
  225. } else {
  226. if !v.After(time.Now()) {
  227. return nil, errors.New(fmt.Sprintf("The validity of the ip %s has expired", ip))
  228. }
  229. }
  230. s.registerLock.Unlock()
  231. }
  232. v.signal.SendLinkInfo(link)
  233. if err != nil {
  234. lg.Println("send link information error:", err, link.Id)
  235. s.DelClient(clientId)
  236. return
  237. }
  238. if v.tunnel == nil {
  239. err = errors.New("get tunnel connection error")
  240. return
  241. } else {
  242. tunnel = v.tunnel
  243. }
  244. v.Lock()
  245. v.linkMap[link.Id] = link
  246. v.Unlock()
  247. if !s.waitStatus(clientId, link.Id) {
  248. err = errors.New("connect fail")
  249. return
  250. }
  251. } else {
  252. s.clientLock.Unlock()
  253. err = errors.New("the connection is not connect")
  254. }
  255. return
  256. }
  257. //删除通信通道
  258. func (s *Bridge) DelClient(id int) {
  259. s.closeClient(id)
  260. }
  261. //get config
  262. func (s *Bridge) GetConfig(c *conn.Conn) {
  263. var client *file.Client
  264. var fail bool
  265. for {
  266. flag, err := c.ReadFlag()
  267. if err != nil {
  268. break
  269. }
  270. switch flag {
  271. case common.WORK_STATUS:
  272. if b, err := c.ReadLen(16); err != nil {
  273. break
  274. } else {
  275. var str string
  276. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  277. if err != nil {
  278. break
  279. }
  280. for _, v := range file.GetCsvDb().Hosts {
  281. if v.Client.Id == id {
  282. str += v.Remark + common.CONN_DATA_SEQ
  283. }
  284. }
  285. for _, v := range file.GetCsvDb().Tasks {
  286. if v.Client.Id == id {
  287. str += v.Remark + common.CONN_DATA_SEQ
  288. }
  289. }
  290. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  291. binary.Write(c, binary.LittleEndian, []byte(str))
  292. }
  293. case common.NEW_CONF:
  294. //new client ,Set the client not to store to the file
  295. client = file.NewClient(crypt.GetRandomString(16), true, false)
  296. client.Remark = "public veky"
  297. //Send the key to the client
  298. file.GetCsvDb().NewClient(client)
  299. c.Write([]byte(client.VerifyKey))
  300. if config, err := c.GetConfigInfo(); err != nil {
  301. fail = true
  302. c.WriteAddFail()
  303. break
  304. } else {
  305. client.Cnf = config
  306. c.WriteAddOk()
  307. }
  308. case common.NEW_HOST:
  309. if h, err := c.GetHostInfo(); err != nil {
  310. fail = true
  311. c.WriteAddFail()
  312. break
  313. } else if file.GetCsvDb().IsHostExist(h) {
  314. fail = true
  315. c.WriteAddFail()
  316. } else {
  317. h.Client = client
  318. file.GetCsvDb().NewHost(h)
  319. c.WriteAddOk()
  320. }
  321. case common.NEW_TASK:
  322. if t, err := c.GetTaskInfo(); err != nil {
  323. fail = true
  324. c.WriteAddFail()
  325. break
  326. } else {
  327. ports := common.GetPorts(t.Ports)
  328. targets := common.GetPorts(t.Target)
  329. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  330. fail = true
  331. c.WriteAddFail()
  332. break
  333. }
  334. for i := 0; i < len(ports); i++ {
  335. tl := new(file.Tunnel)
  336. tl.Mode = t.Mode
  337. tl.Port = ports[i]
  338. if len(ports) == 1 {
  339. tl.Target = t.Target
  340. } else {
  341. tl.Target = strconv.Itoa(targets[i])
  342. }
  343. tl.Id = file.GetCsvDb().GetTaskId()
  344. tl.Status = true
  345. tl.Flow = new(file.Flow)
  346. tl.Remark = t.Remark
  347. tl.NoStore = true
  348. tl.Client = client
  349. file.GetCsvDb().NewTask(tl)
  350. if b := tool.TestServerPort(tl.Port, tl.Mode); !b {
  351. fail = true
  352. c.WriteAddFail()
  353. } else {
  354. s.OpenTask <- tl
  355. }
  356. c.WriteAddOk()
  357. }
  358. }
  359. }
  360. }
  361. if fail && client != nil {
  362. s.CloseClient <- client.Id
  363. }
  364. c.Close()
  365. }
  366. func (s *Bridge) GetStatus(clientId int) {
  367. s.clientLock.Lock()
  368. client := s.Client[clientId]
  369. s.clientLock.Unlock()
  370. if client == nil {
  371. return
  372. }
  373. for {
  374. if id, status, err := client.signal.GetConnStatus(); err != nil {
  375. s.closeClient(clientId)
  376. return
  377. } else {
  378. client.Lock()
  379. client.linkStatusMap[id] = status
  380. client.Unlock()
  381. }
  382. }
  383. }
  384. func (s *Bridge) clientCopy(clientId int) {
  385. s.clientLock.Lock()
  386. client := s.Client[clientId]
  387. s.clientLock.Unlock()
  388. for {
  389. if id, err := client.tunnel.GetLen(); err != nil {
  390. s.closeClient(clientId)
  391. break
  392. } else {
  393. client.Lock()
  394. if link, ok := client.linkMap[id]; ok {
  395. client.Unlock()
  396. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  397. pool.PutBufPoolCopy(content)
  398. s.closeClient(clientId)
  399. lg.Println("read msg content error", err, "close client")
  400. break
  401. } else {
  402. if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
  403. if link.Conn != nil {
  404. link.Conn.Close()
  405. }
  406. } else {
  407. if link.UdpListener != nil && link.UdpRemoteAddr != nil {
  408. link.UdpListener.WriteToUDP(content, link.UdpRemoteAddr)
  409. } else {
  410. link.Conn.Write(content)
  411. }
  412. link.Flow.Add(0, len(content))
  413. }
  414. pool.PutBufPoolCopy(content)
  415. }
  416. } else {
  417. client.Unlock()
  418. continue
  419. }
  420. }
  421. }
  422. }