mux_test.go 11 KB


  1. package mux
  2. import (
  3. "bufio"
  4. "ehang.io/nps/lib/common"
  5. "ehang.io/nps/lib/goroutine"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "net/http/httputil"
  12. _ "net/http/pprof"
  13. "strconv"
  14. "testing"
  15. "time"
  16. "unsafe"
  17. "github.com/astaxie/beego/logs"
  18. )
  19. var conn1 net.Conn
  20. var conn2 net.Conn
  21. func TestNewMux(t *testing.T) {
  22. go func() {
  23. http.ListenAndServe("0.0.0.0:8889", nil)
  24. }()
  25. logs.EnableFuncCallDepth(true)
  26. logs.SetLogFuncCallDepth(3)
  27. server()
  28. client()
  29. //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false))
  30. time.Sleep(time.Second * 3)
  31. go func() {
  32. m2 := NewMux(conn2, "tcp")
  33. //m2 := NewMux(conn2, "kcp")
  34. for {
  35. //logs.Warn("npc starting accept")
  36. c, err := m2.Accept()
  37. if err != nil {
  38. logs.Warn(err)
  39. continue
  40. }
  41. //logs.Warn("npc accept success ")
  42. c2, err := net.Dial("tcp", "127.0.0.1:80")
  43. if err != nil {
  44. logs.Warn(err)
  45. c.Close()
  46. continue
  47. }
  48. //c2.(*net.TCPConn).SetReadBuffer(0)
  49. //c2.(*net.TCPConn).SetReadBuffer(0)
  50. _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil))
  51. //go func(c2 net.Conn, c *conn) {
  52. // wg := new(sync.WaitGroup)
  53. // wg.Add(2)
  54. // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg))
  55. // //go func() {
  56. // // _, err = common.CopyBuffer(c2, c)
  57. // // if err != nil {
  58. // // c2.Close()
  59. // // c.Close()
  60. // // //logs.Warn("close npc by copy from nps", err, c.connId)
  61. // // }
  62. // // wg.Done()
  63. // //}()
  64. // //wg.Add(1)
  65. // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg))
  66. // //go func() {
  67. // // _, err = common.CopyBuffer(c, c2)
  68. // // if err != nil {
  69. // // c2.Close()
  70. // // c.Close()
  71. // // //logs.Warn("close npc by copy from server", err, c.connId)
  72. // // }
  73. // // wg.Done()
  74. // //}()
  75. // //logs.Warn("npc wait")
  76. // wg.Wait()
  77. //}(c2, c.(*conn))
  78. }
  79. }()
  80. go func() {
  81. m1 := NewMux(conn1, "tcp")
  82. //m1 := NewMux(conn1, "kcp")
  83. l, err := net.Listen("tcp", "127.0.0.1:7777")
  84. if err != nil {
  85. logs.Warn(err)
  86. }
  87. for {
  88. //logs.Warn("nps starting accept")
  89. conns, err := l.Accept()
  90. if err != nil {
  91. logs.Warn(err)
  92. continue
  93. }
  94. //conns.(*net.TCPConn).SetReadBuffer(0)
  95. //conns.(*net.TCPConn).SetReadBuffer(0)
  96. //logs.Warn("nps accept success starting new conn")
  97. tmpCpnn, err := m1.NewConn()
  98. if err != nil {
  99. logs.Warn("nps new conn err ", err)
  100. continue
  101. }
  102. //logs.Warn("nps new conn success ", tmpCpnn.connId)
  103. _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil))
  104. //go func(tmpCpnn *conn, conns net.Conn) {
  105. // wg := new(sync.WaitGroup)
  106. // wg.Add(2)
  107. // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg))
  108. // //go func() {
  109. // // _, err := common.CopyBuffer(tmpCpnn, conns)
  110. // // if err != nil {
  111. // // conns.Close()
  112. // // tmpCpnn.Close()
  113. // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
  114. // // }
  115. // //}()
  116. // //wg.Add(1)
  117. // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg))
  118. // //time.Sleep(time.Second)
  119. // //_, err = common.CopyBuffer(conns, tmpCpnn)
  120. // //if err != nil {
  121. // // conns.Close()
  122. // // tmpCpnn.Close()
  123. // // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
  124. // //}
  125. // wg.Wait()
  126. //}(tmpCpnn, conns)
  127. }
  128. }()
  129. //go NewLogServer()
  130. time.Sleep(time.Second * 5)
  131. //for i := 0; i < 1; i++ {
  132. // go test_raw(i)
  133. //}
  134. //test_request()
  135. for {
  136. time.Sleep(time.Second * 5)
  137. }
  138. }
  139. func server() {
  140. var err error
  141. l, err := net.Listen("tcp", "127.0.0.1:9999")
  142. //l, err := kcp.Listen("127.0.0.1:9999")
  143. if err != nil {
  144. logs.Warn(err)
  145. }
  146. go func() {
  147. conn1, err = l.Accept()
  148. //logs.Info("accept", conn1)
  149. if err != nil {
  150. logs.Warn(err)
  151. }
  152. }()
  153. return
  154. }
  155. func client() {
  156. var err error
  157. conn2, err = net.Dial("tcp", "127.0.0.1:9999")
  158. //logs.Warn("dial")
  159. //conn2, err = kcp.Dial("127.0.0.1:9999")
  160. if err != nil {
  161. logs.Warn(err)
  162. }
  163. }
  164. func test_request() {
  165. conn, _ := net.Dial("tcp", "127.0.0.1:7777")
  166. for i := 0; i < 1000; i++ {
  167. conn.Write([]byte(`GET / HTTP/1.1
  168. Host: 127.0.0.1:7777
  169. Connection: keep-alive
  170. `))
  171. r, err := http.ReadResponse(bufio.NewReader(conn), nil)
  172. if err != nil {
  173. logs.Warn("close by read response err", err)
  174. break
  175. }
  176. logs.Warn("read response success", r)
  177. b, err := httputil.DumpResponse(r, true)
  178. if err != nil {
  179. logs.Warn("close by dump response err", err)
  180. break
  181. }
  182. fmt.Println(string(b[:20]), err)
  183. //time.Sleep(time.Second)
  184. }
  185. logs.Warn("finish")
  186. }
  187. func test_raw(k int) {
  188. for i := 0; i < 1000; i++ {
  189. ti := time.Now()
  190. conn, err := net.Dial("tcp", "127.0.0.1:7777")
  191. if err != nil {
  192. logs.Warn("conn dial err", err)
  193. }
  194. tid := time.Now()
  195. conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
  196. Host: 127.0.0.1:7777
  197. `))
  198. tiw := time.Now()
  199. buf := make([]byte, 3572)
  200. n, err := io.ReadFull(conn, buf)
  201. //n, err := conn.Read(buf)
  202. if err != nil {
  203. logs.Warn("close by read response err", err)
  204. break
  205. }
  206. logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
  207. //time.Sleep(time.Second)
  208. err = conn.Close()
  209. if err != nil {
  210. logs.Warn("close conn err ", err)
  211. }
  212. now := time.Now()
  213. du := now.Sub(ti).Seconds()
  214. dud := now.Sub(tid).Seconds()
  215. duw := now.Sub(tiw).Seconds()
  216. if du > 1 {
  217. logs.Warn("duration long", du, dud, duw, k, i)
  218. }
  219. if n != 3572 {
  220. logs.Warn("n loss", n, string(buf))
  221. }
  222. }
  223. logs.Warn("finish")
  224. }
  225. func TestNewConn(t *testing.T) {
  226. buf := common.GetBufPoolCopy()
  227. logs.Warn(len(buf), cap(buf))
  228. //b := pool.GetBufPoolCopy()
  229. //b[0] = 1
  230. //b[1] = 2
  231. //b[2] = 3
  232. b := []byte{1, 2, 3}
  233. logs.Warn(copy(buf[:3], b), len(buf), cap(buf))
  234. logs.Warn(len(buf), buf[0])
  235. }
  236. func TestDQueue(t *testing.T) {
  237. logs.EnableFuncCallDepth(true)
  238. logs.SetLogFuncCallDepth(3)
  239. d := new(bufDequeue)
  240. d.vals = make([]unsafe.Pointer, 8)
  241. go func() {
  242. time.Sleep(time.Second)
  243. for i := 0; i < 10; i++ {
  244. logs.Warn(i)
  245. logs.Warn(d.popTail())
  246. }
  247. }()
  248. go func() {
  249. time.Sleep(time.Second)
  250. for i := 0; i < 10; i++ {
  251. data := "test"
  252. go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data)))
  253. }
  254. }()
  255. time.Sleep(time.Second * 3)
  256. }
  257. func TestChain(t *testing.T) {
  258. go func() {
  259. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  260. }()
  261. logs.EnableFuncCallDepth(true)
  262. logs.SetLogFuncCallDepth(3)
  263. time.Sleep(time.Second * 5)
  264. d := new(bufChain)
  265. d.new(256)
  266. go func() {
  267. time.Sleep(time.Second)
  268. for i := 0; i < 30000; i++ {
  269. unsa, ok := d.popTail()
  270. str := (*string)(unsa)
  271. if ok {
  272. fmt.Println(i, str, *str, ok)
  273. //logs.Warn(i, str, *str, ok)
  274. } else {
  275. fmt.Println("nil", i, ok)
  276. //logs.Warn("nil", i, ok)
  277. }
  278. }
  279. }()
  280. go func() {
  281. time.Sleep(time.Second)
  282. for i := 0; i < 3000; i++ {
  283. go func(i int) {
  284. for n := 0; n < 10; n++ {
  285. data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
  286. fmt.Println(data, unsafe.Pointer(&data))
  287. //logs.Warn(data, unsafe.Pointer(&data))
  288. d.pushHead(unsafe.Pointer(&data))
  289. }
  290. }(i)
  291. }
  292. }()
  293. time.Sleep(time.Second * 100000)
  294. }
  295. func TestFIFO(t *testing.T) {
  296. go func() {
  297. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  298. }()
  299. logs.EnableFuncCallDepth(true)
  300. logs.SetLogFuncCallDepth(3)
  301. time.Sleep(time.Second * 5)
  302. d := new(ReceiveWindowQueue)
  303. d.New()
  304. go func() {
  305. time.Sleep(time.Second)
  306. for i := 0; i < 1001; i++ {
  307. data, err := d.Pop()
  308. if err == nil {
  309. //fmt.Println(i, string(data.buf), err)
  310. logs.Warn(i, string(data.Buf), err)
  311. common.ListElementPool.Put(data)
  312. } else {
  313. //fmt.Println("err", err)
  314. logs.Warn("err", err)
  315. }
  316. //logs.Warn(d.Len())
  317. }
  318. logs.Warn("pop finish")
  319. }()
  320. go func() {
  321. time.Sleep(time.Second * 10)
  322. for i := 0; i < 1000; i++ {
  323. by := []byte("test " + strconv.Itoa(i) + " ") //
  324. data, _ := NewListElement(by, uint16(len(by)), true)
  325. //fmt.Println(string((*data).buf), data)
  326. //logs.Warn(string((*data).buf), data)
  327. d.Push(data)
  328. }
  329. }()
  330. time.Sleep(time.Second * 100000)
  331. }
  332. func TestPriority(t *testing.T) {
  333. go func() {
  334. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  335. }()
  336. logs.EnableFuncCallDepth(true)
  337. logs.SetLogFuncCallDepth(3)
  338. time.Sleep(time.Second * 5)
  339. d := new(PriorityQueue)
  340. d.New()
  341. go func() {
  342. time.Sleep(time.Second)
  343. for i := 0; i < 360050; i++ {
  344. data := d.Pop()
  345. //fmt.Println(i, string(data.buf), err)
  346. logs.Warn(i, string(data.Content), data)
  347. }
  348. logs.Warn("pop finish")
  349. }()
  350. go func() {
  351. time.Sleep(time.Second * 10)
  352. for i := 0; i < 30000; i++ {
  353. go func(i int) {
  354. for n := 0; n < 10; n++ {
  355. data := new(common.MuxPackager)
  356. by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  357. _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  358. //fmt.Println(string((*data).buf), data)
  359. logs.Warn(string((*data).Content), data)
  360. d.Push(data)
  361. }
  362. }(i)
  363. go func(i int) {
  364. data := new(common.MuxPackager)
  365. _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  366. //fmt.Println(string((*data).buf), data)
  367. logs.Warn(data)
  368. d.Push(data)
  369. }(i)
  370. go func(i int) {
  371. data := new(common.MuxPackager)
  372. _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  373. //fmt.Println(string((*data).buf), data)
  374. logs.Warn(data)
  375. d.Push(data)
  376. }(i)
  377. }
  378. }()
  379. time.Sleep(time.Second * 100000)
  380. }
  381. //func TestReceive(t *testing.T) {
  382. // go func() {
  383. // log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  384. // }()
  385. // logs.EnableFuncCallDepth(true)
  386. // logs.SetLogFuncCallDepth(3)
  387. // time.Sleep(time.Second * 5)
  388. // mux := new(Mux)
  389. // mux.bw.readBandwidth = float64(1*1024*1024)
  390. // mux.latency = float64(1/1000)
  391. // wind := new(ReceiveWindow)
  392. // wind.New(mux)
  393. // wind.
  394. // go func() {
  395. // time.Sleep(time.Second)
  396. // for i := 0; i < 36000; i++ {
  397. // data := d.Pop()
  398. // //fmt.Println(i, string(data.buf), err)
  399. // logs.Warn(i, string(data.Content), data)
  400. // }
  401. // }()
  402. // go func() {
  403. // time.Sleep(time.Second*10)
  404. // for i := 0; i < 3000; i++ {
  405. // go func(i int) {
  406. // for n := 0; n < 10; n++{
  407. // data := new(common.MuxPackager)
  408. // by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  409. // _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  410. // //fmt.Println(string((*data).buf), data)
  411. // logs.Warn(string((*data).Content), data)
  412. // d.Push(data)
  413. // }
  414. // }(i)
  415. // go func(i int) {
  416. // data := new(common.MuxPackager)
  417. // _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  418. // //fmt.Println(string((*data).buf), data)
  419. // logs.Warn(data)
  420. // d.Push(data)
  421. // }(i)
  422. // go func(i int) {
  423. // data := new(common.MuxPackager)
  424. // _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  425. // //fmt.Println(string((*data).buf), data)
  426. // logs.Warn(data)
  427. // d.Push(data)
  428. // }(i)
  429. // }
  430. // }()
  431. // time.Sleep(time.Second * 100000)
  432. //}