Skip to content

Commit

Permalink
changes in cache-not storing the new canary config in snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhipsa20 committed Nov 8, 2024
1 parent 10c5bec commit 105b6f9
Showing 1 changed file with 98 additions and 26 deletions.
124 changes: 98 additions & 26 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,19 +276,33 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
currentResources.Version = fmt.Sprintf("%d", currentVersion)

// Update
snapshot.(*Snapshot).Resources[index] = currentResources

cache.snapshots[node] = snapshot
if !isCanary {
snapshot.(*Snapshot).Resources[index] = currentResources
cache.snapshots[node] = snapshot
}

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()

// Respond to delta watches for the node.
err := cache.respondDeltaWatches(ctx, info, snapshot)
if err != nil {
info.mu.Unlock()
continue
if !isCanary {
err := cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
if err != nil {
info.mu.Unlock()
continue
}
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[index] = currentResources
err := cache.respondDeltaWatches(ctx, info, snap, isCanary)
if err != nil {
info.mu.Unlock()
continue
}
}
info.mu.Unlock()
}
Expand Down Expand Up @@ -354,16 +368,27 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
currentResources.Version = fmt.Sprintf("%d", currentVersion)

// Update
snapshot.(*Snapshot).Resources[index] = currentResources
cache.snapshots[node] = snapshot
if !isCanary {
snapshot.(*Snapshot).Resources[index] = currentResources
cache.snapshots[node] = snapshot
}

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
if !isCanary {
return cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[index] = currentResources
return cache.respondDeltaWatches(ctx, info, snap, isCanary)
}
}
} else {
cache.mu.Unlock()
Expand Down Expand Up @@ -432,18 +457,32 @@ func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, ty
prevResources.Version = fmt.Sprintf("%d", currentVersion)

// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot
if !isCanary {
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot
}

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
err := cache.respondDeltaWatches(ctx, info, snapshot)
if err != nil {
return
if !isCanary {
err := cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
if err != nil {
return
}
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[index] = prevResources
err := cache.respondDeltaWatches(ctx, info, snap, isCanary)
if err != nil {
return
}
}
}
}(node, r)
Expand All @@ -470,16 +509,27 @@ func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, ty
currentVersion++
prevResources.Version = fmt.Sprintf("%d", currentVersion)
// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot
if !isCanary {
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot
}

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
if !isCanary {
return cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[index] = prevResources
return cache.respondDeltaWatches(ctx, info, snap, isCanary)
}
}

} else if typ == resource.EndpointType {
Expand Down Expand Up @@ -518,13 +568,24 @@ func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, ty
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot
if !isCanary {
snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot
}

// Respond deltas
if info, ok := cache.status[node_]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
if !isCanary {
_ = cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[types.Endpoint] = currentResources
_ = cache.respondDeltaWatches(ctx, info, snap, isCanary)
}
info.mu.Unlock()
}
}
Expand Down Expand Up @@ -583,13 +644,24 @@ func (cache *snapshotCache) DrainResources(ctx context.Context, _ string, typ st
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot
if !isCanary {
snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot
}

// Respond deltas
if info, ok := cache.status[node_]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
if !isCanary {
_ = cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node_].(*Snapshot).VersionMap,
}
snap.Resources[types.Endpoint] = currentResources
_ = cache.respondDeltaWatches(ctx, info, snap, isCanary)
}
info.mu.Unlock()
}
}
Expand Down Expand Up @@ -617,7 +689,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
}

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
return cache.respondDeltaWatches(ctx, info, snapshot, false)
}

return nil
Expand Down Expand Up @@ -663,7 +735,7 @@ func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *status
return nil
}

func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot, isCanary bool) error {
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
Expand Down

0 comments on commit 105b6f9

Please sign in to comment.