Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Change Codec interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bzEq committed Apr 7, 2024
1 parent 7bfd2f3 commit 000eb18
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
13 changes: 6 additions & 7 deletions core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (

type RouteID uint64

// TODO: Use iovec.IoVec.
type Codec interface {
Encode(RouteID, []byte) ([]byte, error)
Decode([]byte) (RouteID, []byte, error)
Encode(RouteID, *iovec.IoVec) error
Decode(*iovec.IoVec) (RouteID, error)
}

type RouteInfo struct {
Expand All @@ -36,12 +35,12 @@ func (self *SimpleRouter) route(id RouteID, ri *RouteInfo) {
ri.Err <- err
return
}
buf, err := self.C.Encode(id, b.Consume())
err = self.C.Encode(id, &b)
if err != nil {
ri.Err <- err
return
}
if err = self.P.Pack(iovec.FromSlice(buf)); err != nil {
if err = self.P.Pack(&b); err != nil {
ri.Err <- err
return
}
Expand All @@ -68,7 +67,7 @@ func (self *SimpleRouter) Run() {
log.Println(err)
return
}
id, buf, err := self.C.Decode(b.Consume())
id, err := self.C.Decode(&b)
if err != nil {
log.Println(err)
continue
Expand All @@ -79,7 +78,7 @@ func (self *SimpleRouter) Run() {
continue
}
go func() {
if err := ri.P.Pack(iovec.FromSlice(buf)); err != nil {
if err := ri.P.Pack(&b); err != nil {
ri.Err <- err
return
}
Expand Down
41 changes: 21 additions & 20 deletions intrinsic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func (self *ClientContext) dialUDP(network, addr string) (net.Conn, error) {
log.Println(err)
return
}
mux := router.C.(*UDPDispatcher)
id := mux.NewId(addr)
defer mux.FreeId(id)
disp := router.C.(*UDPDispatcher)
id := disp.NewEntry(addr)
defer disp.DeleteEntry(id)
cp := core.NewSyncPortWithTimeout(c, nil, core.DEFAULT_UDP_TIMEOUT)
r, err := router.NewRoute(core.RouteID(id), cp)
if err != nil {
Expand Down Expand Up @@ -165,40 +165,41 @@ func (self *ClientContext) getOrCreateRouter() (*core.SimpleRouter, error) {
}

type UDPDispatcher struct {
r sync.Map
t core.Map[core.RouteID, string]
c uint64
}

func (self *UDPDispatcher) NewId(addr string) uint64 {
id := atomic.AddUint64(&self.c, 1)
self.r.Store(id, addr)
func (self *UDPDispatcher) NewEntry(addr string) core.RouteID {
id := core.RouteID(atomic.AddUint64(&self.c, 1))
self.t.Store(id, addr)
return id
}

func (self *UDPDispatcher) FreeId(id uint64) {
self.r.Delete(id)
func (self *UDPDispatcher) DeleteEntry(id core.RouteID) {
self.t.Delete(id)
}

func (self *UDPDispatcher) Encode(id core.RouteID, data []byte) ([]byte, error) {
v, in := self.r.Load(id)
func (self *UDPDispatcher) Encode(id core.RouteID, data *iovec.IoVec) error {
raddr, in := self.t.Load(id)
if !in {
return data, fmt.Errorf("Remote address of #%d doesn't exist", id)
return fmt.Errorf("Remote address of RouteID #%d doesn't exist", id)
}
raddr := v.(string)
msg := UDPMessage{Id: uint64(id), Addr: raddr, Data: data}
msg := UDPMessage{Id: uint64(id), Addr: raddr, Data: data.Consume()}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(&msg); err != nil {
return data, err
return err
}
return buf.Bytes(), nil
data.Take(buf.Bytes())
return nil
}

func (self *UDPDispatcher) Decode(data []byte) (core.RouteID, []byte, error) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
func (self *UDPDispatcher) Decode(data *iovec.IoVec) (core.RouteID, error) {
dec := gob.NewDecoder(bytes.NewBuffer(data.Consume()))
var msg UDPMessage
if err := dec.Decode(&msg); err != nil {
return 0, data, err
return core.RouteID(^uint64(0)), err
}
return core.RouteID(msg.Id), msg.Data, nil
data.Take(msg.Data)
return core.RouteID(msg.Id), nil
}

0 comments on commit 000eb18

Please sign in to comment.