From 707b6531da3ca5ab7d784d90b7dad6311cae54f3 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Mon, 11 Nov 2024 11:48:29 -0500 Subject: [PATCH 1/2] vam: Array Expressions --- compiler/kernel/vexpr.go | 33 ++++++- runtime/sam/expr/values.go | 4 +- runtime/sam/expr/ztests/array-spread.yaml | 6 -- runtime/sam/expr/ztests/vector-spread.yaml | 4 +- runtime/vam/expr/arrayexpr.go | 104 +++++++++++++++++++++ runtime/ztests/expr/array-spread.yaml | 11 +++ vector/union.go | 3 +- 7 files changed, 152 insertions(+), 13 deletions(-) delete mode 100644 runtime/sam/expr/ztests/array-spread.yaml create mode 100644 runtime/vam/expr/arrayexpr.go create mode 100644 runtime/ztests/expr/array-spread.yaml diff --git a/compiler/kernel/vexpr.go b/compiler/kernel/vexpr.go index 391d077ae7..024c38e41a 100644 --- a/compiler/kernel/vexpr.go +++ b/compiler/kernel/vexpr.go @@ -17,6 +17,8 @@ func (b *Builder) compileVamExpr(e dag.Expr) (vamexpr.Evaluator, error) { return nil, errors.New("null expression not allowed") } switch e := e.(type) { + case *dag.ArrayExpr: + return b.compileVamArrayExpr(e) case *dag.Literal: val, err := zson.ParseValue(b.zctx(), e.Value) if err != nil { @@ -47,8 +49,6 @@ func (b *Builder) compileVamExpr(e dag.Expr) (vamexpr.Evaluator, error) { // return b.compileVamRegexpSearch(e) case *dag.RecordExpr: return b.compileVamRecordExpr(e) - //case *dag.ArrayExpr: - // return b.compileVamArrayExpr(e) //case *dag.SetExpr: // return b.compileVamSetExpr(e) //case *dag.MapCall: @@ -218,3 +218,32 @@ func (b *Builder) compileVamRecordExpr(e *dag.RecordExpr) (vamexpr.Evaluator, er } return vamexpr.NewRecordExpr(b.zctx(), elems), nil } + +func (b *Builder) compileVamArrayExpr(e *dag.ArrayExpr) (vamexpr.Evaluator, error) { + elems, err := b.compileVamVectorElems(e.Elems) + if err != nil { + return nil, err + } + return vamexpr.NewArrayExpr(b.zctx(), elems), nil +} + +func (b *Builder) compileVamVectorElems(elems []dag.VectorElem) ([]vamexpr.VectorElem, error) { + var out []vamexpr.VectorElem + for _, elem := range elems { + switch elem := elem.(type) { + case *dag.Spread: + e, err := b.compileVamExpr(elem.Expr) + if err != nil { + return nil, err + } + out = append(out, vamexpr.VectorElem{Spread: e}) + case *dag.VectorValue: + e, err := b.compileVamExpr(elem.Expr) + if err != nil { + return nil, err + } + out = append(out, vamexpr.VectorElem{Value: e}) + } + } + return out, nil +} diff --git a/runtime/sam/expr/values.go b/runtime/sam/expr/values.go index f63656af77..f027bd2513 100644 --- a/runtime/sam/expr/values.go +++ b/runtime/sam/expr/values.go @@ -198,7 +198,7 @@ func (a *ArrayExpr) Eval(ectx Context, this super.Value) super.Value { val := e.Spread.Eval(ectx, this) inner := super.InnerType(val.Type()) if inner == nil { - // Treat non-list spread values values like missing. + a.collection.append(val) continue } a.collection.appendSpread(inner, val.Bytes()) @@ -238,7 +238,7 @@ func (a *SetExpr) Eval(ectx Context, this super.Value) super.Value { val := e.Spread.Eval(ectx, this) inner := super.InnerType(val.Type()) if inner == nil { - // Treat non-list spread values values like missing. + a.collection.append(val) continue } a.collection.appendSpread(inner, val.Bytes()) diff --git a/runtime/sam/expr/ztests/array-spread.yaml b/runtime/sam/expr/ztests/array-spread.yaml deleted file mode 100644 index 5f14969b6c..0000000000 --- a/runtime/sam/expr/ztests/array-spread.yaml +++ /dev/null @@ -1,6 +0,0 @@ -zed: yield [...this] - -input: &input | - [{a:1},{b:2}] - -output: *input diff --git a/runtime/sam/expr/ztests/vector-spread.yaml b/runtime/sam/expr/ztests/vector-spread.yaml index 0894758339..150ea4a05f 100644 --- a/runtime/sam/expr/ztests/vector-spread.yaml +++ b/runtime/sam/expr/ztests/vector-spread.yaml @@ -7,5 +7,5 @@ input: | output: | [2,3,0,1] |[0,1,2,3]| - [1,2] - |[1,2]| + [1,2,"hi"] + |[1,2,"hi"]| diff --git a/runtime/vam/expr/arrayexpr.go b/runtime/vam/expr/arrayexpr.go new file mode 100644 index 0000000000..d3baddda00 --- /dev/null +++ b/runtime/vam/expr/arrayexpr.go @@ -0,0 +1,104 @@ +package expr + +import ( + "github.com/brimdata/super" + "github.com/brimdata/super/vector" +) + +type VectorElem struct { + Value Evaluator + Spread Evaluator +} + +type ArrayExpr struct { + elems []VectorElem + zctx *super.Context +} + +func NewArrayExpr(zctx *super.Context, elems []VectorElem) *ArrayExpr { + return &ArrayExpr{ + elems: elems, + zctx: zctx, + } +} + +func (a *ArrayExpr) Eval(this vector.Any) vector.Any { + var vecs []vector.Any + for _, e := range a.elems { + if e.Spread != nil { + vecs = append(vecs, e.Spread.Eval(this)) + } else { + vecs = append(vecs, e.Value.Eval(this)) + } + } + return vector.Apply(false, a.eval, vecs...) +} + +func (a *ArrayExpr) eval(vecs ...vector.Any) vector.Any { + n := vecs[0].Len() + if n == 0 { + return vector.NewConst(super.Null, 0, nil) + } + spreadOffs := make([][]uint32, len(a.elems)) + viewIndexes := make([][]uint32, len(a.elems)) + for i, elem := range a.elems { + if elem.Spread != nil { + vecs[i], spreadOffs[i], viewIndexes[i] = a.unwrapSpread(vecs[i]) + } + } + offsets := []uint32{0} + var tags []uint32 + for i := range n { + var size uint32 + for tag, spreadOff := range spreadOffs { + if len(spreadOff) == 0 { + tags = append(tags, uint32(tag)) + size++ + continue + } else { + if index := viewIndexes[tag]; index != nil { + i = index[i] + } + off := spreadOff[i] + for end := spreadOff[i+1]; off < end; off++ { + tags = append(tags, uint32(tag)) + size++ + } + } + } + offsets = append(offsets, offsets[i]+size) + } + var typ super.Type + var innerVec vector.Any + if len(vecs) == 1 { + typ = vecs[0].Type() + innerVec = vecs[0] + } else { + var all []super.Type + for _, vec := range vecs { + all = append(all, vec.Type()) + } + types := super.UniqueTypes(all) + if len(types) == 1 { + typ = types[0] + innerVec = vector.NewDynamic(tags, vecs) + } else { + typ = a.zctx.LookupTypeUnion(types) + innerVec = vector.NewUnion(typ.(*super.TypeUnion), tags, vecs, nil) + } + } + return vector.NewArray(a.zctx.LookupTypeArray(typ), offsets, innerVec, nil) +} + +func (a *ArrayExpr) unwrapSpread(vec vector.Any) (vector.Any, []uint32, []uint32) { + switch vec := vec.(type) { + case *vector.Array: + return vec.Values, vec.Offsets, nil + case *vector.Set: + return vec.Values, vec.Offsets, nil + case *vector.View: + vals, offsets, _ := a.unwrapSpread(vec.Any) + return vals, offsets, vec.Index + } + return vec, nil, nil +} diff --git a/runtime/ztests/expr/array-spread.yaml b/runtime/ztests/expr/array-spread.yaml new file mode 100644 index 0000000000..f35bc87bf7 --- /dev/null +++ b/runtime/ztests/expr/array-spread.yaml @@ -0,0 +1,11 @@ +zed: yield [...a,...b] + +vector: true + +input: | + {a:|[1,2]|,b:[0,1],c:"hi"} + {a:[1,2],b:"hi"} + +output: | + [1,2,0,1] + [1,2,"hi"] diff --git a/vector/union.go b/vector/union.go index 093a18d5c1..04e93ff7db 100644 --- a/vector/union.go +++ b/vector/union.go @@ -25,7 +25,8 @@ func (u *Union) Type() super.Type { func (u *Union) Serialize(b *zcode.Builder, slot uint32) { b.BeginContainer() - b.Append(super.EncodeInt(int64(u.Tags[slot]))) + tag := u.Typ.TagOf(u.TypeOf(slot)) + b.Append(super.EncodeInt(int64(tag))) u.Dynamic.Serialize(b, slot) b.EndContainer() } From a8388b297f7dff24136fbafcdf98f2c886adc7fd Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Wed, 13 Nov 2024 14:05:17 -0500 Subject: [PATCH 2/2] add vector builder --- compiler/kernel/vexpr.go | 12 +- go.mod | 3 + go.sum | 6 + runtime/sam/expr/values.go | 4 +- .../{vector-spread.yaml => set-spread.yaml} | 6 +- runtime/vam/expr/arrayexpr.go | 47 ++- runtime/ztests/expr/array-expr.yaml | 22 ++ runtime/ztests/expr/array-spread.yaml | 12 +- vector/bool.go | 43 +++ vector/builder.go | 359 ++++++++++++++++++ 10 files changed, 488 insertions(+), 26 deletions(-) rename runtime/sam/expr/ztests/{vector-spread.yaml => set-spread.yaml} (51%) create mode 100644 runtime/ztests/expr/array-expr.yaml create mode 100644 vector/builder.go diff --git a/compiler/kernel/vexpr.go b/compiler/kernel/vexpr.go index 024c38e41a..3adf354fdd 100644 --- a/compiler/kernel/vexpr.go +++ b/compiler/kernel/vexpr.go @@ -220,15 +220,15 @@ func (b *Builder) compileVamRecordExpr(e *dag.RecordExpr) (vamexpr.Evaluator, er } func (b *Builder) compileVamArrayExpr(e *dag.ArrayExpr) (vamexpr.Evaluator, error) { - elems, err := b.compileVamVectorElems(e.Elems) + elems, err := b.compileVamListElems(e.Elems) if err != nil { return nil, err } return vamexpr.NewArrayExpr(b.zctx(), elems), nil } -func (b *Builder) compileVamVectorElems(elems []dag.VectorElem) ([]vamexpr.VectorElem, error) { - var out []vamexpr.VectorElem +func (b *Builder) compileVamListElems(elems []dag.VectorElem) ([]vamexpr.ListElem, error) { + var out []vamexpr.ListElem for _, elem := range elems { switch elem := elem.(type) { case *dag.Spread: @@ -236,13 +236,15 @@ func (b *Builder) compileVamVectorElems(elems []dag.VectorElem) ([]vamexpr.Vecto if err != nil { return nil, err } - out = append(out, vamexpr.VectorElem{Spread: e}) + out = append(out, vamexpr.ListElem{Spread: e}) case *dag.VectorValue: e, err := b.compileVamExpr(elem.Expr) if err != nil { return nil, err } - out = append(out, vamexpr.VectorElem{Value: e}) + out = append(out, vamexpr.ListElem{Value: e}) + default: + panic(elem) } } return out, nil diff --git a/go.mod b/go.mod index 699ed1934e..e97c3b2088 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/brimdata/super go 1.23 require ( + github.com/RoaringBitmap/roaring v1.9.4 github.com/agnivade/levenshtein v1.1.1 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/apache/arrow-go/v18 v18.0.0 @@ -46,6 +47,7 @@ require ( github.com/andybalholm/brotli v1.1.1 // indirect github.com/apache/thrift v0.21.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.12.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect @@ -64,6 +66,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect diff --git a/go.sum b/go.sum index a718856bf9..224abee32c 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= +github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -65,6 +67,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -244,6 +248,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= diff --git a/runtime/sam/expr/values.go b/runtime/sam/expr/values.go index f027bd2513..f63656af77 100644 --- a/runtime/sam/expr/values.go +++ b/runtime/sam/expr/values.go @@ -198,7 +198,7 @@ func (a *ArrayExpr) Eval(ectx Context, this super.Value) super.Value { val := e.Spread.Eval(ectx, this) inner := super.InnerType(val.Type()) if inner == nil { - a.collection.append(val) + // Treat non-list spread values values like missing. continue } a.collection.appendSpread(inner, val.Bytes()) @@ -238,7 +238,7 @@ func (a *SetExpr) Eval(ectx Context, this super.Value) super.Value { val := e.Spread.Eval(ectx, this) inner := super.InnerType(val.Type()) if inner == nil { - a.collection.append(val) + // Treat non-list spread values values like missing. continue } a.collection.appendSpread(inner, val.Bytes()) diff --git a/runtime/sam/expr/ztests/vector-spread.yaml b/runtime/sam/expr/ztests/set-spread.yaml similarity index 51% rename from runtime/sam/expr/ztests/vector-spread.yaml rename to runtime/sam/expr/ztests/set-spread.yaml index 150ea4a05f..6c5c5d7036 100644 --- a/runtime/sam/expr/ztests/vector-spread.yaml +++ b/runtime/sam/expr/ztests/set-spread.yaml @@ -1,11 +1,9 @@ -zed: yield [...a,...b], |[...a,...b]| +zed: yield |[...a,...b]| input: | {a:|[2,3]|,b:[0,1],c:"hi"} {a:[1,2],b:"hi"} output: | - [2,3,0,1] |[0,1,2,3]| - [1,2,"hi"] - |[1,2,"hi"]| + |[1,2]| diff --git a/runtime/vam/expr/arrayexpr.go b/runtime/vam/expr/arrayexpr.go index d3baddda00..5d6e44d700 100644 --- a/runtime/vam/expr/arrayexpr.go +++ b/runtime/vam/expr/arrayexpr.go @@ -3,19 +3,20 @@ package expr import ( "github.com/brimdata/super" "github.com/brimdata/super/vector" + "github.com/brimdata/super/zcode" ) -type VectorElem struct { +type ListElem struct { Value Evaluator Spread Evaluator } type ArrayExpr struct { - elems []VectorElem + elems []ListElem zctx *super.Context } -func NewArrayExpr(zctx *super.Context, elems []VectorElem) *ArrayExpr { +func NewArrayExpr(zctx *super.Context, elems []ListElem) *ArrayExpr { return &ArrayExpr{ elems: elems, zctx: zctx, @@ -34,17 +35,27 @@ func (a *ArrayExpr) Eval(this vector.Any) vector.Any { return vector.Apply(false, a.eval, vecs...) } -func (a *ArrayExpr) eval(vecs ...vector.Any) vector.Any { - n := vecs[0].Len() +func (a *ArrayExpr) eval(in ...vector.Any) vector.Any { + n := in[0].Len() if n == 0 { return vector.NewConst(super.Null, 0, nil) } - spreadOffs := make([][]uint32, len(a.elems)) - viewIndexes := make([][]uint32, len(a.elems)) + var spreadOffs [][]uint32 + var viewIndexes [][]uint32 + var vecs []vector.Any for i, elem := range a.elems { + vec := in[i] + var offsets, index []uint32 if elem.Spread != nil { - vecs[i], spreadOffs[i], viewIndexes[i] = a.unwrapSpread(vecs[i]) + vec, offsets, index = a.unwrapSpread(in[i]) + if vec == nil { + // drop unspreadable elements. + continue + } } + vecs = append(vecs, vec) + spreadOffs = append(spreadOffs, offsets) + viewIndexes = append(viewIndexes, index) } offsets := []uint32{0} var tags []uint32 @@ -54,7 +65,6 @@ func (a *ArrayExpr) eval(vecs ...vector.Any) vector.Any { if len(spreadOff) == 0 { tags = append(tags, uint32(tag)) size++ - continue } else { if index := viewIndexes[tag]; index != nil { i = index[i] @@ -81,7 +91,7 @@ func (a *ArrayExpr) eval(vecs ...vector.Any) vector.Any { types := super.UniqueTypes(all) if len(types) == 1 { typ = types[0] - innerVec = vector.NewDynamic(tags, vecs) + innerVec = mergeSameTypeVecs(typ, tags, vecs) } else { typ = a.zctx.LookupTypeUnion(types) innerVec = vector.NewUnion(typ.(*super.TypeUnion), tags, vecs, nil) @@ -100,5 +110,20 @@ func (a *ArrayExpr) unwrapSpread(vec vector.Any) (vector.Any, []uint32, []uint32 vals, offsets, _ := a.unwrapSpread(vec.Any) return vals, offsets, vec.Index } - return vec, nil, nil + return nil, nil, nil +} + +func mergeSameTypeVecs(typ super.Type, tags []uint32, vecs []vector.Any) vector.Any { + // XXX This is going to be slow. At some point would nice to write a native + // merge of same type vectors. + counts := make([]uint32, len(vecs)) + vb := vector.NewBuilder(typ) + var b zcode.Builder + for _, tag := range tags { + b.Truncate() + vecs[tag].Serialize(&b, counts[tag]) + vb.Write(b.Bytes().Body()) + counts[tag]++ + } + return vb.Build() } diff --git a/runtime/ztests/expr/array-expr.yaml b/runtime/ztests/expr/array-expr.yaml new file mode 100644 index 0000000000..0ef22d0d25 --- /dev/null +++ b/runtime/ztests/expr/array-expr.yaml @@ -0,0 +1,22 @@ +zed: yield [a,b,c] + +vector: true + +input: | + {a:error("missing"),b:error("quiet"),c:null(error(string))} + {a:[1,2],b:null([int64]),c:[3,4]} + {a:[1,"foo"],b:[2,"bar"],c:[3,"baz"]} + {a:|[1,2]|,b:null(|[int64]|),c:|[3,4]|} + {a:|{"key":"k1"}|,b:null(|{string:string}|),c:|{"key":"k3"}|} + // heterogenous + {a:"foo",b:1,c:127.0.0.1} + {a:"bar",b:2,c:127.0.0.2} + +output: | + [error("missing"),error("quiet"),null(error(string))] + [[1,2],null([int64]),[3,4]] + [[1,"foo"],[2,"bar"],[3,"baz"]] + [|[1,2]|,null(|[int64]|),|[3,4]|] + [|{"key":"k1"}|,null(|{string:string}|),|{"key":"k3"}|] + ["foo",1,127.0.0.1] + ["bar",2,127.0.0.2] diff --git a/runtime/ztests/expr/array-spread.yaml b/runtime/ztests/expr/array-spread.yaml index f35bc87bf7..61cb7973c4 100644 --- a/runtime/ztests/expr/array-spread.yaml +++ b/runtime/ztests/expr/array-spread.yaml @@ -3,9 +3,13 @@ zed: yield [...a,...b] vector: true input: | - {a:|[1,2]|,b:[0,1],c:"hi"} - {a:[1,2],b:"hi"} + {a:|[1,2]|,b:[0,1,null(int64)],c:"hi"} + {a:|[1,2]|,b:"hi"} + {a:[{x:"foo"},null({x:string})],b:[{x:"bar"}]} + {a:"foo",b:"bar"} output: | - [1,2,0,1] - [1,2,"hi"] + [1,2,0,1,null(int64)] + [1,2] + [{x:"foo"},null({x:string}),{x:"bar"}] + [] diff --git a/vector/bool.go b/vector/bool.go index 53b6a9d1e9..4c4440743a 100644 --- a/vector/bool.go +++ b/vector/bool.go @@ -171,3 +171,46 @@ func NullsOf(v Any) *Bool { } panic(v) } + +func setNulls(v Any, nulls *Bool) { + switch v := v.(type) { + case *Array: + v.Nulls = nulls + case *Bytes: + v.Nulls = nulls + case *Bool: + v.Nulls = nulls + case *Const: + v.Nulls = nulls + case *Dict: + v.Nulls = nulls + case *Error: + v.Nulls = nulls + case *Float: + v.Nulls = nulls + case *Int: + v.Nulls = nulls + case *IP: + v.Nulls = nulls + case *Map: + v.Nulls = nulls + case *Named: + setNulls(v.Any, nulls) + case *Net: + v.Nulls = nulls + case *Record: + v.Nulls = nulls + case *Set: + v.Nulls = nulls + case *String: + v.Nulls = nulls + case *TypeValue: + v.Nulls = nulls + case *Uint: + v.Nulls = nulls + case *Union: + v.Nulls = nulls + default: + panic(v) + } +} diff --git a/vector/builder.go b/vector/builder.go new file mode 100644 index 0000000000..a65c2f9d50 --- /dev/null +++ b/vector/builder.go @@ -0,0 +1,359 @@ +package vector + +import ( + "fmt" + "net/netip" + + "github.com/RoaringBitmap/roaring" + "github.com/brimdata/super" + "github.com/brimdata/super/zcode" +) + +type Builder interface { + Write(zcode.Bytes) + Build() Any +} + +func NewBuilder(typ super.Type) Builder { + var b Builder + switch typ := typ.(type) { + case *super.TypeNamed: + return &namedBuilder{typ: typ, Builder: NewBuilder(typ.Type)} + case *super.TypeRecord: + b = newRecordBuilder(typ) + case *super.TypeError: + b = &errorBuilder{typ: typ, Builder: NewBuilder(typ.Type)} + case *super.TypeArray: + b = newArraySetBuilder(typ) + case *super.TypeSet: + b = newArraySetBuilder(typ) + case *super.TypeMap: + b = newMapBuilder(typ) + case *super.TypeUnion: + b = newUnionBuilder(typ) + default: + id := typ.ID() + if super.IsNumber(id) { + switch { + case super.IsUnsigned(id): + b = &uintBuilder{typ: typ} + case super.IsSigned(id): + b = &intBuilder{typ: typ} + case super.IsFloat(id): + b = &intBuilder{typ: typ} + } + } else { + switch id { + case super.IDBool: + b = newBoolBuilder() + case super.IDBytes, super.IDString, super.IDType: + b = newBytesStringTypeBuilder(typ) + case super.IDIP: + b = &ipBuilder{} + case super.IDNet: + b = &netBuilder{} + case super.IDNull: + return &constNullBuilder{} + default: + panic(fmt.Sprintf("unsupported type: %T", typ)) + } + } + } + return newNullsBuilder(b) +} + +type nullsBuilder struct { + n uint32 + values Builder + nulls *roaring.Bitmap +} + +func newNullsBuilder(values Builder) Builder { + return &nullsBuilder{ + values: values, + nulls: roaring.New(), + } +} + +func (n *nullsBuilder) Write(bytes zcode.Bytes) { + if bytes == nil { + n.nulls.Add(n.n) + } + n.values.Write(bytes) + n.n++ +} + +type namedBuilder struct { + Builder + typ *super.TypeNamed +} + +func (n *namedBuilder) Build() Any { + return NewNamed(n.typ, n.Builder.Build()) +} + +func (n *nullsBuilder) Build() Any { + vec := n.values.Build() + if !n.nulls.IsEmpty() { + bits := make([]uint64, (n.n+63)/64) + n.nulls.WriteDenseTo(bits) + setNulls(vec, NewBool(bits, n.n, nil)) + } + return vec +} + +type recordBuilder struct { + typ *super.TypeRecord + values []Builder +} + +func newRecordBuilder(typ *super.TypeRecord) Builder { + var values []Builder + for _, f := range typ.Fields { + values = append(values, NewBuilder(f.Type)) + } + return &recordBuilder{typ: typ, values: values} +} + +func (r *recordBuilder) Write(bytes zcode.Bytes) { + if bytes == nil { + for _, v := range r.values { + v.Write(nil) + } + return + } + it := bytes.Iter() + for _, v := range r.values { + v.Write(it.Next()) + } +} + +func (r *recordBuilder) Build() Any { + var vecs []Any + for _, v := range r.values { + vecs = append(vecs, v.Build()) + } + return NewRecord(r.typ, vecs, vecs[0].Len(), nil) +} + +type errorBuilder struct { + typ *super.TypeError + Builder +} + +func (e *errorBuilder) Build() Any { + return NewError(e.typ, e.Builder.Build(), nil) +} + +type arraySetBuilder struct { + typ super.Type + values Builder + offsets []uint32 +} + +func newArraySetBuilder(typ super.Type) Builder { + return &arraySetBuilder{typ: typ, values: NewBuilder(super.InnerType(typ)), offsets: []uint32{0}} +} + +func (a *arraySetBuilder) Write(bytes zcode.Bytes) { + off := a.offsets[len(a.offsets)-1] + for it := bytes.Iter(); !it.Done(); { + a.values.Write(it.Next()) + off++ + } + a.offsets = append(a.offsets, off) +} + +func (a *arraySetBuilder) Build() Any { + if typ, ok := a.typ.(*super.TypeArray); ok { + return NewArray(typ, a.offsets, a.values.Build(), nil) + } + return NewSet(a.typ.(*super.TypeSet), a.offsets, a.values.Build(), nil) +} + +type mapBuilder struct { + typ *super.TypeMap + keys, values Builder + offsets []uint32 +} + +func newMapBuilder(typ *super.TypeMap) Builder { + return &mapBuilder{ + typ: typ, + keys: NewBuilder(typ.KeyType), + values: NewBuilder(typ.ValType), + offsets: []uint32{0}, + } +} + +func (m *mapBuilder) Write(bytes zcode.Bytes) { + off := m.offsets[len(m.offsets)-1] + it := bytes.Iter() + for !it.Done() { + m.keys.Write(it.Next()) + m.values.Write(it.Next()) + off++ + } + m.offsets = append(m.offsets, off) +} + +func (m *mapBuilder) Build() Any { + return NewMap(m.typ, m.offsets, m.keys.Build(), m.values.Build(), nil) +} + +type unionBuilder struct { + typ *super.TypeUnion + values []Builder + tags []uint32 +} + +func newUnionBuilder(typ *super.TypeUnion) Builder { + var values []Builder + for _, typ := range typ.Types { + values = append(values, NewBuilder(typ)) + } + return &unionBuilder{typ: typ, values: values} +} + +func (u *unionBuilder) Write(bytes zcode.Bytes) { + if bytes == nil { + u.tags = append(u.tags, 0) + return + } + var typ super.Type + typ, bytes = u.typ.Untag(bytes) + tag := u.typ.TagOf(typ) + u.values[tag].Write(bytes) + u.tags = append(u.tags, uint32(tag)) +} + +func (u *unionBuilder) Build() Any { + var vecs []Any + for _, v := range u.values { + vecs = append(vecs, v.Build()) + } + return NewUnion(u.typ, u.tags, vecs, nil) +} + +type intBuilder struct { + typ super.Type + values []int64 +} + +func (i *intBuilder) Write(bytes zcode.Bytes) { + i.values = append(i.values, super.DecodeInt(bytes)) +} + +func (i *intBuilder) Build() Any { + return NewInt(i.typ, i.values, nil) +} + +type uintBuilder struct { + typ super.Type + values []uint64 +} + +func (u *uintBuilder) Write(bytes zcode.Bytes) { + u.values = append(u.values, super.DecodeUint(bytes)) +} + +func (u *uintBuilder) Build() Any { + return NewUint(u.typ, u.values, nil) +} + +type floatBuilder struct { + typ super.Type + values []float64 +} + +func (f *floatBuilder) Write(bytes zcode.Bytes) { + f.values = append(f.values, super.DecodeFloat(bytes)) +} + +func (f *floatBuilder) Build() Any { + return NewFloat(f.typ, f.values, nil) +} + +type boolBuilder struct { + values *roaring.Bitmap + n uint32 +} + +func newBoolBuilder() Builder { + return &boolBuilder{values: roaring.New()} +} + +func (b *boolBuilder) Write(bytes zcode.Bytes) { + if super.DecodeBool(bytes) { + b.values.Add(b.n) + } + b.n++ +} + +func (b *boolBuilder) Build() Any { + bits := make([]uint64, (b.n+63)/64) + b.values.WriteDenseTo(bits) + return NewBool(bits, b.n, nil) +} + +type bytesStringTypeBuilder struct { + typ super.Type + offs []uint32 + bytes []byte +} + +func newBytesStringTypeBuilder(typ super.Type) Builder { + return &bytesStringTypeBuilder{typ: typ, offs: []uint32{0}} +} + +func (b *bytesStringTypeBuilder) Write(bytes zcode.Bytes) { + b.bytes = append(b.bytes, bytes...) + b.offs = append(b.offs, uint32(len(b.bytes))) +} + +func (b *bytesStringTypeBuilder) Build() Any { + switch b.typ.ID() { + case super.IDString: + return NewString(b.offs, b.bytes, nil) + case super.IDBytes: + return NewBytes(b.offs, b.bytes, nil) + default: + return NewTypeValue(b.offs, b.bytes, nil) + } +} + +type ipBuilder struct { + values []netip.Addr +} + +func (i *ipBuilder) Write(bytes zcode.Bytes) { + i.values = append(i.values, super.DecodeIP(bytes)) +} + +func (i *ipBuilder) Build() Any { + return NewIP(i.values, nil) +} + +type netBuilder struct { + values []netip.Prefix +} + +func (n *netBuilder) Write(bytes zcode.Bytes) { + n.values = append(n.values, super.DecodeNet(bytes)) +} + +func (n *netBuilder) Build() Any { + return NewNet(n.values, nil) +} + +type constNullBuilder struct { + n uint32 +} + +func (c *constNullBuilder) Write(bytes zcode.Bytes) { + c.n++ +} + +func (c *constNullBuilder) Build() Any { + return NewConst(super.Null, c.n, nil) +}