Skip to content

Commit

Permalink
Add GrpcHealthCheckedEndpointGroupBuilder
Browse files Browse the repository at this point in the history
Motivation:

Add `GrpcHealthCheckedEndpointGroupBuilder` which builds a health checked endpoint group whose health comes from a [standard gRPC health check service result](https://grpc.io/docs/guides/health-checking/).

Modifications:

* Adds `GrpcHealthCheckedEndpointGroupBuilder` which extends `AbstractHealthCheckedEndpointGroupBuilder` and creates a new health check function
* Adds `GrpcHealthChecker` which is the health check function that creates and uses a gRPC `HealthGrpc` stub to check the gRPC health service on the endpoint. If the health check response is `SERVING`, it is healthy. It is unhealthy if the response is not `SERVING` or if there was a request failure.
* Adds tests.

Result:

* A user can create a health checked endpoint group that is backed by a gRPC health check service.
* Closes line#5930

Closes
  • Loading branch information
wvuong committed Jan 22, 2025
1 parent a2fd1d3 commit 4d95df5
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;

import static java.util.Objects.requireNonNull;

import java.util.function.Function;

import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.endpoint.healthcheck.AbstractHealthCheckedEndpointGroupBuilder;
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.internal.client.grpc.GrpcHealthChecker;

/**
* Builds a health checked endpoint group whose health comes from a standard gRPC health check service.
*/
public final class GrpcHealthCheckedEndpointGroupBuilder
extends AbstractHealthCheckedEndpointGroupBuilder<GrpcHealthCheckedEndpointGroupBuilder> {

private @Nullable String service;

GrpcHealthCheckedEndpointGroupBuilder(EndpointGroup delegate) {
super(delegate);
}

/**
* Returns a {@link GrpcHealthCheckedEndpointGroupBuilder} that builds a health checked
* endpoint group with the specified {@link EndpointGroup}.
*/
public static GrpcHealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate) {
return new GrpcHealthCheckedEndpointGroupBuilder(requireNonNull(delegate));
}

/**
* Sets the optional service field of the gRPC health check request.
*/
public GrpcHealthCheckedEndpointGroupBuilder service(@Nullable String service) {
this.service = service;
return this;
}

@Override
protected Function<? super HealthCheckerContext, ? extends AsyncCloseable> newCheckerFactory() {
return new GrpcHealthCheckerFactory(service);
}

private static final class GrpcHealthCheckerFactory
implements Function<HealthCheckerContext, AsyncCloseable> {

private final @Nullable String service;

private GrpcHealthCheckerFactory(@Nullable String service) {
this.service = service;
}

@Override
public AsyncCloseable apply(HealthCheckerContext ctx) {
final GrpcHealthChecker healthChecker = new GrpcHealthChecker(ctx, ctx.endpoint(),
ctx.protocol(), service);
healthChecker.start();
return healthChecker;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

/**
* gRPC health checked endpoint.
*/
@NonNullByDefault
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;

import com.linecorp.armeria.common.annotation.NonNullByDefault;
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.internal.client.grpc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
import com.linecorp.armeria.client.grpc.GrpcClients;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.common.util.AsyncCloseableSupport;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;

public final class GrpcHealthChecker implements AsyncCloseable {

static final double HEALTHY = 1d;
static final double UNHEALTHY = 0d;

private final HealthCheckerContext ctx;
@Nullable private final String service;
private final HealthGrpc.HealthStub stub;

private final ReentrantLock lock = new ReentrantShortLock();
private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);

public GrpcHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, SessionProtocol sessionProtocol,
@Nullable String service) {
this.ctx = ctx;
this.service = service;

this.stub = GrpcClients.builder(sessionProtocol, endpoint)
.options(ctx.clientOptions())
.build(HealthGrpc.HealthStub.class);
}

public void start() {
check();
}

@VisibleForTesting
void check() {
lock();
try {
final HealthCheckRequest.Builder builder = HealthCheckRequest.newBuilder();
if (this.service != null) {
builder.setService(service);
}

try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) {
stub.check(builder.build(), new StreamObserver<HealthCheckResponse>() {
@Override
public void onNext(HealthCheckResponse healthCheckResponse) {
final ClientRequestContext reqCtx = reqCtxCaptor.get();
if (healthCheckResponse.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
ctx.updateHealth(HEALTHY, reqCtx, null, null);
} else {
ctx.updateHealth(UNHEALTHY, reqCtx, null, null);
}
}

@Override
public void onError(Throwable throwable) {
final ClientRequestContext reqCtx = reqCtxCaptor.get();
ctx.updateHealth(UNHEALTHY, reqCtx, ResponseHeaders.of(500), throwable);
}

@Override
public void onCompleted() {
}
});
}
} finally {
unlock();
}
}

@Override
public CompletableFuture<?> closeAsync() {
return closeable.closeAsync();
}

private synchronized void closeAsync(CompletableFuture<?> future) {
future.complete(null);
}

@Override
public void close() {
closeable.close();
}

private void lock() {
lock.lock();
}

private void unlock() {
lock.unlock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension;

class GrpcHealthCheckedEndpointGroupBuilderTest {

@RegisterExtension
private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension();

@Test
public void hasHealthyEndpoint() {
serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY);

final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
.builder(serverExtension.endpoint(SessionProtocol.H2C))
.build();

assertThat(endpointGroup.whenReady().join()).hasSize(1);
}

@Test
public void empty() throws Exception {
serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY);

final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
.builder(serverExtension.endpoint(SessionProtocol.H2C))
.build();

assertThatThrownBy(() -> {
// whenReady() will timeout because there are no healthy endpoints
endpointGroup.whenReady().get(1, TimeUnit.SECONDS);
}).isInstanceOf(TimeoutException.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2025 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common.grpc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.TextFormat;

import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;

public class HealthGrpcServerExtension extends ServerExtension {

private static final Logger LOGGER = LoggerFactory.getLogger(HealthGrpcServerExtension.class);

private static final HealthCheckResponse HEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
.setStatus(HealthCheckResponse.ServingStatus.SERVING)
.build();

private static final HealthCheckResponse UNHEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
.setStatus(HealthCheckResponse.ServingStatus.NOT_SERVING)
.build();

public enum Action {
DO_HEALTHY, DO_UNHEALTHY, DO_TIMEOUT
}

private Action action;

@Override
protected void configure(ServerBuilder sb) throws Exception {
final GrpcService grpcService = GrpcService.builder()
.addService(new HealthGrpc.HealthImplBase() {
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
LOGGER.debug("Received health check response {}", TextFormat.shortDebugString(request));

if (action == Action.DO_HEALTHY) {
responseObserver.onNext(HEALTHY_HEALTH_CHECK_RESPONSE);
responseObserver.onCompleted();
} else if (action == Action.DO_UNHEALTHY) {
responseObserver.onNext(UNHEALTHY_HEALTH_CHECK_RESPONSE);
responseObserver.onCompleted();
} else if (action == Action.DO_TIMEOUT) {
LOGGER.debug("Not sending a response...");
}

LOGGER.debug("Completed health check response");
}

@Override
public void watch(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
throw new UnsupportedOperationException();
}
})
.build();

sb.service(grpcService);
}

public void setAction(Action action) {
this.action = action;
}
}
Loading

0 comments on commit 4d95df5

Please sign in to comment.