From 4d95df53b97d51e3e5c141b172ae77ed81773214 Mon Sep 17 00:00:00 2001
From: Will Vuong <wvuong@gmail.com>
Date: Wed, 22 Jan 2025 09:39:31 -0500
Subject: [PATCH] Add GrpcHealthCheckedEndpointGroupBuilder

Motivation:

Add `GrpcHealthCheckedEndpointGroupBuilder` which builds a health checked endpoint group whose health comes from a [standard gRPC health check service result](https://grpc.io/docs/guides/health-checking/).

Modifications:

* Adds `GrpcHealthCheckedEndpointGroupBuilder` which extends `AbstractHealthCheckedEndpointGroupBuilder` and creates a new health check function
* Adds `GrpcHealthChecker` which is the health check function that creates and uses a gRPC `HealthGrpc` stub to check the gRPC health service on the endpoint. If the health check response is `SERVING`, it is healthy. It is unhealthy if the response is not `SERVING` or if there was a request failure.
* Adds tests.

Result:

* A user can create a health checked endpoint group that is backed by a gRPC health check service.
* Closes #5930

Closes
---
 ...GrpcHealthCheckedEndpointGroupBuilder.java |  79 +++++++++++
 .../endpoint/healthcheck/package-info.java    |  23 ++++
 .../client/grpc/GrpcHealthChecker.java        | 125 ++++++++++++++++++
 ...HealthCheckedEndpointGroupBuilderTest.java |  60 +++++++++
 .../grpc/HealthGrpcServerExtension.java       |  86 ++++++++++++
 .../client/grpc/GrpcHealthCheckerTest.java    | 101 ++++++++++++++
 6 files changed, 474 insertions(+)
 create mode 100644 grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java
 create mode 100644 grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java
 create mode 100644 grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java
 create mode 100644 grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java
 create mode 100644 grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java
 create mode 100644 grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java

diff --git a/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java
new file mode 100644
index 00000000000..70974656493
--- /dev/null
+++ b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilder.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2025 LY Corporation
+ *
+ * LY Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Function;
+
+import com.linecorp.armeria.client.endpoint.EndpointGroup;
+import com.linecorp.armeria.client.endpoint.healthcheck.AbstractHealthCheckedEndpointGroupBuilder;
+import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
+import com.linecorp.armeria.common.annotation.Nullable;
+import com.linecorp.armeria.common.util.AsyncCloseable;
+import com.linecorp.armeria.internal.client.grpc.GrpcHealthChecker;
+
+/**
+ * Builds a health checked endpoint group whose health comes from a standard gRPC health check service.
+ */
+public final class GrpcHealthCheckedEndpointGroupBuilder
+        extends AbstractHealthCheckedEndpointGroupBuilder<GrpcHealthCheckedEndpointGroupBuilder> {
+
+    private @Nullable String service;
+
+    GrpcHealthCheckedEndpointGroupBuilder(EndpointGroup delegate) {
+        super(delegate);
+    }
+
+    /**
+     * Returns a {@link GrpcHealthCheckedEndpointGroupBuilder} that builds a health checked
+     * endpoint group with the specified {@link EndpointGroup}.
+     */
+    public static GrpcHealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate) {
+        return new GrpcHealthCheckedEndpointGroupBuilder(requireNonNull(delegate));
+    }
+
+    /**
+     * Sets the optional service field of the gRPC health check request.
+     */
+    public GrpcHealthCheckedEndpointGroupBuilder service(@Nullable String service) {
+        this.service = service;
+        return this;
+    }
+
+    @Override
+    protected Function<? super HealthCheckerContext, ? extends AsyncCloseable> newCheckerFactory() {
+        return new GrpcHealthCheckerFactory(service);
+    }
+
+    private static final class GrpcHealthCheckerFactory
+            implements Function<HealthCheckerContext, AsyncCloseable> {
+
+        private final @Nullable String service;
+
+        private GrpcHealthCheckerFactory(@Nullable String service) {
+            this.service = service;
+        }
+
+        @Override
+        public AsyncCloseable apply(HealthCheckerContext ctx) {
+            final GrpcHealthChecker healthChecker = new GrpcHealthChecker(ctx, ctx.endpoint(),
+                    ctx.protocol(), service);
+            healthChecker.start();
+            return healthChecker;
+        }
+    }
+}
diff --git a/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java
new file mode 100644
index 00000000000..7148050ae69
--- /dev/null
+++ b/grpc/src/main/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2025 LY Corporation
+ *
+ * LY Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * gRPC health checked endpoint.
+ */
+@NonNullByDefault
+package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
+
+import com.linecorp.armeria.common.annotation.NonNullByDefault;
diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java
new file mode 100644
index 00000000000..faef1d1d40f
--- /dev/null
+++ b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthChecker.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2025 LY Corporation
+ *
+ * LY Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linecorp.armeria.internal.client.grpc;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.linecorp.armeria.client.ClientRequestContext;
+import com.linecorp.armeria.client.ClientRequestContextCaptor;
+import com.linecorp.armeria.client.Clients;
+import com.linecorp.armeria.client.Endpoint;
+import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
+import com.linecorp.armeria.client.grpc.GrpcClients;
+import com.linecorp.armeria.common.ResponseHeaders;
+import com.linecorp.armeria.common.SessionProtocol;
+import com.linecorp.armeria.common.annotation.Nullable;
+import com.linecorp.armeria.common.util.AsyncCloseable;
+import com.linecorp.armeria.common.util.AsyncCloseableSupport;
+import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
+
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.stub.StreamObserver;
+
+public final class GrpcHealthChecker implements AsyncCloseable {
+
+    static final double HEALTHY = 1d;
+    static final double UNHEALTHY = 0d;
+
+    private final HealthCheckerContext ctx;
+    @Nullable private final String service;
+    private final HealthGrpc.HealthStub stub;
+
+    private final ReentrantLock lock = new ReentrantShortLock();
+    private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);
+
+    public GrpcHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, SessionProtocol sessionProtocol,
+                             @Nullable String service) {
+        this.ctx = ctx;
+        this.service = service;
+
+        this.stub = GrpcClients.builder(sessionProtocol, endpoint)
+                .options(ctx.clientOptions())
+                .build(HealthGrpc.HealthStub.class);
+    }
+
+    public void start() {
+        check();
+    }
+
+    @VisibleForTesting
+    void check() {
+        lock();
+        try {
+            final HealthCheckRequest.Builder builder = HealthCheckRequest.newBuilder();
+            if (this.service != null) {
+                builder.setService(service);
+            }
+
+            try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) {
+                stub.check(builder.build(), new StreamObserver<HealthCheckResponse>() {
+                    @Override
+                    public void onNext(HealthCheckResponse healthCheckResponse) {
+                        final ClientRequestContext reqCtx = reqCtxCaptor.get();
+                        if (healthCheckResponse.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
+                            ctx.updateHealth(HEALTHY, reqCtx, null, null);
+                        } else {
+                            ctx.updateHealth(UNHEALTHY, reqCtx, null, null);
+                        }
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        final ClientRequestContext reqCtx = reqCtxCaptor.get();
+                        ctx.updateHealth(UNHEALTHY, reqCtx, ResponseHeaders.of(500), throwable);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                    }
+                });
+            }
+        } finally {
+            unlock();
+        }
+    }
+
+    @Override
+    public CompletableFuture<?> closeAsync() {
+        return closeable.closeAsync();
+    }
+
+    private synchronized void closeAsync(CompletableFuture<?> future) {
+        future.complete(null);
+    }
+
+    @Override
+    public void close() {
+        closeable.close();
+    }
+
+    private void lock() {
+        lock.lock();
+    }
+
+    private void unlock() {
+        lock.unlock();
+    }
+}
diff --git a/grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java b/grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java
new file mode 100644
index 00000000000..7d1f145aa47
--- /dev/null
+++ b/grpc/src/test/java/com/linecorp/armeria/client/grpc/endpoint/healthcheck/GrpcHealthCheckedEndpointGroupBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2025 LY Corporation
+ *
+ * LY Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
+import com.linecorp.armeria.common.SessionProtocol;
+import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension;
+
+class GrpcHealthCheckedEndpointGroupBuilderTest {
+
+    @RegisterExtension
+    private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension();
+
+    @Test
+    public void hasHealthyEndpoint() {
+        serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY);
+
+        final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
+                .builder(serverExtension.endpoint(SessionProtocol.H2C))
+                .build();
+
+        assertThat(endpointGroup.whenReady().join()).hasSize(1);
+    }
+
+    @Test
+    public void empty() throws Exception {
+        serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY);
+
+        final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
+                .builder(serverExtension.endpoint(SessionProtocol.H2C))
+                .build();
+
+        assertThatThrownBy(() -> {
+            // whenReady() will timeout because there are no healthy endpoints
+            endpointGroup.whenReady().get(1, TimeUnit.SECONDS);
+        }).isInstanceOf(TimeoutException.class);
+    }
+}
diff --git a/grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java b/grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java
new file mode 100644
index 00000000000..45d152f2d5e
--- /dev/null
+++ b/grpc/src/test/java/com/linecorp/armeria/common/grpc/HealthGrpcServerExtension.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2025 LY Corporation
+ *
+ * LY Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linecorp.armeria.common.grpc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.TextFormat;
+
+import com.linecorp.armeria.server.ServerBuilder;
+import com.linecorp.armeria.server.grpc.GrpcService;
+import com.linecorp.armeria.testing.junit5.server.ServerExtension;
+
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.stub.StreamObserver;
+
+public class HealthGrpcServerExtension extends ServerExtension {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HealthGrpcServerExtension.class);
+
+    private static final HealthCheckResponse HEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
+            .setStatus(HealthCheckResponse.ServingStatus.SERVING)
+            .build();
+
+    private static final HealthCheckResponse UNHEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
+            .setStatus(HealthCheckResponse.ServingStatus.NOT_SERVING)
+            .build();
+
+    public enum Action {
+        DO_HEALTHY, DO_UNHEALTHY, DO_TIMEOUT
+    }
+
+    private Action action;
+
+    @Override
+    protected void configure(ServerBuilder sb) throws Exception {
+        final GrpcService grpcService = GrpcService.builder()
+                .addService(new HealthGrpc.HealthImplBase() {
+                    @Override
+                    public void check(HealthCheckRequest request,
+                                      StreamObserver<HealthCheckResponse> responseObserver) {
+                        LOGGER.debug("Received health check response {}", TextFormat.shortDebugString(request));
+
+                        if (action == Action.DO_HEALTHY) {
+                            responseObserver.onNext(HEALTHY_HEALTH_CHECK_RESPONSE);
+                            responseObserver.onCompleted();
+                        } else if (action == Action.DO_UNHEALTHY) {
+                            responseObserver.onNext(UNHEALTHY_HEALTH_CHECK_RESPONSE);
+                            responseObserver.onCompleted();
+                        } else if (action == Action.DO_TIMEOUT) {
+                            LOGGER.debug("Not sending a response...");
+                        }
+
+                        LOGGER.debug("Completed health check response");
+                    }
+
+                    @Override
+                    public void watch(HealthCheckRequest request,
+                                      StreamObserver<HealthCheckResponse> responseObserver) {
+                        throw new UnsupportedOperationException();
+                    }
+                })
+                .build();
+
+        sb.service(grpcService);
+    }
+
+    public void setAction(Action action) {
+        this.action = action;
+    }
+}
diff --git a/grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java b/grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java
new file mode 100644
index 00000000000..d935e789f2d
--- /dev/null
+++ b/grpc/src/test/java/com/linecorp/armeria/internal/client/grpc/GrpcHealthCheckerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2025 LY Corporation
+ *
+ * LY Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linecorp.armeria.internal.client.grpc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.linecorp.armeria.client.ClientOptions;
+import com.linecorp.armeria.client.ClientRequestContext;
+import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
+import com.linecorp.armeria.common.ResponseHeaders;
+import com.linecorp.armeria.common.SessionProtocol;
+import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension;
+
+import io.grpc.StatusRuntimeException;
+
+@ExtendWith(MockitoExtension.class)
+class GrpcHealthCheckerTest {
+
+    @RegisterExtension
+    private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension();
+
+    @Mock
+    private HealthCheckerContext context;
+
+    @Captor
+    private ArgumentCaptor<Throwable> throwableArgumentCaptor;
+
+    private GrpcHealthChecker healthChecker;
+
+    @BeforeEach
+    void setUp() {
+        when(context.clientOptions())
+                .thenReturn(ClientOptions.builder().responseTimeout(Duration.ofMillis(500)).build());
+
+        healthChecker = new GrpcHealthChecker(context, serverExtension.endpoint(SessionProtocol.H2C),
+                SessionProtocol.H2C, null);
+    }
+
+    @Test
+    void healthy() {
+        serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY);
+
+        healthChecker.check();
+
+        verify(context, timeout(1000).times(1)).updateHealth(eq(GrpcHealthChecker.HEALTHY),
+                any(ClientRequestContext.class), eq(null), eq(null));
+    }
+
+    @Test
+    void unhealthy() {
+        serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY);
+
+        healthChecker.check();
+
+        verify(context, timeout(1000).times(1)).updateHealth(eq(GrpcHealthChecker.UNHEALTHY),
+                any(ClientRequestContext.class), eq(null), eq(null));
+    }
+
+    @Test
+    void exception() {
+        serverExtension.setAction(HealthGrpcServerExtension.Action.DO_TIMEOUT);
+
+        healthChecker.check();
+
+        verify(context, timeout(1000).times(1)).updateHealth(eq(GrpcHealthChecker.UNHEALTHY),
+                any(ClientRequestContext.class), any(ResponseHeaders.class), throwableArgumentCaptor.capture());
+
+        final Throwable exception = throwableArgumentCaptor.getValue();
+        assertThat(exception).isInstanceOf(StatusRuntimeException.class)
+                .hasMessageStartingWith("DEADLINE_EXCEEDED");
+    }
+}