Skip to content

Commit

Permalink
Merge pull request #11 from ShareChat/xds-cache
Browse files Browse the repository at this point in the history
Xds cache
  • Loading branch information
krhitesh7 authored Sep 27, 2024
2 parents ebfafc9 + 4b7e052 commit ee3a839
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (delayedResponse bool, cancel func())
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
}
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (bool, func()) {
cache.mu.Lock()
defer cache.mu.Unlock()

Expand Down Expand Up @@ -412,10 +412,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}

return cache.cancelDeltaWatch(watchID)
return false, cache.cancelDeltaWatch(watchID)
}

return nil
return false, nil
}

func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, val
return cache.CreateWatch(request, state, value)
}

func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (bool, func()) {
key := mux.ClassifyDelta(request)
cache, exists := mux.Caches[key]
if !exists {
value <- nil
return nil
return false, nil
}
return cache.CreateDeltaWatch(request, state, value)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
info.mu.Lock()

// Respond to delta watches for the node.
err := cache.respondDeltaWatches(ctx, info, snapshot)
err := cache.respondDeltaWatches(ctx, info, s)
if err != nil {
info.mu.Unlock()
continue
Expand Down Expand Up @@ -904,7 +904,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string]
}

// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (bool, func()) {
nodeID := cache.hash.ID(request.GetNode())
t := request.GetTypeUrl()

Expand Down Expand Up @@ -934,12 +934,12 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err)
}
// We don't need to respond. We're handling this in a better way in ads.
// response, err := cache.respondDelta(context.Background(), snapshot, request, value, state)
response, err := cache.respondDelta(context.Background(), snapshot, request, value, state)
if err != nil {
cache.log.Errorf("failed to respond with delta response: %s", err)
}

delayedResponse = true // response == nil
delayedResponse = response == nil
}

if delayedResponse {
Expand All @@ -952,10 +952,10 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
}

info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
return cache.cancelDeltaWatch(nodeID, watchID)
return delayedResponse, cache.cancelDeltaWatch(nodeID, watchID)
}

return nil
return false, nil
}

func GetEnvoyNodeStr(node *core.Node) string {
Expand Down

0 comments on commit ee3a839

Please sign in to comment.