Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xd pipeline changes #16

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 7 additions & 21 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func(), delayedResponse bool)
}

// ConfigFetcher fetches configuration resources from cache
Expand Down Expand Up @@ -127,7 +127,7 @@ type RawResponse struct {
Version string

// Resources to be included in the response.
Resources []types.ResourceWithTTL
Resources []VTMarshaledResource

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
Expand All @@ -151,7 +151,7 @@ type RawDeltaResponse struct {
SystemVersionInfo string

// Resources to be included in the response.
Resources []types.ResourceWithTTL
Resources []VTMarshaledResource

// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
RemovedResources []string
Expand Down Expand Up @@ -212,17 +212,9 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
marshaledResources := make([]*anypb.Any, len(r.Resources))

for i, resource := range r.Resources {
maybeTtldResource, resourceType, err := r.maybeCreateTTLResource(resource)
if err != nil {
return nil, err
}
marshaledResource, err := MarshalResource(maybeTtldResource)
if err != nil {
return nil, err
}
marshaledResources[i] = &anypb.Any{
TypeUrl: resourceType,
Value: marshaledResource,
TypeUrl: r.GetRequest().GetTypeUrl(),
Value: resource.Resource,
}
}

Expand All @@ -243,19 +235,14 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
marshaledResponse := r.marshaledResponse.Load()

if marshaledResponse == nil {
marshaledResources := make([]*discovery.Resource, 0)

for _, resource := range r.Resources {
name := GetResourceName(resource.Resource)
name := resource.Name
if name == "" {
continue
}
marshaledResource, err := MarshalResource(resource.Resource)
if err != nil {
return nil, err
}

if resource.Version == "" {
return nil, errors.New("failed to get a resource hash")
Expand All @@ -264,7 +251,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
Name: name,
Resource: &anypb.Any{
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
Value: marshaledResource,
Value: resource.Resource,
},
Version: resource.Version,
})
Expand All @@ -278,7 +265,6 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
}
r.marshaledResponse.Store(marshaledResponse)
}

return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import (
"context"
"strings"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// groups together resource-related arguments for the createDeltaResponse function
type resourceContainer struct {
resourceMap map[string]types.ResourceWithTTL
resourceMap map[string]VTMarshaledResource
versionMap map[string]string
systemVersion string
}
Expand Down Expand Up @@ -66,7 +65,7 @@ func containsPrefixedKey(data map[string]string, keyLike string) ([]string, bool
return resNames, len(resNames) > 0
}

func containsPrefixedKeyResources(data map[string]types.ResourceWithTTL, keyLike string) ([]string, bool) {
func containsPrefixedKeyResources(data map[string]VTMarshaledResource, keyLike string) ([]string, bool) {
resNames := make([]string, 0)
for key := range data {
if strings.Contains(key, keyLike) {
Expand All @@ -79,11 +78,11 @@ func containsPrefixedKeyResources(data map[string]types.ResourceWithTTL, keyLike
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered map[string]types.ResourceWithTTL
var filtered map[string]VTMarshaledResource
var toRemove []string
switch {
case state.IsWildcard():
filtered = make(map[string]types.ResourceWithTTL)
filtered = make(map[string]VTMarshaledResource)
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
// Since we've already precomputed the version hashes of the new snapshot,
Expand All @@ -104,7 +103,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
}
}
default:
filtered = make(map[string]types.ResourceWithTTL)
filtered = make(map[string]VTMarshaledResource)
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
Expand All @@ -121,7 +120,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
if r, ok := resources.resourceMap[versionName]; ok {
nextVersion := resources.versionMap[versionName]
if prevVersion != nextVersion {
filtered[GetResourceName(r.Resource)] = r
filtered[r.Name] = r
}
nextVersionMap[versionName] = nextVersion
} else if found {
Expand All @@ -133,7 +132,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered[GetResourceName(r.Resource)] = r
filtered[r.Name] = r
}
nextVersionMap[name] = nextVersion
} else if found {
Expand All @@ -143,8 +142,8 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
}
}

filteredResources := make([]types.ResourceWithTTL, 0)
filteredResourceNames := make([]string, 0)
filteredResources := make([]VTMarshaledResource, len(filtered))
filteredResourceNames := make([]string, len(filtered))
for name, r := range filtered {
filteredResources = append(filteredResources, r)
filteredResourceNames = append(filteredResourceNames, name)
Expand Down
36 changes: 25 additions & 11 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,27 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
var resources []types.ResourceWithTTL
var resources []VTMarshaledResource
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
resources = make([]VTMarshaledResource, 0, len(cache.resources))
for _, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
out, err := resource.MarshalVTStrict()
if err != nil {
continue
}
resources = append(resources, VTMarshaledResource{Resource: out})
}
} else {
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
resources = make([]VTMarshaledResource, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
out, err := resource.MarshalVTStrict()
if err != nil {
continue
}
resources = append(resources, VTMarshaledResource{Resource: out})
}
}
}
Expand Down Expand Up @@ -180,10 +188,16 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe
// We converted types.Resource to types.ResourceWithTTL in Snapshot cache.
// In order to avoid affects in any other types of caches, we loop and convert
// back types.Resource to types.ResourceWithTTL
resources := make(map[string]types.ResourceWithTTL)
resources := make(map[string]VTMarshaledResource)
for name, resource := range cache.resources {
resources[name] = types.ResourceWithTTL{
Resource: resource,
out, err := resource.MarshalVTStrict()
if err != nil {
continue
}
resources[name] = VTMarshaledResource{
Name: name,
Resource: out,
Version: cache.versionMap[name],
}
}

Expand Down Expand Up @@ -382,7 +396,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
}
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (func(), bool) {
cache.mu.Lock()
defer cache.mu.Unlock()

Expand Down Expand Up @@ -412,10 +426,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}

return cache.cancelDeltaWatch(watchID)
return cache.cancelDeltaWatch(watchID), true
}

return nil
return nil, false
}

