p2p.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package proxy
  2. import (
  3. "github.com/cnlh/nps/lib/common"
  4. "github.com/cnlh/nps/lib/conn"
  5. "net"
  6. "strconv"
  7. "time"
  8. )
  9. type P2PServer struct {
  10. BaseServer
  11. p2pPort int
  12. p2p map[string]*p2p
  13. }
  14. type p2p struct {
  15. provider *conn.Conn
  16. visitor *conn.Conn
  17. visitorAddr string
  18. providerAddr string
  19. }
  20. func NewP2PServer(p2pPort int) *P2PServer {
  21. return &P2PServer{
  22. p2pPort: p2pPort,
  23. p2p: make(map[string]*p2p),
  24. }
  25. }
  26. func (s *P2PServer) Start() error {
  27. return conn.NewKcpListenerAndProcess(":"+strconv.Itoa(s.p2pPort), func(c net.Conn) {
  28. s.p2pProcess(conn.NewConn(c))
  29. })
  30. }
  31. func (s *P2PServer) p2pProcess(c *conn.Conn) {
  32. //获取密钥
  33. var (
  34. f string
  35. b []byte
  36. err error
  37. v *p2p
  38. ok bool
  39. )
  40. if b, err = c.GetShortContent(32); err != nil {
  41. return
  42. }
  43. //获取角色
  44. if f, err = c.ReadFlag(); err != nil {
  45. return
  46. }
  47. if v, ok = s.p2p[string(b)]; !ok {
  48. v = new(p2p)
  49. s.p2p[string(b)] = v
  50. }
  51. //存储
  52. if f == common.WORK_P2P_VISITOR {
  53. v.visitorAddr = c.Conn.RemoteAddr().String()
  54. v.visitor = c
  55. for {
  56. time.Sleep(time.Second)
  57. if v.provider != nil {
  58. break
  59. }
  60. }
  61. if _, err := v.provider.ReadFlag(); err == nil {
  62. v.visitor.WriteLenContent([]byte(v.providerAddr))
  63. delete(s.p2p, string(b))
  64. } else {
  65. }
  66. } else {
  67. v.providerAddr = c.Conn.RemoteAddr().String()
  68. v.provider = c
  69. for {
  70. time.Sleep(time.Second)
  71. if v.visitor != nil {
  72. v.provider.WriteLenContent([]byte(v.visitorAddr))
  73. break
  74. }
  75. }
  76. }
  77. }