Skip to content

Commit

Permalink
Move network layer into Chotki
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Apr 22, 2024
1 parent 031c191 commit 3a8772b
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 95 deletions.
139 changes: 69 additions & 70 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ type Batch [][]byte
type Options struct {
pebble.Options

Orig uint64
Name string
RelaxedOrder bool
MaxLogLen int64
Src uint64
Name string
MaxLogLen int64
RelaxedOrder bool
RestoreNetwork bool
}

func (o *Options) SetDefaults() {
Expand All @@ -51,22 +52,18 @@ type Chotki struct {
last rdx.ID
src uint64

db *pebble.DB
dir string

syncs map[rdx.ID]*pebble.Batch
hooks map[rdx.ID][]Hook
hlock sync.Mutex

// queues to broadcast all new packets
outq map[string]toyqueue.DrainCloser

outlock sync.Mutex
lock sync.Mutex

db *pebble.DB
net *toytlv.TCPDepot
dir string
opts Options

types map[rdx.ID]Fields
outq map[string]toyqueue.DrainCloser // queues to broadcast all new packets
syncs map[rdx.ID]*pebble.Batch
hooks map[rdx.ID][]Hook
types map[rdx.ID]Fields
lock sync.Mutex
hlock sync.Mutex
outlock sync.Mutex
}

var (
Expand Down Expand Up @@ -186,19 +183,43 @@ func Open(dirname string, opts Options) (*Chotki, error) {
return nil, err
}

conn := Chotki{
cho := Chotki{
db: db,
src: opts.Orig,
src: opts.Src,
dir: dirname,
opts: opts,
net: &toytlv.TCPDepot{},
types: make(map[rdx.ID]Fields),
hooks: make(map[rdx.ID][]Hook),
syncs: make(map[rdx.ID]*pebble.Batch),
outq: make(map[string]toyqueue.DrainCloser),
}

cho.net.Open(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{
Host: &cho,
Mode: SyncRWLive,
Name: conn.RemoteAddr().String(),
}
})

if opts.RestoreNetwork {
i := cho.db.NewIter(&pebble.IterOptions{})
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
_ = cho.net.Listen(address)
}

for i.SeekGE([]byte{'c'}); i.Valid() && i.Key()[0] == 'C'; i.Next() {
address := string(i.Key()[1:])
_ = cho.net.Connect(address)
}
}

if !exists {
id0 := rdx.IDFromSrcSeqOff(opts.Orig, 0, 0)
id0 := rdx.IDFromSrcSeqOff(opts.Src, 0, 0)

init := append(toyqueue.Records(nil), Log0...)
init = append(init, toytlv.Record('Y',
Expand All @@ -207,61 +228,27 @@ func Open(dirname string, opts Options) (*Chotki, error) {
toytlv.Record('S', rdx.Stlv(opts.Name)),
))

if err = conn.Drain(init); err != nil {
if err = cho.Drain(init); err != nil {
return nil, err
}
}

vv, err := conn.VersionVector()
vv, err := cho.VersionVector()
if err != nil {
return nil, err
}

conn.last = vv.GetID(conn.src)
cho.last = vv.GetID(cho.src)

return &conn, nil
return &cho, nil
}

func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

tcp.Open(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{Host: cho, Name: conn.RemoteAddr().String(), Mode: SyncRWLive}
})

return nil
func (cho *Chotki) Listen(addr string) error {
return cho.net.Listen(addr)
}

func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

if err := cho.OpenTCP(tcp); err != nil {
return err
}
// ...
io := pebble.IterOptions{}
i := cho.db.NewIter(&io)
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
err := tcp.Listen(address)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
}
for i.SeekGE([]byte{'c'}); i.Valid() && i.Key()[0] == 'C'; i.Next() {
address := string(i.Key()[1:])
err := tcp.Connect(address)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
}
return nil
func (cho *Chotki) Connect(addr string) error {
return cho.net.Connect(addr)
}

