Skip to content

Commit

Permalink
Add new Composite aggregation (#1005)
Browse files Browse the repository at this point in the history
#968

---------

Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
trzysiek and jakozaur authored Nov 21, 2024
1 parent 2e78116 commit 82fff6f
Show file tree
Hide file tree
Showing 7 changed files with 767 additions and 30 deletions.
2 changes: 1 addition & 1 deletion quesma/model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
Cardinality | :white_check_mark: | Auto-interval date histogram | :wavy_dash: | Bucket script | :wavy_dash: |
Extended Stats | :white_check_mark:[^1] | Categorize text | :x: | Bucket count K-S test | :x: |
Avg | :white_check_mark: | Children | :x: | Bucket correlation | :x: |
Boxplot | :x: | Composite | :x: | Bucket selector | :x: |
Boxplot | :x: | Composite | :white_check_mark: | Bucket selector | :x: |
Cardinality | :white_check_mark: | Date histogram | :white_check_mark: | Bucket sort | :x: |
Extended stats | :white_check_mark:[^1] | Date range | :white_check_mark: | Change point | :x: |
Geo-bounds | :x: | Diversified sampler | :x: | Cumulative cardinality | :x: |
Expand Down
97 changes: 97 additions & 0 deletions quesma/model/bucket_aggregations/composite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
)

type (
Composite struct {
ctx context.Context
size int
baseAggregations []*BaseAggregation
}
BaseAggregation struct {
name string
aggregation model.QueryType
}
)

func NewComposite(ctx context.Context, size int, baseAggregations []*BaseAggregation) *Composite {
return &Composite{ctx: ctx, size: size, baseAggregations: baseAggregations}
}

func NewBaseAggregation(name string, aggregation model.QueryType) *BaseAggregation {
return &BaseAggregation{name: name, aggregation: aggregation}
}

func (query *Composite) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *Composite) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
minimumExpectedColNr := query.expectedBaseAggrColumnsNr() + 1 // +1 for doc_count. Can be more, if this Composite has parent aggregations, but never fewer.
if len(rows) > 0 && len(rows[0].Cols) < minimumExpectedColNr {
logger.ErrorWithCtx(query.ctx).Msgf("too few columns in composite aggregation response, len: %d, expected (at least): %d, rows[0]: %v", len(rows[0].Cols), minimumExpectedColNr, rows[0])
}

buckets := make([]model.JsonMap, 0, len(rows))
for _, row := range rows {
colIdx := 0
key := make(model.JsonMap, len(query.baseAggregations))
for _, baseAggr := range query.baseAggregations {
col := row.Cols[colIdx]
if dateHistogram, ok := baseAggr.aggregation.(*DateHistogram); ok {
if originalKey, ok := col.Value.(int64); ok {
key[baseAggr.name] = dateHistogram.calculateResponseKey(originalKey)
} else {
logger.ErrorWithCtx(query.ctx).Msgf("unexpected value in date_histogram key column: %v", col.Value)
}
colIdx += 1
} else if geotileGrid, ok := baseAggr.aggregation.(GeoTileGrid); ok {
key[baseAggr.name] = geotileGrid.calcKey(row.Cols[colIdx:])
colIdx += 3
} else {
key[baseAggr.name] = col.Value
colIdx += 1
}
}

bucket := model.JsonMap{
"key": key,
"doc_count": query.docCount(&row),
}
buckets = append(buckets, bucket)
}

response := model.JsonMap{
"buckets": buckets,
}
if len(buckets) > 0 {
response["after_key"] = buckets[len(buckets)-1]["key"]
}
return response
}

func (query *Composite) String() string {
return fmt.Sprintf("composite(size: %d, base aggregations: %v)", query.size, query.baseAggregations)
}

func (query *Composite) docCount(row *model.QueryResultRow) any {
return row.Cols[len(row.Cols)-1].Value
}

func (query *Composite) expectedBaseAggrColumnsNr() (columnsNr int) {
for _, baseAggr := range query.baseAggregations {
if _, ok := baseAggr.aggregation.(GeoTileGrid); ok {
columnsNr += 3
} else {
columnsNr += 1
}
}
return
}
25 changes: 13 additions & 12 deletions quesma/model/bucket_aggregations/geotile_grid.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,32 @@ func (query GeoTileGrid) AggregationType() model.AggregationType {
}

func (query GeoTileGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
if len(rows) > 0 && len(rows[0].Cols) < 3 {
if len(rows) > 0 && len(rows[0].Cols) < 4 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in geotile_grid aggregation response, len(rows[0].Cols): %d",
len(rows[0].Cols),
)
}
var response []model.JsonMap

