From c1c0ae2f320cb88991c1fff964bec3fe3b4d4563 Mon Sep 17 00:00:00 2001 From: mohanson Date: Sun, 11 Aug 2024 20:54:03 +0800 Subject: [PATCH] Sip --- protocol/czar/mux.go | 51 ++++++++++++++++++++++----------------- protocol/czar/sip.go | 49 +++++++++++++++++++++++++++++++++++++ protocol/czar/sip_test.go | 19 +++++++++++++++ 3 files changed, 97 insertions(+), 22 deletions(-) create mode 100644 protocol/czar/sip.go create mode 100644 protocol/czar/sip_test.go diff --git a/protocol/czar/mux.go b/protocol/czar/mux.go index 9a71b5f..ace9e6b 100644 --- a/protocol/czar/mux.go +++ b/protocol/czar/mux.go @@ -5,11 +5,12 @@ import ( "io" "net" "sync" + + "github.com/mohanson/daze/lib/doa" ) // A Stream managed by the multiplexer. type Stream struct { - idp chan uint8 idx uint8 mux *Mux rbf []byte @@ -17,6 +18,7 @@ type Stream struct { rer Err rdn chan struct{} ron sync.Once + sip *Sip son sync.Once wer Err wdn chan struct{} @@ -31,7 +33,7 @@ func (s *Stream) Close() error { s.won.Do(func() { close(s.wdn) }) s.son.Do(func() { s.mux.Write(0, []byte{s.idx, 0x02, 0x00, 0x00}) - s.idp <- s.idx + s.sip.Put(s.idx) }) return nil } @@ -94,7 +96,6 @@ func (s *Stream) Write(p []byte) (int, error) { // NewStream returns a new Stream. func NewStream(idx uint8, mux *Mux) *Stream { return &Stream{ - idp: nil, idx: idx, mux: mux, rbf: make([]byte, 0), @@ -102,6 +103,7 @@ func NewStream(idx uint8, mux *Mux) *Stream { rer: Err{}, rdn: make(chan struct{}), ron: sync.Once{}, + sip: nil, son: sync.Once{}, wer: Err{}, wdn: make(chan struct{}), @@ -113,9 +115,9 @@ func NewStream(idx uint8, mux *Mux) *Stream { type Mux struct { ach chan *Stream con net.Conn - idp chan uint8 rdn chan struct{} rer error + sip *Sip usb []*Stream wm0 sync.Mutex wm1 sync.Mutex @@ -134,14 +136,18 @@ func (m *Mux) Close() error { // Open is used to create a new stream as a net.Conn. func (m *Mux) Open() (*Stream, error) { - idx := <-m.idp - _, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00}) + idx, err := m.sip.Get() if err != nil { - m.idp <- idx return nil, err } + cnt, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00}) + if err != nil { + m.sip.Put(idx) + return nil, err + } + doa.Doa(cnt == 4) stm := NewStream(idx, m) - stm.idp = m.idp + stm.sip = m.sip m.usb[idx] = stm return stm, nil } @@ -165,10 +171,11 @@ func (m *Mux) Spawn() { old.wer.Put(io.ErrClosedPipe) old.ron.Do(func() { close(old.rdn) }) old.won.Do(func() { close(old.wdn) }) - old.son.Do(func() { old.idp <- old.idx }) + old.son.Do(func() { old.sip.Put(old.idx) }) stm := NewStream(idx, m) // The mux server does not need to using an id pool. - stm.idp = make(chan uint8, 1) + stm.sip = NewSip() + stm.sip.Set(idx) m.usb[idx] = stm m.ach <- stm case cmd == 0x01: @@ -194,7 +201,11 @@ func (m *Mux) Spawn() { stm.wer.Put(io.ErrClosedPipe) stm.ron.Do(func() { close(stm.rdn) }) stm.won.Do(func() { close(stm.wdn) }) - stm.son.Do(func() { stm.idp <- stm.idx }) + stm.son.Do(func() { stm.sip.Put(stm.idx) }) + air := NewStream(idx, m) + air.son.Do(func() {}) + air.Close() + m.usb[idx] = air case cmd >= 0x03: // Packet format error, connection closed. m.con.Close() @@ -223,9 +234,9 @@ func NewMux(conn net.Conn) *Mux { mux := &Mux{ ach: make(chan *Stream), con: conn, - idp: nil, rdn: make(chan struct{}), rer: nil, + sip: nil, usb: make([]*Stream, 256), wm0: sync.Mutex{}, wm1: sync.Mutex{}, @@ -236,11 +247,11 @@ func NewMux(conn net.Conn) *Mux { // NewMuxServer returns a new MuxServer. func NewMuxServer(conn net.Conn) *Mux { mux := NewMux(conn) - for i := range 256 { - stm := NewStream(uint8(i), mux) - stm.son.Do(func() {}) - stm.Close() - mux.usb[i] = stm + for i := range len(mux.usb) { + air := NewStream(uint8(i), mux) + air.son.Do(func() {}) + air.Close() + mux.usb[i] = air } go mux.Spawn() return mux @@ -248,12 +259,8 @@ func NewMuxServer(conn net.Conn) *Mux { // NewMuxClient returns a new MuxClient. func NewMuxClient(conn net.Conn) *Mux { - idp := make(chan uint8, 256) - for i := range 256 { - idp <- uint8(i) - } mux := NewMux(conn) - mux.idp = idp + mux.sip = NewSip() go mux.Spawn() return mux } diff --git a/protocol/czar/sip.go b/protocol/czar/sip.go new file mode 100644 index 0000000..cfcb8f4 --- /dev/null +++ b/protocol/czar/sip.go @@ -0,0 +1,49 @@ +package czar + +import ( + "errors" + "math/big" + "sync" +) + +// A stream id generator. Stream id can be reused, and the smallest available stream id is guaranteed to be generated +// each time. +type Sip struct { + i *big.Int + m *sync.Mutex +} + +// Get selects an stream id from the pool, removes it from the pool, and returns it to the caller. +func (s *Sip) Get() (uint8, error) { + s.m.Lock() + defer s.m.Unlock() + n := big.NewInt(0).Not(s.i) + m := n.TrailingZeroBits() + if m == 256 { + return 0, errors.New("daze: out of stream") + } + s.i.SetBit(s.i, int(m), 1) + return uint8(m), nil +} + +// Put adds x to the pool. +func (s *Sip) Put(x uint8) { + s.m.Lock() + defer s.m.Unlock() + s.i = s.i.SetBit(s.i, int(x), 0) +} + +// Set removes x from the pool. +func (s *Sip) Set(x uint8) { + s.m.Lock() + defer s.m.Unlock() + s.i = s.i.SetBit(s.i, int(x), 1) +} + +// NewSip returns a new sid. +func NewSip() *Sip { + return &Sip{ + i: big.NewInt(0), + m: &sync.Mutex{}, + } +} diff --git a/protocol/czar/sip_test.go b/protocol/czar/sip_test.go new file mode 100644 index 0000000..d35a956 --- /dev/null +++ b/protocol/czar/sip_test.go @@ -0,0 +1,19 @@ +package czar + +import ( + "testing" + + "github.com/mohanson/daze/lib/doa" +) + +func TestSip(t *testing.T) { + sid := NewSip() + for i := range 256 { + doa.Doa(doa.Try(sid.Get()) == uint8(i)) + } + doa.Doa(doa.Err(sid.Get()) != nil) + sid.Put(65) + sid.Put(15) + doa.Doa(doa.Try(sid.Get()) == 15) + doa.Doa(doa.Try(sid.Get()) == 65) +}