Skip to content

Commit 4009e3e

Browse files
committed
xdsclient: update watcher API as per gRFC A88
1 parent 724f450 commit 4009e3e

21 files changed

+377
-348
lines changed

xds/csds/csds_e2e_test.go

+10-25
Original file line numberDiff line numberDiff line change
@@ -70,49 +70,37 @@ func Test(t *testing.T) {
7070

7171
type nopListenerWatcher struct{}
7272

73-
func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
73+
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
7474
onDone()
7575
}
76-
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
77-
onDone()
78-
}
79-
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
76+
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
8077
onDone()
8178
}
8279

8380
type nopRouteConfigWatcher struct{}
8481

85-
func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
82+
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) {
8683
onDone()
8784
}
88-
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
89-
onDone()
90-
}
91-
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
85+
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
9286
onDone()
9387
}
9488

9589
type nopClusterWatcher struct{}
9690

97-
func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
98-
onDone()
99-
}
100-
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
91+
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) {
10192
onDone()
10293
}
103-
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
94+
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
10495
onDone()
10596
}
10697

10798
type nopEndpointsWatcher struct{}
10899

109-
func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
110-
onDone()
111-
}
112-
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
100+
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) {
113101
onDone()
114102
}
115-
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
103+
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
116104
onDone()
117105
}
118106

@@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
137125
}
138126
}
139127

140-
func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
141-
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
142-
}
143-
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
128+
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
144129
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
145130
}
146-
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
131+
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
147132
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
148133
}
149134

xds/internal/balancer/cdsbalancer/cluster_watcher.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,26 @@ type clusterWatcher struct {
3232
parent *cdsBalancer
3333
}
3434

35-
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
35+
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) {
36+
if err != nil {
37+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
38+
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
39+
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
40+
} else {
41+
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
42+
cw.parent.serializer.ScheduleOr(handleError, onDone)
43+
}
44+
return
45+
}
3646
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
3747
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
3848
}
3949

40-
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
50+
func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
4151
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
4252
cw.parent.serializer.ScheduleOr(handleError, onDone)
4353
}
4454

45-
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
46-
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
47-
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
48-
}
49-
5055
// watcherState groups the state associated with a clusterWatcher.
5156
type watcherState struct {
5257
watcher *clusterWatcher // The underlying watcher.

xds/internal/balancer/clusterresolver/resource_resolver_eds.go

+24-25
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,42 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
7676
}
7777

7878
// OnUpdate is invoked to report an update for the resource being watched.
79-
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
79+
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) {
8080
if er.stopped.HasFired() {
8181
onDone()
8282
return
8383
}
8484

85+
if err != nil {
86+
if er.logger.V(2) {
87+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
88+
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
89+
} else {
90+
er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
91+
}
92+
}
93+
// Report an empty update that would result in no priority child being
94+
// created for this discovery mechanism. This would result in the priority
95+
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
96+
// localities) if this was the only discovery mechanism, or would result in
97+
// the priority LB policy using a lower priority discovery mechanism when
98+
// that becomes available.
99+
er.mu.Lock()
100+
er.update = &xdsresource.EndpointsUpdate{}
101+
er.mu.Unlock()
102+
103+
er.topLevelResolver.onUpdate(onDone)
104+
return
105+
}
106+
85107
er.mu.Lock()
86108
er.update = &update.Resource
87109
er.mu.Unlock()
88110

89111
er.topLevelResolver.onUpdate(onDone)
90112
}
91113

92-
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
114+
func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
93115
if er.stopped.HasFired() {
94116
onDone()
95117
return
@@ -119,26 +141,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun
119141

120142
er.topLevelResolver.onUpdate(onDone)
121143
}
122-
123-
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
124-
if er.stopped.HasFired() {
125-
onDone()
126-
return
127-
}
128-
129-
if er.logger.V(2) {
130-
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
131-
}
132-
133-
// Report an empty update that would result in no priority child being
134-
// created for this discovery mechanism. This would result in the priority
135-
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
136-
// localities) if this was the only discovery mechanism, or would result in
137-
// the priority LB policy using a lower priority discovery mechanism when
138-
// that becomes available.
139-
er.mu.Lock()
140-
er.update = &xdsresource.EndpointsUpdate{}
141-
er.mu.Unlock()
142-
143-
er.topLevelResolver.onUpdate(onDone)
144-
}

