1
0

p2p.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package proxy
  2. import (
  3. "github.com/cnlh/nps/lib/common"
  4. "github.com/cnlh/nps/lib/conn"
  5. "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
  6. "github.com/cnlh/nps/vender/github.com/xtaci/kcp"
  7. "strconv"
  8. "time"
  9. )
  10. type P2PServer struct {
  11. BaseServer
  12. p2pPort int
  13. p2p map[string]*p2p
  14. }
  15. type p2p struct {
  16. provider *conn.Conn
  17. visitor *conn.Conn
  18. visitorAddr string
  19. providerAddr string
  20. }
  21. func NewP2PServer(p2pPort int) *P2PServer {
  22. return &P2PServer{
  23. p2pPort: p2pPort,
  24. p2p: make(map[string]*p2p),
  25. }
  26. }
  27. func (s *P2PServer) Start() error {
  28. kcpListener, err := kcp.ListenWithOptions(":"+strconv.Itoa(s.p2pPort), nil, 150, 3)
  29. if err != nil {
  30. logs.Error(err)
  31. return err
  32. }
  33. for {
  34. c, err := kcpListener.AcceptKCP()
  35. conn.SetUdpSession(c)
  36. if err != nil {
  37. logs.Warn(err)
  38. continue
  39. }
  40. go s.p2pProcess(conn.NewConn(c))
  41. }
  42. return nil
  43. }
  44. func (s *P2PServer) p2pProcess(c *conn.Conn) {
  45. logs.Warn("new link", c.Conn.RemoteAddr())
  46. //获取密钥
  47. var (
  48. f string
  49. b []byte
  50. err error
  51. v *p2p
  52. ok bool
  53. )
  54. if b, err = c.ReadLen(32); err != nil {
  55. return
  56. }
  57. //获取角色
  58. if f, err = c.ReadFlag(); err != nil {
  59. return
  60. }
  61. logs.Warn("收到", string(b), f)
  62. if v, ok = s.p2p[string(b)]; !ok {
  63. v = new(p2p)
  64. s.p2p[string(b)] = v
  65. }
  66. logs.Warn(f, c.Conn.RemoteAddr().String())
  67. //存储
  68. if f == common.WORK_P2P_VISITOR {
  69. v.visitorAddr = c.Conn.RemoteAddr().String()
  70. v.visitor = c
  71. for {
  72. time.Sleep(time.Second)
  73. if v.provider != nil {
  74. break
  75. }
  76. }
  77. logs.Warn("等待确认")
  78. if _, err := v.provider.ReadFlag(); err == nil {
  79. v.visitor.WriteLenContent([]byte(v.providerAddr))
  80. logs.Warn("收到确认")
  81. delete(s.p2p, string(b))
  82. } else {
  83. logs.Warn("收到确认失败", err)
  84. }
  85. } else {
  86. v.providerAddr = c.Conn.RemoteAddr().String()
  87. v.provider = c
  88. for {
  89. time.Sleep(time.Second)
  90. if v.visitor != nil {
  91. v.provider.WriteLenContent([]byte(v.visitorAddr))
  92. break
  93. }
  94. }
  95. }
  96. //假设是连接者、等待对应的被连接者连上后,发送被连接者信息
  97. //假设是被连接者,等待对应的连接者脸上后,发送连接者信息
  98. }