bridge.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package bridge
  2. import (
  3. "errors"
  4. "github.com/cnlh/easyProxy/utils"
  5. "log"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. type list struct {
  11. connList chan *utils.Conn
  12. }
  13. func (l *list) Add(c *utils.Conn) {
  14. l.connList <- c
  15. }
  16. func (l *list) Pop() *utils.Conn {
  17. return <-l.connList
  18. }
  19. func (l *list) Len() int {
  20. return len(l.connList)
  21. }
  22. func newList() *list {
  23. l := new(list)
  24. l.connList = make(chan *utils.Conn, 1000)
  25. return l
  26. }
  27. type Tunnel struct {
  28. TunnelPort int //通信隧道端口
  29. listener *net.TCPListener //server端监听
  30. SignalList map[string]*list //通信
  31. TunnelList map[string]*list //隧道
  32. RunList map[string]interface{} //运行中的任务
  33. lock sync.Mutex
  34. tunnelLock sync.Mutex
  35. }
  36. func NewTunnel(tunnelPort int, runList map[string]interface{}) *Tunnel {
  37. t := new(Tunnel)
  38. t.TunnelPort = tunnelPort
  39. t.SignalList = make(map[string]*list)
  40. t.TunnelList = make(map[string]*list)
  41. t.RunList = runList
  42. return t
  43. }
  44. func (s *Tunnel) StartTunnel() error {
  45. var err error
  46. s.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
  47. if err != nil {
  48. return err
  49. }
  50. go s.tunnelProcess()
  51. return nil
  52. }
  53. //tcp server
  54. func (s *Tunnel) tunnelProcess() error {
  55. var err error
  56. for {
  57. conn, err := s.listener.Accept()
  58. if err != nil {
  59. log.Println(err)
  60. continue
  61. }
  62. go s.cliProcess(utils.NewConn(conn))
  63. }
  64. return err
  65. }
  66. //验证失败,返回错误验证flag,并且关闭连接
  67. func (s *Tunnel) verifyError(c *utils.Conn) {
  68. c.Conn.Write([]byte(utils.VERIFY_EER))
  69. c.Conn.Close()
  70. }
  71. func (s *Tunnel) cliProcess(c *utils.Conn) error {
  72. c.Conn.(*net.TCPConn).SetReadDeadline(time.Now().Add(time.Duration(5) * time.Second))
  73. vval := make([]byte, 32)
  74. if _, err := c.Conn.Read(vval); err != nil {
  75. log.Println("客户端读超时。客户端地址为::", c.Conn.RemoteAddr())
  76. c.Conn.Close()
  77. return err
  78. }
  79. if !s.verify(string(vval)) {
  80. log.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
  81. s.verifyError(c)
  82. return errors.New("验证错误")
  83. }
  84. c.Conn.(*net.TCPConn).SetReadDeadline(time.Time{})
  85. //做一个判断 添加到对应的channel里面以供使用
  86. if flag, err := c.ReadFlag(); err != nil {
  87. return err
  88. } else {
  89. return s.typeDeal(flag, c, string(vval))
  90. }
  91. }
  92. //tcp连接类型区分
  93. func (s *Tunnel) typeDeal(typeVal string, c *utils.Conn, cFlag string) error {
  94. switch typeVal {
  95. case utils.WORK_MAIN:
  96. log.Println("客户端连接成功", c.Conn.RemoteAddr())
  97. s.addList(s.SignalList, c, cFlag)
  98. case utils.WORK_CHAN:
  99. s.addList(s.TunnelList, c, cFlag)
  100. default:
  101. return errors.New("无法识别")
  102. }
  103. c.SetAlive()
  104. return nil
  105. }
  106. //加到对应的list中
  107. func (s *Tunnel) addList(m map[string]*list, c *utils.Conn, cFlag string) {
  108. s.lock.Lock()
  109. if v, ok := m[cFlag]; ok {
  110. v.Add(c)
  111. } else {
  112. l := newList()
  113. l.Add(c)
  114. m[cFlag] = l
  115. }
  116. s.lock.Unlock()
  117. }
  118. //新建隧道
  119. func (s *Tunnel) newChan(cFlag string) error {
  120. var connPass *utils.Conn
  121. var err error
  122. retry:
  123. if connPass, err = s.waitAndPop(s.SignalList, cFlag); err != nil {
  124. return err
  125. }
  126. if _, err = connPass.Conn.Write([]byte("chan")); err != nil {
  127. goto retry
  128. }
  129. s.SignalList[cFlag].Add(connPass)
  130. return nil
  131. }
  132. //得到一个tcp隧道
  133. //TODO 超时问题 锁机制问题 对单个客户端加锁
  134. func (s *Tunnel) GetTunnel(cFlag string, en, de int, crypt, mux bool) (c *utils.Conn, err error) {
  135. if v, ok := s.TunnelList[cFlag]; !ok || v.Len() < 3 { //新建通道
  136. go s.newChan(cFlag)
  137. }
  138. retry:
  139. if c, err = s.waitAndPop(s.TunnelList, cFlag); err != nil {
  140. return
  141. }
  142. if _, err = c.WriteTest(); err != nil {
  143. c.Close()
  144. goto retry
  145. }
  146. c.WriteConnInfo(en, de, crypt, mux)
  147. return
  148. }
  149. //得到一个通信通道
  150. func (s *Tunnel) GetSignal(cFlag string) (err error, conn *utils.Conn) {
  151. if v, ok := s.SignalList[cFlag]; !ok || v.Len() == 0 {
  152. err = errors.New("客户端未连接")
  153. return
  154. }
  155. conn = s.SignalList[cFlag].Pop()
  156. return
  157. }
  158. //重回slice 复用
  159. func (s *Tunnel) ReturnSignal(conn *utils.Conn, cFlag string) {
  160. if v, ok := s.SignalList[cFlag]; ok {
  161. v.Add(conn)
  162. }
  163. }
  164. //重回slice 复用
  165. func (s *Tunnel) ReturnTunnel(conn *utils.Conn, cFlag string) {
  166. if v, ok := s.TunnelList[cFlag]; ok {
  167. utils.FlushConn(conn.Conn)
  168. v.Add(conn)
  169. }
  170. }
  171. //删除通信通道
  172. func (s *Tunnel) DelClientSignal(cFlag string) {
  173. s.delClient(cFlag, s.SignalList)
  174. }
  175. //删除隧道
  176. func (s *Tunnel) DelClientTunnel(cFlag string) {
  177. s.delClient(cFlag, s.TunnelList)
  178. }
  179. func (s *Tunnel) delClient(cFlag string, l map[string]*list) {
  180. if t := l[utils.Getverifyval(cFlag)]; t != nil {
  181. for {
  182. if t.Len() <= 0 {
  183. break
  184. }
  185. t.Pop().Close()
  186. }
  187. delete(l, utils.Getverifyval(cFlag))
  188. }
  189. }
  190. //等待
  191. func (s *Tunnel) waitAndPop(m map[string]*list, cFlag string) (c *utils.Conn, err error) {
  192. ticker := time.NewTicker(time.Millisecond * 100)
  193. stop := time.After(time.Second * 10)
  194. for {
  195. select {
  196. case <-ticker.C:
  197. s.lock.Lock()
  198. if v, ok := m[cFlag]; ok && v.Len() > 0 {
  199. c = v.Pop()
  200. ticker.Stop()
  201. s.lock.Unlock()
  202. return
  203. }
  204. s.lock.Unlock()
  205. case <-stop:
  206. err = errors.New("client key: " + cFlag + ",err: get client conn timeout")
  207. return
  208. }
  209. }
  210. return
  211. }
  212. func (s *Tunnel) verify(vKeyMd5 string) bool {
  213. for k := range s.RunList {
  214. if utils.Getverifyval(k) == vKeyMd5 {
  215. return true
  216. }
  217. }
  218. return false
  219. }