diff --git a/chotki.go b/chotki.go index 9b5c1ce..b0f9c95 100644 --- a/chotki.go +++ b/chotki.go @@ -316,19 +316,6 @@ var WriteOptions = pebble.WriteOptions{Sync: false} var ErrBadIRecord = errors.New("bad id-ref record") -func ReadRX(op []byte) (ref ID, exec, rest []byte, err error) { - ref, rest, err = TakeIDWary('R', op) - if err != nil { - return - } - var lit byte - lit, exec, rest = toytlv.TakeAnyRecord(rest) - if lit < 'A' { - err = ErrBadIRecord - } - return -} - var ErrBadPacket = errors.New("bad packet") var ErrBadEPacket = errors.New("bad E packet") var ErrBadVPacket = errors.New("bad V packet") diff --git a/lww.go b/lww.go index 8a66ab1..040a940 100644 --- a/lww.go +++ b/lww.go @@ -9,7 +9,7 @@ import ( // Common LWW functions // for bad format, value==nil (an empty value is an empty slice) -func LWWparse(bulk []byte) (rev int64, src uint64, value []byte) { +func IsfrParse(bulk []byte) (rev int64, src uint64, value []byte) { lit, hlen, blen := toytlv.ProbeHeader(bulk) if lit != 'T' && lit != '0' || hlen+blen > len(bulk) { return @@ -20,7 +20,7 @@ func LWWparse(bulk []byte) (rev int64, src uint64, value []byte) { return } -func LWWtlv(rev int64, src uint64, value []byte) (bulk []byte) { +func IsfrTlv(rev int64, src uint64, value []byte) (bulk []byte) { time := ZipIntUint64Pair(rev, src) bulk = make([]byte, 0, len(time)+len(value)+2) bulk = toytlv.AppendTiny(bulk, 'T', time) @@ -28,7 +28,7 @@ func LWWtlv(rev int64, src uint64, value []byte) (bulk []byte) { return } -func LWWmerge(tlvs [][]byte) (tlv []byte) { +func IsfrMerge(tlvs [][]byte) (tlv []byte) { var winrec []byte var winrev int64 var winsrc uint64 @@ -64,8 +64,8 @@ func LWWmerge(tlvs [][]byte) (tlv []byte) { return winrec } -func LWWdiff(tlv []byte, vvdiff VV) []byte { - _, src, _ := LWWparse(tlv) +func IsfrDiff(tlv []byte, vvdiff VV) []byte { + _, src, _ := IsfrParse(tlv) _, ok := vvdiff[src] if ok { return tlv @@ -74,6 +74,19 @@ func LWWdiff(tlv []byte, vvdiff VV) []byte { } } +func IsfrReSource(bare []byte, src uint64) (res []byte, err error) { + time, s, value := IsfrParse(bare) + if value == nil { + return nil, ErrBadPacket + } + if s != src { // ensure correct attribution + res = IsfrTlv(time, src, value) + } else { + res = bare + } + return +} + // I is a last-write-wins int64 // produce a text form (for REPL mostly) @@ -90,41 +103,41 @@ func Iparse(txt string) (tlv []byte) { // convert native golang value into a TLV form func Itlv(i int64) (tlv []byte) { - return LWWtlv(0, 0, ZipInt64(i)) + return IsfrTlv(0, 0, ZipInt64(i)) } // convert a TLV value to a native golang value func Inative(tlv []byte) int64 { - _, _, val := LWWparse(tlv) + _, _, val := IsfrParse(tlv) return UnzipInt64(val) } // merge TLV values func Imerge(tlvs [][]byte) (tlv []byte) { - return LWWmerge(tlvs) + return IsfrMerge(tlvs) } // produce an op that turns the old value into the new one func Idelta(tlv []byte, new_val int64) (tlv_delta []byte) { - rev, _, val := LWWparse(tlv) + rev, _, val := IsfrParse(tlv) if rev < 0 { rev = -rev } nv := ZipInt64(new_val) if bytes.Compare(val, nv) != 0 { - tlv_delta = LWWtlv(rev+1, 0, nv) + tlv_delta = IsfrTlv(rev+1, 0, nv) } return } // checks a TLV value for validity (format violations) func Ivalid(tlv []byte) bool { - _, src, val := LWWparse(tlv) + _, src, val := IsfrParse(tlv) return val != nil && len(val) <= 8 && src <= MaxSrc } func Idiff(tlv []byte, vvdiff VV) []byte { - return LWWdiff(tlv, vvdiff) + return IsfrDiff(tlv, vvdiff) } // S is a last-write-wins UTF-8 string @@ -134,7 +147,7 @@ const hex = "0123456789abcdef" // produce a text form (for REPL mostly) func Sstring(tlv []byte) (txt string) { dst := make([]byte, 0, len(tlv)*2) - _, _, val := LWWparse(tlv) + _, _, val := IsfrParse(tlv) dst = append(dst, '"') for _, b := range val { switch b { @@ -165,46 +178,46 @@ func Sparse(txt string) (tlv []byte) { } unq := txt[1 : len(txt)-1] unesc, _ := Unescape([]byte(unq), nil) - return LWWtlv(0, 0, unesc) + return IsfrTlv(0, 0, unesc) } // convert native golang value into a TLV form func Stlv(s string) (tlv []byte) { - return LWWtlv(0, 0, []byte(s)) + return IsfrTlv(0, 0, []byte(s)) } // convert a TLV value to a native golang value func Snative(tlv []byte) string { - _, _, val := LWWparse(tlv) + _, _, val := IsfrParse(tlv) return string(val) } // merge TLV values func Smerge(tlvs [][]byte) (tlv []byte) { - return LWWmerge(tlvs) + return IsfrMerge(tlvs) } // produce an op that turns the old value into the new one func Sdelta(tlv []byte, new_val string) (tlv_delta []byte) { - rev, _, val := LWWparse(tlv) + rev, _, val := IsfrParse(tlv) if rev < 0 { rev = -rev } nv := []byte(new_val) if bytes.Compare(val, nv) != 0 { - tlv_delta = LWWtlv(rev+1, 0, nv) + tlv_delta = IsfrTlv(rev+1, 0, nv) } return } // checks a TLV value for validity (format violations) func Svalid(tlv []byte) bool { - _, src, val := LWWparse(tlv) + _, src, val := IsfrParse(tlv) return val != nil && src <= MaxSrc } func Sdiff(tlv []byte, vvdiff VV) []byte { - return LWWdiff(tlv, vvdiff) + return IsfrDiff(tlv, vvdiff) } // R is a last-write-wins ID @@ -222,42 +235,42 @@ func Rparse(txt string) (tlv []byte) { // convert native golang value into a TLV form func Rtlv(i ID) (tlv []byte) { - return LWWtlv(0, 0, i.ZipBytes()) + return IsfrTlv(0, 0, i.ZipBytes()) } // convert a TLV value to a native golang value func Rnative(tlv []byte) ID { - _, _, val := LWWparse(tlv) + _, _, val := IsfrParse(tlv) return IDFromZipBytes(val) } // merge TLV values func Rmerge(tlvs [][]byte) (tlv []byte) { - return LWWmerge(tlvs) + return IsfrMerge(tlvs) } // produce an op that turns the old value into the new one func Rdelta(tlv []byte, new_val ID) (tlv_delta []byte) { - rev, _, val := LWWparse(tlv) + rev, _, val := IsfrParse(tlv) if rev < 0 { rev = -rev } nv := new_val.ZipBytes() if bytes.Compare(val, nv) != 0 { - tlv_delta = LWWtlv(rev+1, 0, nv) + tlv_delta = IsfrTlv(rev+1, 0, nv) } return } // checks a TLV value for validity (format violations) func Rvalid(tlv []byte) bool { - _, src, val := LWWparse(tlv) + _, src, val := IsfrParse(tlv) return val != nil && src <= MaxSrc // todo correct sizes } func Rdiff(tlv []byte, vvdiff VV) []byte { - return LWWdiff(tlv, vvdiff) + return IsfrDiff(tlv, vvdiff) } // F is a last-write-wins float64 @@ -276,39 +289,39 @@ func Fparse(txt string) (tlv []byte) { // convert native golang value into a TLV form func Ftlv(i float64) (tlv []byte) { - return LWWtlv(0, 0, ZipFloat64(i)) + return IsfrTlv(0, 0, ZipFloat64(i)) } // convert a TLV value to a native golang value func Fnative(tlv []byte) float64 { - _, _, val := LWWparse(tlv) + _, _, val := IsfrParse(tlv) return UnzipFloat64(val) } // merge TLV values func Fmerge(tlvs [][]byte) (tlv []byte) { - return LWWmerge(tlvs) + return IsfrMerge(tlvs) } // produce an op that turns the old value into the new one func Fdelta(tlv []byte, new_val float64) (tlv_delta []byte) { - rev, _, val := LWWparse(tlv) + rev, _, val := IsfrParse(tlv) if rev < 0 { rev = -rev } nv := ZipFloat64(new_val) if bytes.Compare(val, nv) != 0 { - tlv_delta = LWWtlv(rev+1, 0, nv) + tlv_delta = IsfrTlv(rev+1, 0, nv) } return } // checks a TLV value for validity (format violations) func Fvalid(tlv []byte) bool { - _, src, val := LWWparse(tlv) + _, src, val := IsfrParse(tlv) return val != nil && src <= MaxSrc && len(val) <= 8 } func Fdiff(tlv []byte, vvdiff VV) []byte { - return LWWdiff(tlv, vvdiff) + return IsfrDiff(tlv, vvdiff) } diff --git a/lww_test.go b/lww_test.go index f91f820..04ccdb2 100644 --- a/lww_test.go +++ b/lww_test.go @@ -8,13 +8,13 @@ import ( func TestTLV(t *testing.T) { body := []byte("test") - tlv := LWWtlv(234, 123, body) - time, src, val := LWWparse(tlv) + tlv := IsfrTlv(234, 123, body) + time, src, val := IsfrParse(tlv) assert.Equal(t, 234, int(time)) assert.Equal(t, 123, int(src)) assert.Equal(t, body, val) - doc := LWWtlv(4, 5, ZipInt64(-11)) + doc := IsfrTlv(4, 5, ZipInt64(-11)) assert.Equal(t, []byte{0x32, 0x08, 0x05, 0x15}, doc) } @@ -83,12 +83,12 @@ func TestIMerge(t *testing.T) { } func TestLWWTie(t *testing.T) { - a := LWWtlv(4, 8, ZipInt64(1)) - b := LWWtlv(4, 7, ZipInt64(2)) - c := LWWtlv(4, 5, ZipInt64(2)) + a := IsfrTlv(4, 8, ZipInt64(1)) + b := IsfrTlv(4, 7, ZipInt64(2)) + c := IsfrTlv(4, 5, ZipInt64(2)) d := Imerge(toyqueue.Records{a, b, c}) assert.Equal(t, int64(2), Inative(d)) - rev, src, _ := LWWparse(d) + rev, src, _ := IsfrParse(d) assert.Equal(t, int64(4), rev) assert.Equal(t, uint64(7), src) } diff --git a/mel.go b/mel.go new file mode 100644 index 0000000..4ed0925 --- /dev/null +++ b/mel.go @@ -0,0 +1,79 @@ +package main + +import "github.com/learn-decentralized-systems/toytlv" +import "errors" + +type Time struct { + rev int64 + src uint64 +} + +func (t Time) ZipBytes() []byte { + return ZipIntUint64Pair(t.rev, t.src) +} + +func TimeFromZipBytes(zip []byte) (t Time) { + // todo bad data + t.rev, t.src = UnzipIntUint64Pair(zip) + return +} + +var ErrBadISFR = errors.New("bad ISFR record") + +// Parses an enveloped ISFR record +func MelParse(data []byte) (lit byte, t Time, value, rest []byte, err error) { + var hlen, blen int + lit, hlen, blen = toytlv.ProbeHeader(data) + if lit == 0 || hlen+blen > len(data) { + err = toytlv.ErrIncomplete + return + } + rec := data[:hlen+blen] + rest = data[hlen+blen:] + tlit, thlen, tblen := toytlv.ProbeHeader(data) + tlen := thlen + tblen + if (tlit != 'T' && tlit != '0') || (tlen > len(rec)) { + err = ErrBadISFR + return + } + tsb := rec[thlen:tlen] + t.rev, t.src = UnzipIntUint64Pair(tsb) + value = rec[tlen:] + return +} + +func MelAppend(to []byte, lit byte, t Time, body []byte) []byte { + tb := toytlv.TinyRecord('T', t.ZipBytes()) + return toytlv.Append(to, lit, tb, body) +} + +func MelReSource(isfr []byte, src uint64) (ret []byte, err error) { + var lit byte + var time Time + var body []byte + rest := isfr + for len(rest) > 0 { + at := len(isfr) - len(rest) + lit, time, body, rest, err = MelParse(rest) + if err != nil { + return + } + if time.src != src { + ret = make([]byte, at, len(isfr)*2) + copy(ret, isfr[:at]) + break + } + } + if ret == nil && err == nil { + return isfr, nil + } + for err == nil { + time.src = src + ret = MelAppend(ret, lit, time, body) + if len(rest) == 0 { + break + } + lit, time, body, rest, err = MelParse(rest) + } + return +} diff --git a/packets.go b/packets.go index c554a5c..cd3e879 100644 --- a/packets.go +++ b/packets.go @@ -59,23 +59,24 @@ func (ch *Chotki) ApplyLOT(id, ref ID, body []byte, batch *pebble.Batch) (err er if lit == 0 || lit == '-' { return ErrBadPacket } - body = rest[hlen : hlen+blen] + var bare, rebar []byte + bare = rest[hlen : hlen+blen] fid = id + fno fkey := OKey(fid, lit) switch lit { - case 'I', 'S', 'R', 'F': - time, src, value := LWWparse(body) - if value == nil { - return ErrBadPacket - } - if src != id.Src() { // ensure correct attribution - body = LWWtlv(time, id.Src(), value) - } + case 'I', 'S', 'F', 'R': + rebar, err = IsfrReSource(bare, id.Src()) + case 'M', 'E', 'L': + rebar, err = MelReSource(bare, id.Src()) default: + rebar = bare + } + if err != nil { + break } err = batch.Merge( fkey, - body, + rebar, &WriteOptions) rest = rest[hlen+blen:] } @@ -93,7 +94,7 @@ func (ch *Chotki) ApplyE(id, r ID, body []byte, batch *pebble.Batch) (err error) } rest := body for len(rest) > 0 && err == nil { - var fint, bare []byte + var fint, bare, rebar []byte var lit byte fint, rest = toytlv.Take('F', rest) field := UnzipUint64(fint) @@ -101,10 +102,21 @@ func (ch *Chotki) ApplyE(id, r ID, body []byte, batch *pebble.Batch) (err error) return ErrBadEPacket } lit, bare, rest = toytlv.TakeAny(rest) + switch lit { + case 'I', 'S', 'F', 'R': + rebar, err = IsfrReSource(bare, id.Src()) + case 'M', 'E', 'L': + rebar, err = MelReSource(bare, id.Src()) + default: + rebar = bare + } + if err != nil { + break + } fkey := OKey(r+ID(field), lit) err = batch.Merge( fkey, - bare, + rebar, &WriteOptions) } if err == nil {