xds/internal/resolver/watch_service.go

+24-14
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,26 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
3636
return lw
3737
}
3838

39-
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
39+
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
40+
if err != nil {
41+
var handleError func(context.Context)
42+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
43+
handleError = func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
44+
} else {
45+
handleError = func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
46+
}
47+
l.parent.serializer.ScheduleOr(handleError, onDone)
48+
return
49+
}
4050
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
4151
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
4252
}
4353

44-
func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
54+
func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
4555
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
4656
l.parent.serializer.ScheduleOr(handleError, onDone)
4757
}
4858

49-
func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
50-
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
51-
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
52-
}
53-
5459
func (l *listenerWatcher) stop() {
5560
l.cancel()
5661
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
@@ -68,24 +73,29 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
6873
return rw
6974
}
7075

71-
func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
76+
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
77+
if err != nil {
78+
var handleError func(context.Context)
79+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
80+
handleError = func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
81+
} else {
82+
handleError = func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
83+
}
84+
r.parent.serializer.ScheduleOr(handleError, onDone)
85+
return
86+
}
7287
handleUpdate := func(context.Context) {
7388
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
7489
onDone()
7590
}
7691
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
7792
}
7893

79-
func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
94+
func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
8095
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
8196
r.parent.serializer.ScheduleOr(handleError, onDone)
8297
}
8398

84-
func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
85-
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
86-
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
87-
}
88-
8999
func (r *routeConfigWatcher) stop() {
90100
r.cancel()
91101
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)

xds/internal/server/listener_wrapper.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -414,19 +414,33 @@ type ldsWatcher struct {
414414
name string
415415
}
416416

417-
func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
417+
func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
418418
defer onDone()
419419
if lw.parent.closed.HasFired() {
420-
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
420+
if err != nil {
421+
lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err)
422+
} else {
423+
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
424+
}
421425
return
422426
}
423427
if lw.logger.V(2) {
424-
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
428+
if err != nil {
429+
lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err)
430+
} else {
431+
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
432+
}
433+
}
434+
if err != nil {
435+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
436+
lw.parent.onLDSResourceDoesNotExist(err)
437+
}
438+
return
425439
}
426440
lw.parent.handleLDSUpdate(update.Resource)
427441
}
428442

429-
func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
443+
func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
430444
defer onDone()
431445
if lw.parent.closed.HasFired() {
432446
lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err)
@@ -438,17 +452,3 @@ func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
438452
// For errors which are anything other than "resource-not-found", we
439453
// continue to use the old configuration.
440454
}
441-
442-
func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
443-
defer onDone()
444-
if lw.parent.closed.HasFired() {
445-
lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name)
446-
return
447-
}
448-
if lw.logger.V(2) {
449-
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error", lw.name)
450-
}
451-
452-
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
453-
lw.parent.onLDSResourceDoesNotExist(err)
454-
}

xds/internal/server/rds_handler.go

+11-18
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ type rdsWatcher struct {
147147
canceled bool // eats callbacks if true
148148
}
149149

150-
func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
150+
func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
151151
defer onDone()
152152
rw.mu.Lock()
153153
if rw.canceled {
@@ -156,26 +156,20 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDo
156156
}
157157
rw.mu.Unlock()
158158
if rw.logger.V(2) {
159-
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
159+
if err != nil {
160+
rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err)
161+
} else {
162+
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
163+
}
160164
}
161-
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
162-
}
163-
164-
func (rw *rdsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
165-
defer onDone()
166-
rw.mu.Lock()
167-
if rw.canceled {
168-
rw.mu.Unlock()
165+
if err != nil {
166+
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
169167
return
170168
}
171-
rw.mu.Unlock()
172-
if rw.logger.V(2) {
173-
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
174-
}
175-
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
169+
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
176170
}
177171

178-
func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
172+
func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
179173
defer onDone()
180174
rw.mu.Lock()
181175
if rw.canceled {
@@ -184,8 +178,7 @@ func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
184178
}
185179
rw.mu.Unlock()
186180
if rw.logger.V(2) {
187-
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
181+
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
188182
}
189-
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
190183
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
191184
}

0 commit comments

Comments
 (0)