bridge.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package bridge
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "github.com/cnlh/nps/lib/common"
  6. "github.com/cnlh/nps/lib/conn"
  7. "github.com/cnlh/nps/lib/crypt"
  8. "github.com/cnlh/nps/lib/file"
  9. "github.com/cnlh/nps/lib/kcp"
  10. "github.com/cnlh/nps/lib/lg"
  11. "github.com/cnlh/nps/lib/pool"
  12. "github.com/cnlh/nps/server/tool"
  13. "log"
  14. "net"
  15. "strconv"
  16. "sync"
  17. "time"
  18. )
  19. type Client struct {
  20. tunnel *conn.Conn
  21. signal *conn.Conn
  22. linkMap map[int]*conn.Link
  23. linkStatusMap map[int]bool
  24. stop chan bool
  25. sync.RWMutex
  26. }
  27. func NewClient(t *conn.Conn, s *conn.Conn) *Client {
  28. return &Client{
  29. linkMap: make(map[int]*conn.Link),
  30. stop: make(chan bool),
  31. linkStatusMap: make(map[int]bool),
  32. signal: s,
  33. tunnel: t,
  34. }
  35. }
  36. type Bridge struct {
  37. TunnelPort int //通信隧道端口
  38. tcpListener *net.TCPListener //server端监听
  39. kcpListener *kcp.Listener //server端监听
  40. Client map[int]*Client
  41. tunnelType string //bridge type kcp or tcp
  42. OpenTask chan *file.Tunnel
  43. CloseClient chan int
  44. lock sync.Mutex
  45. tunnelLock sync.Mutex
  46. clientLock sync.RWMutex
  47. }
  48. func NewTunnel(tunnelPort int, tunnelType string) *Bridge {
  49. t := new(Bridge)
  50. t.TunnelPort = tunnelPort
  51. t.Client = make(map[int]*Client)
  52. t.tunnelType = tunnelType
  53. t.OpenTask = make(chan *file.Tunnel)
  54. t.CloseClient = make(chan int)
  55. return t
  56. }
  57. func (s *Bridge) StartTunnel() error {
  58. var err error
  59. if s.tunnelType == "kcp" {
  60. s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
  61. if err != nil {
  62. return err
  63. }
  64. go func() {
  65. for {
  66. c, err := s.kcpListener.AcceptKCP()
  67. conn.SetUdpSession(c)
  68. if err != nil {
  69. lg.Println(err)
  70. continue
  71. }
  72. go s.cliProcess(conn.NewConn(c))
  73. }
  74. }()
  75. } else {
  76. s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  77. if err != nil {
  78. return err
  79. }
  80. go func() {
  81. for {
  82. c, err := s.tcpListener.Accept()
  83. if err != nil {
  84. lg.Println(err)
  85. continue
  86. }
  87. go s.cliProcess(conn.NewConn(c))
  88. }
  89. }()
  90. }
  91. return nil
  92. }
  93. //验证失败,返回错误验证flag,并且关闭连接
  94. func (s *Bridge) verifyError(c *conn.Conn) {
  95. c.Write([]byte(common.VERIFY_EER))
  96. c.Conn.Close()
  97. }
  98. func (s *Bridge) verifySuccess(c *conn.Conn) {
  99. c.Write([]byte(common.VERIFY_SUCCESS))
  100. }
  101. func (s *Bridge) cliProcess(c *conn.Conn) {
  102. c.SetReadDeadline(5, s.tunnelType)
  103. var buf []byte
  104. var err error
  105. if buf, err = c.ReadLen(32); err != nil {
  106. c.Close()
  107. return
  108. }
  109. //验证
  110. id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
  111. if err != nil {
  112. lg.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
  113. s.verifyError(c)
  114. return
  115. } else {
  116. s.verifySuccess(c)
  117. }
  118. //做一个判断 添加到对应的channel里面以供使用
  119. if flag, err := c.ReadFlag(); err == nil {
  120. s.typeDeal(flag, c, id)
  121. } else {
  122. log.Println(222)
  123. log.Println(err, flag)
  124. }
  125. return
  126. }
  127. func (s *Bridge) closeClient(id int) {
  128. s.clientLock.Lock()
  129. defer s.clientLock.Unlock()
  130. if v, ok := s.Client[id]; ok {
  131. if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore {
  132. s.CloseClient <- c.Id
  133. }
  134. v.signal.WriteClose()
  135. delete(s.Client, id)
  136. }
  137. }
  138. //tcp连接类型区分
  139. func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
  140. switch typeVal {
  141. case common.WORK_MAIN:
  142. //客户端已经存在,下线
  143. s.clientLock.Lock()
  144. if v, ok := s.Client[id]; ok {
  145. s.clientLock.Unlock()
  146. if v.signal != nil {
  147. v.signal.WriteClose()
  148. }
  149. v.Lock()
  150. v.signal = c
  151. v.Unlock()
  152. } else {
  153. s.Client[id] = NewClient(nil, c)
  154. s.clientLock.Unlock()
  155. }
  156. lg.Printf("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr())
  157. go s.GetStatus(id)
  158. case common.WORK_CHAN:
  159. s.clientLock.Lock()
  160. if v, ok := s.Client[id]; ok {
  161. s.clientLock.Unlock()
  162. v.Lock()
  163. v.tunnel = c
  164. v.Unlock()
  165. } else {
  166. s.Client[id] = NewClient(c, nil)
  167. s.clientLock.Unlock()
  168. }
  169. go s.clientCopy(id)
  170. case common.WORK_CONFIG:
  171. go s.GetConfig(c)
  172. }
  173. c.SetAlive(s.tunnelType)
  174. return
  175. }
  176. //等待
  177. func (s *Bridge) waitStatus(clientId, id int) (bool) {
  178. ticker := time.NewTicker(time.Millisecond * 100)
  179. stop := time.After(time.Second * 10)
  180. for {
  181. select {
  182. case <-ticker.C:
  183. s.clientLock.Lock()
  184. if v, ok := s.Client[clientId]; ok {
  185. s.clientLock.Unlock()
  186. v.Lock()
  187. if vv, ok := v.linkStatusMap[id]; ok {
  188. ticker.Stop()
  189. v.Unlock()
  190. return vv
  191. }
  192. v.Unlock()
  193. } else {
  194. s.clientLock.Unlock()
  195. }
  196. case <-stop:
  197. return false
  198. }
  199. }
  200. return false
  201. }
  202. func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link) (tunnel *conn.Conn, err error) {
  203. s.clientLock.Lock()
  204. if v, ok := s.Client[clientId]; ok {
  205. s.clientLock.Unlock()
  206. v.signal.SendLinkInfo(link)
  207. if err != nil {
  208. lg.Println("send link information error:", err, link.Id)
  209. s.DelClient(clientId)
  210. return
  211. }
  212. if v.tunnel == nil {
  213. err = errors.New("get tunnel connection error")
  214. return
  215. } else {
  216. tunnel = v.tunnel
  217. }
  218. v.Lock()
  219. v.linkMap[link.Id] = link
  220. v.Unlock()
  221. if !s.waitStatus(clientId, link.Id) {
  222. err = errors.New("connect fail")
  223. return
  224. }
  225. } else {
  226. s.clientLock.Unlock()
  227. err = errors.New("the connection is not connect")
  228. }
  229. return
  230. }
  231. //删除通信通道
  232. func (s *Bridge) DelClient(id int) {
  233. s.closeClient(id)
  234. }
  235. //get config
  236. func (s *Bridge) GetConfig(c *conn.Conn) {
  237. var client *file.Client
  238. var fail bool
  239. for {
  240. flag, err := c.ReadFlag()
  241. if err != nil {
  242. break
  243. }
  244. switch flag {
  245. case common.WORK_STATUS:
  246. if b, err := c.ReadLen(16); err != nil {
  247. break
  248. } else {
  249. var str string
  250. id, err := file.GetCsvDb().GetClientIdByVkey(string(b))
  251. if err != nil {
  252. break
  253. }
  254. for _, v := range file.GetCsvDb().Hosts {
  255. if v.Client.Id == id {
  256. str += v.Remark + common.CONN_DATA_SEQ
  257. }
  258. }
  259. for _, v := range file.GetCsvDb().Tasks {
  260. if v.Client.Id == id {
  261. str += v.Remark + common.CONN_DATA_SEQ
  262. }
  263. }
  264. binary.Write(c, binary.LittleEndian, int32(len([]byte(str))))
  265. binary.Write(c, binary.LittleEndian, []byte(str))
  266. }
  267. case common.NEW_CONF:
  268. //new client ,Set the client not to store to the file
  269. client = file.NewClient(crypt.GetRandomString(16), true, false)
  270. client.Remark = "public veky"
  271. //Send the key to the client
  272. file.GetCsvDb().NewClient(client)
  273. c.Write([]byte(client.VerifyKey))
  274. if config, err := c.GetConfigInfo(); err != nil {
  275. fail = true
  276. c.WriteAddFail()
  277. break
  278. } else {
  279. client.Cnf = config
  280. c.WriteAddOk()
  281. }
  282. case common.NEW_HOST:
  283. if h, err := c.GetHostInfo(); err != nil {
  284. fail = true
  285. c.WriteAddFail()
  286. break
  287. } else if file.GetCsvDb().IsHostExist(h.Host) {
  288. fail = true
  289. c.WriteAddFail()
  290. } else {
  291. h.Client = client
  292. file.GetCsvDb().NewHost(h)
  293. c.WriteAddOk()
  294. }
  295. case common.NEW_TASK:
  296. if t, err := c.GetTaskInfo(); err != nil {
  297. fail = true
  298. c.WriteAddFail()
  299. break
  300. } else {
  301. ports := common.GetPorts(t.Ports)
  302. targets := common.GetPorts(t.Target)
  303. if len(ports) > 1 && (t.Mode == "tcpServer" || t.Mode == "udpServer") && (len(ports) != len(targets)) {
  304. fail = true
  305. c.WriteAddFail()
  306. break
  307. }
  308. for i := 0; i < len(ports); i++ {
  309. tl := new(file.Tunnel)
  310. tl.Mode = t.Mode
  311. tl.Port = ports[i]
  312. if len(ports) == 1 {
  313. tl.Target = t.Target
  314. } else {
  315. tl.Target = strconv.Itoa(targets[i])
  316. }
  317. tl.Id = file.GetCsvDb().GetTaskId()
  318. tl.Status = true
  319. tl.Flow = new(file.Flow)
  320. tl.Remark = t.Remark
  321. tl.NoStore = true
  322. tl.Client = client
  323. file.GetCsvDb().NewTask(tl)
  324. if b := tool.TestServerPort(tl.Port, tl.Mode); !b {
  325. fail = true
  326. c.WriteAddFail()
  327. } else {
  328. s.OpenTask <- tl
  329. }
  330. c.WriteAddOk()
  331. }
  332. }
  333. }
  334. }
  335. if fail && client != nil {
  336. s.CloseClient <- client.Id
  337. }
  338. c.Close()
  339. }
  340. func (s *Bridge) GetStatus(clientId int) {
  341. s.clientLock.Lock()
  342. client := s.Client[clientId]
  343. s.clientLock.Unlock()
  344. if client == nil {
  345. return
  346. }
  347. for {
  348. if id, status, err := client.signal.GetConnStatus(); err != nil {
  349. s.closeClient(clientId)
  350. return
  351. } else {
  352. client.Lock()
  353. client.linkStatusMap[id] = status
  354. client.Unlock()
  355. }
  356. }
  357. }
  358. func (s *Bridge) clientCopy(clientId int) {
  359. s.clientLock.Lock()
  360. client := s.Client[clientId]
  361. s.clientLock.Unlock()
  362. for {
  363. if id, err := client.tunnel.GetLen(); err != nil {
  364. s.closeClient(clientId)
  365. lg.Println("读取msg id 错误", err, id)
  366. break
  367. } else {
  368. client.Lock()
  369. if link, ok := client.linkMap[id]; ok {
  370. client.Unlock()
  371. if content, err := client.tunnel.GetMsgContent(link); err != nil {
  372. pool.PutBufPoolCopy(content)
  373. s.closeClient(clientId)
  374. lg.Println("read msg content error", err, "close client")
  375. break
  376. } else {
  377. if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
  378. if link.Conn != nil {
  379. link.Conn.Close()
  380. }
  381. } else {
  382. if link.UdpListener != nil && link.UdpRemoteAddr != nil {
  383. link.UdpListener.WriteToUDP(content, link.UdpRemoteAddr)
  384. } else {
  385. link.Conn.Write(content)
  386. }
  387. link.Flow.Add(0, len(content))
  388. }
  389. pool.PutBufPoolCopy(content)
  390. }
  391. } else {
  392. client.Unlock()
  393. continue
  394. }
  395. }
  396. }
  397. }