diff --git a/chotki.go b/chotki.go index 174f20f..e40e4aa 100644 --- a/chotki.go +++ b/chotki.go @@ -4,13 +4,15 @@ import ( "encoding/binary" "errors" "fmt" + "net" + "os" + "sync" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" "github.com/drpcorg/chotki/rdx" "github.com/learn-decentralized-systems/toyqueue" "github.com/learn-decentralized-systems/toytlv" - "net" - "os" - "sync" ) type Packet []byte @@ -53,9 +55,28 @@ type Chotki struct { types map[rdx.ID]Fields } -var ErrCausalityBroken = errors.New("order fail: refs an unknown op") -var ErrOutOfOrder = errors.New("order fail: sequence gap") -var ErrNotImplemented = errors.New("not implemented yet") +var ( + ErrDbClosed = errors.New("chotki: db is closed") + ErrDirnameIsFile = errors.New("chotki: the dirname is file") + ErrNotImplemented = errors.New("chotki: not implemented yet") + + ErrHookNotFound = errors.New("chotki: hook not found") + ErrBadIRecord = errors.New("chotki: bad id-ref record") + ErrBadHPacket = errors.New("chotki: bad handshake packet") + ErrBadEPacket = errors.New("chotki: bad E packet") + ErrBadVPacket = errors.New("chotki: bad V packet") + ErrBadYPacket = errors.New("chotki: bad Y packet") + ErrBadLPacket = errors.New("chotki: bad L packet") + ErrBadTPacket = errors.New("chotki: bad T packet") + ErrBadOPacket = errors.New("chotki: bad O packet") + ErrSrcUnknown = errors.New("chotki: source unknown") + ErrSyncUnknown = errors.New("chotki: sync session unknown") + ErrBadRRecord = errors.New("chotki: bad ref record") + ErrClosed = errors.New("chotki: no replica open") + + ErrOutOfOrder = errors.New("chotki: order fail: sequence gap") + ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op") +) func OKey(id rdx.ID, rdt byte) (key []byte) { var ret = [16]byte{'O'} @@ -99,8 +120,6 @@ func ReplicaDirName(rno uint64) string { return fmt.Sprintf("cho%x", rno) } -var ErrAlreadyOpen = errors.New("the db is already open") - func (o *Options) SetDefaults() { if o.MaxLogLen == 0 { o.MaxLogLen = 1 << 23 @@ -120,81 +139,99 @@ func merger(key, value []byte) (pebble.ValueMerger, error) { return &pma, nil } -// Create a replica. orig=0 for read-only replicas. -func (cho *Chotki) Create(orig uint64, name string) (err error) { - opts := pebble.Options{ - ErrorIfExists: true, - ErrorIfNotExists: false, - Merger: &pebble.Merger{ - Name: "CRDT", - Merge: merger, - }} - cho.opts.SetDefaults() // todo param - path := ReplicaDirName(orig) - cho.db, err = pebble.Open(path, &opts) +func Exists(dirname string) (bool, error) { + stats, err := os.Stat(dirname) if err != nil { - return + if os.IsNotExist(err) { + return false, nil + } + + return false, err } - var _0 rdx.ID - id0 := rdx.IDFromSrcSeqOff(orig, 0, 0) - rec0 := toytlv.Concat( - toytlv.Record('Y', - toytlv.Record('I', id0.ZipBytes()), - toytlv.Record('R', _0.ZipBytes()), - toytlv.Record('S', rdx.Stlv(name)), - ), - ) - init := append(Log0, rec0) - err = cho.Drain(init) + + if !stats.IsDir() { + return false, ErrDirnameIsFile + } + + desc, err := pebble.Peek(dirname, vfs.Default) if err != nil { - return + return false, err } - _ = cho.Close() - return cho.Open(orig) -} - -// Open a replica. orig=0 for read-only replicas. -func (cho *Chotki) Open(orig uint64) (err error) { - cho.src = orig - opts := pebble.Options{ - ErrorIfNotExists: true, - Merger: &pebble.Merger{ - Name: "CRDT", - Merge: merger, - }} - cho.opts.SetDefaults() // todo param - path := ReplicaDirName(orig) - cho.db, err = pebble.Open(path, &opts) + + return desc.Exists, nil +} + +func Open(orig uint64, name, dirname string) (*Chotki, bool, error) { + exists, err := Exists(dirname) if err != nil { - return + return nil, false, err } - cho.dir = path + + db, err := pebble.Open(dirname, &pebble.Options{ + ErrorIfExists: false, + ErrorIfNotExists: false, + Merger: &pebble.Merger{Name: "CRDT", Merge: merger}, + }) if err != nil { - _ = cho.db.Close() - return err + return nil, exists, err } - cho.types = make(map[rdx.ID]Fields) - cho.syncs = make(map[rdx.ID]*pebble.Batch) - cho.hooks = make(map[rdx.ID][]Hook) - var vv rdx.VV - vv, err = cho.VersionVector() + + conn := Chotki{ + db: db, + src: orig, + dir: dirname, + types: make(map[rdx.ID]Fields), + hooks: make(map[rdx.ID][]Hook), + syncs: make(map[rdx.ID]*pebble.Batch), + outq: make(map[string]toyqueue.DrainCloser), + } + conn.opts.SetDefaults() // todo param + + if !exists { + id0 := rdx.IDFromSrcSeqOff(orig, 0, 0) + + init := append(toyqueue.Records(nil), Log0...) + init = append(init, toytlv.Record('Y', + toytlv.Record('I', id0.ZipBytes()), + toytlv.Record('R', rdx.ID0.ZipBytes()), + toytlv.Record('S', rdx.Stlv(name)), + )) + + if err = conn.Drain(init); err != nil { + return nil, exists, err + } + } + + vv, err := conn.VersionVector() if err != nil { - return + return nil, exists, err } - cho.last = vv.GetID(cho.src) - cho.outq = make(map[string]toyqueue.DrainCloser) - // repl.last = repl.heads.GetID(orig) todo root VV - return + + conn.last = vv.GetID(conn.src) + + return &conn, exists, nil } -func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) { +func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) error { + if cho.db == nil { + return ErrDbClosed + } + tcp.Open(func(conn net.Conn) toyqueue.FeedDrainCloser { return &Syncer{Host: cho, Name: conn.RemoteAddr().String(), Mode: SyncRWLive} }) + + return nil } -func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) { - cho.OpenTCP(tcp) +func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) error { + if cho.db == nil { + return ErrDbClosed + } + + if err := cho.OpenTCP(tcp); err != nil { + return err + } // ... io := pebble.IterOptions{} i := cho.db.NewIter(&io) @@ -214,6 +251,7 @@ func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) { _, _ = fmt.Fprintln(os.Stderr, err.Error()) } } + return nil } func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) { @@ -327,10 +365,18 @@ func (cho *Chotki) Drain(recs toyqueue.Records) (err error) { default: return fmt.Errorf("unsupported packet type %c", lit) } + if !yvh && err == nil { - _ = cho.db.Apply(&pb, &WriteOptions) + if err := cho.db.Apply(&pb, &WriteOptions); err != nil { + return err + } } } + + if err := cho.db.Flush(); err != nil { + return err + } + if err != nil { // fixme separate packets return } @@ -359,20 +405,6 @@ func (cho *Chotki) VersionVector() (vv rdx.VV, err error) { var WriteOptions = pebble.WriteOptions{Sync: false} -var ErrBadIRecord = errors.New("bad id-ref record") - -var ErrBadHPacket = errors.New("bad handshake packet") -var ErrBadEPacket = errors.New("bad E packet") -var ErrBadVPacket = errors.New("bad V packet") -var ErrBadYPacket = errors.New("bad Y packet") -var ErrBadLPacket = errors.New("bad L packet") -var ErrBadTPacket = errors.New("bad T packet") -var ErrBadOPacket = errors.New("bad O packet") -var ErrSrcUnknown = errors.New("source unknown") -var ErrSyncUnknown = errors.New("sync session unknown") -var ErrBadRRecord = errors.New("bad ref record") -var ErrClosed = errors.New("no replica open") - var KeyLogLen = []byte("Mloglen") func (cho *Chotki) Last() rdx.ID { @@ -483,8 +515,6 @@ func (cho *Chotki) RemoveAllHooks(fid rdx.ID) { cho.hlock.Unlock() } -var ErrHookNotFound = errors.New("hook not found") - func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error) { cho.hlock.Lock() list := cho.hooks[fid] diff --git a/chotki_test.go b/chotki_test.go index 49bcb14..61c60c5 100644 --- a/chotki_test.go +++ b/chotki_test.go @@ -23,13 +23,14 @@ func TestChotki_Debug(t *testing.T) { } func TestChotki_Create(t *testing.T) { - _ = os.RemoveAll("cho1a") - var a Chotki - err := a.Create(0x1a, "test replica") + dirname := ReplicaDirName(0x1a) + _ = os.RemoveAll(dirname) + a, exists, err := Open(0x1a, "test replica", dirname) assert.Nil(t, err) + assert.Equal(t, exists, false) //a.DumpAll() _ = a.Close() - _ = os.RemoveAll("cho1a") + _ = os.RemoveAll(dirname) } type KVMerger interface { @@ -37,17 +38,19 @@ type KVMerger interface { } func TestChotki_Sync(t *testing.T) { - _ = os.RemoveAll("choa") - _ = os.RemoveAll("chob") - var a, b Chotki - err := a.Create(0xa, "test replica A") + adir, bdir := ReplicaDirName(0xa), ReplicaDirName(0xb) + _ = os.RemoveAll(adir) + _ = os.RemoveAll(bdir) + + a, _, err := Open(0xa, "test replica A", adir) assert.Nil(t, err) //a.DumpAll() - err = b.Create(0xb, "test replica B") + + b, _, err := Open(0xb, "test replica B", bdir) assert.Nil(t, err) - synca := Syncer{Host: &a, Mode: SyncRW, Name: "a"} - syncb := Syncer{Host: &b, Mode: SyncRW, Name: "b"} + synca := Syncer{Host: a, Mode: SyncRW, Name: "a"} + syncb := Syncer{Host: b, Mode: SyncRW, Name: "b"} err = toyqueue.Relay(&syncb, &synca) assert.Nil(t, err) err = toyqueue.Pump(&synca, &syncb) diff --git a/object_example_test.go b/object_example_test.go index 75ae0a1..c2b4eb0 100644 --- a/object_example_test.go +++ b/object_example_test.go @@ -11,6 +11,7 @@ import ( ) func TestORMExample(t *testing.T) { + edir, fdir := ReplicaDirName(0x1e), ReplicaDirName(0x1f) _ = os.RemoveAll("cho1e") _ = os.RemoveAll("cho1f") defer func() { @@ -18,8 +19,7 @@ func TestORMExample(t *testing.T) { _ = os.RemoveAll("cho1f") }() - var a, b Chotki - err := a.Create(0x1e, "test replica") + a, _, err := Open(0x1e, "test replica", edir) assert.Nil(t, err) var tid, oid rdx.ID tid, err = a.NewClass(rdx.ID0, @@ -35,9 +35,9 @@ func TestORMExample(t *testing.T) { err = a.Close() assert.Nil(t, err) - err = a.Open(0x1e) + + a, _, err = Open(0x1e, "test replica", edir) assert.Nil(t, err) - //a.DumpAll() var exa Example ita := a.ObjectIterator(rdx.IDFromString("1e-2")) @@ -50,11 +50,11 @@ func TestORMExample(t *testing.T) { exa.Score = 103 // todo save the object - err = b.Create(0x1f, "another test replica") + b, _, err := Open(0x1f, "another test replica", fdir) assert.Nil(t, err) - syncera := Syncer{Host: &a, Mode: SyncRW} - syncerb := Syncer{Host: &b, Mode: SyncRW} + syncera := Syncer{Host: a, Mode: SyncRW} + syncerb := Syncer{Host: b, Mode: SyncRW} err = toyqueue.Relay(&syncerb, &syncera) assert.Nil(t, err) err = toyqueue.Pump(&syncera, &syncerb) diff --git a/objects_test.go b/objects_test.go index eb2b673..734df0d 100644 --- a/objects_test.go +++ b/objects_test.go @@ -9,9 +9,10 @@ import ( ) func TestTypes(t *testing.T) { - _ = os.RemoveAll("cho1a") - var a Chotki - err := a.Create(0x1a, "test replica A") + adir := ReplicaDirName(0x1a) + + _ = os.RemoveAll(adir) + a, _, err := Open(0x1a, "test replica A", adir) assert.Nil(t, err) var tid, oid rdx.ID diff --git a/repl/commands.go b/repl/commands.go index c0426b7..0498bca 100644 --- a/repl/commands.go +++ b/repl/commands.go @@ -51,7 +51,17 @@ func (repl *REPL) CommandCreate(arg *rdx.RDX) (id rdx.ID, err error) { if src == rdx.ID0 { return } - err = repl.Host.Create(src.Src(), name) + + dirname := chotki.ReplicaDirName(src.Src()) + exists, err := chotki.Exists(dirname) + if err != nil { + return rdx.BadId, err + } + if exists { + return rdx.BadId, errors.New("Replica already exists") + } + + repl.Host, _, err = chotki.Open(src.Src(), name, dirname) if err == nil { id = repl.Host.Last() } @@ -60,16 +70,27 @@ func (repl *REPL) CommandCreate(arg *rdx.RDX) (id rdx.ID, err error) { var HelpOpen = errors.New("open zone/1") -func (repl *REPL) CommandOpen(arg *rdx.RDX) (id rdx.ID, err error) { +func (repl *REPL) CommandOpen(arg *rdx.RDX) (rdx.ID, error) { if arg == nil || arg.RdxType != rdx.Reference { return rdx.BadId, HelpOpen } + src0 := rdx.IDFromText(arg.Text) - err = repl.Host.Open(src0.Src()) - if err == nil { - id = repl.Host.Last() + dirname := chotki.ReplicaDirName(src0.Src()) + + exists, err := chotki.Exists(dirname) + if err != nil { + return rdx.BadId, err + } else if !exists { + return rdx.BadId, errors.New("Replica not found") } - return + + repl.Host, _, err = chotki.Open(src0.Src(), "", dirname) + if err != nil { + return rdx.BadId, err + } + + return repl.Host.Last(), nil } var HelpDump = errors.New("dump (obj|objects|vv|all)?") @@ -393,7 +414,7 @@ func (repl *REPL) CommandPinc(arg *rdx.RDX) (id rdx.ID, err error) { if fid.Off() == 0 { return } - err = KeepOdd(&repl.Host, fid) + err = KeepOdd(repl.Host, fid) if err != nil { return } @@ -412,7 +433,7 @@ func (repl *REPL) CommandPonc(arg *rdx.RDX) (id rdx.ID, err error) { if fid.Off() == 0 { return } - err = KeepEven(&repl.Host, fid) + err = KeepEven(repl.Host, fid) if err != nil { return } diff --git a/repl/repl.go b/repl/repl.go index b3c46c9..147f9c4 100644 --- a/repl/repl.go +++ b/repl/repl.go @@ -15,7 +15,7 @@ import ( // REPL per se. type REPL struct { - Host chotki.Chotki + Host *chotki.Chotki tcp *toytlv.TCPDepot rl *readline.Instance snap pebble.Reader @@ -127,7 +127,7 @@ func (repl *REPL) REPL() (id rdx.ID, err error) { _ = repl.snap.Close() repl.snap = nil } - if repl.Host.Last() != rdx.ID0 { + if repl.Host != nil && repl.Host.Last() != rdx.ID0 { repl.snap = repl.Host.Snapshot() } switch cmd { @@ -139,7 +139,7 @@ func (repl *REPL) REPL() (id rdx.ID, err error) { case "close": id, err = repl.CommandClose(arg) case "exit", "quit": - if repl.Host.Last() != rdx.ID0 { + if repl.Host != nil && repl.Host.Last() != rdx.ID0 { id, err = repl.CommandClose(arg) } if err == nil {