From 43bce825d43b7b2fa20e5285e6498a1e5fc816f7 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 4 Feb 2024 15:11:08 +0800 Subject: [PATCH 1/6] refactor: rm unused code --- .../com/google/protobuf/ByteStringHelper.java | 42 ----------- .../com/google/protobuf/BytesStealer.java | 73 ------------------- .../greptime/common/signal/SignalHelper.java | 2 +- 3 files changed, 1 insertion(+), 116 deletions(-) delete mode 100644 ingester-common/src/main/java/com/google/protobuf/BytesStealer.java diff --git a/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java b/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java index d4c403c..0e46bc8 100644 --- a/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java +++ b/ingester-common/src/main/java/com/google/protobuf/ByteStringHelper.java @@ -15,9 +15,6 @@ */ package com.google.protobuf; -import java.io.IOException; -import java.nio.ByteBuffer; - /** * A {@code ByteString} helper, avoid some memory copying to improve performance. * @@ -31,43 +28,4 @@ public class ByteStringHelper { public static ByteString wrap(byte[] bs) { return ByteString.wrap(bs); } - - /** - * Wrap a byte array into a ByteString. - * - * @param bs the byte array - * @param offset read start offset in array - * @param len read data length - * @return the result byte string. - */ - public static ByteString wrap(byte[] bs, int offset, int len) { - return ByteString.wrap(bs, offset, len); - } - - /** - * Wrap a byte buffer into a ByteString. - */ - public static ByteString wrap(ByteBuffer buf) { - return ByteString.wrap(buf); - } - - /** - * Steal the byte[] from {@link ByteString}, if failed, - * then call {@link ByteString#toByteArray()}. - * - * @param byteString the byteString source data - * @return carried bytes - */ - public static byte[] sealByteArray(ByteString byteString) { - BytesStealer stealer = new BytesStealer(); - try { - byteString.writeTo(stealer); - if (stealer.isValid()) { - return stealer.value(); - } - } catch (IOException ignored) { - // ignored - } - return byteString.toByteArray(); - } } diff --git a/ingester-common/src/main/java/com/google/protobuf/BytesStealer.java b/ingester-common/src/main/java/com/google/protobuf/BytesStealer.java deleted file mode 100644 index 0e2f0f4..0000000 --- a/ingester-common/src/main/java/com/google/protobuf/BytesStealer.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.protobuf; - -import java.nio.ByteBuffer; - -/** - * Try to get the bytes form ByteString with no copy. - * - * @author jiachun.fjc - */ -public class BytesStealer extends ByteOutput { - - private byte[] value; - private boolean valid = false; - - public byte[] value() { - return value; - } - - public boolean isValid() { - return valid; - } - - @Override - public void write(byte value) { - this.valid = false; - } - - @Override - public void write(byte[] value, int offset, int length) { - doWrite(value, offset, length); - } - - @Override - public void writeLazy(byte[] value, int offset, int length) { - doWrite(value, offset, length); - } - - @Override - public void write(ByteBuffer value) { - this.valid = false; - } - - @Override - public void writeLazy(ByteBuffer value) { - this.valid = false; - } - - private void doWrite(byte[] value, int offset, int length) { - if (this.value != null) { - this.valid = false; - return; - } - if (offset == 0 && length == value.length) { - this.value = value; - this.valid = true; - } - } -} diff --git a/ingester-common/src/main/java/io/greptime/common/signal/SignalHelper.java b/ingester-common/src/main/java/io/greptime/common/signal/SignalHelper.java index ef84b37..f6b6350 100644 --- a/ingester-common/src/main/java/io/greptime/common/signal/SignalHelper.java +++ b/ingester-common/src/main/java/io/greptime/common/signal/SignalHelper.java @@ -104,7 +104,7 @@ public void handle(sun.misc.Signal signal) { h.handle(signal.getName()); } } catch (Throwable t) { - LOG.error("Fail to handle signal: {}.", signal, t); + LOG.error("Failed to handle signal: {}.", signal, t); } } } From 7269307b85ce6052aa9cb70bee7d66e6e6166588 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 4 Feb 2024 15:24:16 +0800 Subject: [PATCH 2/6] refactor: java doc --- .../java/io/greptime/common/util/StringBuilderHelper.java | 8 +------- .../src/main/java/io/greptime/common/util/Strings.java | 8 ++++++++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/ingester-common/src/main/java/io/greptime/common/util/StringBuilderHelper.java b/ingester-common/src/main/java/io/greptime/common/util/StringBuilderHelper.java index 89af4ca..74cec9c 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/StringBuilderHelper.java +++ b/ingester-common/src/main/java/io/greptime/common/util/StringBuilderHelper.java @@ -22,14 +22,13 @@ * * @author jiachun.fjc */ -@SuppressWarnings("unused") public class StringBuilderHelper { private static final int MAX_BUF_SIZE; private static final ThreadLocal HOLDER_THREAD_LOCAL; static { - MAX_BUF_SIZE = 1024 << 3; // 8k + MAX_BUF_SIZE = 1024 << 3; // 8k HOLDER_THREAD_LOCAL = ThreadLocal.withInitial(StringBuilderHolder::new); } @@ -38,11 +37,6 @@ public static StringBuilder get() { return holder.getStringBuilder(); } - public static void truncate() { - StringBuilderHolder holder = HOLDER_THREAD_LOCAL.get(); - holder.truncate(); - } - private static class StringBuilderHolder { private StringBuilder buf = new StringBuilder(); diff --git a/ingester-common/src/main/java/io/greptime/common/util/Strings.java b/ingester-common/src/main/java/io/greptime/common/util/Strings.java index aeb6e2c..79bd9af 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/Strings.java +++ b/ingester-common/src/main/java/io/greptime/common/util/Strings.java @@ -57,11 +57,13 @@ public static boolean isNullOrEmpty(String str) { /** * Checks if a string is whitespace, empty ("") or null. *

+ * ``` java * Strings.isBlank(null) = true * Strings.isBlank("") = true * Strings.isBlank(" ") = true * Strings.isBlank("bob") = false * Strings.isBlank(" bob ") = false + * ``` */ public static boolean isBlank(String str) { int strLen; @@ -78,11 +80,13 @@ public static boolean isBlank(String str) { /** * Checks if a string is not empty (""), not null and not whitespace only. *

+ * ``` java * Strings.isNotBlank(null) = false * Strings.isNotBlank("") = false * Strings.isNotBlank(" ") = false * Strings.isNotBlank("bob") = true * Strings.isNotBlank(" bob ") = true + * ``` */ public static boolean isNotBlank(String str) { return !isBlank(str); @@ -93,12 +97,14 @@ public static boolean isNotBlank(String str) { *

* A null input String returns null. *

+ * ``` java * Strings.split(null, *) = null * Strings.split("", *) = [] * Strings.split("a.b.c", '.') = ["a", "b", "c"] * Strings.split("a..b.c", '.') = ["a", "b", "c"] * Strings.split("a:b:c", '.') = ["a:b:c"] * Strings.split("a b c", ' ') = ["a", "b", "c"] + * ``` */ public static String[] split(String str, char separator) { return split(str, separator, false); @@ -111,6 +117,7 @@ public static String[] split(String str, char separator) { *

* A null input String returns null. *

+ * ``` java * Strings.split(null, *, true) = null * Strings.split("", *, true) = [] * Strings.split("a.b.c", '.', true) = ["a", "b", "c"] @@ -122,6 +129,7 @@ public static String[] split(String str, char separator) { * Strings.split(" a b c", ' ', true) = ["", a", "b", "c"] * Strings.split(" a b c", ' ', true) = ["", "", a", "b", "c"] * Strings.split(" a b c ", ' ', true) = ["", a", "b", "c", ""] + * ``` */ public static String[] split(String str, char separator, boolean preserveAllTokens) { if (str == null) { From c700b2c7185185a7c21b07ab020b994984ab9e53 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 4 Feb 2024 15:27:58 +0800 Subject: [PATCH 3/6] refactor: remove unused code --- .../io/greptime/common/util/SystemPropertyUtil.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/ingester-common/src/main/java/io/greptime/common/util/SystemPropertyUtil.java b/ingester-common/src/main/java/io/greptime/common/util/SystemPropertyUtil.java index df929f4..4f9e9dd 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/SystemPropertyUtil.java +++ b/ingester-common/src/main/java/io/greptime/common/util/SystemPropertyUtil.java @@ -25,19 +25,10 @@ * parse the values of the Java system properties. * */ -@SuppressWarnings("unused") public final class SystemPropertyUtil { private static final Logger LOG = LoggerFactory.getLogger(SystemPropertyUtil.class); - /** - * Returns {@code true} if and only if the system property - * with the specified {@code key} exists. - */ - public static boolean contains(String key) { - return get(key) != null; - } - /** * Returns the value of the Java system property with the * specified {@code key}, while falling back to {@code null} @@ -155,6 +146,7 @@ public static int getInt(String key, int def) { * {@code def} if there's no such property or if access to * the specified property is not allowed. */ + @SuppressWarnings("unused") public static long getLong(String key, long def) { String value = get(key); if (value == null) { @@ -178,6 +170,7 @@ public static long getLong(String key, long def) { * Sets the value of the Java system property with the * specified {@code key} */ + @SuppressWarnings("unused") public static Object setProperty(String key, String value) { return System.getProperties().setProperty(key, value); } From 3df9fe29b66144ba3288c45fda113f8c26be55e8 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 4 Feb 2024 15:53:24 +0800 Subject: [PATCH 4/6] refactor: rpc module --- .../main/java/io/greptime/rpc/GrpcClient.java | 94 +++--------- .../main/java/io/greptime/RouterClient.java | 7 +- .../main/java/io/greptime/rpc/RpcClient.java | 141 ++++++------------ .../io/greptime/rpc/RpcFactoryProvider.java | 2 +- .../rpc/errors/ConnectFailException.java | 17 +-- .../rpc/errors/InvokeTimeoutException.java | 45 ------ .../rpc/errors/RemotingException.java | 44 ------ 7 files changed, 70 insertions(+), 280 deletions(-) delete mode 100644 ingester-rpc/src/main/java/io/greptime/rpc/errors/InvokeTimeoutException.java delete mode 100644 ingester-rpc/src/main/java/io/greptime/rpc/errors/RemotingException.java diff --git a/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java b/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java index 04aba7d..26b734a 100644 --- a/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java +++ b/ingester-grpc/src/main/java/io/greptime/rpc/GrpcClient.java @@ -34,9 +34,7 @@ import io.greptime.common.util.SystemPropertyUtil; import io.greptime.common.util.ThreadPoolUtil; import io.greptime.rpc.errors.ConnectFailException; -import io.greptime.rpc.errors.InvokeTimeoutException; import io.greptime.rpc.errors.OnlyErrorMessage; -import io.greptime.rpc.errors.RemotingException; import io.greptime.rpc.interceptors.ClientRequestLimitInterceptor; import io.greptime.rpc.interceptors.ContextToHeadersInterceptor; import io.greptime.rpc.interceptors.MetricInterceptor; @@ -60,7 +58,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -68,7 +65,6 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -153,17 +149,6 @@ public void shutdownGracefully() { closeAllChannels(); } - @Override - public boolean checkConnection(Endpoint endpoint) { - return checkConnection(endpoint, false); - } - - @Override - public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) { - Ensures.ensureNonNull(endpoint, "null `endpoint`"); - return checkChannel(endpoint, createIfAbsent); - } - @Override public void closeConnection(Endpoint endpoint) { Ensures.ensureNonNull(endpoint, "null `endpoint`"); @@ -176,41 +161,12 @@ public void registerConnectionObserver(ConnectionObserver observer) { } @Override - public Resp invokeSync(Endpoint endpoint, Req request, Context ctx, long timeoutMs) - throws RemotingException { - long timeout = calcTimeout(timeoutMs); - CompletableFuture future = new CompletableFuture<>(); - - invokeAsync(endpoint, request, ctx, new Observer() { - - @Override - public void onNext(Resp value) { - future.complete(value); - } - - @Override - public void onError(Throwable err) { - future.completeExceptionally(err); - } - }, timeout); - - try { - return future.get(timeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - future.cancel(true); - throw new InvokeTimeoutException(e); - } catch (Throwable t) { - future.cancel(true); - throw new RemotingException(t); - } - } - - @Override - public void invokeAsync(Endpoint endpoint, - Req request, - Context ctx, - Observer observer, - long timeoutMs) { + public void invokeAsync( + Endpoint endpoint, + Req request, + Context ctx, + Observer observer, + long timeoutMs) { checkArgs(endpoint, request, ctx, observer); ContextToHeadersInterceptor.setCurrentCtx(ctx); @@ -274,10 +230,11 @@ private long onReceived(boolean onError) { } @Override - public void invokeServerStreaming(Endpoint endpoint, - Req request, - Context ctx, - Observer observer) { + public void invokeServerStreaming( + Endpoint endpoint, + Req request, + Context ctx, + Observer observer) { checkArgs(endpoint, request, ctx, observer); ContextToHeadersInterceptor.setCurrentCtx(ctx); @@ -323,10 +280,11 @@ public void onCompleted() { } @Override - public Observer invokeClientStreaming(Endpoint endpoint, - Req defaultReqIns, - Context ctx, - Observer respObserver) { + public Observer invokeClientStreaming( + Endpoint endpoint, + Req defaultReqIns, + Context ctx, + Observer respObserver) { checkArgs(endpoint, defaultReqIns, ctx, respObserver); ContextToHeadersInterceptor.setCurrentCtx(ctx); @@ -508,16 +466,6 @@ private void closeChannel(Endpoint endpoint) { } } - private boolean checkChannel(Endpoint endpoint, boolean createIfAbsent) { - ManagedChannel ch = getChannel(endpoint, createIfAbsent); - - if (ch == null) { - return false; - } - - return checkConnectivity(endpoint, ch); - } - private boolean checkConnectivity(Endpoint endpoint, ManagedChannel ch) { ConnectivityState st = ch.getState(false); @@ -583,7 +531,7 @@ private MethodDescriptor getCallMethod(Object request, MethodD } private Channel getCheckedChannel(Endpoint endpoint, Consumer onFailed) { - ManagedChannel ch = getChannel(endpoint, true); + ManagedChannel ch = this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel); if (checkConnectivity(endpoint, ch)) { return ch; @@ -594,14 +542,6 @@ private Channel getCheckedChannel(Endpoint endpoint, Consumer onFaile return null; } - private ManagedChannel getChannel(Endpoint endpoint, boolean createIfAbsent) { - if (createIfAbsent) { - return this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel); - } else { - return this.managedChannelPool.get(endpoint); - } - } - private IdChannel newChannel(Endpoint endpoint) { ManagedChannel innerChannel = NettyChannelBuilder // .forAddress(endpoint.getAddr(), endpoint.getPort()) // diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index d44bb92..ed00da0 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -24,7 +24,6 @@ import io.greptime.rpc.Context; import io.greptime.rpc.Observer; import io.greptime.rpc.RpcClient; -import io.greptime.rpc.errors.RemotingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; @@ -134,7 +133,7 @@ public void onError(Throwable err) { } }, timeoutMs); - } catch (RemotingException e) { + } catch (Exception e) { future.completeExceptionally(e); } @@ -155,7 +154,7 @@ public void onError(Throwable err) { public void invokeServerStreaming(Endpoint endpoint, Req request, Context ctx, Observer observer) { try { this.rpcClient.invokeServerStreaming(endpoint, request, ctx, observer); - } catch (RemotingException e) { + } catch (Exception e) { observer.onError(e); } } @@ -175,7 +174,7 @@ public Observer invokeClientStreaming(Endpoint endpoint, Req de Observer respObserver) { try { return this.rpcClient.invokeClientStreaming(endpoint, defaultReqIns, ctx, respObserver); - } catch (RemotingException e) { + } catch (Exception e) { respObserver.onError(e); return new Observer.RejectedObserver<>(e); } diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/RpcClient.java b/ingester-rpc/src/main/java/io/greptime/rpc/RpcClient.java index d39b52b..68af38f 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/RpcClient.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/RpcClient.java @@ -18,38 +18,20 @@ import io.greptime.common.Display; import io.greptime.common.Endpoint; import io.greptime.common.Lifecycle; -import io.greptime.rpc.errors.RemotingException; /** * A common RPC client interface. * * @author jiachun.fjc */ -@SuppressWarnings("unused") public interface RpcClient extends Lifecycle, Display { /** - * Check connection for given address. - * - * @param endpoint target address - * @return true if there is a connection adn the connection is active adn writable. - */ - boolean checkConnection(Endpoint endpoint); - - /** - * Check connection for given address and async to create a new one if there is no connection. - * - * @param endpoint target address - * @param createIfAbsent create a new one if there is no connection - * @return true if there is a connection and the connection is active and writable. - */ - boolean checkConnection(Endpoint endpoint, boolean createIfAbsent); - - /** - * Close all connections of an address. + * Closes all connections of an address. * * @param endpoint target address */ + @SuppressWarnings("unused") void closeConnection(Endpoint endpoint); /** @@ -87,103 +69,76 @@ interface ConnectionObserver { void onShutdown(Endpoint ep); } - /** - * Executes a synchronous call. - * - * @param endpoint target address - * @param request request object - * @param timeoutMs timeout millisecond - * @param request message type - * @param response message type - * @return response - */ - default Resp invokeSync(Endpoint endpoint, - Req request, - long timeoutMs) throws RemotingException { - return invokeSync(endpoint, request, null, timeoutMs); - } - - /** - * Executes a synchronous call using an invoke context. - * - * @param endpoint target address - * @param request request object - * @param ctx invoke context - * @param timeoutMs timeout millisecond - * @param request message type - * @param response message type - * @return response - */ - Resp invokeSync(Endpoint endpoint, - Req request, - Context ctx, - long timeoutMs) throws RemotingException; - /** * Executes an asynchronous call with a response {@link Observer}. * - * @param endpoint target address - * @param request request object - * @param observer response observer - * @param timeoutMs timeout millisecond - * @param request message type - * @param response message type + * @param endpoint the target address + * @param request the request object + * @param observer the response observer + * @param timeoutMs timeout with millisecond + * @param the request message type + * @param the response message type */ - default void invokeAsync(Endpoint endpoint, - Req request, - Observer observer, - long timeoutMs) throws RemotingException { + @SuppressWarnings("unused") + default void invokeAsync( + Endpoint endpoint, + Req request, + Observer observer, + long timeoutMs) { invokeAsync(endpoint, request, null, observer, timeoutMs); } /** * Executes an asynchronous call with a response {@link Observer}. * - * @param endpoint target address - * @param request request object - * @param ctx invoke context - * @param observer response observer - * @param timeoutMs timeout millisecond - * @param request message type - * @param response message type + * @param endpoint the target address + * @param request the request object + * @param ctx the invoke context + * @param observer the response observer + * @param timeoutMs timeout with millisecond + * @param the request message type + * @param the response message type */ - void invokeAsync(Endpoint endpoint, - Req request, - Context ctx, - Observer observer, - long timeoutMs) throws RemotingException; + void invokeAsync( + Endpoint endpoint, + Req request, + Context ctx, + Observer observer, + long timeoutMs); /** * Executes a server-streaming call with a response {@link Observer}. *

* One request message followed by zero or more response messages. * - * @param endpoint target address - * @param request request object - * @param ctx invoke context - * @param observer response stream observer - * @param request message type - * @param response message type + * @param endpoint the target address + * @param request the request object + * @param ctx the invoke context + * @param observer the response stream observer + * @param the request message type + * @param the response message type */ - void invokeServerStreaming(Endpoint endpoint, - Req request, - Context ctx, - Observer observer) throws RemotingException; + void invokeServerStreaming( + Endpoint endpoint, + Req request, + Context ctx, + Observer observer); /** * Executes a client-streaming call with a request {@link Observer} * and a response {@link Observer}. * - * @param endpoint target address + * @param endpoint the target address * @param defaultReqIns the default request instance - * @param ctx invoke context - * @param respObserver response stream observer - * @param request message type - * @param response message type + * @param ctx the invoke context + * @param respObserver the response stream observer + * @param the request message type + * @param the response message type * @return request {@link Observer}. */ - Observer invokeClientStreaming(Endpoint endpoint, - Req defaultReqIns, - Context ctx, - Observer respObserver) throws RemotingException; + Observer invokeClientStreaming( + Endpoint endpoint, + Req defaultReqIns, + Context ctx, + Observer respObserver); } diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/RpcFactoryProvider.java b/ingester-rpc/src/main/java/io/greptime/rpc/RpcFactoryProvider.java index dd1f97c..929c508 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/RpcFactoryProvider.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/RpcFactoryProvider.java @@ -30,7 +30,7 @@ public class RpcFactoryProvider { private static final RpcFactory RPC_FACTORY = ServiceLoader.load(RpcFactory.class).first(); /** - * Get the {@link RpcFactory} impl, base on SPI. + * Gets the {@link RpcFactory} impl, base on SPI. * * @return a shared rpcFactory instance */ diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/errors/ConnectFailException.java b/ingester-rpc/src/main/java/io/greptime/rpc/errors/ConnectFailException.java index f5e84e3..03e67a2 100644 --- a/ingester-rpc/src/main/java/io/greptime/rpc/errors/ConnectFailException.java +++ b/ingester-rpc/src/main/java/io/greptime/rpc/errors/ConnectFailException.java @@ -20,26 +20,11 @@ * * @author jiachun.fjc */ -@SuppressWarnings("unused") -public class ConnectFailException extends RemotingException { +public class ConnectFailException extends RuntimeException { private static final long serialVersionUID = 3129127065579018606L; - public ConnectFailException() {} - public ConnectFailException(String message) { super(message); } - - public ConnectFailException(String message, Throwable cause) { - super(message, cause); - } - - public ConnectFailException(Throwable cause) { - super(cause); - } - - public ConnectFailException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } } diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/errors/InvokeTimeoutException.java b/ingester-rpc/src/main/java/io/greptime/rpc/errors/InvokeTimeoutException.java deleted file mode 100644 index d3becb0..0000000 --- a/ingester-rpc/src/main/java/io/greptime/rpc/errors/InvokeTimeoutException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime.rpc.errors; - -/** - * Invoke timeout error. - * - * @author jiachun.fjc - */ -@SuppressWarnings("unused") -public class InvokeTimeoutException extends RemotingException { - - private static final long serialVersionUID = -4710810309766380565L; - - public InvokeTimeoutException() {} - - public InvokeTimeoutException(String message) { - super(message); - } - - public InvokeTimeoutException(String message, Throwable cause) { - super(message, cause); - } - - public InvokeTimeoutException(Throwable cause) { - super(cause); - } - - public InvokeTimeoutException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/ingester-rpc/src/main/java/io/greptime/rpc/errors/RemotingException.java b/ingester-rpc/src/main/java/io/greptime/rpc/errors/RemotingException.java deleted file mode 100644 index 4f61a07..0000000 --- a/ingester-rpc/src/main/java/io/greptime/rpc/errors/RemotingException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime.rpc.errors; - -/** - * Exception for default remoting problems. - * - * @author jiachun.fjc - */ -public class RemotingException extends Exception { - - private static final long serialVersionUID = -6326244159775972292L; - - public RemotingException() {} - - public RemotingException(String message) { - super(message); - } - - public RemotingException(String message, Throwable cause) { - super(message, cause); - } - - public RemotingException(Throwable cause) { - super(cause); - } - - public RemotingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} From 86f79c484e4f12a32491d952c70691acfb045922 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 4 Feb 2024 15:59:14 +0800 Subject: [PATCH 5/6] refactor: protocol module's unused code --- .../java/io/greptime/errors/LimitedException.java | 11 ----------- .../java/io/greptime/errors/PojoException.java | 11 ----------- .../java/io/greptime/errors/ServerException.java | 15 --------------- .../java/io/greptime/errors/StreamException.java | 15 --------------- 4 files changed, 52 deletions(-) diff --git a/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java b/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java index 425c63a..bfe5725 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/LimitedException.java @@ -20,26 +20,15 @@ * * @author jiachun.fjc */ -@SuppressWarnings("unused") public class LimitedException extends RuntimeException { private static final long serialVersionUID = -1L; - public LimitedException() {} - public LimitedException(String message) { super(message); } - public LimitedException(String message, Throwable cause) { - super(message, cause); - } - public LimitedException(Throwable cause) { super(cause); } - - public LimitedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } } diff --git a/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java b/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java index f120f96..0577f31 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/PojoException.java @@ -18,24 +18,13 @@ /** * @author jiachun.fjc */ -@SuppressWarnings("unused") public class PojoException extends RuntimeException { - public PojoException() {} - public PojoException(String message) { super(message); } - public PojoException(String message, Throwable cause) { - super(message, cause); - } - public PojoException(Throwable cause) { super(cause); } - - public PojoException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } } diff --git a/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java b/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java index ee04767..8ddb74d 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/ServerException.java @@ -18,26 +18,11 @@ /** * @author jiachun.fjc */ -@SuppressWarnings("unused") public class ServerException extends RuntimeException { private static final long serialVersionUID = -1L; - public ServerException() {} - public ServerException(String message) { super(message); } - - public ServerException(String message, Throwable cause) { - super(message, cause); - } - - public ServerException(Throwable cause) { - super(cause); - } - - public ServerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } } diff --git a/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java b/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java index d00efe0..a0b861a 100644 --- a/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java +++ b/ingester-protocol/src/main/java/io/greptime/errors/StreamException.java @@ -20,26 +20,11 @@ * * @author jiachun.fjc */ -@SuppressWarnings("unused") public class StreamException extends RuntimeException { private static final long serialVersionUID = -1L; - public StreamException() {} - public StreamException(String message) { super(message); } - - public StreamException(String message, Throwable cause) { - super(message, cause); - } - - public StreamException(Throwable cause) { - super(cause); - } - - public StreamException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } } From a8ed400adab49417779682bc42335c280e1ca245 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 4 Feb 2024 16:38:20 +0800 Subject: [PATCH 6/6] refactor: protocol module --- .../src/main/java/io/greptime/Status.java | 6 ++- .../greptime/models/IntervalMonthDayNano.java | 12 ----- .../java/io/greptime/models/RowHelper.java | 30 ++++++------ .../models/{Util.java => ValueUtil.java} | 2 +- .../greptime/signal/MetricsSignalHandler.java | 2 +- .../greptime/limit/InFlightLimiterTest.java | 46 +++++++++++++++++ .../{UtilTest.java => ValueUtilTest.java} | 49 +++++++++++++------ 7 files changed, 100 insertions(+), 47 deletions(-) rename ingester-protocol/src/main/java/io/greptime/models/{Util.java => ValueUtil.java} (99%) create mode 100644 ingester-protocol/src/test/java/io/greptime/limit/InFlightLimiterTest.java rename ingester-protocol/src/test/java/io/greptime/models/{UtilTest.java => ValueUtilTest.java} (57%) diff --git a/ingester-protocol/src/main/java/io/greptime/Status.java b/ingester-protocol/src/main/java/io/greptime/Status.java index 141902a..702a0a9 100644 --- a/ingester-protocol/src/main/java/io/greptime/Status.java +++ b/ingester-protocol/src/main/java/io/greptime/Status.java @@ -62,7 +62,10 @@ public enum Status { // TableColumnNotFound(4002), // TableColumnExists(4003), // - DatabaseNotFound(4004), RegionNotFound(4005), RegionAlreadyExists(4006), RegionReadonly(4007), + DatabaseNotFound(4004), // + RegionNotFound(4005), // + RegionAlreadyExists(4006), // + RegionReadonly(4007), // ====== End of catalog related status code ======= // ====== Begin of storage related status code ===== @@ -125,7 +128,6 @@ public int getStatusCode() { /** * Returns {@code true} if the status code is {@link #Success}. */ - @SuppressWarnings("unused") public static boolean isSuccess(int statusCode) { return statusCode == Success.getStatusCode(); } diff --git a/ingester-protocol/src/main/java/io/greptime/models/IntervalMonthDayNano.java b/ingester-protocol/src/main/java/io/greptime/models/IntervalMonthDayNano.java index 4c84eac..69e09d9 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/IntervalMonthDayNano.java +++ b/ingester-protocol/src/main/java/io/greptime/models/IntervalMonthDayNano.java @@ -32,18 +32,6 @@ public IntervalMonthDayNano(int months, int days, long nanoseconds) { this.nanoseconds = nanoseconds; } - public int getMonths() { - return months; - } - - public int getDays() { - return days; - } - - public long getNanoseconds() { - return nanoseconds; - } - @Override public Common.IntervalMonthDayNano into() { return Common.IntervalMonthDayNano.newBuilder() // diff --git a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java index 9aaf9ff..046a025 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java @@ -47,7 +47,7 @@ public static void addValue(RowData.Row.Builder builder, // valueBuilder.setI32Value((int) value); break; case INT64: - valueBuilder.setI64Value(Util.getLongValue(value)); + valueBuilder.setI64Value(ValueUtil.getLongValue(value)); break; case UINT8: valueBuilder.setU8Value((int) value); @@ -59,7 +59,7 @@ public static void addValue(RowData.Row.Builder builder, // valueBuilder.setU32Value((int) value); break; case UINT64: - valueBuilder.setU64Value(Util.getLongValue(value)); + valueBuilder.setU64Value(ValueUtil.getLongValue(value)); break; case FLOAT32: valueBuilder.setF32Value(((Number) value).floatValue()); @@ -77,46 +77,46 @@ public static void addValue(RowData.Row.Builder builder, // valueBuilder.setStringValue((String) value); break; case DATE: - valueBuilder.setDateValue(Util.getDateValue(value)); + valueBuilder.setDateValue(ValueUtil.getDateValue(value)); break; case DATETIME: - valueBuilder.setDatetimeValue(Util.getDateTimeValue(value)); + valueBuilder.setDatetimeValue(ValueUtil.getDateTimeValue(value)); break; case TIMESTAMP_SECOND: - valueBuilder.setTimestampSecondValue(Util.getLongValue(value)); + valueBuilder.setTimestampSecondValue(ValueUtil.getLongValue(value)); break; case TIMESTAMP_MILLISECOND: - valueBuilder.setTimestampMillisecondValue(Util.getLongValue(value)); + valueBuilder.setTimestampMillisecondValue(ValueUtil.getLongValue(value)); break; case TIMESTAMP_MICROSECOND: - valueBuilder.setTimestampMicrosecondValue(Util.getLongValue(value)); + valueBuilder.setTimestampMicrosecondValue(ValueUtil.getLongValue(value)); break; case TIMESTAMP_NANOSECOND: - valueBuilder.setTimestampNanosecondValue(Util.getLongValue(value)); + valueBuilder.setTimestampNanosecondValue(ValueUtil.getLongValue(value)); break; case TIME_SECOND: - valueBuilder.setTimeSecondValue(Util.getLongValue(value)); + valueBuilder.setTimeSecondValue(ValueUtil.getLongValue(value)); break; case TIME_MILLISECOND: - valueBuilder.setTimeMillisecondValue(Util.getLongValue(value)); + valueBuilder.setTimeMillisecondValue(ValueUtil.getLongValue(value)); break; case TIME_MICROSECOND: - valueBuilder.setTimeMicrosecondValue(Util.getLongValue(value)); + valueBuilder.setTimeMicrosecondValue(ValueUtil.getLongValue(value)); break; case TIME_NANOSECOND: - valueBuilder.setTimeNanosecondValue(Util.getLongValue(value)); + valueBuilder.setTimeNanosecondValue(ValueUtil.getLongValue(value)); break; case INTERVAL_YEAR_MONTH: valueBuilder.setIntervalYearMonthValue((int) value); break; case INTERVAL_DAY_TIME: - valueBuilder.setIntervalDayTimeValue(Util.getLongValue(value)); + valueBuilder.setIntervalDayTimeValue(ValueUtil.getLongValue(value)); break; case INTERVAL_MONTH_DAY_NANO: - valueBuilder.setIntervalMonthDayNanoValue(Util.getIntervalMonthDayNanoValue(value)); + valueBuilder.setIntervalMonthDayNanoValue(ValueUtil.getIntervalMonthDayNanoValue(value)); break; case DECIMAL128: - valueBuilder.setDecimal128Value(Util.getDecimal128Value(dataTypeExtension, value)); + valueBuilder.setDecimal128Value(ValueUtil.getDecimal128Value(dataTypeExtension, value)); break; default: throw new IllegalArgumentException(String.format("Unsupported `data_type`: %s", dataType)); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Util.java b/ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java similarity index 99% rename from ingester-protocol/src/main/java/io/greptime/models/Util.java rename to ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java index de801f5..20d7db0 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java @@ -27,7 +27,7 @@ /** * @author jiachun.fjc */ -public class Util { +public class ValueUtil { static int ONE_DAY_IN_SECONDS = 86400; diff --git a/ingester-protocol/src/main/java/io/greptime/signal/MetricsSignalHandler.java b/ingester-protocol/src/main/java/io/greptime/signal/MetricsSignalHandler.java index 63bec4f..0c96685 100644 --- a/ingester-protocol/src/main/java/io/greptime/signal/MetricsSignalHandler.java +++ b/ingester-protocol/src/main/java/io/greptime/signal/MetricsSignalHandler.java @@ -50,7 +50,7 @@ public void handle(String signalName) { try { File file = FileOutputHelper.getOutputFile(BASE_NAME); - LOG.info("Printing GreptimeDB client metrics triggered by signal: {} to file: {}.", signalName, + LOG.info("Printing GreptimeDB clients metrics triggered by signal: {} to file: {}.", signalName, file.getAbsoluteFile()); try (PrintStream out = new PrintStream(new FileOutputStream(file, true))) { diff --git a/ingester-protocol/src/test/java/io/greptime/limit/InFlightLimiterTest.java b/ingester-protocol/src/test/java/io/greptime/limit/InFlightLimiterTest.java new file mode 100644 index 0000000..edace9d --- /dev/null +++ b/ingester-protocol/src/test/java/io/greptime/limit/InFlightLimiterTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime.limit; + +import org.junit.Assert; +import org.junit.Test; + +/** + * @author jiachun.fjc + */ +public class InFlightLimiterTest { + + @Test + public void testAcquire() { + InFlightLimiter limiter = new InFlightLimiter(10, "test"); + Assert.assertEquals(10, limiter.maxPermits()); + limiter.acquire(1); + Assert.assertEquals(9, limiter.availablePermits()); + limiter.release(1); + Assert.assertEquals(10, limiter.availablePermits()); + } + + @Test + public void testTryAcquire() { + InFlightLimiter limiter = new InFlightLimiter(10, "test"); + Assert.assertEquals(10, limiter.maxPermits()); + Assert.assertTrue(limiter.tryAcquire(1)); + Assert.assertEquals(9, limiter.availablePermits()); + limiter.release(1); + Assert.assertEquals(10, limiter.availablePermits()); + Assert.assertFalse(limiter.tryAcquire(11)); + } +} diff --git a/ingester-protocol/src/test/java/io/greptime/models/UtilTest.java b/ingester-protocol/src/test/java/io/greptime/models/ValueUtilTest.java similarity index 57% rename from ingester-protocol/src/test/java/io/greptime/models/UtilTest.java rename to ingester-protocol/src/test/java/io/greptime/models/ValueUtilTest.java index 90873e5..af11761 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/UtilTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/ValueUtilTest.java @@ -30,16 +30,16 @@ /** * @author jiachun.fjc */ -public class UtilTest { +public class ValueUtilTest { @Test public void testGetLongValue() { - Assert.assertEquals(1L, Util.getLongValue(1)); - Assert.assertEquals(1L, Util.getLongValue(1L)); - Assert.assertEquals(1L, Util.getLongValue(1.0)); - Assert.assertEquals(1L, Util.getLongValue(1.0f)); - Assert.assertEquals(1L, Util.getLongValue(BigInteger.valueOf(1))); - Assert.assertEquals(1L, Util.getLongValue(BigDecimal.valueOf(1))); + Assert.assertEquals(1L, ValueUtil.getLongValue(1)); + Assert.assertEquals(1L, ValueUtil.getLongValue(1L)); + Assert.assertEquals(1L, ValueUtil.getLongValue(1.0)); + Assert.assertEquals(1L, ValueUtil.getLongValue(1.0f)); + Assert.assertEquals(1L, ValueUtil.getLongValue(BigInteger.valueOf(1))); + Assert.assertEquals(1L, ValueUtil.getLongValue(BigDecimal.valueOf(1))); } @Test @@ -48,10 +48,10 @@ public void testGetDateValue() { TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT"); cal.setTimeZone(gmtTimeZone); cal.set(1970, Calendar.JANUARY, 2); - Assert.assertEquals(1, Util.getDateValue(cal.getTime())); - Assert.assertEquals(1, Util.getDateValue(Instant.ofEpochSecond(86400))); - Assert.assertEquals(1, Util.getDateValue(LocalDate.ofEpochDay(1))); - Assert.assertEquals(1, Util.getDateValue(1)); + Assert.assertEquals(1, ValueUtil.getDateValue(cal.getTime())); + Assert.assertEquals(1, ValueUtil.getDateValue(Instant.ofEpochSecond(86400))); + Assert.assertEquals(1, ValueUtil.getDateValue(LocalDate.ofEpochDay(1))); + Assert.assertEquals(1, ValueUtil.getDateValue(1)); } @Test @@ -61,10 +61,27 @@ public void testGetDateTimeValue() { cal.setTimeZone(gmtTimeZone); cal.set(1970, Calendar.JANUARY, 2, 0, 0, 0); cal.set(Calendar.MILLISECOND, 111); - Assert.assertEquals(86400111, Util.getDateTimeValue(cal.getTime())); - Assert.assertEquals(86400111, Util.getDateTimeValue(cal.getTime().toInstant())); - Assert.assertEquals(86400000, Util.getDateTimeValue(Instant.ofEpochSecond(86400))); - Assert.assertEquals(86400, Util.getDateTimeValue(86400)); + Assert.assertEquals(86400111, ValueUtil.getDateTimeValue(cal.getTime())); + Assert.assertEquals(86400111, ValueUtil.getDateTimeValue(cal.getTime().toInstant())); + Assert.assertEquals(86400000, ValueUtil.getDateTimeValue(Instant.ofEpochSecond(86400))); + Assert.assertEquals(86400, ValueUtil.getDateTimeValue(86400)); + } + + @Test + public void testGetIntervalMonthDayNanoValue() { + Common.IntervalMonthDayNano result = ValueUtil.getIntervalMonthDayNanoValue(new IntervalMonthDayNano(1, 2, 3)); + Assert.assertEquals(1, result.getMonths()); + Assert.assertEquals(2, result.getDays()); + Assert.assertEquals(3, result.getNanoseconds()); + + // test invalid type + try { + ValueUtil.getIntervalMonthDayNanoValue(1); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Expected type: `IntervalMonthDayNano`, actual: class java.lang.Integer", + e.getMessage()); + } } @Test @@ -84,7 +101,7 @@ public void testGetDecimal128Value() { BigInteger bigInt = BigInteger.valueOf(new Random().nextLong()).shiftLeft(64); bigInt = bigInt.add(BigInteger.valueOf(new Random().nextLong())); BigDecimal value = new BigDecimal(bigInt, scale); - Common.Decimal128 result = Util.getDecimal128Value(dataTypeExtension, value); + Common.Decimal128 result = ValueUtil.getDecimal128Value(dataTypeExtension, value); BigDecimal value2 = TestUtil.getDecimal(result, scale);