Skip to content

Commit

Permalink
2024-08-15 09:58:09
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 15, 2024
1 parent 3465a75 commit 63d8d5d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
2 changes: 1 addition & 1 deletion protocol/czar/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *Client) Run() {
case 1:
select {
case c.Mux <- mux:
case <-mux.rdn:
case <-mux.rer.Sig():
log.Println("czar: mux done")
sid = 0
case <-c.Cancel:
Expand Down
39 changes: 27 additions & 12 deletions protocol/czar/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,38 @@ import (

// Err is an object that will only store an error once.
type Err struct {
sync.Mutex // Guards following
err error
mux *sync.Mutex // Guards following
err error
sig chan struct{}
}

// Get an error from Err.
func (e *Err) Get() error {
e.mux.Lock()
defer e.mux.Unlock()
return e.err
}

// Put an error into Err.
func (a *Err) Put(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
func (e *Err) Put(err error) {
e.mux.Lock()
defer e.mux.Unlock()
if e.err != nil {
return
}
a.err = err
e.err = err
close(e.sig)
}

// Get an error from Err.
func (a *Err) Get() error {
a.Lock()
defer a.Unlock()
return a.err
// When any error puts, the sig will be sent.
func (e *Err) Sig() <-chan struct{} {
return e.sig
}

func NewErr() *Err {
return &Err{
mux: &sync.Mutex{},
err: nil,
sig: make(chan struct{}),
}
}
2 changes: 1 addition & 1 deletion protocol/czar/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestErr(t *testing.T) {
er0 := errors.New("0")
er1 := errors.New("1")
e := Err{}
e := NewErr()
e.Put(er0)
e.Put(er1)
doa.Doa(e.Get() == er0)
Expand Down
37 changes: 12 additions & 25 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@ type Stream struct {
mux *Mux
rbf []byte
rch chan []byte
rer Err
rdn chan struct{}
rer *Err
ron sync.Once
sip *Sip
son sync.Once
wer Err
wdn chan struct{}
wer *Err
won sync.Once
}

// Close implements io.Closer.
func (s *Stream) Close() error {
s.rer.Put(io.ErrClosedPipe)
s.wer.Put(io.ErrClosedPipe)
s.ron.Do(func() { close(s.rdn) })
s.won.Do(func() { close(s.wdn) })
s.son.Do(func() {
s.mux.Write(0, []byte{s.idx, 0x02})
s.sip.Put(s.idx)
Expand All @@ -56,10 +52,10 @@ func (s *Stream) Read(p []byte) (int, error) {
n := copy(p, s.rbf)
s.rbf = s.rbf[n:]
return n, nil
case <-s.rdn:
return 0, s.rer.err
case <-s.mux.rdn:
return 0, s.mux.rer
case <-s.rer.Sig():
return 0, s.rer.Get()
case <-s.mux.rer.Sig():
return 0, s.mux.rer.Get()
}
}

Expand Down Expand Up @@ -100,13 +96,11 @@ func NewStream(idx uint8, mux *Mux) *Stream {
mux: mux,
rbf: make([]byte, 0),
rch: make(chan []byte, 32),
rer: Err{},
rdn: make(chan struct{}),
rer: NewErr(),
ron: sync.Once{},
sip: nil,
son: sync.Once{},
wer: Err{},
wdn: make(chan struct{}),
wer: NewErr(),
won: sync.Once{},
}
}
Expand All @@ -115,8 +109,7 @@ func NewStream(idx uint8, mux *Mux) *Stream {
type Mux struct {
ach chan *Stream
con net.Conn
rdn chan struct{}
rer error
rer *Err
sip *Sip
usb []*Stream
wm0 sync.Mutex
Expand Down Expand Up @@ -158,7 +151,7 @@ func (m *Mux) Spawn() {
buf := make([]byte, 2)
_, err := io.ReadFull(m.con, buf[:2])
if err != nil {
m.rer = err
m.rer.Put(err)
break
}
idx := buf[0]
Expand All @@ -169,8 +162,6 @@ func (m *Mux) Spawn() {
old := m.usb[idx]
old.rer.Put(io.EOF)
old.wer.Put(io.ErrClosedPipe)
old.ron.Do(func() { close(old.rdn) })
old.won.Do(func() { close(old.wdn) })
old.son.Do(func() { old.sip.Put(old.idx) })
stm := NewStream(idx, m)
// The mux server does not need to using an id pool.
Expand All @@ -194,14 +185,12 @@ func (m *Mux) Spawn() {
stm := m.usb[idx]
select {
case stm.rch <- msg:
case <-stm.rdn:
case <-stm.rer.Sig():
}
case cmd == 0x02:
stm := m.usb[idx]
stm.rer.Put(io.EOF)
stm.wer.Put(io.ErrClosedPipe)
stm.ron.Do(func() { close(stm.rdn) })
stm.won.Do(func() { close(stm.wdn) })
stm.son.Do(func() { stm.sip.Put(stm.idx) })
air := NewStream(idx, m)
air.son.Do(func() {})
Expand All @@ -213,7 +202,6 @@ func (m *Mux) Spawn() {
}
}
close(m.ach)
close(m.rdn)
}

// Write writes data to the connection. The code implements a simple priority write using two locks.
Expand All @@ -235,8 +223,7 @@ func NewMux(conn net.Conn) *Mux {
mux := &Mux{
ach: make(chan *Stream),
con: conn,
rdn: make(chan struct{}),
rer: nil,
rer: NewErr(),
sip: nil,
usb: make([]*Stream, 256),
wm0: sync.Mutex{},
Expand Down

0 comments on commit 63d8d5d

Please sign in to comment.