Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: update watcher API as per gRFC A88 #7977

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion internal/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (sc *ServerConfig) ServerFeatures() []string {
//
// This feature controls the behavior of the xDS client when the server deletes
// a previously sent Listener or Cluster resource. If set, the xDS client will
// not invoke the watchers' OnResourceDoesNotExist() method when a resource is
// not invoke the watchers' ResourceError() method when a resource is
// deleted, nor will it remove the existing resource value from its cache.
func (sc *ServerConfig) ServerFeaturesIgnoreResourceDeletion() bool {
for _, sf := range sc.serverFeatures {
Expand Down
38 changes: 19 additions & 19 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,49 +71,49 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

Expand All @@ -127,31 +127,31 @@ func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc)
// for ADS stream level flow control), and was causing CSDS to not receive any
// updates from the xDS client.
type blockingListenerWatcher struct {
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan xdsresource.OnDoneFunc // Channel to write the onDone callback to.
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan func() // Channel to write the onDone callback to.
}

func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher {
return &blockingListenerWatcher{
testCtxDone: testCtxDone,
onDoneCh: make(chan xdsresource.OnDoneFunc, 1),
onDoneCh: make(chan func(), 1),
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) AmbientError(_ error, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

// writeOnDone attempts to write the onDone callback on the onDone channel. It
// returns when it can successfully write to the channel or when the test is
// done, which is signalled by testCtxDone being closed.
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.OnDoneFunc, onDone xdsresource.OnDoneFunc) {
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan func(), onDone func()) {
select {
case <-testCtxDone:
case onDoneCh <- onDone:
Expand Down
27 changes: 13 additions & 14 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (b *cdsBalancer) ResolverError(err error) {
if b.lbCfg != nil {
root = b.lbCfg.ClusterName
}
b.onClusterError(root, err)
b.onClusterAmbientError(root, err)
})
}

Expand Down Expand Up @@ -474,20 +474,20 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
// If the security config is invalid, for example, if the provider
// instance is not found in the bootstrap config, we need to put the
// channel in transient failure.
b.onClusterError(name, b.annotateErrorWithNodeID(fmt.Errorf("received Cluster resource contains invalid security config: %v", err)))
b.onClusterAmbientError(name, b.annotateErrorWithNodeID(fmt.Errorf("received Cluster resource contains invalid security config: %v", err)))
return
}
}

clustersSeen := make(map[string]bool)
dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
if err != nil {
b.onClusterError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("failed to generate discovery mechanisms: %v", err)))
b.onClusterAmbientError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("failed to generate discovery mechanisms: %v", err)))
return
}
if ok {
if len(dms) == 0 {
b.onClusterError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters")))
b.onClusterAmbientError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters")))
return
}
// Child policy is built the first time we resolve the cluster graph.
Expand Down Expand Up @@ -542,12 +542,12 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
}
}

// Handles an error Cluster update from the xDS client. Propagates the error
// down to the child policy if one exists, or puts the channel in
// TRANSIENT_FAILURE.
// Handles an error Cluster update from the xDS client to not stop using the
// previously seen resource. Propagates the error down to the child policy
// if one exists, or puts the channel in TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterError(name string, err error) {
func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)

if b.childLB != nil {
Expand All @@ -566,14 +566,13 @@ func (b *cdsBalancer) onClusterError(name string, err error) {
}
}

// Handles a resource-not-found error from the xDS client. Propagates the error
// down to the child policy if one exists, or puts the channel in
// TRANSIENT_FAILURE.
// Handles an error Cluster update from the xDS client to stop using the
// previously seen resource. Propagates the error down to the child policy
// if one exists, or puts the channel in TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
b.logger.Warningf("CDS watch for resource %q reported resource-does-not-exist error", name)
err := b.annotateErrorWithNodeID(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "cluster %q not found", name))
func (b *cdsBalancer) onClusterResourceError(name string, err error) {
b.logger.Warningf("CDS watch for resource %q reported resource error", name)
b.closeChildPolicyAndReportTF(err)
}

Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (s) TestResolverError(t *testing.T) {

// Grab the wrapped connection from the listener wrapper. This will be used
// to verify the connection is closed.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*100000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please undo this change.

Just FYI: In a large PR, things like this can be easily missed during the review process. Please be a little more mindful of such changes going forward.

defer cancel()
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -1003,7 +1003,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {

// Ensure RPC fails with Unavailable status code and the error message is
// meaningful and contains the xDS node ID.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
wantErr := fmt.Sprintf("resource %q of type %q has been removed", clusterName, "ClusterResource")
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err := verifyRPCError(err, codes.Unavailable, wantErr, nodeID); err != nil {
t.Fatal(err)
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterResourceData, onDone func()) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
func (cw *clusterWatcher) ResourceError(err error, onDone func()) {
handleResourceError := func(context.Context) { cw.parent.onClusterResourceError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleResourceError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
func (cw *clusterWatcher) AmbientError(err error, onDone func()) {
handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
}

// Ensure that RPCs start to fail with expected error.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
wantErr := fmt.Sprintf("resource %q of type %q has been removed", clusterName, "ClusterResource")
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type resourceUpdate struct {
priorities []priorityConfig
// To be invoked once the update is completely processed, or is dropped in
// favor of a newer update.
onDone xdsresource.OnDoneFunc
onDone func()
}

// topLevelResolver is used by concrete endpointsResolver implementations for
Expand All @@ -50,7 +50,7 @@ type topLevelResolver interface {
// endpointsResolver implementation. The onDone callback is to be invoked
// once the update is completely processed, or is dropped in favor of a
// newer update.
onUpdate(onDone xdsresource.OnDoneFunc)
onUpdate(onDone func())
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -282,7 +282,7 @@ func (rr *resourceResolver) stop(closing bool) {
// clusterresolver LB policy.
//
// Caller must hold rr.mu.
func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
func (rr *resourceResolver) generateLocked(onDone func()) {
var ret []priorityConfig
for _, rDM := range rr.children {
u, ok := rDM.r.lastUpdate()
Expand Down Expand Up @@ -312,7 +312,7 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
}

func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
func (rr *resourceResolver) onUpdate(onDone func()) {
handleUpdate := func(context.Context) {
rr.mu.Lock()
rr.generateLocked(onDone)
Expand Down
40 changes: 20 additions & 20 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
return ret
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
// ResourceChanged is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
Expand All @@ -89,52 +89,52 @@
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) ResourceError(err error, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
}

if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
}

er.mu.Lock()
if er.update != nil {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone()
return
}
er.logger.Warningf("EDS discovery mechanism for resource %q reported resource error: %v", er.nameToWatch, err)

// Else report an empty update that would result in no priority child being
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) AmbientError(err error, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
}

er.logger.Warningf("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported ambient error: %v", er.nameToWatch, err)
}

Check warning on line 121 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L120-L121

Added lines #L120 - L121 were not covered by tests

// Report an empty update that would result in no priority child being
er.mu.Lock()
if er.update != nil {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone()
return
}

// Else report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

Expand Down
Loading
Loading