Skip to content

Commit

Permalink
add single key call
Browse files Browse the repository at this point in the history
Currently namespace used by unique generator uses edge predicate to
validate. This shouldn't be the case, we should use the namespace
provided in context.
  • Loading branch information
Harshil goel authored and harshil-goel committed Aug 3, 2024
1 parent ee57c5e commit e791681
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 43 deletions.
35 changes: 27 additions & 8 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
graphql: isGraphQL,
gqlField: req.gqlField,
}
if rerr = parseRequest(qc); rerr != nil {
if rerr = parseRequest(ctx, qc); rerr != nil {
return
}

Expand Down Expand Up @@ -1565,7 +1565,7 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
}

// parseRequest parses the incoming request
func parseRequest(qc *queryContext) error {
func parseRequest(ctx context.Context, qc *queryContext) error {
start := time.Now()
defer func() {
qc.latency.Parsing = time.Since(start)
Expand All @@ -1585,7 +1585,7 @@ func parseRequest(qc *queryContext) error {
qc.gmuList = append(qc.gmuList, gmu)
}

if err := addQueryIfUnique(qc); err != nil {
if err := addQueryIfUnique(ctx, qc); err != nil {
return err
}

Expand Down Expand Up @@ -1698,19 +1698,38 @@ func verifyUnique(qc *queryContext, qr query.Request) error {
}

// addQueryIfUnique adds dummy queries in the request for checking whether predicate is unique in the db
func addQueryIfUnique(qc *queryContext) error {
func addQueryIfUnique(qctx context.Context, qc *queryContext) error {
if len(qc.gmuList) == 0 {
return nil
}

ctx := context.WithValue(context.Background(), schema.IsWrite, false)
ctx := context.WithValue(qctx, schema.IsWrite, false)
namespace, err := x.ExtractNamespace(ctx)
if err != nil {
// It's okay to ignore this here. If namespace is not set, it could mean either there is no
// authorization or it's trying to be bypassed. So the namespace is either 0 or the mutation would fail.
glog.Errorf("Error while extracting namespace, assuming default %s", err)
namespace = 0
}
isGalaxyQuery := x.IsGalaxyOperation(ctx)

qc.uniqueVars = map[uint64]uniquePredMeta{}
var buildQuery strings.Builder
for gmuIndex, gmu := range qc.gmuList {
for rdfIndex, pred := range gmu.Set {
predSchema, _ := schema.State().Get(ctx, x.NamespaceAttr(pred.Namespace, pred.Predicate))
if !predSchema.Unique {
continue
if isGalaxyQuery {
// The caller should make sure that the directed edges contain the namespace we want
// to insert into.
namespace = pred.Namespace
}
if pred.Predicate != "dgraph.xid" {
// [TODO] Don't check if it's dgraph.xid. It's a bug as this node might not be aware
// of the schema for the given predicate. This is a bug issue for dgraph.xid hence
// we are bypassing it manually until the bug is fixed.
predSchema, ok := schema.State().Get(ctx, x.NamespaceAttr(namespace, pred.Predicate))
if !ok || !predSchema.Unique {
continue
}
}
var predicateName string
if pred.Lang != "" {
Expand Down
31 changes: 31 additions & 0 deletions ee/acl/acl_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,37 @@ func (asuite *AclTestSuite) TestWrongPermission() {
require.Contains(t, err.Error(), "Value for this predicate should be between 0 and 7")
}

func (asuite *AclTestSuite) TestACLNamespaceEdge() {
t := asuite.T()
gc, cleanup, err := asuite.dc.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphapi.DefaultUser, dgraphapi.DefaultPassword, x.GalaxyNamespace))

json := `
{
"set": [
{
"dgraph.xid": "groot",
"dgraph.password": "password",
"dgraph.type": "dgraph.type.User",
"dgraph.user.group": {
"dgraph.xid": "guardians",
"dgraph.type": "dgraph.type.Group",
"namespace": 1
},
"namespace": 1
}
]
}`

mu := &api.Mutation{SetJson: []byte(json), CommitNow: true}
_, err = gc.Mutate(mu)
require.Error(t, err)
require.ErrorContains(t, err, "could not insert duplicate value") // Could be gaurdian or groot
}

func (asuite *AclTestSuite) TestACLDuplicateGrootUser() {
t := asuite.T()
gc, cleanup, err := asuite.dc.Client()
Expand Down
48 changes: 48 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
}

func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
// and GetSingeValueForKey works without an issue.

key := x.DataKey(x.GalaxyAttr("value"), 1240)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
N := int(10000)
for i := 2; i <= N; i += 2 {
edge := &pb.DirectedEdge{
Value: []byte("ho hey there" + strconv.Itoa(i)),
}
txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
kData := ol.getMutation(uint64(i))
writer := NewTxnWriter(pstore)
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
require.NoError(t, err)
}
writer.Flush()

if i%10 == 0 {
// Do frequent rollups, and store data in old timestamp
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}

j := 2
if j < int(ol.minTs) {
j = int(ol.minTs)
}
for ; j < i+6; j++ {
tx := NewTxn(uint64(j))
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}
}
}

func TestRollupMaxTsIsSet(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
Expand Down
71 changes: 71 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,77 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
getList := func() *pb.PostingList {
lc.RLock()
defer lc.RUnlock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
if err == nil {
return pl
}
}

l := lc.plists[string(key)]
if l != nil {
// If the current transaction is updating it, read it from here.
// Otherwise read it from disk. TODO see if this can be fixed.
return l.mutationMap[lc.startTs]
}

return nil
}

getPostings := func() (*pb.PostingList, error) {
pl := getList()
if pl != nil {
return pl, nil
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
if err := pl.Unmarshal(val); err != nil {
return err
}
return nil
})

return pl, err
}

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
return nil, nil
}
if err != nil {
return nil, err
}

// Filter and remove STAR_ALL and OP_DELETE Postings
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
return pl, nil
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
104 changes: 69 additions & 35 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

// These are certain special cases where we can get away with reading only the latest value
// Lang doesn't work because we would be storing various different languages at various
// time. So when we go to read the latest value, we might get a different language.
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
// because list is stored by time, and we combine all the list items at various timestamps.
hasLang := schema.State().HasLang(q.Attr)
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil

calculate := func(start, end int) error {
x.AssertTrue(start%width == 0)
out := &pb.Result{}
Expand All @@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
key := x.DataKey(q.Attr, q.UidList.Uids[i])

// Get or create the posting list for an entity, attribute combination.
pl, err := qs.cache.Get(key)
if err != nil {
return err
}

// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
var vals []types.Val
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
continue
}
vals = make([]types.Val, len(pl.Postings))
for i, p := range pl.Postings {
vals[i] = types.Val{
Tid: types.TypeID(p.ValType),
Value: p.Value,
}
}
} else {
pl, err := qs.cache.Get(key)
if err != nil {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}

vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}
continue
case err != nil:
return err
}

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)

switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
}
continue
case err != nil:
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
}
}

uidList := new(pb.List)
Expand Down

0 comments on commit e791681

Please sign in to comment.