mux_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. package mux
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/cnlh/nps/lib/common"
  6. "github.com/cnlh/nps/lib/goroutine"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "net/http/httputil"
  12. _ "net/http/pprof"
  13. "strconv"
  14. "testing"
  15. "time"
  16. "unsafe"
  17. "github.com/astaxie/beego/logs"
  18. )
  19. var conn1 net.Conn
  20. var conn2 net.Conn
  21. func TestNewMux(t *testing.T) {
  22. go func() {
  23. http.ListenAndServe("0.0.0.0:8889", nil)
  24. }()
  25. logs.EnableFuncCallDepth(true)
  26. logs.SetLogFuncCallDepth(3)
  27. server()
  28. client()
  29. //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false))
  30. time.Sleep(time.Second * 3)
  31. go func() {
  32. m2 := NewMux(conn2, "tcp")
  33. for {
  34. //logs.Warn("npc starting accept")
  35. c, err := m2.Accept()
  36. if err != nil {
  37. logs.Warn(err)
  38. continue
  39. }
  40. //logs.Warn("npc accept success ")
  41. c2, err := net.Dial("tcp", "127.0.0.1:80")
  42. if err != nil {
  43. logs.Warn(err)
  44. c.Close()
  45. continue
  46. }
  47. //c2.(*net.TCPConn).SetReadBuffer(0)
  48. //c2.(*net.TCPConn).SetReadBuffer(0)
  49. _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil))
  50. //go func(c2 net.Conn, c *conn) {
  51. // wg := new(sync.WaitGroup)
  52. // wg.Add(2)
  53. // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg))
  54. // //go func() {
  55. // // _, err = common.CopyBuffer(c2, c)
  56. // // if err != nil {
  57. // // c2.Close()
  58. // // c.Close()
  59. // // //logs.Warn("close npc by copy from nps", err, c.connId)
  60. // // }
  61. // // wg.Done()
  62. // //}()
  63. // //wg.Add(1)
  64. // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg))
  65. // //go func() {
  66. // // _, err = common.CopyBuffer(c, c2)
  67. // // if err != nil {
  68. // // c2.Close()
  69. // // c.Close()
  70. // // //logs.Warn("close npc by copy from server", err, c.connId)
  71. // // }
  72. // // wg.Done()
  73. // //}()
  74. // //logs.Warn("npc wait")
  75. // wg.Wait()
  76. //}(c2, c.(*conn))
  77. }
  78. }()
  79. go func() {
  80. m1 := NewMux(conn1, "tcp")
  81. l, err := net.Listen("tcp", "127.0.0.1:7777")
  82. if err != nil {
  83. logs.Warn(err)
  84. }
  85. for {
  86. //logs.Warn("nps starting accept")
  87. conns, err := l.Accept()
  88. if err != nil {
  89. logs.Warn(err)
  90. continue
  91. }
  92. //conns.(*net.TCPConn).SetReadBuffer(0)
  93. //conns.(*net.TCPConn).SetReadBuffer(0)
  94. //logs.Warn("nps accept success starting new conn")
  95. tmpCpnn, err := m1.NewConn()
  96. if err != nil {
  97. logs.Warn("nps new conn err ", err)
  98. continue
  99. }
  100. //logs.Warn("nps new conn success ", tmpCpnn.connId)
  101. _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil))
  102. //go func(tmpCpnn *conn, conns net.Conn) {
  103. // wg := new(sync.WaitGroup)
  104. // wg.Add(2)
  105. // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg))
  106. // //go func() {
  107. // // _, err := common.CopyBuffer(tmpCpnn, conns)
  108. // // if err != nil {
  109. // // conns.Close()
  110. // // tmpCpnn.Close()
  111. // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
  112. // // }
  113. // //}()
  114. // //wg.Add(1)
  115. // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg))
  116. // //time.Sleep(time.Second)
  117. // //_, err = common.CopyBuffer(conns, tmpCpnn)
  118. // //if err != nil {
  119. // // conns.Close()
  120. // // tmpCpnn.Close()
  121. // // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
  122. // //}
  123. // wg.Wait()
  124. //}(tmpCpnn, conns)
  125. }
  126. }()
  127. //go NewLogServer()
  128. time.Sleep(time.Second * 5)
  129. //for i := 0; i < 1; i++ {
  130. // go test_raw(i)
  131. //}
  132. //test_request()
  133. for {
  134. time.Sleep(time.Second * 5)
  135. }
  136. }
  137. func server() {
  138. var err error
  139. l, err := net.Listen("tcp", "127.0.0.1:9999")
  140. if err != nil {
  141. logs.Warn(err)
  142. }
  143. go func() {
  144. conn1, err = l.Accept()
  145. if err != nil {
  146. logs.Warn(err)
  147. }
  148. }()
  149. return
  150. }
  151. func client() {
  152. var err error
  153. conn2, err = net.Dial("tcp", "127.0.0.1:9999")
  154. if err != nil {
  155. logs.Warn(err)
  156. }
  157. }
  158. func test_request() {
  159. conn, _ := net.Dial("tcp", "127.0.0.1:7777")
  160. for i := 0; i < 1000; i++ {
  161. conn.Write([]byte(`GET / HTTP/1.1
  162. Host: 127.0.0.1:7777
  163. Connection: keep-alive
  164. `))
  165. r, err := http.ReadResponse(bufio.NewReader(conn), nil)
  166. if err != nil {
  167. logs.Warn("close by read response err", err)
  168. break
  169. }
  170. logs.Warn("read response success", r)
  171. b, err := httputil.DumpResponse(r, true)
  172. if err != nil {
  173. logs.Warn("close by dump response err", err)
  174. break
  175. }
  176. fmt.Println(string(b[:20]), err)
  177. //time.Sleep(time.Second)
  178. }
  179. logs.Warn("finish")
  180. }
  181. func test_raw(k int) {
  182. for i := 0; i < 1000; i++ {
  183. ti := time.Now()
  184. conn, err := net.Dial("tcp", "127.0.0.1:7777")
  185. if err != nil {
  186. logs.Warn("conn dial err", err)
  187. }
  188. tid := time.Now()
  189. conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
  190. Host: 127.0.0.1:7777
  191. `))
  192. tiw := time.Now()
  193. buf := make([]byte, 3572)
  194. n, err := io.ReadFull(conn, buf)
  195. //n, err := conn.Read(buf)
  196. if err != nil {
  197. logs.Warn("close by read response err", err)
  198. break
  199. }
  200. logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
  201. //time.Sleep(time.Second)
  202. err = conn.Close()
  203. if err != nil {
  204. logs.Warn("close conn err ", err)
  205. }
  206. now := time.Now()
  207. du := now.Sub(ti).Seconds()
  208. dud := now.Sub(tid).Seconds()
  209. duw := now.Sub(tiw).Seconds()
  210. if du > 1 {
  211. logs.Warn("duration long", du, dud, duw, k, i)
  212. }
  213. if n != 3572 {
  214. logs.Warn("n loss", n, string(buf))
  215. }
  216. }
  217. logs.Warn("finish")
  218. }
  219. func TestNewConn(t *testing.T) {
  220. buf := common.GetBufPoolCopy()
  221. logs.Warn(len(buf), cap(buf))
  222. //b := pool.GetBufPoolCopy()
  223. //b[0] = 1
  224. //b[1] = 2
  225. //b[2] = 3
  226. b := []byte{1, 2, 3}
  227. logs.Warn(copy(buf[:3], b), len(buf), cap(buf))
  228. logs.Warn(len(buf), buf[0])
  229. }
  230. func TestDQueue(t *testing.T) {
  231. logs.EnableFuncCallDepth(true)
  232. logs.SetLogFuncCallDepth(3)
  233. d := new(bufDequeue)
  234. d.vals = make([]unsafe.Pointer, 8)
  235. go func() {
  236. time.Sleep(time.Second)
  237. for i := 0; i < 10; i++ {
  238. logs.Warn(i)
  239. logs.Warn(d.popTail())
  240. }
  241. }()
  242. go func() {
  243. time.Sleep(time.Second)
  244. for i := 0; i < 10; i++ {
  245. data := "test"
  246. go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data)))
  247. }
  248. }()
  249. time.Sleep(time.Second * 3)
  250. }
  251. func TestChain(t *testing.T) {
  252. go func() {
  253. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  254. }()
  255. logs.EnableFuncCallDepth(true)
  256. logs.SetLogFuncCallDepth(3)
  257. time.Sleep(time.Second * 5)
  258. d := new(bufChain)
  259. d.new(256)
  260. go func() {
  261. time.Sleep(time.Second)
  262. for i := 0; i < 30000; i++ {
  263. unsa, ok := d.popTail()
  264. str := (*string)(unsa)
  265. if ok {
  266. fmt.Println(i, str, *str, ok)
  267. //logs.Warn(i, str, *str, ok)
  268. } else {
  269. fmt.Println("nil", i, ok)
  270. //logs.Warn("nil", i, ok)
  271. }
  272. }
  273. }()
  274. go func() {
  275. time.Sleep(time.Second)
  276. for i := 0; i < 3000; i++ {
  277. go func(i int) {
  278. for n := 0; n < 10; n++ {
  279. data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
  280. fmt.Println(data, unsafe.Pointer(&data))
  281. //logs.Warn(data, unsafe.Pointer(&data))
  282. d.pushHead(unsafe.Pointer(&data))
  283. }
  284. }(i)
  285. }
  286. }()
  287. time.Sleep(time.Second * 100000)
  288. }
  289. func TestFIFO(t *testing.T) {
  290. go func() {
  291. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  292. }()
  293. logs.EnableFuncCallDepth(true)
  294. logs.SetLogFuncCallDepth(3)
  295. time.Sleep(time.Second * 5)
  296. d := new(ReceiveWindowQueue)
  297. d.New()
  298. go func() {
  299. time.Sleep(time.Second)
  300. for i := 0; i < 1001; i++ {
  301. data, err := d.Pop()
  302. if err == nil {
  303. //fmt.Println(i, string(data.buf), err)
  304. logs.Warn(i, string(data.Buf), err)
  305. common.ListElementPool.Put(data)
  306. } else {
  307. //fmt.Println("err", err)
  308. logs.Warn("err", err)
  309. }
  310. //logs.Warn(d.Len())
  311. }
  312. logs.Warn("pop finish")
  313. }()
  314. go func() {
  315. time.Sleep(time.Second * 10)
  316. for i := 0; i < 1000; i++ {
  317. by := []byte("test " + strconv.Itoa(i) + " ") //
  318. data, _ := NewListElement(by, uint16(len(by)), true)
  319. //fmt.Println(string((*data).buf), data)
  320. //logs.Warn(string((*data).buf), data)
  321. d.Push(data)
  322. }
  323. }()
  324. time.Sleep(time.Second * 100000)
  325. }
  326. func TestPriority(t *testing.T) {
  327. go func() {
  328. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  329. }()
  330. logs.EnableFuncCallDepth(true)
  331. logs.SetLogFuncCallDepth(3)
  332. time.Sleep(time.Second * 5)
  333. d := new(PriorityQueue)
  334. d.New()
  335. go func() {
  336. time.Sleep(time.Second)
  337. for i := 0; i < 360050; i++ {
  338. data := d.Pop()
  339. //fmt.Println(i, string(data.buf), err)
  340. logs.Warn(i, string(data.Content), data)
  341. }
  342. logs.Warn("pop finish")
  343. }()
  344. go func() {
  345. time.Sleep(time.Second * 10)
  346. for i := 0; i < 30000; i++ {
  347. go func(i int) {
  348. for n := 0; n < 10; n++ {
  349. data := new(common.MuxPackager)
  350. by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  351. _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  352. //fmt.Println(string((*data).buf), data)
  353. logs.Warn(string((*data).Content), data)
  354. d.Push(data)
  355. }
  356. }(i)
  357. go func(i int) {
  358. data := new(common.MuxPackager)
  359. _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  360. //fmt.Println(string((*data).buf), data)
  361. logs.Warn(data)
  362. d.Push(data)
  363. }(i)
  364. go func(i int) {
  365. data := new(common.MuxPackager)
  366. _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  367. //fmt.Println(string((*data).buf), data)
  368. logs.Warn(data)
  369. d.Push(data)
  370. }(i)
  371. }
  372. }()
  373. time.Sleep(time.Second * 100000)
  374. }
  375. //func TestReceive(t *testing.T) {
  376. // go func() {
  377. // log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  378. // }()
  379. // logs.EnableFuncCallDepth(true)
  380. // logs.SetLogFuncCallDepth(3)
  381. // time.Sleep(time.Second * 5)
  382. // mux := new(Mux)
  383. // mux.bw.readBandwidth = float64(1*1024*1024)
  384. // mux.latency = float64(1/1000)
  385. // wind := new(ReceiveWindow)
  386. // wind.New(mux)
  387. // wind.
  388. // go func() {
  389. // time.Sleep(time.Second)
  390. // for i := 0; i < 36000; i++ {
  391. // data := d.Pop()
  392. // //fmt.Println(i, string(data.buf), err)
  393. // logs.Warn(i, string(data.Content), data)
  394. // }
  395. // }()
  396. // go func() {
  397. // time.Sleep(time.Second*10)
  398. // for i := 0; i < 3000; i++ {
  399. // go func(i int) {
  400. // for n := 0; n < 10; n++{
  401. // data := new(common.MuxPackager)
  402. // by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  403. // _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  404. // //fmt.Println(string((*data).buf), data)
  405. // logs.Warn(string((*data).Content), data)
  406. // d.Push(data)
  407. // }
  408. // }(i)
  409. // go func(i int) {
  410. // data := new(common.MuxPackager)
  411. // _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  412. // //fmt.Println(string((*data).buf), data)
  413. // logs.Warn(data)
  414. // d.Push(data)
  415. // }(i)
  416. // go func(i int) {
  417. // data := new(common.MuxPackager)
  418. // _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  419. // //fmt.Println(string((*data).buf), data)
  420. // logs.Warn(data)
  421. // d.Push(data)
  422. // }(i)
  423. // }
  424. // }()
  425. // time.Sleep(time.Second * 100000)
  426. //}