func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, val
return cache.CreateWatch(request, state, value)
}

func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (func(), bool) {
key := mux.ClassifyDelta(request)
cache, exists := mux.Caches[key]
if !exists {
value <- nil
return nil
return nil, true
}
return cache.CreateDeltaWatch(request, state, value)
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/cache/v3/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ func GetResourceNames(resources []types.Resource) []string {
}

// GetResourceWithTTLNames returns the resource names for a list of valid xDS response types.
func GetResourceWithTTLNames(resources []types.ResourceWithTTL) []string {
func GetResourceWithTTLNames(resources []VTMarshaledResource) []string {
out := make([]string, len(resources))
for i, r := range resources {
out[i] = GetResourceName(r.Resource)
out[i] = r.Name
}
return out
}
Expand All @@ -139,19 +139,20 @@ func GetResourceReferences(resources map[string]types.ResourceWithTTL) map[resou
func GetAllResourceReferences(resourceGroups [types.UnknownType]Resources) map[resource.Type]map[string]bool {
ret := map[resource.Type]map[string]bool{}

// Commenting the following as we switch to using VTMarshaledResource
// We only check resources that we expect to have references to other resources.
responseTypesWithReferences := map[types.ResponseType]struct{}{
types.Cluster: {},
types.Listener: {},
types.ScopedRoute: {},
}

for responseType, resourceGroup := range resourceGroups {
if _, ok := responseTypesWithReferences[types.ResponseType(responseType)]; ok {
items := resourceGroup.Items
getResourceReferences(items, ret)
}
}
//responseTypesWithReferences := map[types.ResponseType]struct{}{
// types.Cluster: {},
// types.Listener: {},
// types.ScopedRoute: {},
//}
//
//for responseType, resourceGroup := range resourceGroups {
// if _, ok := responseTypesWithReferences[types.ResponseType(responseType)]; ok {
// items := resourceGroup.Items
// getResourceReferences(items, ret)
// }
//}

return ret
}
Expand Down
35 changes: 32 additions & 3 deletions pkg/cache/v3/resources.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,43 @@
package cache

import "github.com/envoyproxy/go-control-plane/pkg/cache/types"
import (
"fmt"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"time"
)

// Resources is a versioned group of resources.
type Resources struct {
// Version information.
Version string

// Items in the group indexed by name.
Items map[string]types.ResourceWithTTL
Items map[string]VTMarshaledResource
}

type VTMarshaledResource struct {
Name string
Version string
Resource []byte
TTL *time.Duration
}

// IndexAndMarshalResourcesByName creates a map from the resource name to the marshaled resource.
func IndexAndMarshalResourcesByName(items []types.ResourceWithTTL) map[string]VTMarshaledResource {
indexed := make(map[string]VTMarshaledResource, len(items))
for _, item := range items {
out, err := item.Resource.MarshalVTStrict()
if err != nil {
fmt.Printf("failed to MarshalVTStrict resource %s: %v\n", GetResourceName(item.Resource), err)
continue
}
indexed[GetResourceName(item.Resource)] = VTMarshaledResource{
Name: GetResourceName(item.Resource),
Version: item.Version,
Resource: out,
}
}
return indexed
}

// IndexResourcesByName creates a map from the resource name to the resource.
Expand Down Expand Up @@ -42,6 +71,6 @@ func NewResources(version string, items []types.Resource) Resources {
func NewResourcesWithTTL(version string, items []types.ResourceWithTTL) Resources {
return Resources{
Version: version,
Items: IndexResourcesByName(items),
Items: IndexAndMarshalResourcesByName(items),
}
}
Loading
Loading