Skip to content

Commit

Permalink
vam: Fix summarize where
Browse files Browse the repository at this point in the history
Fix issue with incorrect counts when using where clauses on a
aggregation function in vector runtime. If a value fails the where
clause make the value as null so it is skipped by the aggregation
function.

Closes #5468
  • Loading branch information
mattnibs committed Nov 26, 2024
1 parent e7d0376 commit f102449
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 46 deletions.
21 changes: 8 additions & 13 deletions runtime/vam/expr/agg/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,20 @@ type count struct {
}

func (a *count) Consume(vec vector.Any) {
if c, ok := vec.(*vector.Const); ok {
val := c.Value()
if !val.IsNull() && !val.IsError() {
a.count += uint64(vec.Len())
}
if c, ok := vec.(*vector.Const); ok && c.Value().IsNull() {
return
}
if _, ok := vector.Under(vec).Type().(*super.TypeError); ok {
return
}
nulls := vector.NullsOf(vec)
if nulls == nil {
a.count += uint64(vec.Len())
return
}
for i := range vec.Len() {
if !nulls.Value(i) {
a.count++
if nulls := vector.NullsOf(vec); nulls != nil {
for i := range vec.Len() {
if !nulls.Value(i) {
a.count++
}
}
} else {
a.count += uint64(vec.Len())
}
}

Expand Down
22 changes: 13 additions & 9 deletions runtime/vam/expr/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ func (a *Aggregator) Eval(this vector.Any) vector.Any {

func (a *Aggregator) apply(args ...vector.Any) vector.Any {
vec, where := args[0], args[1]
var tags []uint32
// If type is not bool then we want to filter everything.
if where.Type().ID() == super.IDBool {
for slot := uint32(0); slot < where.Len(); slot++ {
// XXX Feels like we should have a optimzed version of this.
if vector.BoolValue(where, slot) {
tags = append(tags, slot)
}
bools, _ := BoolMask(where)
if bools.IsEmpty() {
// everything is filtered.
return vector.NewConst(super.NewValue(vec.Type(), nil), vec.Len(), nil)
}
bools.Flip(0, uint64(vec.Len()))
if !bools.IsEmpty() {
nulls := vector.NewBoolEmpty(vec.Len(), nil)
bools.WriteDenseTo(nulls.Bits)
if origNulls := vector.NullsOf(vec); origNulls != nil {
nulls = vector.Or(nulls, origNulls)
}
vec = vector.CopyAndSetNulls(vec, nulls)
}
return vector.NewView(vec, tags)
return vec
}
3 changes: 1 addition & 2 deletions runtime/vam/expr/arith.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ func (a *Arith) eval(vecs ...vector.Any) (out vector.Any) {
}()
}
out = f(lhs, rhs)
vector.SetNulls(out, vector.Or(vector.NullsOf(lhs), vector.NullsOf(rhs)))
return out
return vector.CopyAndSetNulls(out, vector.Or(vector.NullsOf(lhs), vector.NullsOf(rhs)))
}

func (a *Arith) evalDivideByZero(kind vector.Kind, lhs, rhs vector.Any) vector.Any {
Expand Down
3 changes: 1 addition & 2 deletions runtime/vam/expr/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func (c *Compare) eval(vecs ...vector.Any) vector.Any {
return vector.NewConst(super.False, lhs.Len(), nulls)
}
out := f(lhs, rhs)
vector.SetNulls(out, nulls)
return out
return vector.CopyAndSetNulls(out, nulls)
}

func (c *Compare) compareTypeVals(lhs, rhs vector.Any) vector.Any {
Expand Down
17 changes: 17 additions & 0 deletions runtime/ztests/op/summarize/count-where.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
zed: |
summarize
num_requests := count()
where log_time >= 2012-10-01T00:00:00Z
by client_ip
| sort client_ip
vector: true

input: |
{log_time:2012-01-01T00:00:44Z,client_ip:249.92.17.134}
{log_time:2012-10-01T00:24:30Z,client_ip:249.92.17.134}
{log_time:2012-05-12T10:23:22Z,client_ip:251.58.48.137}
output: |
{client_ip:249.92.17.134,num_requests:1(uint64)}
{client_ip:251.58.48.137,num_requests:0(uint64)}
113 changes: 94 additions & 19 deletions vector/bool.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,44 +179,119 @@ func NullsOf(v Any) *Bool {
panic(v)
}

func SetNulls(v Any, nulls *Bool) {
func CopyAndSetNulls(v Any, nulls *Bool) Any {
switch v := v.(type) {
case *Array:
v.Nulls = nulls
return &Array{
Typ: v.Typ,
Offsets: v.Offsets,
Values: v.Values,
Nulls: nulls,
}
case *Bytes:
v.Nulls = nulls
return &Bytes{
Offs: v.Offs,
Bytes: v.Bytes,
Nulls: nulls,
}
case *Bool:
v.Nulls = nulls
return &Bool{
len: v.len,
Bits: v.Bits,
Nulls: nulls,
}
case *Const:
v.Nulls = nulls
return &Const{
val: v.val,
len: v.len,
Nulls: nulls,
}
case *Dict:
v.Nulls = nulls
return &Dict{
Any: v.Any,
Index: v.Index,
Counts: v.Counts,
Nulls: nulls,
}
case *Error:
v.Nulls = nulls
return &Error{
Typ: v.Typ,
Vals: v.Vals,
Nulls: nulls,
}
case *Float:
v.Nulls = nulls
return &Float{
Typ: v.Typ,
Values: v.Values,
Nulls: nulls,
}
case *Int:
v.Nulls = nulls
return &Int{
Typ: v.Typ,
Values: v.Values,
Nulls: nulls,
}
case *IP:
v.Nulls = nulls
return &IP{
Values: v.Values,
Nulls: nulls,
}
case *Map:
v.Nulls = nulls
return &Map{
Typ: v.Typ,
Offsets: v.Offsets,
Keys: v.Keys,
Values: v.Values,
Nulls: nulls,
}
case *Named:
SetNulls(v.Any, nulls)
return &Named{
Typ: v.Typ,
Any: CopyAndSetNulls(v.Any, nulls),
}
case *Net:
v.Nulls = nulls
return &Net{
Values: v.Values,
Nulls: nulls,
}
case *Record:
v.Nulls = nulls
return &Record{
Typ: v.Typ,
Fields: v.Fields,
len: v.len,
Nulls: nulls,
}
case *Set:
v.Nulls = nulls
return &Set{
Typ: v.Typ,
Offsets: v.Offsets,
Values: v.Values,
Nulls: nulls,
}
case *String:
v.Nulls = nulls
return &String{
Offsets: v.Offsets,
Bytes: v.Bytes,
Nulls: nulls,
}
case *TypeValue:
v.Nulls = nulls
return &TypeValue{
Offsets: v.Offsets,
Bytes: v.Bytes,
Nulls: nulls,
}
case *Uint:
v.Nulls = nulls
return &Uint{
Typ: v.Typ,
Values: v.Values,
Nulls: nulls,
}
case *Union:
v.Nulls = nulls
return &Union{
Dynamic: v.Dynamic,
Typ: v.Typ,
Nulls: nulls,
}
default:
panic(v)
}
Expand Down
2 changes: 1 addition & 1 deletion vector/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (n *nullsBuilder) Build() Any {
if !n.nulls.IsEmpty() {
bits := make([]uint64, (n.n+63)/64)
n.nulls.WriteDenseTo(bits)
SetNulls(vec, NewBool(bits, n.n, nil))
vec = CopyAndSetNulls(vec, NewBool(bits, n.n, nil))
}
return vec
}
Expand Down

0 comments on commit f102449

Please sign in to comment.