Skip to content

Commit

Permalink
Sip
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 12, 2024
1 parent 997bfa7 commit c1c0ae2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
51 changes: 29 additions & 22 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import (
"io"
"net"
"sync"

"github.com/mohanson/daze/lib/doa"
)

// A Stream managed by the multiplexer.
type Stream struct {
idp chan uint8
idx uint8
mux *Mux
rbf []byte
rch chan []byte
rer Err
rdn chan struct{}
ron sync.Once
sip *Sip
son sync.Once
wer Err
wdn chan struct{}
Expand All @@ -31,7 +33,7 @@ func (s *Stream) Close() error {
s.won.Do(func() { close(s.wdn) })
s.son.Do(func() {
s.mux.Write(0, []byte{s.idx, 0x02, 0x00, 0x00})
s.idp <- s.idx
s.sip.Put(s.idx)
})
return nil
}
Expand Down Expand Up @@ -94,14 +96,14 @@ func (s *Stream) Write(p []byte) (int, error) {
// NewStream returns a new Stream.
func NewStream(idx uint8, mux *Mux) *Stream {
return &Stream{
idp: nil,
idx: idx,
mux: mux,
rbf: make([]byte, 0),
rch: make(chan []byte, 32),
rer: Err{},
rdn: make(chan struct{}),
ron: sync.Once{},
sip: nil,
son: sync.Once{},
wer: Err{},
wdn: make(chan struct{}),
Expand All @@ -113,9 +115,9 @@ func NewStream(idx uint8, mux *Mux) *Stream {
type Mux struct {
ach chan *Stream
con net.Conn
idp chan uint8
rdn chan struct{}
rer error
sip *Sip
usb []*Stream
wm0 sync.Mutex
wm1 sync.Mutex
Expand All @@ -134,14 +136,18 @@ func (m *Mux) Close() error {

// Open is used to create a new stream as a net.Conn.
func (m *Mux) Open() (*Stream, error) {
idx := <-m.idp
_, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
idx, err := m.sip.Get()
if err != nil {
m.idp <- idx
return nil, err
}
cnt, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
if err != nil {
m.sip.Put(idx)
return nil, err
}
doa.Doa(cnt == 4)
stm := NewStream(idx, m)
stm.idp = m.idp
stm.sip = m.sip
m.usb[idx] = stm
return stm, nil
}
Expand All @@ -165,10 +171,11 @@ func (m *Mux) Spawn() {
old.wer.Put(io.ErrClosedPipe)
old.ron.Do(func() { close(old.rdn) })
old.won.Do(func() { close(old.wdn) })
old.son.Do(func() { old.idp <- old.idx })
old.son.Do(func() { old.sip.Put(old.idx) })
stm := NewStream(idx, m)
// The mux server does not need to using an id pool.
stm.idp = make(chan uint8, 1)
stm.sip = NewSip()
stm.sip.Set(idx)
m.usb[idx] = stm
m.ach <- stm
case cmd == 0x01:
Expand All @@ -194,7 +201,11 @@ func (m *Mux) Spawn() {
stm.wer.Put(io.ErrClosedPipe)
stm.ron.Do(func() { close(stm.rdn) })
stm.won.Do(func() { close(stm.wdn) })
stm.son.Do(func() { stm.idp <- stm.idx })
stm.son.Do(func() { stm.sip.Put(stm.idx) })
air := NewStream(idx, m)
air.son.Do(func() {})
air.Close()
m.usb[idx] = air
case cmd >= 0x03:
// Packet format error, connection closed.
m.con.Close()
Expand Down Expand Up @@ -223,9 +234,9 @@ func NewMux(conn net.Conn) *Mux {
mux := &Mux{
ach: make(chan *Stream),
con: conn,
idp: nil,
rdn: make(chan struct{}),
rer: nil,
sip: nil,
usb: make([]*Stream, 256),
wm0: sync.Mutex{},
wm1: sync.Mutex{},
Expand All @@ -236,24 +247,20 @@ func NewMux(conn net.Conn) *Mux {
// NewMuxServer returns a new MuxServer.
func NewMuxServer(conn net.Conn) *Mux {
mux := NewMux(conn)
for i := range 256 {
stm := NewStream(uint8(i), mux)
stm.son.Do(func() {})
stm.Close()
mux.usb[i] = stm
for i := range len(mux.usb) {
air := NewStream(uint8(i), mux)
air.son.Do(func() {})
air.Close()
mux.usb[i] = air
}
go mux.Spawn()
return mux
}

// NewMuxClient returns a new MuxClient.
func NewMuxClient(conn net.Conn) *Mux {
idp := make(chan uint8, 256)
for i := range 256 {
idp <- uint8(i)
}
mux := NewMux(conn)
mux.idp = idp
mux.sip = NewSip()
go mux.Spawn()
return mux
}
49 changes: 49 additions & 0 deletions protocol/czar/sip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package czar

import (
"errors"
"math/big"
"sync"
)

// A stream id generator. Stream id can be reused, and the smallest available stream id is guaranteed to be generated
// each time.
type Sip struct {
i *big.Int
m *sync.Mutex
}

// Get selects an stream id from the pool, removes it from the pool, and returns it to the caller.
func (s *Sip) Get() (uint8, error) {
s.m.Lock()
defer s.m.Unlock()
n := big.NewInt(0).Not(s.i)
m := n.TrailingZeroBits()
if m == 256 {
return 0, errors.New("daze: out of stream")
}
s.i.SetBit(s.i, int(m), 1)
return uint8(m), nil
}

// Put adds x to the pool.
func (s *Sip) Put(x uint8) {
s.m.Lock()
defer s.m.Unlock()
s.i = s.i.SetBit(s.i, int(x), 0)
}

// Set removes x from the pool.
func (s *Sip) Set(x uint8) {
s.m.Lock()
defer s.m.Unlock()
s.i = s.i.SetBit(s.i, int(x), 1)
}

// NewSip returns a new sid.
func NewSip() *Sip {
return &Sip{
i: big.NewInt(0),
m: &sync.Mutex{},
}
}
19 changes: 19 additions & 0 deletions protocol/czar/sip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package czar

import (
"testing"

"github.com/mohanson/daze/lib/doa"
)

func TestSip(t *testing.T) {
sid := NewSip()
for i := range 256 {
doa.Doa(doa.Try(sid.Get()) == uint8(i))
}
doa.Doa(doa.Err(sid.Get()) != nil)
sid.Put(65)
sid.Put(15)
doa.Doa(doa.Try(sid.Get()) == 15)
doa.Doa(doa.Try(sid.Get()) == 65)
}

0 comments on commit c1c0ae2

Please sign in to comment.