mux_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package mux
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/cnlh/nps/lib/common"
  6. "log"
  7. "net"
  8. "net/http"
  9. "net/http/httputil"
  10. _ "net/http/pprof"
  11. "strconv"
  12. "testing"
  13. "time"
  14. "unsafe"
  15. "github.com/astaxie/beego/logs"
  16. )
  17. var conn1 net.Conn
  18. var conn2 net.Conn
  19. func TestNewMux(t *testing.T) {
  20. go func() {
  21. http.ListenAndServe("0.0.0.0:8889", nil)
  22. }()
  23. logs.EnableFuncCallDepth(true)
  24. logs.SetLogFuncCallDepth(3)
  25. server()
  26. client()
  27. time.Sleep(time.Second * 3)
  28. go func() {
  29. m2 := NewMux(conn2, "tcp")
  30. for {
  31. //logs.Warn("npc starting accept")
  32. c, err := m2.Accept()
  33. if err != nil {
  34. logs.Warn(err)
  35. continue
  36. }
  37. //logs.Warn("npc accept success ")
  38. c2, err := net.Dial("tcp", "127.0.0.1:80")
  39. if err != nil {
  40. logs.Warn(err)
  41. c.Close()
  42. continue
  43. }
  44. //c2.(*net.TCPConn).SetReadBuffer(0)
  45. //c2.(*net.TCPConn).SetReadBuffer(0)
  46. go func(c2 net.Conn, c *conn) {
  47. //wg := sync.WaitGroup{}
  48. //wg.Add(1)
  49. //go func() {
  50. // _, err = common.CopyBuffer(c2, c)
  51. // if err != nil {
  52. // c2.Close()
  53. // c.Close()
  54. // logs.Warn("close npc by copy from nps", err, c.connId)
  55. // }
  56. // wg.Done()
  57. //}()
  58. //wg.Add(1)
  59. //go func() {
  60. // _, err = common.CopyBuffer(c, c2)
  61. // if err != nil {
  62. // c2.Close()
  63. // c.Close()
  64. // logs.Warn("close npc by copy from server", err, c.connId)
  65. // }
  66. // wg.Done()
  67. //}()
  68. ////logs.Warn("npc wait")
  69. //wg.Wait()
  70. }(c2, c.(*conn))
  71. }
  72. }()
  73. go func() {
  74. //m1 := NewMux(conn1, "tcp")
  75. l, err := net.Listen("tcp", "127.0.0.1:7777")
  76. if err != nil {
  77. logs.Warn(err)
  78. }
  79. for {
  80. //logs.Warn("nps starting accept")
  81. _, err := l.Accept()
  82. if err != nil {
  83. logs.Warn(err)
  84. continue
  85. }
  86. //conns.(*net.TCPConn).SetReadBuffer(0)
  87. //conns.(*net.TCPConn).SetReadBuffer(0)
  88. //logs.Warn("nps accept success starting new conn")
  89. //tmpCpnn, err := m1.NewConn()
  90. //if err != nil {
  91. // logs.Warn("nps new conn err ", err)
  92. // continue
  93. //}
  94. ////logs.Warn("nps new conn success ", tmpCpnn.connId)
  95. //go func(tmpCpnn *conn, conns net.Conn) {
  96. // //go func() {
  97. // // _, err := common.CopyBuffer(tmpCpnn, conns)
  98. // // if err != nil {
  99. // // conns.Close()
  100. // // tmpCpnn.Close()
  101. // // logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
  102. // // }
  103. // //}()
  104. // ////time.Sleep(time.Second)
  105. // //_, err = common.CopyBuffer(conns, tmpCpnn)
  106. // //if err != nil {
  107. // // conns.Close()
  108. // // tmpCpnn.Close()
  109. // // logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
  110. // //}
  111. //}(tmpCpnn, conns)
  112. }
  113. }()
  114. go NewLogServer()
  115. time.Sleep(time.Second * 5)
  116. for i := 0; i < 1000; i++ {
  117. go test_raw(i)
  118. }
  119. for {
  120. time.Sleep(time.Second * 5)
  121. }
  122. }
  123. func server() {
  124. var err error
  125. l, err := net.Listen("tcp", "127.0.0.1:9999")
  126. if err != nil {
  127. logs.Warn(err)
  128. }
  129. go func() {
  130. conn1, err = l.Accept()
  131. if err != nil {
  132. logs.Warn(err)
  133. }
  134. }()
  135. return
  136. }
  137. func client() {
  138. var err error
  139. conn2, err = net.Dial("tcp", "127.0.0.1:9999")
  140. if err != nil {
  141. logs.Warn(err)
  142. }
  143. }
  144. func test_request() {
  145. conn, _ := net.Dial("tcp", "127.0.0.1:7777")
  146. for {
  147. conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
  148. Host: 127.0.0.1:7777
  149. Connection: keep-alive
  150. `))
  151. r, err := http.ReadResponse(bufio.NewReader(conn), nil)
  152. if err != nil {
  153. logs.Warn("close by read response err", err)
  154. break
  155. }
  156. logs.Warn("read response success", r)
  157. b, err := httputil.DumpResponse(r, true)
  158. if err != nil {
  159. logs.Warn("close by dump response err", err)
  160. break
  161. }
  162. fmt.Println(string(b[:20]), err)
  163. time.Sleep(time.Second)
  164. }
  165. }
  166. func test_raw(k int) {
  167. for i := 0; i < 1; i++ {
  168. ti := time.Now()
  169. _, _ = net.Dial("tcp", "127.0.0.1:7777")
  170. tid := time.Now()
  171. // conn.Write([]byte(`GET / HTTP/1.1
  172. //Host: 127.0.0.1:7777
  173. //
  174. //
  175. //`))
  176. // tiw := time.Now()
  177. //buf := make([]byte, 3572)
  178. //n, err := io.ReadFull(conn, buf)
  179. ////n, err := conn.Read(buf)
  180. //if err != nil {
  181. // logs.Warn("close by read response err", err)
  182. // break
  183. //}
  184. ////logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
  185. ////time.Sleep(time.Second)
  186. //err = conn.Close()
  187. //if err != nil {
  188. // logs.Warn("close conn err ", err)
  189. //}
  190. now := time.Now()
  191. du := now.Sub(ti).Seconds()
  192. dud := now.Sub(tid).Seconds()
  193. //duw := now.Sub(tiw).Seconds()
  194. //if du > 1 {
  195. logs.Warn("duration long", du, dud, k, i)
  196. //}
  197. //if n != 3572 {
  198. // logs.Warn("n loss", n, string(buf))
  199. //}
  200. }
  201. }
  202. func TestNewConn(t *testing.T) {
  203. buf := common.GetBufPoolCopy()
  204. logs.Warn(len(buf), cap(buf))
  205. //b := pool.GetBufPoolCopy()
  206. //b[0] = 1
  207. //b[1] = 2
  208. //b[2] = 3
  209. b := []byte{1, 2, 3}
  210. logs.Warn(copy(buf[:3], b), len(buf), cap(buf))
  211. logs.Warn(len(buf), buf[0])
  212. }
  213. func TestDQueue(t *testing.T) {
  214. logs.EnableFuncCallDepth(true)
  215. logs.SetLogFuncCallDepth(3)
  216. d := new(bufDequeue)
  217. d.vals = make([]unsafe.Pointer, 8)
  218. go func() {
  219. time.Sleep(time.Second)
  220. for i := 0; i < 10; i++ {
  221. logs.Warn(i)
  222. logs.Warn(d.popTail())
  223. }
  224. }()
  225. go func() {
  226. time.Sleep(time.Second)
  227. for i := 0; i < 10; i++ {
  228. data := "test"
  229. go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data)))
  230. }
  231. }()
  232. time.Sleep(time.Second * 3)
  233. }
  234. func TestChain(t *testing.T) {
  235. go func() {
  236. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  237. }()
  238. logs.EnableFuncCallDepth(true)
  239. logs.SetLogFuncCallDepth(3)
  240. time.Sleep(time.Second * 5)
  241. d := new(bufChain)
  242. d.new(256)
  243. go func() {
  244. time.Sleep(time.Second)
  245. for i := 0; i < 30000; i++ {
  246. unsa, ok := d.popTail()
  247. str := (*string)(unsa)
  248. if ok {
  249. fmt.Println(i, str, *str, ok)
  250. //logs.Warn(i, str, *str, ok)
  251. } else {
  252. fmt.Println("nil", i, ok)
  253. //logs.Warn("nil", i, ok)
  254. }
  255. }
  256. }()
  257. go func() {
  258. time.Sleep(time.Second)
  259. for i := 0; i < 3000; i++ {
  260. go func(i int) {
  261. for n := 0; n < 10; n++ {
  262. data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
  263. fmt.Println(data, unsafe.Pointer(&data))
  264. //logs.Warn(data, unsafe.Pointer(&data))
  265. d.pushHead(unsafe.Pointer(&data))
  266. }
  267. }(i)
  268. }
  269. }()
  270. time.Sleep(time.Second * 100000)
  271. }
  272. func TestFIFO(t *testing.T) {
  273. go func() {
  274. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  275. }()
  276. logs.EnableFuncCallDepth(true)
  277. logs.SetLogFuncCallDepth(3)
  278. time.Sleep(time.Second * 5)
  279. d := new(FIFOQueue)
  280. d.New()
  281. go func() {
  282. time.Sleep(time.Second)
  283. for i := 0; i < 30000; i++ {
  284. data, err := d.Pop()
  285. if err == nil {
  286. //fmt.Println(i, string(data.buf), err)
  287. logs.Warn(i, string(data.buf), err)
  288. } else {
  289. //fmt.Println("err", err)
  290. logs.Warn("err", err)
  291. }
  292. }
  293. }()
  294. go func() {
  295. time.Sleep(time.Second * 10)
  296. for i := 0; i < 3000; i++ {
  297. go func(i int) {
  298. for n := 0; n < 10; n++ {
  299. data := new(ListElement)
  300. by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  301. _ = data.New(by, uint16(len(by)), true)
  302. //fmt.Println(string((*data).buf), data)
  303. logs.Warn(string((*data).buf), data)
  304. d.Push(data)
  305. }
  306. }(i)
  307. }
  308. }()
  309. time.Sleep(time.Second * 100000)
  310. }
  311. func TestPriority(t *testing.T) {
  312. go func() {
  313. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  314. }()
  315. logs.EnableFuncCallDepth(true)
  316. logs.SetLogFuncCallDepth(3)
  317. time.Sleep(time.Second * 5)
  318. d := new(PriorityQueue)
  319. d.New()
  320. go func() {
  321. time.Sleep(time.Second)
  322. for i := 0; i < 36000; i++ {
  323. data := d.Pop()
  324. //fmt.Println(i, string(data.buf), err)
  325. logs.Warn(i, string(data.Content), data)
  326. }
  327. }()
  328. go func() {
  329. time.Sleep(time.Second * 10)
  330. for i := 0; i < 3000; i++ {
  331. go func(i int) {
  332. for n := 0; n < 10; n++ {
  333. data := new(common.MuxPackager)
  334. by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  335. _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  336. //fmt.Println(string((*data).buf), data)
  337. logs.Warn(string((*data).Content), data)
  338. d.Push(data)
  339. }
  340. }(i)
  341. go func(i int) {
  342. data := new(common.MuxPackager)
  343. _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  344. //fmt.Println(string((*data).buf), data)
  345. logs.Warn(data)
  346. d.Push(data)
  347. }(i)
  348. go func(i int) {
  349. data := new(common.MuxPackager)
  350. _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  351. //fmt.Println(string((*data).buf), data)
  352. logs.Warn(data)
  353. d.Push(data)
  354. }(i)
  355. }
  356. }()
  357. time.Sleep(time.Second * 100000)
  358. }
  359. //func TestReceive(t *testing.T) {
  360. // go func() {
  361. // log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  362. // }()
  363. // logs.EnableFuncCallDepth(true)
  364. // logs.SetLogFuncCallDepth(3)
  365. // time.Sleep(time.Second * 5)
  366. // mux := new(Mux)
  367. // mux.bw.readBandwidth = float64(1*1024*1024)
  368. // mux.latency = float64(1/1000)
  369. // wind := new(ReceiveWindow)
  370. // wind.New(mux)
  371. // wind.
  372. // go func() {
  373. // time.Sleep(time.Second)
  374. // for i := 0; i < 36000; i++ {
  375. // data := d.Pop()
  376. // //fmt.Println(i, string(data.buf), err)
  377. // logs.Warn(i, string(data.Content), data)
  378. // }
  379. // }()
  380. // go func() {
  381. // time.Sleep(time.Second*10)
  382. // for i := 0; i < 3000; i++ {
  383. // go func(i int) {
  384. // for n := 0; n < 10; n++{
  385. // data := new(common.MuxPackager)
  386. // by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  387. // _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  388. // //fmt.Println(string((*data).buf), data)
  389. // logs.Warn(string((*data).Content), data)
  390. // d.Push(data)
  391. // }
  392. // }(i)
  393. // go func(i int) {
  394. // data := new(common.MuxPackager)
  395. // _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  396. // //fmt.Println(string((*data).buf), data)
  397. // logs.Warn(data)
  398. // d.Push(data)
  399. // }(i)
  400. // go func(i int) {
  401. // data := new(common.MuxPackager)
  402. // _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  403. // //fmt.Println(string((*data).buf), data)
  404. // logs.Warn(data)
  405. // d.Push(data)
  406. // }(i)
  407. // }
  408. // }()
  409. // time.Sleep(time.Second * 100000)
  410. //}