123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- package mux
- import (
- "errors"
- "io"
- "math"
- "net"
- "runtime"
- "sync"
- "sync/atomic"
- "time"
- "github.com/cnlh/nps/lib/common"
- )
- type conn struct {
- net.Conn
- getStatusCh chan struct{}
- connStatusOkCh chan struct{}
- connStatusFailCh chan struct{}
- connId int32
- isClose bool
- closeFlag bool // close conn flag
- receiveWindow *ReceiveWindow
- sendWindow *SendWindow
- once sync.Once
- //label string
- }
- func NewConn(connId int32, mux *Mux, label ...string) *conn {
- c := &conn{
- getStatusCh: make(chan struct{}),
- connStatusOkCh: make(chan struct{}),
- connStatusFailCh: make(chan struct{}),
- connId: connId,
- receiveWindow: new(ReceiveWindow),
- sendWindow: new(SendWindow),
- once: sync.Once{},
- }
- //if len(label) > 0 {
- // c.label = label[0]
- //}
- c.receiveWindow.New(mux)
- c.sendWindow.New(mux)
- //logm := &connLog{
- // startTime: time.Now(),
- // isClose: false,
- // logs: []string{c.label + "new conn success"},
- //}
- //setM(label[0], int(connId), logm)
- return c
- }
- func (s *conn) Read(buf []byte) (n int, err error) {
- if s.isClose || buf == nil {
- return 0, errors.New("the conn has closed")
- }
- if len(buf) == 0 {
- return 0, nil
- }
- // waiting for takeout from receive window finish or timeout
- //now := time.Now()
- n, err = s.receiveWindow.Read(buf, s.connId)
- //t := time.Now().Sub(now)
- //if t.Seconds() > 0.5 {
- //logs.Warn("conn read long", n, t.Seconds())
- //}
- //var errstr string
- //if err == nil {
- // errstr = "err:nil"
- //} else {
- // errstr = err.Error()
- //}
- //d := getM(s.label, int(s.connId))
- //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
- //setM(s.label, int(s.connId), d)
- return
- }
- func (s *conn) Write(buf []byte) (n int, err error) {
- if s.isClose {
- return 0, errors.New("the conn has closed")
- }
- if s.closeFlag {
- //s.Close()
- return 0, errors.New("io: write on closed conn")
- }
- if len(buf) == 0 {
- return 0, nil
- }
- //logs.Warn("write buf", len(buf))
- //now := time.Now()
- n, err = s.sendWindow.WriteFull(buf, s.connId)
- //t := time.Now().Sub(now)
- //if t.Seconds() > 0.5 {
- // logs.Warn("conn write long", n, t.Seconds())
- //}
- return
- }
- func (s *conn) Close() (err error) {
- s.once.Do(s.closeProcess)
- return
- }
- func (s *conn) closeProcess() {
- s.isClose = true
- s.receiveWindow.mux.connMap.Delete(s.connId)
- if !s.receiveWindow.mux.IsClose {
- // if server or user close the conn while reading, will get a io.EOF
- // and this Close method will be invoke, send this signal to close other side
- s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
- }
- s.sendWindow.CloseWindow()
- s.receiveWindow.CloseWindow()
- //d := getM(s.label, int(s.connId))
- //d.isClose = true
- //d.logs = append(d.logs, s.label+"close "+time.Now().String())
- //setM(s.label, int(s.connId), d)
- return
- }
- func (s *conn) LocalAddr() net.Addr {
- return s.receiveWindow.mux.conn.LocalAddr()
- }
- func (s *conn) RemoteAddr() net.Addr {
- return s.receiveWindow.mux.conn.RemoteAddr()
- }
- func (s *conn) SetDeadline(t time.Time) error {
- _ = s.SetReadDeadline(t)
- _ = s.SetWriteDeadline(t)
- return nil
- }
- func (s *conn) SetReadDeadline(t time.Time) error {
- s.receiveWindow.SetTimeOut(t)
- return nil
- }
- func (s *conn) SetWriteDeadline(t time.Time) error {
- s.sendWindow.SetTimeOut(t)
- return nil
- }
- type window struct {
- remainingWait uint64 // 64bit alignment
- off uint32
- maxSize uint32
- closeOp bool
- closeOpCh chan struct{}
- mux *Mux
- }
- func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
- const mask = 1<<dequeueBits - 1
- remaining = uint32((ptrs >> dequeueBits) & mask)
- wait = uint32(ptrs & mask)
- return
- }
- func (Self *window) pack(remaining, wait uint32) uint64 {
- const mask = 1<<dequeueBits - 1
- return (uint64(remaining) << dequeueBits) |
- uint64(wait&mask)
- }
- func (Self *window) New() {
- Self.closeOpCh = make(chan struct{}, 2)
- }
- func (Self *window) CloseWindow() {
- if !Self.closeOp {
- Self.closeOp = true
- Self.closeOpCh <- struct{}{}
- Self.closeOpCh <- struct{}{}
- }
- }
- type ReceiveWindow struct {
- window
- bufQueue ReceiveWindowQueue
- element *common.ListElement
- count int8
- once sync.Once
- }
- func (Self *ReceiveWindow) New(mux *Mux) {
- // initial a window for receive
- Self.bufQueue.New()
- Self.element = common.ListElementPool.Get()
- Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
- Self.mux = mux
- Self.window.New()
- }
- func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
- // receive window remaining
- l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
- l -= int64(delta)
- if l > 0 {
- n = uint32(l)
- }
- return
- }
- func (Self *ReceiveWindow) calcSize() {
- // calculating maximum receive window size
- if Self.count == 0 {
- //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
- conns := Self.mux.connMap.Size()
- n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
- Self.mux.bw.Get() / float64(conns))
- if n < common.MAXIMUM_SEGMENT_SIZE*10 {
- n = common.MAXIMUM_SEGMENT_SIZE * 10
- }
- bufLen := Self.bufQueue.Len()
- if n < bufLen {
- n = bufLen
- }
- if n < Self.maxSize/2 {
- n = Self.maxSize / 2
- }
- // set the minimal size
- if n > 2*Self.maxSize {
- n = 2 * Self.maxSize
- }
- if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
- n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
- }
- // set the maximum size
- //logs.Warn("n", n)
- atomic.StoreUint32(&Self.maxSize, n)
- Self.count = -10
- }
- Self.count += 1
- return
- }
- func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
- if Self.closeOp {
- return errors.New("conn.receiveWindow: write on closed window")
- }
- element, err := NewListElement(buf, l, part)
- //logs.Warn("push the buf", len(buf), l, (&element).l)
- if err != nil {
- return
- }
- Self.calcSize() // calculate the max window size
- var wait uint32
- start:
- ptrs := atomic.LoadUint64(&Self.remainingWait)
- _, wait = Self.unpack(ptrs)
- newRemaining := Self.remainingSize(l)
- // calculate the remaining window size now, plus the element we will push
- if newRemaining == 0 {
- //logs.Warn("window full true", remaining)
- wait = 1
- }
- if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
- goto start
- // another goroutine change the status, make sure shall we need wait
- }
- Self.bufQueue.Push(element)
- // status check finish, now we can push the element into the queue
- if wait == 0 {
- Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, newRemaining)
- // send the remaining window size, not including zero size
- }
- return nil
- }
- func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
- if Self.closeOp {
- return 0, io.EOF // receive close signal, returns eof
- }
- pOff := 0
- l := 0
- //logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
- copyData:
- if Self.off == uint32(Self.element.L) {
- // on the first Read method invoked, Self.off and Self.element.l
- // both zero value
- common.ListElementPool.Put(Self.element)
- if Self.closeOp {
- return 0, io.EOF
- }
- Self.element, err = Self.bufQueue.Pop()
- // if the queue is empty, Pop method will wait until one element push
- // into the queue successful, or timeout.
- // timer start on timeout parameter is set up ,
- // reset to 60s if timeout and data still available
- Self.off = 0
- if err != nil {
- Self.CloseWindow() // also close the window, to avoid read twice
- return // queue receive stop or time out, break the loop and return
- }
- //logs.Warn("pop element", Self.element.l, Self.element.part)
- }
- l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
- pOff += l
- Self.off += uint32(l)
- //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
- n += l
- l = 0
- if Self.off == uint32(Self.element.L) {
- //logs.Warn("put the element end ", string(Self.element.buf[:15]))
- common.WindowBuff.Put(Self.element.Buf)
- Self.sendStatus(id, Self.element.L)
- // check the window full status
- }
- if pOff < len(p) && Self.element.Part {
- // element is a part of the segments, trying to fill up buf p
- goto copyData
- }
- return // buf p is full or all of segments in buf, return
- }
- func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
- var remaining, wait uint32
- for {
- ptrs := atomic.LoadUint64(&Self.remainingWait)
- remaining, wait = Self.unpack(ptrs)
- remaining += uint32(l)
- if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
- break
- }
- runtime.Gosched()
- // another goroutine change remaining or wait status, make sure
- // we need acknowledge other side
- }
- // now we get the current window status success
- if wait == 1 {
- //logs.Warn("send the wait status", remaining)
- Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, remaining)
- }
- return
- }
- func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
- // waiting for FIFO queue Pop method
- Self.bufQueue.SetTimeOut(t)
- }
- func (Self *ReceiveWindow) Stop() {
- // queue has no more data to push, so unblock pop method
- Self.once.Do(Self.bufQueue.Stop)
- }
- func (Self *ReceiveWindow) CloseWindow() {
- Self.window.CloseWindow()
- Self.Stop()
- Self.release()
- }
- func (Self *ReceiveWindow) release() {
- //if Self.element != nil {
- // if Self.element.Buf != nil {
- // common.WindowBuff.Put(Self.element.Buf)
- // }
- // common.ListElementPool.Put(Self.element)
- //}
- for {
- ele := Self.bufQueue.TryPop()
- if ele == nil {
- return
- }
- if ele.Buf != nil {
- common.WindowBuff.Put(ele.Buf)
- }
- common.ListElementPool.Put(ele)
- } // release resource
- }
- type SendWindow struct {
- window
- buf []byte
- setSizeCh chan struct{}
- timeout time.Time
- }
- func (Self *SendWindow) New(mux *Mux) {
- Self.setSizeCh = make(chan struct{})
- Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
- atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
- Self.mux = mux
- Self.window.New()
- }
- func (Self *SendWindow) SetSendBuf(buf []byte) {
- // send window buff from conn write method, set it to send window
- Self.buf = buf
- Self.off = 0
- }
- func (Self *SendWindow) SetSize(newRemaining uint32) (closed bool) {
- // set the window size from receive window
- defer func() {
- if recover() != nil {
- closed = true
- }
- }()
- if Self.closeOp {
- close(Self.setSizeCh)
- return true
- }
- //logs.Warn("set send window size to ", windowSize, newRemaining)
- var remaining, wait, newWait uint32
- for {
- ptrs := atomic.LoadUint64(&Self.remainingWait)
- remaining, wait = Self.unpack(ptrs)
- if remaining == newRemaining {
- //logs.Warn("waiting for another window size")
- return false // waiting for receive another usable window size
- }
- if newRemaining == 0 && wait == 1 {
- newWait = 1 // keep the wait status,
- // also if newRemaining is not zero, change wait to 0
- }
- if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
- break
- }
- // anther goroutine change wait status or window size
- }
- if wait == 1 {
- // send window into the wait status, need notice the channel
- //logs.Warn("send window remaining size is 0")
- Self.allow()
- }
- // send window not into the wait status, so just do slide
- return false
- }
- func (Self *SendWindow) allow() {
- select {
- case Self.setSizeCh <- struct{}{}:
- //logs.Warn("send window remaining size is 0 finish")
- return
- case <-Self.closeOpCh:
- close(Self.setSizeCh)
- return
- }
- }
- func (Self *SendWindow) sent(sentSize uint32) {
- atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
- }
- func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
- // returns buf segments, return only one segments, need a loop outside
- // until err = io.EOF
- if Self.closeOp {
- return nil, 0, false, errors.New("conn.writeWindow: window closed")
- }
- if Self.off == uint32(len(Self.buf)) {
- return nil, 0, false, io.EOF
- // send window buff is drain, return eof and get another one
- }
- var remaining uint32
- start:
- ptrs := atomic.LoadUint64(&Self.remainingWait)
- remaining, _ = Self.unpack(ptrs)
- if remaining == 0 {
- if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
- goto start // another goroutine change the window, try again
- }
- // into the wait status
- //logs.Warn("send window into wait status")
- err = Self.waitReceiveWindow()
- if err != nil {
- return nil, 0, false, err
- }
- //logs.Warn("rem into wait finish")
- goto start
- }
- // there are still remaining window
- //logs.Warn("rem", remaining)
- if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
- sendSize = common.MAXIMUM_SEGMENT_SIZE
- //logs.Warn("cut buf by mss")
- } else {
- sendSize = uint32(len(Self.buf[Self.off:]))
- }
- if remaining < sendSize {
- // usable window size is small than
- // window MAXIMUM_SEGMENT_SIZE or send buf left
- sendSize = remaining
- //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
- }
- //logs.Warn("send size", sendSize)
- if sendSize < uint32(len(Self.buf[Self.off:])) {
- part = true
- }
- p = Self.buf[Self.off : sendSize+Self.off]
- Self.off += sendSize
- Self.sent(sendSize)
- return
- }
- func (Self *SendWindow) waitReceiveWindow() (err error) {
- t := Self.timeout.Sub(time.Now())
- if t < 0 { // not set the timeout, wait for it as long as connection close
- select {
- case _, ok := <-Self.setSizeCh:
- if !ok {
- return errors.New("conn.writeWindow: window closed")
- }
- return nil
- case <-Self.closeOpCh:
- return errors.New("conn.writeWindow: window closed")
- }
- }
- timer := time.NewTimer(t)
- defer timer.Stop()
- // waiting for receive usable window size, or timeout
- select {
- case _, ok := <-Self.setSizeCh:
- if !ok {
- return errors.New("conn.writeWindow: window closed")
- }
- return nil
- case <-timer.C:
- return errors.New("conn.writeWindow: write to time out")
- case <-Self.closeOpCh:
- return errors.New("conn.writeWindow: window closed")
- }
- }
- func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
- Self.SetSendBuf(buf) // set the buf to send window
- //logs.Warn("set the buf to send window")
- var bufSeg []byte
- var part bool
- var l uint32
- for {
- bufSeg, l, part, err = Self.WriteTo()
- //logs.Warn("buf seg", len(bufSeg), part, err)
- // get the buf segments from send window
- if bufSeg == nil && part == false && err == io.EOF {
- // send window is drain, break the loop
- err = nil
- break
- }
- if err != nil {
- break
- }
- n += int(l)
- l = 0
- if part {
- Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
- } else {
- Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
- //logs.Warn("buf seg sent", len(bufSeg), part, err)
- }
- // send to other side, not send nil data to other side
- }
- //logs.Warn("buf seg write success")
- return
- }
- func (Self *SendWindow) SetTimeOut(t time.Time) {
- // waiting for receive a receive window size
- Self.timeout = t
- }
- //type bandwidth struct {
- // readStart time.Time
- // lastReadStart time.Time
- // readEnd time.Time
- // lastReadEnd time.Time
- // bufLength int
- // lastBufLength int
- // count int8
- // readBW float64
- // writeBW float64
- // readBandwidth float64
- //}
- //
- //func (Self *bandwidth) StartRead() {
- // Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
- // if !Self.lastReadStart.IsZero() {
- // if Self.count == -5 {
- // Self.calcBandWidth()
- // }
- // }
- //}
- //
- //func (Self *bandwidth) EndRead() {
- // Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
- // if Self.count == -5 {
- // Self.calcWriteBandwidth()
- // }
- // if Self.count == 0 {
- // Self.calcReadBandwidth()
- // Self.count = -6
- // }
- // Self.count += 1
- //}
- //
- //func (Self *bandwidth) SetCopySize(n int) {
- // // must be invoke between StartRead and EndRead
- // Self.lastBufLength, Self.bufLength = Self.bufLength, n
- //}
- //// calculating
- //// start end start end
- //// read read
- //// write
- //
- //func (Self *bandwidth) calcBandWidth() {
- // t := Self.readStart.Sub(Self.lastReadStart)
- // if Self.lastBufLength >= 32768 {
- // Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
- // }
- //}
- //
- //func (Self *bandwidth) calcReadBandwidth() {
- // // Bandwidth between nps and npc
- // readTime := Self.readEnd.Sub(Self.readStart)
- // Self.readBW = float64(Self.bufLength) / readTime.Seconds()
- // //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
- //}
- //
- //func (Self *bandwidth) calcWriteBandwidth() {
- // // Bandwidth between nps and user, npc and application
- // writeTime := Self.readStart.Sub(Self.lastReadEnd)
- // Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
- // //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
- //}
- //
- //func (Self *bandwidth) Get() (bw float64) {
- // // The zero value, 0 for numeric types
- // if Self.writeBW == 0 && Self.readBW == 0 {
- // //logs.Warn("bw both 0")
- // return 100
- // }
- // if Self.writeBW == 0 && Self.readBW != 0 {
- // return Self.readBW
- // }
- // if Self.readBW == 0 && Self.writeBW != 0 {
- // return Self.writeBW
- // }
- // return Self.readBandwidth
- //}
|