Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query): Batch multiple get calls to badger while querying in handleValuePostings #8999

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/IBM/sarama v1.45.0
github.com/Masterminds/semver/v3 v3.3.1
github.com/blevesearch/bleve/v2 v2.4.4
github.com/dgraph-io/badger/v4 v4.5.1
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e
github.com/dgraph-io/dgo/v240 v240.1.0
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.2
Expand Down Expand Up @@ -98,7 +98,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/flatbuffers v25.1.24+incompatible // indirect
github.com/google/flatbuffers v25.2.10+incompatible // indirect
github.com/google/pprof v0.0.0-20250128161936-077ca0a936bf // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down Expand Up @@ -156,7 +156,6 @@ require (
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/time v0.9.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v4 v4.5.1 h1:7DCIXrQjo1LKmM96YD+hLVJ2EEsyyoWxJfpdd56HLps=
github.com/dgraph-io/badger/v4 v4.5.1/go.mod h1:qn3Be0j3TfV4kPbVoK0arXCD1/nr1ftth6sbL5jxdoA=
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e h1:sZmnvDqloFjehWjr6f/G5O8ANbhenwSYdkGxkTR2Bww=
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e/go.mod h1:aSwx/bXKT3/WRl9rn2BrTU+tfRQlFPKlOsqRTdcpHB8=
github.com/dgraph-io/dgo/v240 v240.1.0 h1:xd8z9kEXDWOAblaLJ2HLg2tXD6ngMQwq3ehLUS7GKNg=
github.com/dgraph-io/dgo/v240 v240.1.0/go.mod h1:r8WASETKfodzKqThSAhhTNIzcEMychArKKlZXQufWuA=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
Expand Down Expand Up @@ -267,8 +267,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/codesearch v1.2.0 h1:VlyAH+AntnIbGGArOUs6sEBdPVwYvf1e8Uw3/TC77cA=
github.com/google/codesearch v1.2.0/go.mod h1:9wQjQDVAP7Mvt96tw1KqVeXncdBLOWUYdxRiHlsG6Xc=
github.com/google/flatbuffers v25.1.24+incompatible h1:4wPqL3K7GzBd1CwyhSd3usxLKOaJN/AC6puCca6Jm7o=
github.com/google/flatbuffers v25.1.24+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down
91 changes: 72 additions & 19 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,32 +323,32 @@ func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) {
return pl, err
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
// This would return an error if there is some data in the local cache, but we couldn't read it.
getListFromLocalCache := func() (*pb.PostingList, error) {
lc.RLock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := proto.Unmarshal(delta, pl)
lc.RUnlock()
return pl, err
}
func (lc *LocalCache) GetSinglePostingFromLocalCache(key []byte) (*pb.PostingList, error) {
lc.RLock()

l := lc.plists[string(key)]
pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := proto.Unmarshal(delta, pl)
lc.RUnlock()
return pl, err
}

if l != nil {
return l.StaticValue(lc.startTs)
}
l := lc.plists[string(key)]
lc.RUnlock()

return nil, nil
if l != nil {
return l.StaticValue(lc.startTs)
}

return nil, nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
// This would return an error if there is some data in the local cache, but we couldn't read it.
getPostings := func() (*pb.PostingList, error) {
pl, err := getListFromLocalCache()
pl, err := lc.GetSinglePostingFromLocalCache(key)
// If both pl and err are empty, that means that there was no data in local cache, hence we should
// read the data from badger.
if pl != nil || err != nil {
Expand Down Expand Up @@ -381,6 +381,59 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
return pl, nil
}

func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) {
results := make([]*pb.PostingList, len(keys))
remaining_keys := make([][]byte, 0)
for i, key := range keys {
if pl, err := lc.GetSinglePostingFromLocalCache(key); pl != nil && err != nil {
results[i] = pl
} else {
remaining_keys = append(remaining_keys, key)
}
}

txn := pstore.NewTransactionAt(lc.startTs, false)
items, err := txn.GetBatch(remaining_keys)
if err != nil {
fmt.Println(err, keys)
return nil, err
}
idx := 0

for i := 0; i < len(results); i++ {
if results[i] != nil {
continue
}
pl := &pb.PostingList{}
err = items[idx].Value(func(val []byte) error {
if err := proto.Unmarshal(val, pl); err != nil {
return err
}
return nil
})
idx += 1
results[i] = pl
}

for i := 0; i < len(results); i++ {
pl := results[i]
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
results[i] = pl
}

return results, err
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
18 changes: 15 additions & 3 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
out := &pb.Result{}
outputs[start/width] = out

cache := make([]*pb.PostingList, 0)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
Expand All @@ -437,9 +438,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
if len(cache) == 0 {
keys := make([][]byte, 10)
keys[0] = key
for j := i + 1; j < i+10 && j < end; j++ {
keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j])
}
cache, err = qs.cache.GetBatchSinglePosting(keys)
if err != nil {
return err
}
}
pl := cache[0]
if len(cache) > 1 {
cache = cache[1:]
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
Expand Down
Loading