bridge.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package bridge
  2. import (
  3. "errors"
  4. "github.com/cnlh/easyProxy/utils"
  5. "log"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. )
  11. type list struct {
  12. connList chan *utils.Conn
  13. }
  14. func (l *list) Add(c *utils.Conn) {
  15. l.connList <- c
  16. }
  17. func (l *list) Pop() *utils.Conn {
  18. return <-l.connList
  19. }
  20. func (l *list) Len() int {
  21. return len(l.connList)
  22. }
  23. func newList() *list {
  24. l := new(list)
  25. l.connList = make(chan *utils.Conn, 1000)
  26. return l
  27. }
  28. type Bridge struct {
  29. TunnelPort int //通信隧道端口
  30. listener *net.TCPListener //server端监听
  31. SignalList map[int]*list //通信
  32. TunnelList map[int]*list //隧道
  33. RunList map[int]interface{} //运行中的任务
  34. lock sync.Mutex
  35. tunnelLock sync.Mutex
  36. }
  37. func NewTunnel(tunnelPort int, runList map[int]interface{}) *Bridge {
  38. t := new(Bridge)
  39. t.TunnelPort = tunnelPort
  40. t.SignalList = make(map[int]*list)
  41. t.TunnelList = make(map[int]*list)
  42. t.RunList = runList
  43. return t
  44. }
  45. func (s *Bridge) StartTunnel() error {
  46. var err error
  47. s.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  48. if err != nil {
  49. return err
  50. }
  51. go s.tunnelProcess()
  52. return nil
  53. }
  54. //tcp server
  55. func (s *Bridge) tunnelProcess() error {
  56. var err error
  57. for {
  58. conn, err := s.listener.Accept()
  59. if err != nil {
  60. log.Println(err)
  61. continue
  62. }
  63. go s.cliProcess(utils.NewConn(conn))
  64. }
  65. return err
  66. }
  67. //验证失败,返回错误验证flag,并且关闭连接
  68. func (s *Bridge) verifyError(c *utils.Conn) {
  69. c.Conn.Write([]byte(utils.VERIFY_EER))
  70. c.Conn.Close()
  71. }
  72. func (s *Bridge) cliProcess(c *utils.Conn) error {
  73. c.Conn.(*net.TCPConn).SetReadDeadline(time.Now().Add(time.Duration(5) * time.Second))
  74. vval := make([]byte, 32)
  75. if _, err := c.Conn.Read(vval); err != nil {
  76. log.Println("客户端读超时。客户端地址为::", c.Conn.RemoteAddr())
  77. c.Conn.Close()
  78. return err
  79. }
  80. id, err := utils.GetCsvDb().GetIdByVerifyKey(string(vval),c.Conn.RemoteAddr().String())
  81. if err != nil {
  82. log.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
  83. s.verifyError(c)
  84. return errors.New("验证错误")
  85. }
  86. c.Conn.(*net.TCPConn).SetReadDeadline(time.Time{})
  87. //做一个判断 添加到对应的channel里面以供使用
  88. if flag, err := c.ReadFlag(); err != nil {
  89. return err
  90. } else {
  91. return s.typeDeal(flag, c, id)
  92. }
  93. }
  94. //tcp连接类型区分
  95. func (s *Bridge) typeDeal(typeVal string, c *utils.Conn, id int) error {
  96. switch typeVal {
  97. case utils.WORK_MAIN:
  98. log.Println("客户端连接成功", c.Conn.RemoteAddr())
  99. s.addList(s.SignalList, c, id)
  100. case utils.WORK_CHAN:
  101. s.addList(s.TunnelList, c, id)
  102. default:
  103. return errors.New("无法识别")
  104. }
  105. c.SetAlive()
  106. return nil
  107. }
  108. //加到对应的list中
  109. func (s *Bridge) addList(m map[int]*list, c *utils.Conn, id int) {
  110. s.lock.Lock()
  111. if v, ok := m[id]; ok {
  112. v.Add(c)
  113. } else {
  114. l := newList()
  115. l.Add(c)
  116. m[id] = l
  117. }
  118. s.lock.Unlock()
  119. }
  120. //新建隧道
  121. func (s *Bridge) newChan(id int) error {
  122. var connPass *utils.Conn
  123. var err error
  124. retry:
  125. if connPass, err = s.waitAndPop(s.SignalList, id); err != nil {
  126. return err
  127. }
  128. if _, err = connPass.Conn.Write([]byte("chan")); err != nil {
  129. goto retry
  130. }
  131. s.SignalList[id].Add(connPass)
  132. return nil
  133. }
  134. //得到一个tcp隧道
  135. //TODO 超时问题 锁机制问题 对单个客户端加锁
  136. func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (c *utils.Conn, err error) {
  137. retry:
  138. if c, err = s.waitAndPop(s.TunnelList, id); err != nil {
  139. return
  140. }
  141. if _, err = c.WriteTest(); err != nil {
  142. c.Close()
  143. goto retry
  144. }
  145. c.WriteConnInfo(en, de, crypt, mux)
  146. return
  147. }
  148. //得到一个通信通道
  149. func (s *Bridge) GetSignal(id int) (err error, conn *utils.Conn) {
  150. if v, ok := s.SignalList[id]; !ok || v.Len() == 0 {
  151. err = errors.New("客户端未连接")
  152. return
  153. }
  154. conn = s.SignalList[id].Pop()
  155. return
  156. }
  157. //重回slice 复用
  158. func (s *Bridge) ReturnSignal(conn *utils.Conn, id int) {
  159. if v, ok := s.SignalList[id]; ok {
  160. v.Add(conn)
  161. }
  162. }
  163. //重回slice 复用
  164. func (s *Bridge) ReturnTunnel(conn *utils.Conn, id int) {
  165. if v, ok := s.TunnelList[id]; ok {
  166. utils.FlushConn(conn.Conn)
  167. v.Add(conn)
  168. }
  169. }
  170. //删除通信通道
  171. func (s *Bridge) DelClientSignal(id int) {
  172. s.delClient(id, s.SignalList)
  173. }
  174. //删除隧道
  175. func (s *Bridge) DelClientTunnel(id int) {
  176. s.delClient(id, s.TunnelList)
  177. }
  178. func (s *Bridge) delClient(id int, l map[int]*list) {
  179. if t := l[id]; t != nil {
  180. for {
  181. if t.Len() <= 0 {
  182. break
  183. }
  184. t.Pop().Close()
  185. }
  186. delete(l, id)
  187. }
  188. }
  189. //等待
  190. func (s *Bridge) waitAndPop(m map[int]*list, id int) (c *utils.Conn, err error) {
  191. ticker := time.NewTicker(time.Millisecond * 100)
  192. stop := time.After(time.Second * 3)
  193. for {
  194. select {
  195. case <-ticker.C:
  196. s.lock.Lock()
  197. if v, ok := m[id]; ok && v.Len() > 0 {
  198. c = v.Pop()
  199. ticker.Stop()
  200. s.lock.Unlock()
  201. return
  202. }
  203. s.lock.Unlock()
  204. case <-stop:
  205. err = errors.New("client id: " + strconv.Itoa(id) + ",err: get client conn timeout")
  206. return
  207. }
  208. }
  209. return
  210. }
  211. func (s *Bridge) verify(id int) bool {
  212. for k := range s.RunList {
  213. if k == id {
  214. return true
  215. }
  216. }
  217. return false
  218. }