From 44324f119c897d5c32cef1bd3c923d9ffd3df048 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 18 Dec 2024 23:00:54 -0500 Subject: [PATCH] Add tracing to the LR2 implementation --- internal/graph/lookupresources2.go | 32 +++++++++++++++++++++++++- internal/graph/lr2streams.go | 13 +++++++++++ internal/graph/resourcesubjectsmap2.go | 4 ++++ pkg/typesystem/reachabilitygraph.go | 17 ++++++++++---- 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/internal/graph/lookupresources2.go b/internal/graph/lookupresources2.go index 7411d7c073..9e60a6ebd0 100644 --- a/internal/graph/lookupresources2.go +++ b/internal/graph/lookupresources2.go @@ -5,6 +5,9 @@ import ( "slices" "sort" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "github.com/authzed/spicedb/internal/caveats" "github.com/authzed/spicedb/internal/dispatch" "github.com/authzed/spicedb/internal/graph/computed" @@ -40,6 +43,9 @@ func (crr *CursoredLookupResources2) LookupResources2( req ValidatedLookupResources2Request, stream dispatch.LookupResources2Stream, ) error { + ctx, span := tracer.Start(stream.Context(), "lookupResources2") + defer span.End() + if req.TerminalSubject == nil { return spiceerrors.MustBugf("no terminal subject given to lookup resources dispatch") } @@ -57,7 +63,6 @@ func (crr *CursoredLookupResources2) LookupResources2( sort.Strings(req.SubjectIds) } - ctx := stream.Context() limits := newLimitTracker(req.OptionalLimit) ci, err := newCursorInformation(req.OptionalCursor, limits, dispatchVersion) if err != nil { @@ -105,6 +110,9 @@ func (crr *CursoredLookupResources2) afterSameType( req ValidatedLookupResources2Request, parentStream dispatch.LookupResources2Stream, ) error { + ctx, span := tracer.Start(ctx, "lookupViaReachability") + defer span.End() + dispatched := NewSyncONRSet() // Load the type system and reachability graph to find the entrypoints for the reachability. @@ -127,6 +135,15 @@ func (crr *CursoredLookupResources2) afterSameType( // For each entrypoint, load the necessary data and re-dispatch if a subproblem was found. return withParallelizedStreamingIterableInCursor(ctx, ci, entrypoints, parentStream, crr.concurrencyLimit, func(ctx context.Context, ci cursorInformation, entrypoint typesystem.ReachabilityEntrypoint, stream dispatch.LookupResources2Stream) error { + ds, err := entrypoint.DebugString() + spiceerrors.DebugAssert(func() bool { + return err == nil + }, "Error in entrypoint.DebugString()") + ctx, span := tracer.Start(ctx, "entrypoint", trace.WithAttributes( + attribute.String("entrypoint", ds), + )) + defer span.End() + switch entrypoint.EntrypointKind() { case core.ReachabilityEntrypoint_RELATION_ENTRYPOINT: return crr.lookupRelationEntrypoint(ctx, ci, entrypoint, rg, reader, req, stream, dispatched) @@ -265,6 +282,14 @@ func (crr *CursoredLookupResources2) redispatchOrReportOverDatabaseQuery( ctx context.Context, config redispatchOverDatabaseConfig2, ) error { + ctx, span := tracer.Start(ctx, "datastorequery", trace.WithAttributes( + attribute.String("source-resource-type-namespace", config.sourceResourceType.Namespace), + attribute.String("source-resource-type-relation", config.sourceResourceType.Relation), + attribute.String("subjects-filter-subject-type", config.subjectsFilter.SubjectType), + attribute.Int("subjects-filter-subject-ids-count", len(config.subjectsFilter.OptionalSubjectIds)), + )) + defer span.End() + return withDatastoreCursorInCursor(ctx, config.ci, config.parentStream, config.concurrencyLimit, // Find the target resources for the subject. func(queryCursor options.Cursor) ([]itemAndPostCursor[dispatchableResourcesSubjectMap2], error) { @@ -455,6 +480,11 @@ func (crr *CursoredLookupResources2) redispatchOrReport( return nil } + ctx, span := tracer.Start(ctx, "redispatchOrReport", trace.WithAttributes( + attribute.Int("found-resources-count", foundResources.len()), + )) + defer span.End() + // Check for entrypoints for the new found resource type. hasResourceEntrypoints, err := rg.HasOptimizedEntrypointsForSubjectToResource(ctx, foundResourceType, parentRequest.ResourceRelation) if err != nil { diff --git a/internal/graph/lr2streams.go b/internal/graph/lr2streams.go index e1296a96a8..d03bb2d0dd 100644 --- a/internal/graph/lr2streams.go +++ b/internal/graph/lr2streams.go @@ -5,6 +5,9 @@ import ( "strconv" "sync" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "github.com/authzed/spicedb/internal/dispatch" "github.com/authzed/spicedb/internal/graph/computed" "github.com/authzed/spicedb/internal/graph/hints" @@ -103,6 +106,11 @@ func (rdc *checkAndDispatchRunner) runChecker(ctx context.Context, startingIndex return nil } + ctx, span := tracer.Start(ctx, "lr2Check", trace.WithAttributes( + attribute.Int("resource-id-count", len(resourceIDsToCheck)), + )) + defer span.End() + checkHints := make([]*v1.CheckHint, 0, len(resourceIDsToCheck)) for _, resourceID := range resourceIDsToCheck { checkHint, err := hints.HintForEntrypoint( @@ -193,6 +201,11 @@ func (rdc *checkAndDispatchRunner) runDispatch( } rdc.lock.Unlock() + ctx, span := tracer.Start(ctx, "lr2Dispatch", trace.WithAttributes( + attribute.Int("resource-id-count", len(resourceIDsToDispatch)), + )) + defer span.End() + // NOTE: Since we extracted a custom section from the cursor at the beginning of this run, we have to add // the starting index to the cursor to ensure that the next run starts from the correct place, and we have // to use the *updated* cursor below on the dispatch. diff --git a/internal/graph/resourcesubjectsmap2.go b/internal/graph/resourcesubjectsmap2.go index 582a8791e4..5c134be00b 100644 --- a/internal/graph/resourcesubjectsmap2.go +++ b/internal/graph/resourcesubjectsmap2.go @@ -103,6 +103,10 @@ type dispatchableResourcesSubjectMap2 struct { resourcesSubjectMap2 } +func (rsm dispatchableResourcesSubjectMap2) len() int { + return rsm.resourcesAndSubjects.Len() +} + func (rsm dispatchableResourcesSubjectMap2) isEmpty() bool { return rsm.resourcesAndSubjects.IsEmpty() } diff --git a/pkg/typesystem/reachabilitygraph.go b/pkg/typesystem/reachabilitygraph.go index dd370cd258..3f4b826fd0 100644 --- a/pkg/typesystem/reachabilitygraph.go +++ b/pkg/typesystem/reachabilitygraph.go @@ -116,18 +116,27 @@ func (re ReachabilityEntrypoint) String() string { } func (re ReachabilityEntrypoint) MustDebugString() string { + ds, err := re.DebugString() + if err != nil { + panic(err) + } + + return ds +} + +func (re ReachabilityEntrypoint) DebugString() (string, error) { switch re.EntrypointKind() { case core.ReachabilityEntrypoint_RELATION_ENTRYPOINT: - return fmt.Sprintf("relation-entrypoint: %s#%s", re.re.TargetRelation.Namespace, re.re.TargetRelation.Relation) + return "relation-entrypoint: " + re.re.TargetRelation.Namespace + "#" + re.re.TargetRelation.Relation, nil case core.ReachabilityEntrypoint_TUPLESET_TO_USERSET_ENTRYPOINT: - return fmt.Sprintf("ttu-entrypoint: %s#%s | %s | %s#%s", re.parentRelation.Namespace, re.parentRelation.Relation, re.re.TuplesetRelation, re.re.TargetRelation.Namespace, re.re.TargetRelation.Relation) + return "ttu-entrypoint: " + re.re.TuplesetRelation + " -> " + re.re.TargetRelation.Namespace + "#" + re.re.TargetRelation.Relation, nil case core.ReachabilityEntrypoint_COMPUTED_USERSET_ENTRYPOINT: - return fmt.Sprintf("computed-entrypoint: %s#%s", re.re.TargetRelation.Namespace, re.re.TargetRelation.Relation) + return "computed-userset-entrypoint: " + re.re.TargetRelation.Namespace + "#" + re.re.TargetRelation.Relation, nil default: - panic("unknown relation entrypoint kind") + return "", fmt.Errorf("unknown entrypoint kind %v", re.EntrypointKind()) } }