diff --git a/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java new file mode 100644 index 00000000000..70974656493 --- /dev/null +++ b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java @@ -0,0 +1,79 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.client.grpc.endpoint.healthcheck; + +import static java.util.Objects.requireNonNull; + +import java.util.function.Function; + +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.endpoint.healthcheck.AbstractHealthCheckedEndpointGroupBuilder; +import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.AsyncCloseable; +import com.linecorp.armeria.internal.client.grpc.GrpcHealthChecker; + +/** + * Builds a health checked endpoint group whose health comes from a standard gRPC health check service. + */ +public final class GrpcHealthCheckedEndpointGroupBuilder + extends AbstractHealthCheckedEndpointGroupBuilder { + + private @Nullable String service; + + GrpcHealthCheckedEndpointGroupBuilder(EndpointGroup delegate) { + super(delegate); + } + + /** + * Returns a {@link GrpcHealthCheckedEndpointGroupBuilder} that builds a health checked + * endpoint group with the specified {@link EndpointGroup}. + */ + public static GrpcHealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate) { + return new GrpcHealthCheckedEndpointGroupBuilder(requireNonNull(delegate)); + } + + /** + * Sets the optional service field of the gRPC health check request. + */ + public GrpcHealthCheckedEndpointGroupBuilder service(@Nullable String service) { + this.service = service; + return this; + } + + @Override + protected Function newCheckerFactory() { + return new GrpcHealthCheckerFactory(service); + } + + private static final class GrpcHealthCheckerFactory + implements Function { + + private final @Nullable String service; + + private GrpcHealthCheckerFactory(@Nullable String service) { + this.service = service; + } + + @Override + public AsyncCloseable apply(HealthCheckerContext ctx) { + final GrpcHealthChecker healthChecker = new GrpcHealthChecker(ctx, ctx.endpoint(), + ctx.protocol(), service); + healthChecker.start(); + return healthChecker; + } + } +} diff --git a/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java new file mode 100644 index 00000000000..7148050ae69 --- /dev/null +++ b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * gRPC health checked endpoint. + */ +@NonNullByDefault +package com.linecorp.armeria.client.grpc.endpoint.healthcheck; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java new file mode 100644 index 00000000000..faef1d1d40f --- /dev/null +++ b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java @@ -0,0 +1,125 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.internal.client.grpc; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext; +import com.linecorp.armeria.client.grpc.GrpcClients; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.AsyncCloseable; +import com.linecorp.armeria.common.util.AsyncCloseableSupport; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; + +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.StreamObserver; + +public final class GrpcHealthChecker implements AsyncCloseable { + + static final double HEALTHY = 1d; + static final double UNHEALTHY = 0d; + + private final HealthCheckerContext ctx; + @Nullable private final String service; + private final HealthGrpc.HealthStub stub; + + private final ReentrantLock lock = new ReentrantShortLock(); + private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync); + + public GrpcHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, SessionProtocol sessionProtocol, + @Nullable String service) { + this.ctx = ctx; + this.service = service; + + this.stub = GrpcClients.builder(sessionProtocol, endpoint) + .options(ctx.clientOptions()) + .build(HealthGrpc.HealthStub.class); + } + + public void start() { + check(); + } + + @VisibleForTesting + void check() { + lock(); + try { + final HealthCheckRequest.Builder builder = HealthCheckRequest.newBuilder(); + if (this.service != null) { + builder.setService(service); + } + + try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) { + stub.check(builder.build(), new StreamObserver() { + @Override + public void onNext(HealthCheckResponse healthCheckResponse) { + final ClientRequestContext reqCtx = reqCtxCaptor.get(); + if (healthCheckResponse.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { + ctx.updateHealth(HEALTHY, reqCtx, null, null); + } else { + ctx.updateHealth(UNHEALTHY, reqCtx, null, null); + } + } + + @Override + public void onError(Throwable throwable) { + final ClientRequestContext reqCtx = reqCtxCaptor.get(); + ctx.updateHealth(UNHEALTHY, reqCtx, ResponseHeaders.of(500), throwable); + } + + @Override + public void onCompleted() { + } + }); + } + } finally { + unlock(); + } + } + + @Override + public CompletableFuture closeAsync() { + return closeable.closeAsync(); + } + + private synchronized void closeAsync(CompletableFuture future) { + future.complete(null); + } + + @Override + public void close() { + closeable.close(); + } + + private void lock() { + lock.lock(); + } + + private void unlock() { + lock.unlock(); + } +} diff --git a/grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java b/grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java new file mode 100644 index 00000000000..7d1f145aa47 --- /dev/null +++ b/grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.client.grpc.endpoint.healthcheck; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension; + +class GrpcHealthCheckedEndpointGroupBuilderTest { + + @RegisterExtension + private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension(); + + @Test + public void hasHealthyEndpoint() { + serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY); + + final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder + .builder(serverExtension.endpoint(SessionProtocol.H2C)) + .build(); + + assertThat(endpointGroup.whenReady().join()).hasSize(1); + } + + @Test + public void empty() throws Exception { + serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY); + + final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder + .builder(serverExtension.endpoint(SessionProtocol.H2C)) + .build(); + + assertThatThrownBy(() -> { + // whenReady() will timeout because there are no healthy endpoints + endpointGroup.whenReady().get(1, TimeUnit.SECONDS); + }).isInstanceOf(TimeoutException.class); + } +} diff --git a/grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java b/grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java new file mode 100644 index 00000000000..45d152f2d5e --- /dev/null +++ b/grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java @@ -0,0 +1,86 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common.grpc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.TextFormat; + +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.StreamObserver; + +public class HealthGrpcServerExtension extends ServerExtension { + + private static final Logger LOGGER = LoggerFactory.getLogger(HealthGrpcServerExtension.class); + + private static final HealthCheckResponse HEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder() + .setStatus(HealthCheckResponse.ServingStatus.SERVING) + .build(); + + private static final HealthCheckResponse UNHEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder() + .setStatus(HealthCheckResponse.ServingStatus.NOT_SERVING) + .build(); + + public enum Action { + DO_HEALTHY, DO_UNHEALTHY, DO_TIMEOUT + } + + private Action action; + + @Override + protected void configure(ServerBuilder sb) throws Exception { + final GrpcService grpcService = GrpcService.builder() + .addService(new HealthGrpc.HealthImplBase() { + @Override + public void check(HealthCheckRequest request, + StreamObserver responseObserver) { + LOGGER.debug("Received health check response {}", TextFormat.shortDebugString(request)); + + if (action == Action.DO_HEALTHY) { + responseObserver.onNext(HEALTHY_HEALTH_CHECK_RESPONSE); + responseObserver.onCompleted(); + } else if (action == Action.DO_UNHEALTHY) { + responseObserver.onNext(UNHEALTHY_HEALTH_CHECK_RESPONSE); + responseObserver.onCompleted(); + } else if (action == Action.DO_TIMEOUT) { + LOGGER.debug("Not sending a response..."); + } + + LOGGER.debug("Completed health check response"); + } + + @Override + public void watch(HealthCheckRequest request, + StreamObserver responseObserver) { + throw new UnsupportedOperationException(); + } + }) + .build(); + + sb.service(grpcService); + } + + public void setAction(Action action) { + this.action = action; + } +} diff --git a/grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java b/grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java new file mode 100644 index 00000000000..d935e789f2d --- /dev/null +++ b/grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.internal.client.grpc; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.linecorp.armeria.client.ClientOptions; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension; + +import io.grpc.StatusRuntimeException; + +@ExtendWith(MockitoExtension.class) +class GrpcHealthCheckerTest { + + @RegisterExtension + private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension(); + + @Mock + private HealthCheckerContext context; + + @Captor + private ArgumentCaptor throwableArgumentCaptor; + + private GrpcHealthChecker healthChecker; + + @BeforeEach + void setUp() { + when(context.clientOptions()) + .thenReturn(ClientOptions.builder().responseTimeout(Duration.ofMillis(500)).build()); + + healthChecker = new GrpcHealthChecker(context, serverExtension.endpoint(SessionProtocol.H2C), + SessionProtocol.H2C, null); + } + + @Test + void healthy() { + serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY); + + healthChecker.check(); + + verify(context, timeout(1000).times(1)).updateHealth(eq(GrpcHealthChecker.HEALTHY), + any(ClientRequestContext.class), eq(null), eq(null)); + } + + @Test + void unhealthy() { + serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY); + + healthChecker.check(); + + verify(context, timeout(1000).times(1)).updateHealth(eq(GrpcHealthChecker.UNHEALTHY), + any(ClientRequestContext.class), eq(null), eq(null)); + } + + @Test + void exception() { + serverExtension.setAction(HealthGrpcServerExtension.Action.DO_TIMEOUT); + + healthChecker.check(); + + verify(context, timeout(1000).times(1)).updateHealth(eq(GrpcHealthChecker.UNHEALTHY), + any(ClientRequestContext.class), any(ResponseHeaders.class), throwableArgumentCaptor.capture()); + + final Throwable exception = throwableArgumentCaptor.getValue(); + assertThat(exception).isInstanceOf(StatusRuntimeException.class) + .hasMessageStartingWith("DEADLINE_EXCEEDED"); + } +}