Skip to content

xds: client watcher API update #12046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
// import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.util.GracefulSwitchLoadBalancer;
Expand Down Expand Up @@ -419,6 +420,98 @@ public void onChanged(final CdsUpdate update) {
handleClusterDiscovered();
}


// After A74 maybe:
/*@Override
public void onResourceChanged(StatusOr<CdsUpdate> update) {
if (shutdown) {
return;
}

discovered = true;
if (!update.hasValue()) {
Status error = Status.UNAVAILABLE
.withDescription(String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, update.getStatus().getCode(), update.getStatus().getDescription()))
.withCause(update.getStatus().getCause());
if (ClusterState.this == root) {
handleClusterDiscoveryError(error);
}
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
return;
}

CdsUpdate cdsUpdate = update.getValue();
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", cdsUpdate);
result = cdsUpdate;

if (cdsUpdate.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
cdsUpdate.clusterName(), cdsUpdate.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : cdsUpdate.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, cdsUpdate.clusterName()));
continue;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
}
childClusterStates = newChildStates;
} else if (cdsUpdate.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
cdsUpdate.clusterName(), cdsUpdate.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", cdsUpdate.clusterName());
}
handleClusterDiscovered();
}

@Override
public void onAmbientError(Status error) {
if (shutdown) {
return;
}
Status status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, error.getCode(), error.getDescription()))
.withCause(error.getCause());
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}*/
}
}
}
33 changes: 33 additions & 0 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,19 @@ protected void shutdown() {

@Override
public void onChanged(final EdsUpdate update) {
// Maybe after A74
/*if (!resourceUpdate.getStatus().isOk()) {
Status error = resourceUpdate.getStatus();
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
return;
}
final EdsUpdate update = resourceUpdate.getValue();*/
class EndpointsUpdated implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -571,6 +584,26 @@ public void onError(final Status error) {
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}

// Maybe after A74
/*@Override
public void onAmbientError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(
XdsLogLevel.WARNING, "Received transient error from EDS resource: {0}", error);
status = Status.UNAVAILABLE
.withDescription(
"Transient error while watching EDS resource: " + error.getDescription())
.withCause(error.getCause());
handleEndpointResolutionError();
}
});
}*/
}

private final class LogicalDnsClusterState extends ClusterState {
Expand Down
Loading
Loading