diff --git a/internal/dispatch/cluster/cluster.go b/internal/dispatch/cluster/cluster.go index b00728a851..edc69952ff 100644 --- a/internal/dispatch/cluster/cluster.go +++ b/internal/dispatch/cluster/cluster.go @@ -7,7 +7,6 @@ import ( "github.com/authzed/spicedb/internal/dispatch/caching" "github.com/authzed/spicedb/internal/dispatch/graph" "github.com/authzed/spicedb/internal/dispatch/keys" - "github.com/authzed/spicedb/internal/dispatch/singleflight" "github.com/authzed/spicedb/pkg/cache" ) @@ -68,7 +67,6 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp } clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits) - clusterDispatch = singleflight.New(clusterDispatch, &keys.CanonicalKeyHandler{}) if opts.prometheusSubsystem == "" { opts.prometheusSubsystem = "dispatch" diff --git a/internal/dispatch/singleflight/singleflight.go b/internal/dispatch/singleflight/singleflight.go index fbcc1224f9..6a01a386bb 100644 --- a/internal/dispatch/singleflight/singleflight.go +++ b/internal/dispatch/singleflight/singleflight.go @@ -12,6 +12,8 @@ import ( "google.golang.org/grpc/status" "resenje.org/singleflight" + log "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/dispatch" "github.com/authzed/spicedb/internal/dispatch/keys" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" @@ -73,6 +75,7 @@ func (d *Dispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckReq if err != nil { return &v1.DispatchCheckResponse{Metadata: &v1.ResponseMeta{DispatchCount: 1}}, err } else if possiblyLoop { + log.Debug().Object("DispatchCheckRequest", req).Str("key", keyString).Msg("potential DispatchCheckRequest loop detected") singleFlightCount.WithLabelValues("DispatchCheck", "loop").Inc() return d.delegate.DispatchCheck(ctx, req) } @@ -116,6 +119,7 @@ func (d *Dispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpandR if err != nil { return &v1.DispatchExpandResponse{Metadata: &v1.ResponseMeta{DispatchCount: 1}}, err } else if possiblyLoop { + log.Debug().Object("DispatchExpand", req).Str("key", keyString).Msg("potential DispatchExpand loop detected") singleFlightCount.WithLabelValues("DispatchExpand", "loop").Inc() return d.delegate.DispatchExpand(ctx, req) } diff --git a/internal/services/v1/permissions.go b/internal/services/v1/permissions.go index 952fe7660b..1f17f710f2 100644 --- a/internal/services/v1/permissions.go +++ b/internal/services/v1/permissions.go @@ -155,11 +155,16 @@ func (ps *permissionServer) ExpandPermissionTree(ctx context.Context, req *v1.Ex return nil, ps.rewriteError(ctx, err) } + bf, err := dispatch.NewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)) + if err != nil { + return nil, err + } + resp, err := ps.dispatch.DispatchExpand(ctx, &dispatch.DispatchExpandRequest{ Metadata: &dispatch.ResolverMeta{ AtRevision: atRevision.String(), DepthRemaining: ps.config.MaximumAPIDepth, - TraversalBloom: dispatch.MustNewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)), + TraversalBloom: bf, }, ResourceAndRelation: &core.ObjectAndRelation{ Namespace: req.Resource.ObjectType, @@ -417,12 +422,17 @@ func (ps *permissionServer) LookupResources(req *v1.LookupResourcesRequest, resp return nil }) + bf, err := dispatch.NewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)) + if err != nil { + return err + } + err = ps.dispatch.DispatchLookupResources( &dispatch.DispatchLookupResourcesRequest{ Metadata: &dispatch.ResolverMeta{ AtRevision: atRevision.String(), DepthRemaining: ps.config.MaximumAPIDepth, - TraversalBloom: dispatch.MustNewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)), + TraversalBloom: bf, }, ObjectRelation: &core.RelationReference{ Namespace: req.ResourceObjectType, @@ -537,12 +547,17 @@ func (ps *permissionServer) LookupSubjects(req *v1.LookupSubjectsRequest, resp v return nil }) + bf, err := dispatch.NewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)) + if err != nil { + return err + } + err = ps.dispatch.DispatchLookupSubjects( &dispatch.DispatchLookupSubjectsRequest{ Metadata: &dispatch.ResolverMeta{ AtRevision: atRevision.String(), DepthRemaining: ps.config.MaximumAPIDepth, - TraversalBloom: dispatch.MustNewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)), + TraversalBloom: bf, }, ResourceRelation: &core.RelationReference{ Namespace: req.Resource.ObjectType, diff --git a/pkg/proto/dispatch/v1/02_resolvermeta.go b/pkg/proto/dispatch/v1/02_resolvermeta.go index 4e5b248adf..93742e06a7 100644 --- a/pkg/proto/dispatch/v1/02_resolvermeta.go +++ b/pkg/proto/dispatch/v1/02_resolvermeta.go @@ -11,6 +11,10 @@ import ( ) func (x *ResolverMeta) RecordTraversal(key string) (possiblyLoop bool, err error) { + if key == "" { + return false, spiceerrors.MustBugf("missing key to be recorded in traversal") + } + if x == nil || len(x.TraversalBloom) == 0 { return false, status.Error(codes.Internal, fmt.Errorf("required traversal bloom filter is missing").Error()) } @@ -39,12 +43,12 @@ const defaultFalsePositiveRate = 0.001 func NewTraversalBloomFilter(numElements uint) ([]byte, error) { bf := bloom.NewWithEstimates(numElements, defaultFalsePositiveRate) - modifiedBloomFilter, err := bf.MarshalBinary() + emptyBloomFilter, err := bf.MarshalBinary() if err != nil { return nil, spiceerrors.MustBugf("unexpected error while serializing empty bloom filter: %s", err.Error()) } - return modifiedBloomFilter, nil + return emptyBloomFilter, nil } // MustNewTraversalBloomFilter creates a new bloom filter sized to the provided number of elements and