Skip to content

Commit

Permalink
Merge pull request #1650 from authzed/log-posibly-loop-singleflight-d…
Browse files Browse the repository at this point in the history
…ispatch

singleflight dispatcher: do not double-singleflight on remote cluster dispatch
  • Loading branch information
vroldanbet authored Nov 16, 2023
2 parents b95cc46 + 4a20f13 commit 638511c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 7 deletions.
2 changes: 0 additions & 2 deletions internal/dispatch/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/keys"
"github.com/authzed/spicedb/internal/dispatch/singleflight"
"github.com/authzed/spicedb/pkg/cache"
)

Expand Down Expand Up @@ -68,7 +67,6 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp
}

clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits)
clusterDispatch = singleflight.New(clusterDispatch, &keys.CanonicalKeyHandler{})

if opts.prometheusSubsystem == "" {
opts.prometheusSubsystem = "dispatch"
Expand Down
4 changes: 4 additions & 0 deletions internal/dispatch/singleflight/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"google.golang.org/grpc/status"
"resenje.org/singleflight"

log "github.com/authzed/spicedb/internal/logging"

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/keys"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
Expand Down Expand Up @@ -73,6 +75,7 @@ func (d *Dispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckReq
if err != nil {
return &v1.DispatchCheckResponse{Metadata: &v1.ResponseMeta{DispatchCount: 1}}, err
} else if possiblyLoop {
log.Debug().Object("DispatchCheckRequest", req).Str("key", keyString).Msg("potential DispatchCheckRequest loop detected")
singleFlightCount.WithLabelValues("DispatchCheck", "loop").Inc()
return d.delegate.DispatchCheck(ctx, req)
}
Expand Down Expand Up @@ -116,6 +119,7 @@ func (d *Dispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpandR
if err != nil {
return &v1.DispatchExpandResponse{Metadata: &v1.ResponseMeta{DispatchCount: 1}}, err
} else if possiblyLoop {
log.Debug().Object("DispatchExpand", req).Str("key", keyString).Msg("potential DispatchExpand loop detected")
singleFlightCount.WithLabelValues("DispatchExpand", "loop").Inc()
return d.delegate.DispatchExpand(ctx, req)
}
Expand Down
21 changes: 18 additions & 3 deletions internal/services/v1/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,16 @@ func (ps *permissionServer) ExpandPermissionTree(ctx context.Context, req *v1.Ex
return nil, ps.rewriteError(ctx, err)
}

bf, err := dispatch.NewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth))
if err != nil {
return nil, err
}

resp, err := ps.dispatch.DispatchExpand(ctx, &dispatch.DispatchExpandRequest{
Metadata: &dispatch.ResolverMeta{
AtRevision: atRevision.String(),
DepthRemaining: ps.config.MaximumAPIDepth,
TraversalBloom: dispatch.MustNewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)),
TraversalBloom: bf,
},
ResourceAndRelation: &core.ObjectAndRelation{
Namespace: req.Resource.ObjectType,
Expand Down Expand Up @@ -417,12 +422,17 @@ func (ps *permissionServer) LookupResources(req *v1.LookupResourcesRequest, resp
return nil
})

bf, err := dispatch.NewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth))
if err != nil {
return err
}

err = ps.dispatch.DispatchLookupResources(
&dispatch.DispatchLookupResourcesRequest{
Metadata: &dispatch.ResolverMeta{
AtRevision: atRevision.String(),
DepthRemaining: ps.config.MaximumAPIDepth,
TraversalBloom: dispatch.MustNewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)),
TraversalBloom: bf,
},
ObjectRelation: &core.RelationReference{
Namespace: req.ResourceObjectType,
Expand Down Expand Up @@ -537,12 +547,17 @@ func (ps *permissionServer) LookupSubjects(req *v1.LookupSubjectsRequest, resp v
return nil
})

bf, err := dispatch.NewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth))
if err != nil {
return err
}

err = ps.dispatch.DispatchLookupSubjects(
&dispatch.DispatchLookupSubjectsRequest{
Metadata: &dispatch.ResolverMeta{
AtRevision: atRevision.String(),
DepthRemaining: ps.config.MaximumAPIDepth,
TraversalBloom: dispatch.MustNewTraversalBloomFilter(uint(ps.config.MaximumAPIDepth)),
TraversalBloom: bf,
},
ResourceRelation: &core.RelationReference{
Namespace: req.Resource.ObjectType,
Expand Down
8 changes: 6 additions & 2 deletions pkg/proto/dispatch/v1/02_resolvermeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
)

func (x *ResolverMeta) RecordTraversal(key string) (possiblyLoop bool, err error) {
if key == "" {
return false, spiceerrors.MustBugf("missing key to be recorded in traversal")
}

if x == nil || len(x.TraversalBloom) == 0 {
return false, status.Error(codes.Internal, fmt.Errorf("required traversal bloom filter is missing").Error())
}
Expand Down Expand Up @@ -39,12 +43,12 @@ const defaultFalsePositiveRate = 0.001
func NewTraversalBloomFilter(numElements uint) ([]byte, error) {
bf := bloom.NewWithEstimates(numElements, defaultFalsePositiveRate)

modifiedBloomFilter, err := bf.MarshalBinary()
emptyBloomFilter, err := bf.MarshalBinary()
if err != nil {
return nil, spiceerrors.MustBugf("unexpected error while serializing empty bloom filter: %s", err.Error())
}

return modifiedBloomFilter, nil
return emptyBloomFilter, nil
}

// MustNewTraversalBloomFilter creates a new bloom filter sized to the provided number of elements and
Expand Down

0 comments on commit 638511c

Please sign in to comment.