func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) {
Expand Down Expand Up @@ -423,17 +410,29 @@ func (cho *Chotki) Snapshot() pebble.Reader {

func (cho *Chotki) Close() error {
cho.lock.Lock()
if cho.db == nil {
cho.lock.Unlock()
return ErrClosed
defer cho.lock.Unlock()

if cho.net != nil {
if err := cho.net.Close(); err != nil {
return err
}
}
if err := cho.db.Close(); err != nil {
return err

if cho.db != nil {
if err := cho.db.Close(); err != nil {
return err
}
}

clear(cho.outq)
clear(cho.syncs)
clear(cho.hooks)
clear(cho.types)

cho.db = nil
// todo
cho.src = 0
cho.last = rdx.ID0
cho.lock.Unlock()

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions chotki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestChotki_Create(t *testing.T) {
defer cancel()

a, err := Open(dirs[0], Options{
Orig: 0x1a,
Src: 0x1a,
Name: "test replica",
Options: pebble.Options{ErrorIfExists: true},
})
Expand All @@ -62,9 +62,9 @@ func TestChotki_Sync(t *testing.T) {
dirs, clear := testdirs(0xa, 0xb)
defer clear()

a, err := Open(dirs[0], Options{Orig: 0xa, Name: "test replica A"})
a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A"})
assert.Nil(t, err)
b, err := Open(dirs[1], Options{Orig: 0xb, Name: "test replica B"})
b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B"})
assert.Nil(t, err)

synca := Syncer{Host: a, Mode: SyncRW, Name: "a"}
Expand Down
6 changes: 3 additions & 3 deletions examples/object_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestORMExample(t *testing.T) {
defer os.RemoveAll("cho1e")
defer os.RemoveAll("cho1f")

a, err := chotki.Open("cho1e", chotki.Options{Orig: 0x1e, Name: "test replica"})
a, err := chotki.Open("cho1e", chotki.Options{Src: 0x1e, Name: "test replica"})
assert.Nil(t, err)
tid, err := a.NewClass(rdx.ID0,
chotki.Field{Name: "Name", RdxType: rdx.String},
Expand All @@ -31,7 +31,7 @@ func TestORMExample(t *testing.T) {
err = a.Close()
assert.Nil(t, err)

a, err = chotki.Open("cho1e", chotki.Options{Orig: 0x1e, Name: "test replica"})
a, err = chotki.Open("cho1e", chotki.Options{Src: 0x1e, Name: "test replica"})
assert.Nil(t, err)

var exa Example
Expand All @@ -45,7 +45,7 @@ func TestORMExample(t *testing.T) {
exa.Score = 103
// todo save the object

b, err := chotki.Open("cho1f", chotki.Options{Orig: 0x1f, Name: "another test replica"})
b, err := chotki.Open("cho1f", chotki.Options{Src: 0x1f, Name: "another test replica"})
assert.Nil(t, err)

syncera := chotki.Syncer{Host: a, Mode: chotki.SyncRW}
Expand Down
2 changes: 1 addition & 1 deletion examples/objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestTypes(t *testing.T) {
defer os.RemoveAll("cho1a")

a, err := chotki.Open("cho1a", chotki.Options{Orig: 0x1a, Name: "test replica A"})
a, err := chotki.Open("cho1a", chotki.Options{Src: 0x1a, Name: "test replica A"})
assert.Nil(t, err)

var tid, oid rdx.ID
Expand Down
20 changes: 4 additions & 16 deletions repl/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (repl *REPL) CommandCreate(arg *rdx.RDX) (id rdx.ID, err error) {

dirname := chotki.ReplicaDirName(src.Src())
repl.Host, err = chotki.Open(dirname, chotki.Options{
Orig: src.Src(),
Src: src.Src(),
Name: name,
Options: pebble.Options{ErrorIfExists: true},
})
Expand All @@ -78,7 +78,7 @@ func (repl *REPL) CommandOpen(arg *rdx.RDX) (rdx.ID, error) {

var err error
repl.Host, err = chotki.Open(dirname, chotki.Options{
Orig: src0.Src(),
Src: src0.Src(),
Options: pebble.Options{ErrorIfNotExists: true},
})
if err != nil {
Expand Down Expand Up @@ -110,10 +110,6 @@ func (repl *REPL) CommandDump(arg *rdx.RDX) (id rdx.ID, err error) {
}

func (repl *REPL) CommandClose(arg *rdx.RDX) (id rdx.ID, err error) {
if repl.tcp != nil {
repl.tcp.Close()
repl.tcp = nil
}
err = repl.Host.Close()
if err == nil {
id = repl.Host.Last()
Expand Down Expand Up @@ -320,11 +316,7 @@ func (repl *REPL) CommandListen(arg *rdx.RDX) (id rdx.ID, err error) {
}
addr := rdx.Snative(rdx.Sparse(string(arg.Text)))
if err == nil {
if repl.tcp == nil {
repl.tcp = &toytlv.TCPDepot{}
repl.Host.OpenTCP(repl.tcp)
}
err = repl.tcp.Listen(addr)
err = repl.Host.Listen(addr)
}
return
}
Expand All @@ -337,11 +329,7 @@ func (repl *REPL) CommandConnect(arg *rdx.RDX) (id rdx.ID, err error) {
}
addr := rdx.Snative(rdx.Sparse(string(arg.Text)))
if err == nil {
if repl.tcp == nil {
repl.tcp = &toytlv.TCPDepot{}
repl.Host.OpenTCP(repl.tcp)
}
err = repl.tcp.Connect(addr)
err = repl.Host.Connect(addr)
}
return
}
Expand Down
2 changes: 0 additions & 2 deletions repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/drpcorg/chotki"
"github.com/drpcorg/chotki/rdx"
"github.com/ergochat/readline"
"github.com/drpcorg/chotki/toytlv"
"io"
"os"
"strings"
Expand All @@ -16,7 +15,6 @@ import (
// REPL per se.
type REPL struct {
Host *chotki.Chotki
tcp *toytlv.TCPDepot
rl *readline.Instance
snap pebble.Reader
}
Expand Down
8 changes: 8 additions & 0 deletions toytlv/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ const (
MIN_RETRY_PERIOD = time.Second / 2
)

type ConnType = uint

const (
TCP ConnType = iota + 1
TLS ConnType = iota + 1
QUIC ConnType = iota + 1
)

type TCPConn struct {
addr string
conn atomic.Pointer[net.Conn]
Expand Down

0 comments on commit 3a8772b

Please sign in to comment.