buckets := make([]model.JsonMap, 0, len(rows))
for _, row := range rows {
zoomAsFloat, _ := util.ExtractFloat64(row.Cols[0].Value)
zoom := int64(zoomAsFloat)
xAsFloat, _ := util.ExtractFloat64(row.Cols[1].Value)
x := int64(xAsFloat)
yAsFloat, _ := util.ExtractFloat64(row.Cols[2].Value)
y := int64(yAsFloat)
key := strconv.FormatInt(zoom, 10) + "/" + strconv.FormatInt(x, 10) + "/" + strconv.FormatInt(y, 10)
response = append(response, model.JsonMap{
"key": key,
buckets = append(buckets, model.JsonMap{
"key": query.calcKey(row.Cols),
"doc_count": row.LastColValue(),
})
}
return model.JsonMap{
"buckets": response,
"buckets": buckets,
}
}

func (query GeoTileGrid) calcKey(cols []model.QueryResultCol) string {
zoom, _ := util.ExtractFloat64(cols[0].Value)
x, _ := util.ExtractFloat64(cols[1].Value)
y, _ := util.ExtractFloat64(cols[2].Value)
return strconv.FormatInt(int64(zoom), 10) + "/" + strconv.FormatInt(int64(x), 10) + "/" + strconv.FormatInt(int64(y), 10)
}

func (query GeoTileGrid) String() string {
return "geotile_grid"
}
4 changes: 4 additions & 0 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (query Terms) String() string {
return "significant_terms"
}

func (query Terms) IsSignificant() bool {
return query.significant
}

func (query Terms) sumDocCounts(rows []model.QueryResultRow) int {
sum := 0
if len(rows) > 0 {
Expand Down
68 changes: 68 additions & 0 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
delete(queryMap, "filters")
return
}
if composite, ok := queryMap["composite"]; ok {
aggregation.queryType, err = cw.parseComposite(aggregation, composite)
delete(queryMap, "composite")
return err == nil, err
}
success = false
return
}
Expand Down Expand Up @@ -363,6 +368,69 @@ func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *buck
return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr)
}

// compositeRaw - in a proper request should be of QueryMap type.
// TODO: In geotile_grid, without order specidfied, Elastic returns sort by key (a/b/c earlier than x/y/z if a<x or (a=x && b<y), etc.)
// Maybe add some ordering, but doesn't seem to be very important.
func (cw *ClickhouseQueryTranslator) parseComposite(currentAggrNode *pancakeAggregationTreeNode, compositeRaw any) (*bucket_aggregations.Composite, error) {
const defaultSize = 10
composite, ok := compositeRaw.(QueryMap)
if !ok {
return nil, fmt.Errorf("composite is not a map, but %T, value: %v", compositeRaw, compositeRaw)
}

// The sources parameter can be any of the following types:
// 1) Terms (but NOT Significant Terms) 2) Histogram 3) Date histogram 4) GeoTile grid
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html
isValidSourceType := func(queryType model.QueryType) bool {
switch typed := queryType.(type) {
case *bucket_aggregations.Histogram, *bucket_aggregations.DateHistogram, bucket_aggregations.GeoTileGrid:
return true
case bucket_aggregations.Terms:
return !typed.IsSignificant()
default:
return false
}
}

var baseAggrs []*bucket_aggregations.BaseAggregation
sourcesRaw, exists := composite["sources"]
if !exists {
return nil, fmt.Errorf("composite has no sources")
}
sources, ok := sourcesRaw.([]any)
if !ok {
return nil, fmt.Errorf("sources is not an array, but %T, value: %v", sourcesRaw, sourcesRaw)
}
for _, sourceRaw := range sources {
source, ok := sourceRaw.(QueryMap)
if !ok {
return nil, fmt.Errorf("source is not a map, but %T, value: %v", sourceRaw, sourceRaw)
}
if len(source) != 1 {
return nil, fmt.Errorf("source has unexpected length: %v", source)
}
for aggrName, aggrRaw := range source {
aggr, ok := aggrRaw.(QueryMap)
if !ok {
return nil, fmt.Errorf("source value is not a map, but %T, value: %v", aggrRaw, aggrRaw)

}
if success, err := cw.pancakeTryBucketAggregation(currentAggrNode, aggr); success {
if !isValidSourceType(currentAggrNode.queryType) {
return nil, fmt.Errorf("unsupported base aggregation type: %v", currentAggrNode.queryType)
}
baseAggrs = append(baseAggrs, bucket_aggregations.NewBaseAggregation(aggrName, currentAggrNode.queryType))
} else {
return nil, err
}
}
}

size := cw.parseIntField(composite, "size", defaultSize)
currentAggrNode.limit = size
return bucket_aggregations.NewComposite(cw.Ctx, size, baseAggrs), nil
}

func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr {
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)
Expand Down
Loading

0 comments on commit 82fff6f

Please sign in to comment.