Skip to content

Commit

Permalink
Merge pull request #2174 from josephschorr/lr2-spans
Browse files Browse the repository at this point in the history
Add tracing to the LR2 implementation
  • Loading branch information
josephschorr authored Dec 19, 2024
2 parents f23035f + 44324f1 commit bc2c9e5
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
32 changes: 31 additions & 1 deletion internal/graph/lookupresources2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"slices"
"sort"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/caveats"
"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/graph/computed"
Expand Down Expand Up @@ -40,6 +43,9 @@ func (crr *CursoredLookupResources2) LookupResources2(
req ValidatedLookupResources2Request,
stream dispatch.LookupResources2Stream,
) error {
ctx, span := tracer.Start(stream.Context(), "lookupResources2")
defer span.End()

if req.TerminalSubject == nil {
return spiceerrors.MustBugf("no terminal subject given to lookup resources dispatch")
}
Expand All @@ -57,7 +63,6 @@ func (crr *CursoredLookupResources2) LookupResources2(
sort.Strings(req.SubjectIds)
}

ctx := stream.Context()
limits := newLimitTracker(req.OptionalLimit)
ci, err := newCursorInformation(req.OptionalCursor, limits, dispatchVersion)
if err != nil {
Expand Down Expand Up @@ -105,6 +110,9 @@ func (crr *CursoredLookupResources2) afterSameType(
req ValidatedLookupResources2Request,
parentStream dispatch.LookupResources2Stream,
) error {
ctx, span := tracer.Start(ctx, "lookupViaReachability")
defer span.End()

dispatched := NewSyncONRSet()

// Load the type system and reachability graph to find the entrypoints for the reachability.
Expand All @@ -127,6 +135,15 @@ func (crr *CursoredLookupResources2) afterSameType(
// For each entrypoint, load the necessary data and re-dispatch if a subproblem was found.
return withParallelizedStreamingIterableInCursor(ctx, ci, entrypoints, parentStream, crr.concurrencyLimit,
func(ctx context.Context, ci cursorInformation, entrypoint typesystem.ReachabilityEntrypoint, stream dispatch.LookupResources2Stream) error {
ds, err := entrypoint.DebugString()
spiceerrors.DebugAssert(func() bool {
return err == nil
}, "Error in entrypoint.DebugString()")
ctx, span := tracer.Start(ctx, "entrypoint", trace.WithAttributes(
attribute.String("entrypoint", ds),
))
defer span.End()

switch entrypoint.EntrypointKind() {
case core.ReachabilityEntrypoint_RELATION_ENTRYPOINT:
return crr.lookupRelationEntrypoint(ctx, ci, entrypoint, rg, reader, req, stream, dispatched)
Expand Down Expand Up @@ -265,6 +282,14 @@ func (crr *CursoredLookupResources2) redispatchOrReportOverDatabaseQuery(
ctx context.Context,
config redispatchOverDatabaseConfig2,
) error {
ctx, span := tracer.Start(ctx, "datastorequery", trace.WithAttributes(
attribute.String("source-resource-type-namespace", config.sourceResourceType.Namespace),
attribute.String("source-resource-type-relation", config.sourceResourceType.Relation),
attribute.String("subjects-filter-subject-type", config.subjectsFilter.SubjectType),
attribute.Int("subjects-filter-subject-ids-count", len(config.subjectsFilter.OptionalSubjectIds)),
))
defer span.End()

return withDatastoreCursorInCursor(ctx, config.ci, config.parentStream, config.concurrencyLimit,
// Find the target resources for the subject.
func(queryCursor options.Cursor) ([]itemAndPostCursor[dispatchableResourcesSubjectMap2], error) {
Expand Down Expand Up @@ -455,6 +480,11 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
return nil
}

ctx, span := tracer.Start(ctx, "redispatchOrReport", trace.WithAttributes(
attribute.Int("found-resources-count", foundResources.len()),
))
defer span.End()

// Check for entrypoints for the new found resource type.
hasResourceEntrypoints, err := rg.HasOptimizedEntrypointsForSubjectToResource(ctx, foundResourceType, parentRequest.ResourceRelation)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/graph/lr2streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"strconv"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/graph/computed"
"github.com/authzed/spicedb/internal/graph/hints"
Expand Down Expand Up @@ -103,6 +106,11 @@ func (rdc *checkAndDispatchRunner) runChecker(ctx context.Context, startingIndex
return nil
}

ctx, span := tracer.Start(ctx, "lr2Check", trace.WithAttributes(
attribute.Int("resource-id-count", len(resourceIDsToCheck)),
))
defer span.End()

checkHints := make([]*v1.CheckHint, 0, len(resourceIDsToCheck))
for _, resourceID := range resourceIDsToCheck {
checkHint, err := hints.HintForEntrypoint(
Expand Down Expand Up @@ -193,6 +201,11 @@ func (rdc *checkAndDispatchRunner) runDispatch(
}
rdc.lock.Unlock()

ctx, span := tracer.Start(ctx, "lr2Dispatch", trace.WithAttributes(
attribute.Int("resource-id-count", len(resourceIDsToDispatch)),
))
defer span.End()

// NOTE: Since we extracted a custom section from the cursor at the beginning of this run, we have to add
// the starting index to the cursor to ensure that the next run starts from the correct place, and we have
// to use the *updated* cursor below on the dispatch.
Expand Down
4 changes: 4 additions & 0 deletions internal/graph/resourcesubjectsmap2.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ type dispatchableResourcesSubjectMap2 struct {
resourcesSubjectMap2
}

func (rsm dispatchableResourcesSubjectMap2) len() int {
return rsm.resourcesAndSubjects.Len()
}

func (rsm dispatchableResourcesSubjectMap2) isEmpty() bool {
return rsm.resourcesAndSubjects.IsEmpty()
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/typesystem/reachabilitygraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,27 @@ func (re ReachabilityEntrypoint) String() string {
}

func (re ReachabilityEntrypoint) MustDebugString() string {
ds, err := re.DebugString()
if err != nil {
panic(err)
}

return ds
}

func (re ReachabilityEntrypoint) DebugString() (string, error) {
switch re.EntrypointKind() {
case core.ReachabilityEntrypoint_RELATION_ENTRYPOINT:
return fmt.Sprintf("relation-entrypoint: %s#%s", re.re.TargetRelation.Namespace, re.re.TargetRelation.Relation)
return "relation-entrypoint: " + re.re.TargetRelation.Namespace + "#" + re.re.TargetRelation.Relation, nil

case core.ReachabilityEntrypoint_TUPLESET_TO_USERSET_ENTRYPOINT:
return fmt.Sprintf("ttu-entrypoint: %s#%s | %s | %s#%s", re.parentRelation.Namespace, re.parentRelation.Relation, re.re.TuplesetRelation, re.re.TargetRelation.Namespace, re.re.TargetRelation.Relation)
return "ttu-entrypoint: " + re.re.TuplesetRelation + " -> " + re.re.TargetRelation.Namespace + "#" + re.re.TargetRelation.Relation, nil

case core.ReachabilityEntrypoint_COMPUTED_USERSET_ENTRYPOINT:
return fmt.Sprintf("computed-entrypoint: %s#%s", re.re.TargetRelation.Namespace, re.re.TargetRelation.Relation)
return "computed-userset-entrypoint: " + re.re.TargetRelation.Namespace + "#" + re.re.TargetRelation.Relation, nil

default:
panic("unknown relation entrypoint kind")
return "", fmt.Errorf("unknown entrypoint kind %v", re.EntrypointKind())
}
}

Expand Down

0 comments on commit bc2c9e5

Please sign in to comment.