mux_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package mux
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/cnlh/nps/lib/common"
  6. "io"
  7. "log"
  8. "net"
  9. "net/http"
  10. "net/http/httputil"
  11. _ "net/http/pprof"
  12. "strconv"
  13. "sync"
  14. "testing"
  15. "time"
  16. "unsafe"
  17. "github.com/astaxie/beego/logs"
  18. )
  19. var conn1 net.Conn
  20. var conn2 net.Conn
  21. func TestNewMux(t *testing.T) {
  22. go func() {
  23. http.ListenAndServe("0.0.0.0:8889", nil)
  24. }()
  25. logs.EnableFuncCallDepth(true)
  26. logs.SetLogFuncCallDepth(3)
  27. server()
  28. client()
  29. time.Sleep(time.Second * 3)
  30. go func() {
  31. m2 := NewMux(conn2, "tcp")
  32. for {
  33. //logs.Warn("npc starting accept")
  34. c, err := m2.Accept()
  35. if err != nil {
  36. logs.Warn(err)
  37. continue
  38. }
  39. //logs.Warn("npc accept success ")
  40. c2, err := net.Dial("tcp", "127.0.0.1:80")
  41. if err != nil {
  42. logs.Warn(err)
  43. c.Close()
  44. continue
  45. }
  46. //c2.(*net.TCPConn).SetReadBuffer(0)
  47. //c2.(*net.TCPConn).SetReadBuffer(0)
  48. go func(c2 net.Conn, c *conn) {
  49. wg := sync.WaitGroup{}
  50. wg.Add(1)
  51. go func() {
  52. _, err = common.CopyBuffer(c2, c)
  53. if err != nil {
  54. c2.Close()
  55. c.Close()
  56. //logs.Warn("close npc by copy from nps", err, c.connId)
  57. }
  58. wg.Done()
  59. }()
  60. wg.Add(1)
  61. go func() {
  62. _, err = common.CopyBuffer(c, c2)
  63. if err != nil {
  64. c2.Close()
  65. c.Close()
  66. //logs.Warn("close npc by copy from server", err, c.connId)
  67. }
  68. wg.Done()
  69. }()
  70. //logs.Warn("npc wait")
  71. wg.Wait()
  72. }(c2, c.(*conn))
  73. }
  74. }()
  75. go func() {
  76. m1 := NewMux(conn1, "tcp")
  77. l, err := net.Listen("tcp", "127.0.0.1:7777")
  78. if err != nil {
  79. logs.Warn(err)
  80. }
  81. for {
  82. //logs.Warn("nps starting accept")
  83. conns, err := l.Accept()
  84. if err != nil {
  85. logs.Warn(err)
  86. continue
  87. }
  88. //conns.(*net.TCPConn).SetReadBuffer(0)
  89. //conns.(*net.TCPConn).SetReadBuffer(0)
  90. //logs.Warn("nps accept success starting new conn")
  91. tmpCpnn, err := m1.NewConn()
  92. if err != nil {
  93. logs.Warn("nps new conn err ", err)
  94. continue
  95. }
  96. //logs.Warn("nps new conn success ", tmpCpnn.connId)
  97. go func(tmpCpnn *conn, conns net.Conn) {
  98. go func() {
  99. _, err := common.CopyBuffer(tmpCpnn, conns)
  100. if err != nil {
  101. conns.Close()
  102. tmpCpnn.Close()
  103. //logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
  104. }
  105. }()
  106. //time.Sleep(time.Second)
  107. _, err = common.CopyBuffer(conns, tmpCpnn)
  108. if err != nil {
  109. conns.Close()
  110. tmpCpnn.Close()
  111. //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
  112. }
  113. }(tmpCpnn, conns)
  114. }
  115. }()
  116. //go NewLogServer()
  117. time.Sleep(time.Second * 5)
  118. for i := 0; i < 1000; i++ {
  119. go test_raw(i)
  120. }
  121. //test_request()
  122. for {
  123. time.Sleep(time.Second * 5)
  124. }
  125. }
  126. func server() {
  127. var err error
  128. l, err := net.Listen("tcp", "127.0.0.1:9999")
  129. if err != nil {
  130. logs.Warn(err)
  131. }
  132. go func() {
  133. conn1, err = l.Accept()
  134. if err != nil {
  135. logs.Warn(err)
  136. }
  137. }()
  138. return
  139. }
  140. func client() {
  141. var err error
  142. conn2, err = net.Dial("tcp", "127.0.0.1:9999")
  143. if err != nil {
  144. logs.Warn(err)
  145. }
  146. }
  147. func test_request() {
  148. conn, _ := net.Dial("tcp", "127.0.0.1:7777")
  149. for {
  150. conn.Write([]byte(`GET / HTTP/1.1
  151. Host: 127.0.0.1:7777
  152. Connection: keep-alive
  153. `))
  154. r, err := http.ReadResponse(bufio.NewReader(conn), nil)
  155. if err != nil {
  156. logs.Warn("close by read response err", err)
  157. break
  158. }
  159. logs.Warn("read response success", r)
  160. b, err := httputil.DumpResponse(r, true)
  161. if err != nil {
  162. logs.Warn("close by dump response err", err)
  163. break
  164. }
  165. fmt.Println(string(b[:20]), err)
  166. time.Sleep(time.Second)
  167. }
  168. }
  169. func test_raw(k int) {
  170. for i := 0; i < 10; i++ {
  171. ti := time.Now()
  172. conn, err := net.Dial("tcp", "127.0.0.1:7777")
  173. if err != nil {
  174. logs.Warn("conn dial err", err)
  175. }
  176. tid := time.Now()
  177. conn.Write([]byte(`GET / HTTP/1.1
  178. Host: 127.0.0.1:7777
  179. `))
  180. tiw := time.Now()
  181. buf := make([]byte, 3572)
  182. n, err := io.ReadFull(conn, buf)
  183. //n, err := conn.Read(buf)
  184. if err != nil {
  185. logs.Warn("close by read response err", err)
  186. break
  187. }
  188. logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
  189. //time.Sleep(time.Second)
  190. err = conn.Close()
  191. if err != nil {
  192. logs.Warn("close conn err ", err)
  193. }
  194. now := time.Now()
  195. du := now.Sub(ti).Seconds()
  196. dud := now.Sub(tid).Seconds()
  197. duw := now.Sub(tiw).Seconds()
  198. if du > 1 {
  199. logs.Warn("duration long", du, dud, duw, k, i)
  200. }
  201. if n != 3572 {
  202. logs.Warn("n loss", n, string(buf))
  203. }
  204. }
  205. }
  206. func TestNewConn(t *testing.T) {
  207. buf := common.GetBufPoolCopy()
  208. logs.Warn(len(buf), cap(buf))
  209. //b := pool.GetBufPoolCopy()
  210. //b[0] = 1
  211. //b[1] = 2
  212. //b[2] = 3
  213. b := []byte{1, 2, 3}
  214. logs.Warn(copy(buf[:3], b), len(buf), cap(buf))
  215. logs.Warn(len(buf), buf[0])
  216. }
  217. func TestDQueue(t *testing.T) {
  218. logs.EnableFuncCallDepth(true)
  219. logs.SetLogFuncCallDepth(3)
  220. d := new(bufDequeue)
  221. d.vals = make([]unsafe.Pointer, 8)
  222. go func() {
  223. time.Sleep(time.Second)
  224. for i := 0; i < 10; i++ {
  225. logs.Warn(i)
  226. logs.Warn(d.popTail())
  227. }
  228. }()
  229. go func() {
  230. time.Sleep(time.Second)
  231. for i := 0; i < 10; i++ {
  232. data := "test"
  233. go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data)))
  234. }
  235. }()
  236. time.Sleep(time.Second * 3)
  237. }
  238. func TestChain(t *testing.T) {
  239. go func() {
  240. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  241. }()
  242. logs.EnableFuncCallDepth(true)
  243. logs.SetLogFuncCallDepth(3)
  244. time.Sleep(time.Second * 5)
  245. d := new(bufChain)
  246. d.new(256)
  247. go func() {
  248. time.Sleep(time.Second)
  249. for i := 0; i < 30000; i++ {
  250. unsa, ok := d.popTail()
  251. str := (*string)(unsa)
  252. if ok {
  253. fmt.Println(i, str, *str, ok)
  254. //logs.Warn(i, str, *str, ok)
  255. } else {
  256. fmt.Println("nil", i, ok)
  257. //logs.Warn("nil", i, ok)
  258. }
  259. }
  260. }()
  261. go func() {
  262. time.Sleep(time.Second)
  263. for i := 0; i < 3000; i++ {
  264. go func(i int) {
  265. for n := 0; n < 10; n++ {
  266. data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
  267. fmt.Println(data, unsafe.Pointer(&data))
  268. //logs.Warn(data, unsafe.Pointer(&data))
  269. d.pushHead(unsafe.Pointer(&data))
  270. }
  271. }(i)
  272. }
  273. }()
  274. time.Sleep(time.Second * 100000)
  275. }
  276. func TestFIFO(t *testing.T) {
  277. go func() {
  278. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  279. }()
  280. logs.EnableFuncCallDepth(true)
  281. logs.SetLogFuncCallDepth(3)
  282. time.Sleep(time.Second * 5)
  283. d := new(ReceiveWindowQueue)
  284. d.New()
  285. go func() {
  286. time.Sleep(time.Second)
  287. for i := 0; i < 30010; i++ {
  288. data, err := d.Pop()
  289. if err == nil {
  290. //fmt.Println(i, string(data.buf), err)
  291. logs.Warn(i, string(data.buf), err)
  292. } else {
  293. //fmt.Println("err", err)
  294. logs.Warn("err", err)
  295. }
  296. //logs.Warn(d.Len())
  297. }
  298. logs.Warn("pop finish")
  299. }()
  300. go func() {
  301. time.Sleep(time.Second * 10)
  302. for i := 0; i < 3000; i++ {
  303. go func(i int) {
  304. for n := 0; n < 10; n++ {
  305. data := new(ListElement)
  306. by := []byte("test " + strconv.Itoa(i) + " " + strconv.Itoa(n)) //
  307. _ = data.New(by, uint16(len(by)), true)
  308. //fmt.Println(string((*data).buf), data)
  309. //logs.Warn(string((*data).buf), data)
  310. d.Push(data)
  311. }
  312. }(i)
  313. }
  314. }()
  315. time.Sleep(time.Second * 100000)
  316. }
  317. func TestPriority(t *testing.T) {
  318. go func() {
  319. log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  320. }()
  321. logs.EnableFuncCallDepth(true)
  322. logs.SetLogFuncCallDepth(3)
  323. time.Sleep(time.Second * 5)
  324. d := new(PriorityQueue)
  325. d.New()
  326. go func() {
  327. time.Sleep(time.Second)
  328. for i := 0; i < 36005; i++ {
  329. data := d.Pop()
  330. //fmt.Println(i, string(data.buf), err)
  331. logs.Warn(i, string(data.Content), data)
  332. }
  333. logs.Warn("pop finish")
  334. }()
  335. go func() {
  336. time.Sleep(time.Second * 10)
  337. for i := 0; i < 3000; i++ {
  338. go func(i int) {
  339. for n := 0; n < 10; n++ {
  340. data := new(common.MuxPackager)
  341. by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  342. _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  343. //fmt.Println(string((*data).buf), data)
  344. logs.Warn(string((*data).Content), data)
  345. d.Push(data)
  346. }
  347. }(i)
  348. go func(i int) {
  349. data := new(common.MuxPackager)
  350. _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  351. //fmt.Println(string((*data).buf), data)
  352. logs.Warn(data)
  353. d.Push(data)
  354. }(i)
  355. go func(i int) {
  356. data := new(common.MuxPackager)
  357. _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  358. //fmt.Println(string((*data).buf), data)
  359. logs.Warn(data)
  360. d.Push(data)
  361. }(i)
  362. }
  363. }()
  364. time.Sleep(time.Second * 100000)
  365. }
  366. //func TestReceive(t *testing.T) {
  367. // go func() {
  368. // log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
  369. // }()
  370. // logs.EnableFuncCallDepth(true)
  371. // logs.SetLogFuncCallDepth(3)
  372. // time.Sleep(time.Second * 5)
  373. // mux := new(Mux)
  374. // mux.bw.readBandwidth = float64(1*1024*1024)
  375. // mux.latency = float64(1/1000)
  376. // wind := new(ReceiveWindow)
  377. // wind.New(mux)
  378. // wind.
  379. // go func() {
  380. // time.Sleep(time.Second)
  381. // for i := 0; i < 36000; i++ {
  382. // data := d.Pop()
  383. // //fmt.Println(i, string(data.buf), err)
  384. // logs.Warn(i, string(data.Content), data)
  385. // }
  386. // }()
  387. // go func() {
  388. // time.Sleep(time.Second*10)
  389. // for i := 0; i < 3000; i++ {
  390. // go func(i int) {
  391. // for n := 0; n < 10; n++{
  392. // data := new(common.MuxPackager)
  393. // by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
  394. // _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
  395. // //fmt.Println(string((*data).buf), data)
  396. // logs.Warn(string((*data).Content), data)
  397. // d.Push(data)
  398. // }
  399. // }(i)
  400. // go func(i int) {
  401. // data := new(common.MuxPackager)
  402. // _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
  403. // //fmt.Println(string((*data).buf), data)
  404. // logs.Warn(data)
  405. // d.Push(data)
  406. // }(i)
  407. // go func(i int) {
  408. // data := new(common.MuxPackager)
  409. // _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
  410. // //fmt.Println(string((*data).buf), data)
  411. // logs.Warn(data)
  412. // d.Push(data)
  413. // }(i)
  414. // }
  415. // }()
  416. // time.Sleep(time.Second * 100000)
  417. //}