Skip to content

Commit

Permalink
Sip
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 15, 2024
1 parent eabafdf commit 3f76f44
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 18 deletions.
39 changes: 21 additions & 18 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"io"
"net"
"sync"

"github.com/mohanson/daze/lib/doa"
)

// A Stream managed by the multiplexer.
type Stream struct {
idp chan uint8
idx uint8
mux *Mux
rbf []byte
rch chan []byte
rer *Err
sip *Sip
son sync.Once
wer *Err
}
Expand All @@ -25,7 +27,7 @@ func (s *Stream) Close() error {
s.wer.Put(io.ErrClosedPipe)
s.son.Do(func() {
s.mux.Write(0, []byte{s.idx, 0x02, 0x00, 0x00})
s.idp <- s.idx
s.sip.Put(s.idx)
})
return nil
}
Expand Down Expand Up @@ -88,12 +90,12 @@ func (s *Stream) Write(p []byte) (int, error) {
// NewStream returns a new Stream.
func NewStream(idx uint8, mux *Mux) *Stream {
return &Stream{
idp: nil,
idx: idx,
mux: mux,
rbf: make([]byte, 0),
rch: make(chan []byte, 32),
rer: NewErr(),
sip: nil,
son: sync.Once{},
wer: NewErr(),
}
Expand All @@ -103,8 +105,8 @@ func NewStream(idx uint8, mux *Mux) *Stream {
type Mux struct {
ach chan *Stream
con net.Conn
idp chan uint8
rer *Err
sip *Sip
usb []*Stream
wm0 sync.Mutex
wm1 sync.Mutex
Expand All @@ -123,14 +125,18 @@ func (m *Mux) Close() error {

// Open is used to create a new stream as a net.Conn.
func (m *Mux) Open() (*Stream, error) {
idx := <-m.idp
_, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
idx, err := m.sip.Get()
if err != nil {
m.idp <- idx
return nil, err
}
cnt, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
if err != nil {
m.sip.Put(idx)
return nil, err
}
doa.Doa(cnt == 4)
stm := NewStream(idx, m)
stm.idp = m.idp
stm.sip = m.sip
m.usb[idx] = stm
return stm, nil
}
Expand All @@ -152,10 +158,11 @@ func (m *Mux) Spawn() {
old := m.usb[idx]
old.rer.Put(io.EOF)
old.wer.Put(io.ErrClosedPipe)
old.son.Do(func() { old.idp <- old.idx })
old.son.Do(func() { old.sip.Put(old.idx) })
stm := NewStream(idx, m)
// The mux server does not need to using an id pool.
stm.idp = make(chan uint8, 1)
stm.sip = NewSip()
stm.sip.Set(idx)
m.usb[idx] = stm
m.ach <- stm
case cmd == 0x01:
Expand All @@ -179,7 +186,7 @@ func (m *Mux) Spawn() {
stm := m.usb[idx]
stm.rer.Put(io.EOF)
stm.wer.Put(io.ErrClosedPipe)
stm.son.Do(func() { stm.idp <- stm.idx })
stm.son.Do(func() { stm.sip.Put(stm.idx) })
air := NewStream(idx, m)
air.son.Do(func() {})
air.Close()
Expand Down Expand Up @@ -211,8 +218,8 @@ func NewMux(conn net.Conn) *Mux {
mux := &Mux{
ach: make(chan *Stream),
con: conn,
idp: nil,
rer: NewErr(),
sip: nil,
usb: make([]*Stream, 256),
wm0: sync.Mutex{},
wm1: sync.Mutex{},
Expand All @@ -223,7 +230,7 @@ func NewMux(conn net.Conn) *Mux {
// NewMuxServer returns a new MuxServer.
func NewMuxServer(conn net.Conn) *Mux {
mux := NewMux(conn)
for i := range 256 {
for i := range len(mux.usb) {
air := NewStream(uint8(i), mux)
air.son.Do(func() {})
air.Close()
Expand All @@ -235,12 +242,8 @@ func NewMuxServer(conn net.Conn) *Mux {

// NewMuxClient returns a new MuxClient.
func NewMuxClient(conn net.Conn) *Mux {
idp := make(chan uint8, 256)
for i := range 256 {
idp <- uint8(i)
}
mux := NewMux(conn)
mux.idp = idp
mux.sip = NewSip()
go mux.Spawn()
return mux
}
49 changes: 49 additions & 0 deletions protocol/czar/sip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package czar

import (
"errors"
"math/big"
"sync"
)

// A stream id generator. Stream id can be reused, and the smallest available stream id is guaranteed to be generated
// each time.
type Sip struct {
i *big.Int
m *sync.Mutex
}

// Get selects an stream id from the pool, removes it from the pool, and returns it to the caller.
func (s *Sip) Get() (uint8, error) {
s.m.Lock()
defer s.m.Unlock()
n := big.NewInt(0).Not(s.i)
m := n.TrailingZeroBits()
if m == 256 {
return 0, errors.New("daze: out of stream")
}
s.i.SetBit(s.i, int(m), 1)
return uint8(m), nil
}

// Put adds x to the pool.
func (s *Sip) Put(x uint8) {
s.m.Lock()
defer s.m.Unlock()
s.i = s.i.SetBit(s.i, int(x), 0)
}

// Set removes x from the pool.
func (s *Sip) Set(x uint8) {
s.m.Lock()
defer s.m.Unlock()
s.i = s.i.SetBit(s.i, int(x), 1)
}

// NewSip returns a new sid.
func NewSip() *Sip {
return &Sip{
i: big.NewInt(0),
m: &sync.Mutex{},
}
}
19 changes: 19 additions & 0 deletions protocol/czar/sip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package czar

import (
"testing"

"github.com/mohanson/daze/lib/doa"
)

func TestSip(t *testing.T) {
sid := NewSip()
for i := range 256 {
doa.Doa(doa.Try(sid.Get()) == uint8(i))
}
doa.Doa(doa.Err(sid.Get()) != nil)
sid.Put(65)
sid.Put(15)
doa.Doa(doa.Try(sid.Get()) == 15)
doa.Doa(doa.Try(sid.Get()) == 65)
}

0 comments on commit 3f76f44

Please sign in to comment.