From 58d661b178b365706bd63ab61f7fa49f2914a9b2 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 12:02:26 +0800 Subject: [PATCH 01/10] chore: refactor GreptimeOptions --- .../src/main/java/io/greptime/GreptimeDB.java | 26 ++--- .../src/main/java/io/greptime/Router.java | 20 +++- .../main/java/io/greptime/RouterClient.java | 37 ++++--- .../main/java/io/greptime/WriteClient.java | 2 + .../io/greptime/options/GreptimeOptions.java | 46 ++++----- .../io/greptime/options/RouterOptions.java | 11 +++ .../io/greptime/options/WriteOptions.java | 2 +- .../greptime/options/GreptimeOptionsTest.java | 97 +++++++++++++++++++ 8 files changed, 179 insertions(+), 62 deletions(-) create mode 100644 ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 41b2f1a..c68a579 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -42,7 +42,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -70,7 +69,6 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle, private GreptimeOptions opts; private RouterClient routerClient; private WriteClient writeClient; - private Executor asyncPool; /** * Returns all instances of {@link GreptimeDB}. @@ -100,10 +98,7 @@ public boolean init(GreptimeOptions opts) { this.opts = GreptimeOptions.checkSelf(opts).copy(); this.routerClient = makeRouteClient(opts); - if (this.asyncPool != null) { - this.asyncPool = new MetricExecutor(this.asyncPool, "async_pool.time"); - } - this.writeClient = makeWriteClient(opts, this.routerClient, this.asyncPool); + this.writeClient = makeWriteClient(opts, this.routerClient); INSTANCES.put(this.id, this); @@ -184,8 +179,10 @@ public void display(Printer out) { .println(VERSION) // .print("endpoints=") // .println(this.opts.getEndpoints()) // - .print("userAsyncPool=") // - .println(this.opts.getAsyncPool()); + .print("database=") // + .println(this.opts.getDatabase()) // + .print("rpcOptions=") // + .println(this.opts.getRpcOptions()); if (this.routerClient != null) { out.println(""); @@ -208,7 +205,6 @@ public String toString() { ", opts=" + opts + // ", routerClient=" + routerClient + // ", writeClient=" + writeClient + // - ", asyncPool=" + asyncPool + // '}'; } @@ -222,7 +218,7 @@ private static RpcClient makeRpcClient(GreptimeOptions opts) { RpcOptions rpcOpts = opts.getRpcOptions(); RpcClient rpcClient = RpcFactoryProvider.getRpcFactory().createRpcClient(); if (!rpcClient.init(rpcOpts)) { - throw new IllegalStateException("Fail to start RPC client"); + throw new IllegalStateException("Start RPC client failed"); } rpcClient.registerConnectionObserver(new RpcConnectionObserver()); return rpcClient; @@ -233,21 +229,17 @@ private static RouterClient makeRouteClient(GreptimeOptions opts) { routerOpts.setRpcClient(makeRpcClient(opts)); RouterClient routerClient = new RouterClient(); if (!routerClient.init(routerOpts)) { - throw new IllegalStateException("Fail to start router client"); + throw new IllegalStateException("Start router client failed"); } return routerClient; } - private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient routerClient, Executor asyncPool) { + private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient routerClient) { WriteOptions writeOpts = opts.getWriteOptions(); writeOpts.setRouterClient(routerClient); - writeOpts.setAsyncPool(asyncPool); WriteClient writeClient = new WriteClient(); - if (opts.getAuthInfo() != null) { - writeOpts.setAuthInfo(opts.getAuthInfo()); - } if (!writeClient.init(writeOpts)) { - throw new IllegalStateException("Fail to start write client"); + throw new IllegalStateException("Start write client failed"); } return writeClient; } diff --git a/ingester-protocol/src/main/java/io/greptime/Router.java b/ingester-protocol/src/main/java/io/greptime/Router.java index 928ef84..8d13315 100644 --- a/ingester-protocol/src/main/java/io/greptime/Router.java +++ b/ingester-protocol/src/main/java/io/greptime/Router.java @@ -15,6 +15,7 @@ */ package io.greptime; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -22,7 +23,7 @@ * * @author jiachun.fjc */ -public interface Router { +public interface Router { /** * For a given request return the routing decision for the call. @@ -30,5 +31,20 @@ public interface Router { * @param request route request * @return a endpoint for the call */ - CompletableFuture routeFor(Req request); + CompletableFuture routeFor(R request); + + /** + * Refresh the routing table from remote server. + * @return a future that will be completed when the refresh is done + */ + CompletableFuture refresh(); + + /** + * Refresh the routing table. + * We need to get all the endpoints, and this method will overwrite all + * current endpoints. + * + * @param endpoints all new endpoints + */ + void onRefresh(List endpoints); } diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index 5836127..d44bb92 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -49,7 +49,7 @@ public class RouterClient implements Lifecycle, Display { private ScheduledExecutorService refresher; private RouterOptions opts; private RpcClient rpcClient; - private InnerRouter inner; + private Router router; @Override public boolean init(RouterOptions opts) { @@ -58,14 +58,20 @@ public boolean init(RouterOptions opts) { List endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`"); - this.inner = new InnerRouter(); - this.inner.refreshLocal(endpoints); + this.router = new DefaultRouter(); + this.router.onRefresh(endpoints); long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( - () -> this.inner.refreshFromRemote(), + () -> this.router.refresh().whenComplete((r, e) -> { + if (e != null) { + LOG.error("Router cache refresh failed.", e); + } else { + LOG.debug("Router cache refresh {}.", r ? "success" : "failed"); + } + }), Util.randomInitialDelay(180), refreshPeriod, TimeUnit.SECONDS); LOG.info("Router cache refresher started."); @@ -90,7 +96,7 @@ public void shutdownGracefully() { * Get the current routing table. */ public CompletableFuture route() { - return this.inner.routeFor(null); + return this.router.routeFor(null); } /** @@ -211,18 +217,10 @@ public String toString() { * the client send request using a rr or random policy, and frontend server needs to * be able to return the member list for the purpose of frontend server members change. */ - private static class InnerRouter implements Router { + private static class DefaultRouter implements Router { private final AtomicReference> endpointsRef = new AtomicReference<>(); - public void refreshFromRemote() { - // TODO - } - - void refreshLocal(List input) { - this.endpointsRef.set(input); - } - @Override public CompletableFuture routeFor(Void request) { List endpoints = this.endpointsRef.get(); @@ -230,5 +228,16 @@ public CompletableFuture routeFor(Void request) { int i = random.nextInt(0, endpoints.size()); return Util.completedCf(endpoints.get(i)); } + + @Override + public CompletableFuture refresh() { + // always return true + return Util.completedCf(true); + } + + @Override + public void onRefresh(List endpoints) { + this.endpointsRef.set(endpoints); + } } } diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index bc4e73d..83732b2 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -24,6 +24,7 @@ import io.greptime.common.Lifecycle; import io.greptime.common.util.Clock; import io.greptime.common.util.Ensures; +import io.greptime.common.util.MetricExecutor; import io.greptime.common.util.MetricsUtil; import io.greptime.common.util.SerializingExecutor; import io.greptime.errors.LimitedException; @@ -69,6 +70,7 @@ public boolean init(WriteOptions opts) { this.routerClient = this.opts.getRouterClient(); Executor pool = this.opts.getAsyncPool(); this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client"); + this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time"); this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy()); return true; } diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 739debf..4200dd1 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -15,6 +15,7 @@ */ package io.greptime.options; +import io.greptime.Router; import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.common.util.Ensures; @@ -35,12 +36,10 @@ */ public class GreptimeOptions implements Copiable { private List endpoints; - private Executor asyncPool; private RpcOptions rpcOptions; private RouterOptions routerOptions; private WriteOptions writeOptions; private String database; - private AuthInfo authInfo; public List getEndpoints() { return endpoints; @@ -50,14 +49,6 @@ public void setEndpoints(List endpoints) { this.endpoints = endpoints; } - public Executor getAsyncPool() { - return asyncPool; - } - - public void setAsyncPool(Executor asyncPool) { - this.asyncPool = asyncPool; - } - public RpcOptions getRpcOptions() { return rpcOptions; } @@ -90,21 +81,11 @@ public void setDatabase(String database) { this.database = database; } - public AuthInfo getAuthInfo() { - return authInfo; - } - - public void setAuthInfo(AuthInfo authInfo) { - this.authInfo = authInfo; - } - @Override public GreptimeOptions copy() { GreptimeOptions opts = new GreptimeOptions(); opts.endpoints = new ArrayList<>(this.endpoints); - opts.asyncPool = this.asyncPool; opts.database = this.database; - opts.authInfo = this.authInfo; if (this.rpcOptions != null) { opts.rpcOptions = this.rpcOptions.copy(); } @@ -121,12 +102,10 @@ public GreptimeOptions copy() { public String toString() { return "GreptimeOptions{" + // "endpoints=" + endpoints + // - ", asyncPool=" + asyncPool + // ", rpcOptions=" + rpcOptions + // ", routerOptions=" + routerOptions + // ", writeOptions=" + writeOptions + // ", database='" + database + '\'' + // - ", authInfo=" + authInfo + // '}'; } @@ -174,6 +153,8 @@ public static final class Builder { private long routeTableRefreshPeriodSeconds = -1; // Authentication information private AuthInfo authInfo; + // The request router + private Router router; public Builder(List endpoints, String database) { this.endpoints.addAll(endpoints); @@ -273,6 +254,16 @@ public Builder authInfo(AuthInfo authInfo) { return this; } + /** + * Sets the request router. + * @param router the request router + * @return this builder + */ + public Builder router(Router router) { + this.router = router; + return this; + } + /** * A good start, happy coding. * @@ -281,23 +272,22 @@ public Builder authInfo(AuthInfo authInfo) { public GreptimeOptions build() { GreptimeOptions opts = new GreptimeOptions(); opts.setEndpoints(this.endpoints); - opts.setAsyncPool(this.asyncPool); opts.setRpcOptions(this.rpcOptions); opts.setDatabase(this.database); - opts.setAuthInfo(this.authInfo); - opts.setRouterOptions(createRouterOptions()); - opts.setWriteOptions(createWriteOptions()); + opts.setRouterOptions(routerOptions()); + opts.setWriteOptions(writeOptions()); return GreptimeOptions.checkSelf(opts); } - private RouterOptions createRouterOptions() { + private RouterOptions routerOptions() { RouterOptions routerOpts = new RouterOptions(); routerOpts.setEndpoints(this.endpoints); + routerOpts.setRouter(this.router); routerOpts.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds); return routerOpts; } - private WriteOptions createWriteOptions() { + private WriteOptions writeOptions() { WriteOptions writeOpts = new WriteOptions(); writeOpts.setDatabase(this.database); writeOpts.setAuthInfo(this.authInfo); diff --git a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java index e978b7e..3caab4f 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java @@ -15,6 +15,7 @@ */ package io.greptime.options; +import io.greptime.Router; import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.rpc.RpcClient; @@ -34,6 +35,7 @@ public class RouterOptions implements Copiable { // all route tables periodically. If the value is less than or // equal to 0, the route tables will not be refreshed. private long refreshPeriodSeconds = -1; + private Router router; public RpcClient getRpcClient() { return rpcClient; @@ -59,12 +61,21 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) { this.refreshPeriodSeconds = refreshPeriodSeconds; } + public Router getRouter() { + return router; + } + + public void setRouter(Router router) { + this.router = router; + } + @Override public RouterOptions copy() { RouterOptions opts = new RouterOptions(); opts.rpcClient = rpcClient; opts.endpoints = this.endpoints; opts.refreshPeriodSeconds = this.refreshPeriodSeconds; + opts.router = this.router; return opts; } diff --git a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java index 5624dbb..cdfd7de 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java @@ -118,9 +118,9 @@ public WriteOptions copy() { @Override public String toString() { + // Do not print auto info return "WriteOptions{" + // "database='" + database + '\'' + // - ", authInfo=" + authInfo + // ", routerClient=" + routerClient + // ", asyncPool=" + asyncPool + // ", maxRetries=" + maxRetries + // diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java new file mode 100644 index 0000000..9644cbe --- /dev/null +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime.options; + +import io.greptime.Router; +import io.greptime.limit.LimitedPolicy; +import io.greptime.models.AuthInfo; +import io.greptime.rpc.RpcOptions; +import org.junit.Assert; +import org.junit.Test; +import io.greptime.common.Endpoint; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * @author jiachun.fjc + */ +public class GreptimeOptionsTest { + + @Test + public void testAllOptions() { + String database = "public"; + String[] endpoints = {"127.0.0.1:4001"}; + Executor asyncPool = command -> System.out.println("asyncPool"); + RpcOptions rpcOptions = RpcOptions.newDefault(); + int writeMaxRetries = 2; + int maxInFlightWriteRows = 999; + LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy(); + int defaultStreamMaxWritePointsPerSecond = 100000; + long routeTableRefreshPeriodSeconds = 99; + AuthInfo authInfo = new AuthInfo("user", "password"); + Router router = createTestRouter(); + + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + .asyncPool(asyncPool) // + .rpcOptions(rpcOptions) // + .writeMaxRetries(writeMaxRetries) // + .maxInFlightWriteRows(maxInFlightWriteRows) // + .writeLimitedPolicy(limitedPolicy) // + .defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) // + .routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) // + .authInfo(authInfo) // + .router(router) // + .build(); + + Assert.assertEquals(database, opts.getDatabase()); + Assert.assertArrayEquals(endpoints, opts.getEndpoints().stream().map(Endpoint::toString).toArray()); + Assert.assertEquals(rpcOptions, opts.getRpcOptions()); + + RouterOptions routerOptions = opts.getRouterOptions(); + Assert.assertNotNull(routerOptions); + Assert.assertArrayEquals(endpoints, routerOptions.getEndpoints().stream().map(Endpoint::toString).toArray()); + Assert.assertEquals(router, routerOptions.getRouter()); + Assert.assertEquals(routeTableRefreshPeriodSeconds, routerOptions.getRefreshPeriodSeconds()); + + WriteOptions writeOptions = opts.getWriteOptions(); + Assert.assertNotNull(writeOptions); + Assert.assertEquals(asyncPool, writeOptions.getAsyncPool()); + Assert.assertEquals(writeMaxRetries, writeOptions.getMaxRetries()); + Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWriteRows()); + Assert.assertEquals(limitedPolicy, writeOptions.getLimitedPolicy()); + Assert.assertEquals(defaultStreamMaxWritePointsPerSecond, writeOptions.getDefaultStreamMaxWritePointsPerSecond()); + Assert.assertEquals(authInfo, writeOptions.getAuthInfo()); + } + + private Router createTestRouter() { + return new Router() { + + @Override + public CompletableFuture routeFor(Void request) { + return null; + } + + @Override + public CompletableFuture refresh() { + return null; + } + + @Override + public void onRefresh(List endpoints) {} + }; + } +} From 7ba59725f4b8cea8fa8bcc2ac7f0506b854d338e Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 12:04:52 +0800 Subject: [PATCH 02/10] chore: v0.5.0 --- ingester-all/pom.xml | 2 +- ingester-common/pom.xml | 2 +- ingester-example/pom.xml | 2 +- ingester-grpc/pom.xml | 2 +- ingester-protocol/pom.xml | 2 +- ingester-protocol/src/main/resources/client_version.properties | 2 +- ingester-protocol/src/test/java/io/greptime/UtilTest.java | 2 +- ingester-rpc/pom.xml | 2 +- pom.xml | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ingester-all/pom.xml b/ingester-all/pom.xml index 8d36bed..61cf624 100644 --- a/ingester-all/pom.xml +++ b/ingester-all/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-all diff --git a/ingester-common/pom.xml b/ingester-common/pom.xml index 430654d..de32dfe 100644 --- a/ingester-common/pom.xml +++ b/ingester-common/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-common diff --git a/ingester-example/pom.xml b/ingester-example/pom.xml index 0454d9d..7ec06e0 100644 --- a/ingester-example/pom.xml +++ b/ingester-example/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-example diff --git a/ingester-grpc/pom.xml b/ingester-grpc/pom.xml index fc55419..05198f0 100644 --- a/ingester-grpc/pom.xml +++ b/ingester-grpc/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-grpc diff --git a/ingester-protocol/pom.xml b/ingester-protocol/pom.xml index f3c0ae6..e46555b 100644 --- a/ingester-protocol/pom.xml +++ b/ingester-protocol/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-protocol diff --git a/ingester-protocol/src/main/resources/client_version.properties b/ingester-protocol/src/main/resources/client_version.properties index 2777b47..269f518 100644 --- a/ingester-protocol/src/main/resources/client_version.properties +++ b/ingester-protocol/src/main/resources/client_version.properties @@ -1 +1 @@ -client.version=0.4.0 +client.version=0.5.0 diff --git a/ingester-protocol/src/test/java/io/greptime/UtilTest.java b/ingester-protocol/src/test/java/io/greptime/UtilTest.java index ce6f6a3..3d58056 100644 --- a/ingester-protocol/src/test/java/io/greptime/UtilTest.java +++ b/ingester-protocol/src/test/java/io/greptime/UtilTest.java @@ -26,6 +26,6 @@ public class UtilTest { @Test public void testClientVersion() { String ver = Util.clientVersion(); - Assert.assertEquals("0.4.0", ver); + Assert.assertEquals("0.5.0", ver); } } diff --git a/ingester-rpc/pom.xml b/ingester-rpc/pom.xml index cb4f191..57d7411 100644 --- a/ingester-rpc/pom.xml +++ b/ingester-rpc/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-rpc diff --git a/pom.xml b/pom.xml index 9571137..6ce821a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.greptime greptimedb-ingester - 0.4.0 + 0.5.0 pom ${project.groupId}:${project.artifactId} From 736dc11327b1eef009b2394f335d588ebea93e65 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 12:11:52 +0800 Subject: [PATCH 03/10] chore: require database name on start --- .../src/main/java/io/greptime/StreamWritePOJOsQuickStart.java | 4 ++-- .../src/main/java/io/greptime/StreamWriteQuickStart.java | 4 ++-- .../src/main/java/io/greptime/WritePOJOsQuickStart.java | 4 ++-- .../src/main/java/io/greptime/WriteQuickStart.java | 4 ++-- .../src/main/java/io/greptime/options/GreptimeOptions.java | 3 +++ .../test/java/io/greptime/options/GreptimeOptionsTest.java | 2 +- 6 files changed, 12 insertions(+), 9 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index ee743e1..3eb4c74 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -34,8 +34,8 @@ public class StreamWritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; + // GreptimeDB has a default database named "greptime.public", we can use it as the test database + String database = "greptime.public"; // By default, GreptimeDB listens on port 4001 using the gRPC protocol. // We can provide multiple endpoints that point to the same GreptimeDB cluster. // The client will make calls to these endpoints based on a load balancing strategy. diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index 38c5b05..4db5efe 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -37,8 +37,8 @@ public class StreamWriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; + // GreptimeDB has a default database named "greptime.public", we can use it as the test database + String database = "greptime.public"; // By default, GreptimeDB listens on port 4001 using the gRPC protocol. // We can provide multiple endpoints that point to the same GreptimeDB cluster. // The client will make calls to these endpoints based on a load balancing strategy. diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index 7c20d16..de98623 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -37,8 +37,8 @@ public class WritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; + // GreptimeDB has a default database named "greptime.public", we can use it as the test database + String database = "greptime.public"; // By default, GreptimeDB listens on port 4001 using the gRPC protocol. // We can provide multiple endpoints that point to the same GreptimeDB cluster. // The client will make calls to these endpoints based on a load balancing strategy. diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 6b6d6d0..85e94f4 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -42,8 +42,8 @@ public class WriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; + // GreptimeDB has a default database named "greptime.public", we can use it as the test database + String database = "greptime.public"; // By default, GreptimeDB listens on port 4001 using the gRPC protocol. // We can provide multiple endpoints that point to the same GreptimeDB cluster. // The client will make calls to these endpoints based on a load balancing strategy. diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 4200dd1..237a76d 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -19,6 +19,7 @@ import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.common.util.Ensures; +import io.greptime.common.util.Strings; import io.greptime.limit.LimitedPolicy; import io.greptime.models.AuthInfo; import io.greptime.rpc.RpcOptions; @@ -113,6 +114,8 @@ public static GreptimeOptions checkSelf(GreptimeOptions opts) { Ensures.ensureNonNull(opts, "null `opts (GreptimeOptions)`)`"); Ensures.ensureNonNull(opts.getEndpoints(), "null `endpoints`"); Ensures.ensure(!opts.getEndpoints().isEmpty(), "empty `endpoints`"); + Ensures.ensure(Strings.isNotBlank(opts.getDatabase()), + "`database` can not be empty, we can use the default database of GreptimeDB: `greptime.public`"); Ensures.ensureNonNull(opts.getRpcOptions(), "null `rpcOptions`"); Ensures.ensureNonNull(opts.getRouterOptions(), "null `routerOptions`"); Ensures.ensureNonNull(opts.getWriteOptions(), "null `writeOptions`"); diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index 9644cbe..3252acb 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -33,7 +33,7 @@ public class GreptimeOptionsTest { @Test public void testAllOptions() { - String database = "public"; + String database = "greptime.public"; String[] endpoints = {"127.0.0.1:4001"}; Executor asyncPool = command -> System.out.println("asyncPool"); RpcOptions rpcOptions = RpcOptions.newDefault(); From ff40e7a1f0d1d0273a339f05441b922fe4f9df78 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 14:09:04 +0800 Subject: [PATCH 04/10] chore: require database name on start --- .../src/main/java/io/greptime/GreptimeDB.java | 13 +++++++++---- .../java/io/greptime/options/GreptimeOptions.java | 3 +-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index c68a579..bce4f1e 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -21,8 +21,8 @@ import io.greptime.common.Endpoint; import io.greptime.common.Lifecycle; import io.greptime.common.signal.SignalHandlersLoader; -import io.greptime.common.util.MetricExecutor; import io.greptime.common.util.MetricsUtil; +import io.greptime.common.util.Strings; import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.Table; @@ -82,6 +82,7 @@ public static GreptimeDB create(GreptimeOptions opts) { if (!greptimeDB.init(opts)) { throw new RuntimeException("Failed to start GreptimeDB client"); } + LOG.info("GreptimeDB client started: {}", greptimeDB); return greptimeDB; } @@ -97,6 +98,10 @@ public boolean init(GreptimeOptions opts) { this.opts = GreptimeOptions.checkSelf(opts).copy(); + if (Strings.isBlank(this.opts.getDatabase())) { + LOG.warn("The `database` is not specified, use default database: greptime.public"); + } + this.routerClient = makeRouteClient(opts); this.writeClient = makeWriteClient(opts, this.routerClient); @@ -218,7 +223,7 @@ private static RpcClient makeRpcClient(GreptimeOptions opts) { RpcOptions rpcOpts = opts.getRpcOptions(); RpcClient rpcClient = RpcFactoryProvider.getRpcFactory().createRpcClient(); if (!rpcClient.init(rpcOpts)) { - throw new IllegalStateException("Start RPC client failed"); + throw new IllegalStateException("Failed to start the RPC client"); } rpcClient.registerConnectionObserver(new RpcConnectionObserver()); return rpcClient; @@ -229,7 +234,7 @@ private static RouterClient makeRouteClient(GreptimeOptions opts) { routerOpts.setRpcClient(makeRpcClient(opts)); RouterClient routerClient = new RouterClient(); if (!routerClient.init(routerOpts)) { - throw new IllegalStateException("Start router client failed"); + throw new IllegalStateException("Failed to start the router client"); } return routerClient; } @@ -239,7 +244,7 @@ private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient ro writeOpts.setRouterClient(routerClient); WriteClient writeClient = new WriteClient(); if (!writeClient.init(writeOpts)) { - throw new IllegalStateException("Start write client failed"); + throw new IllegalStateException("Failed to start the write client failed"); } return writeClient; } diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 237a76d..cf8ac30 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -114,11 +114,10 @@ public static GreptimeOptions checkSelf(GreptimeOptions opts) { Ensures.ensureNonNull(opts, "null `opts (GreptimeOptions)`)`"); Ensures.ensureNonNull(opts.getEndpoints(), "null `endpoints`"); Ensures.ensure(!opts.getEndpoints().isEmpty(), "empty `endpoints`"); - Ensures.ensure(Strings.isNotBlank(opts.getDatabase()), - "`database` can not be empty, we can use the default database of GreptimeDB: `greptime.public`"); Ensures.ensureNonNull(opts.getRpcOptions(), "null `rpcOptions`"); Ensures.ensureNonNull(opts.getRouterOptions(), "null `routerOptions`"); Ensures.ensureNonNull(opts.getWriteOptions(), "null `writeOptions`"); + return opts; } From da9bec20875a6f9d9dbf5c2b3171998e9fb0dc43 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 14:10:28 +0800 Subject: [PATCH 05/10] chore: warn on empty database name not sets --- ingester-protocol/src/main/java/io/greptime/GreptimeDB.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index bce4f1e..9d8436f 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -80,7 +80,7 @@ public static List instances() { public static GreptimeDB create(GreptimeOptions opts) { GreptimeDB greptimeDB = new GreptimeDB(); if (!greptimeDB.init(opts)) { - throw new RuntimeException("Failed to start GreptimeDB client"); + throw new RuntimeException("Failed to start the GreptimeDB client"); } LOG.info("GreptimeDB client started: {}", greptimeDB); return greptimeDB; From 9de9b601ffe5937ee1bac7a6b335e40dfbc75a95 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 14:49:33 +0800 Subject: [PATCH 06/10] chore: add TestConnector --- .../greptime/StreamWritePOJOsQuickStart.java | 12 +-- .../io/greptime/StreamWriteQuickStart.java | 12 +-- .../main/java/io/greptime/TestConnector.java | 93 +++++++++++++++++++ .../io/greptime/WritePOJOsQuickStart.java | 12 +-- .../java/io/greptime/WriteQuickStart.java | 12 +-- .../src/main/java/io/greptime/GreptimeDB.java | 2 +- .../java/io/greptime/models/AuthInfo.java | 24 ++--- .../io/greptime/options/GreptimeOptions.java | 23 +++-- 8 files changed, 121 insertions(+), 69 deletions(-) create mode 100644 ingester-example/src/main/java/io/greptime/TestConnector.java diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index 3eb4c74..4412ee6 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -16,7 +16,6 @@ package io.greptime; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -34,16 +33,7 @@ public class StreamWritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); List myMetric1s = new ArrayList<>(); for (int i = 0; i < 10; i++) { diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index 4db5efe..dd9b2b3 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -20,7 +20,6 @@ import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -37,16 +36,7 @@ public class StreamWriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // .addColumn("tag1", SemanticType.Tag, DataType.String) // diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java new file mode 100644 index 0000000..c5ac637 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/TestConnector.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.common.util.SerializingExecutor; +import io.greptime.limit.LimitedPolicy; +import io.greptime.models.AuthInfo; +import io.greptime.options.GreptimeOptions; +import io.greptime.rpc.RpcOptions; + +/** + * @author jiachun.fjc + */ +public class TestConnector { + + public static GreptimeDB connectToDefaultDB() { + // GreptimeDB has a default database named "public" in the default catalog "greptime", + // we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + // Optional, the default value is fine. + // + // Asynchronous thread pool, which is used to handle various asynchronous + // tasks in the SDK (You are using a purely asynchronous SDK). If you do not + // set it, there will be a default implementation, which you can reconfigure + // if the default implementation is not satisfied. + // The default implementation is: `SerializingExecutor` + .asyncPool(new SerializingExecutor("asyncPool")) + // Optional, the default value is fine. + // + // Sets the RPC options, in general, the default configuration is fine. + .rpcOptions(RpcOptions.newDefault()) + // Optional, the default value is fine. + // + // In some case of failure, a retry of write can be attempted. + // The default is 1 + .writeMaxRetries(1) + // Optional, the default value is fine. + // + // Write flow limit: maximum number of data rows in-flight. It does not take effect on `StreamWriter` + // The default is 65536 + .maxInFlightWriteRows(65536) + // Optional, the default value is fine. + // + // Write flow limit: the policy to use when the write flow limit is exceeded. + // All options: + // - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. + // - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. + // - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. + // - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + // The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` + .writeLimitedPolicy(LimitedPolicy.defaultWriteLimitedPolicy()) + // Optional, the default value is fine. + // + // The default rate limit for stream writer. It only takes effect when we do not specify the + // `maxPointsPerSecond` when creating a `StreamWriter`. + // The default is 10 * 65536 + .defaultStreamMaxWritePointsPerSecond(10 * 65536) + // Optional, the default value is fine. + // + // Refresh frequency of route tables. The background refreshes all route tables + // periodically. By default, the route tables will not be refreshed. + .routeTableRefreshPeriodSeconds(-1) + // Optional, the default value is fine. + // + // Sets the request router, The internal default implementation works well. + // You don't need to set it unless you have special requirements. + .router(null) + // Sets authentication information. If the DB is not required to authenticate, we can ignore this. + .authInfo(AuthInfo.noAuthorization()) + // A good start ^_^ + .build(); + + return GreptimeDB.create(opts); + } +} diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index de98623..4a1ebee 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -18,7 +18,6 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -37,16 +36,7 @@ public class WritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); List myMetric1s = new ArrayList<>(); for (int i = 0; i < 10; i++) { diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 85e94f4..7d56223 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -22,7 +22,6 @@ import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -42,16 +41,7 @@ public class WriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // .addColumn("tag1", SemanticType.Tag, DataType.String) // diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 9d8436f..817fc0f 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -99,7 +99,7 @@ public boolean init(GreptimeOptions opts) { this.opts = GreptimeOptions.checkSelf(opts).copy(); if (Strings.isBlank(this.opts.getDatabase())) { - LOG.warn("The `database` is not specified, use default database: greptime.public"); + LOG.warn("The `database` is not specified, use default (catalog-database): greptime-public"); } this.routerClient = makeRouteClient(opts); diff --git a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java index bd01595..e5d27c4 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java +++ b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java @@ -25,8 +25,8 @@ */ public class AuthInfo implements Into { - private String username; - private String password; + private final String username; + private final String password; /** * Create AuthInfo from username/password. @@ -36,27 +36,15 @@ public AuthInfo(String username, String password) { this.password = password; } - public String getUsername() { - return this.username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return this.password; - } - - public void setPassword(String password) { - this.password = password; + public static AuthInfo noAuthorization() { + return null; } @Override public Common.AuthHeader into() { Common.Basic basic = Common.Basic.newBuilder() // - .setUsername(getUsername()) // - .setPassword(getPassword()) // + .setUsername(this.username) // + .setPassword(this.password) // .build(); return Common.AuthHeader.newBuilder() // .setBasic(basic) // diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index cf8ac30..310f602 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -19,7 +19,6 @@ import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.common.util.Ensures; -import io.greptime.common.util.Strings; import io.greptime.limit.LimitedPolicy; import io.greptime.models.AuthInfo; import io.greptime.rpc.RpcOptions; @@ -213,7 +212,13 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) { } /** - * Set write limited policy. + * Write flow limit: the policy to use when the write flow limit is exceeded. + * The options: + * - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. + * - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. + * - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. + * - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + * The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` * * @param writeLimitedPolicy write limited policy * @return this builder @@ -224,7 +229,10 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) { } /** - * The default rate limit for stream writer. + * The default rate limit for `StreamWriter`. It only takes effect when we do not specify the + * `maxPointsPerSecond` when creating a `StreamWriter`. + * The default is 10 * 65536 + * * @param defaultStreamMaxWritePointsPerSecond default max write points per second * @return this builder */ @@ -235,7 +243,7 @@ public Builder defaultStreamMaxWritePointsPerSecond(int defaultStreamMaxWritePoi /** * Refresh frequency of route tables. The background refreshes all route tables - * periodically. By default, all route tables are refreshed every 30 seconds. + * periodically. By default, By default, the route tables will not be refreshed. * * @param routeTableRefreshPeriodSeconds refresh period for route tables cache * @return this builder @@ -246,7 +254,8 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond } /** - * Sets authentication information. + * Sets authentication information. If the DB is not required to authenticate, + * we can ignore this. * * @param authInfo the authentication information * @return this builder @@ -257,7 +266,9 @@ public Builder authInfo(AuthInfo authInfo) { } /** - * Sets the request router. + * Sets the request router, The internal default implementation works well. + * You don't need to set it unless you have special requirements. + * * @param router the request router * @return this builder */ From 461588af2685148417f361291b59f86df2fa4831 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 17:42:49 +0800 Subject: [PATCH 07/10] chore: more friendly write api --- .../src/main/java/io/greptime/WritePOJOsQuickStart.java | 4 +--- .../src/main/java/io/greptime/WriteQuickStart.java | 4 +--- ingester-protocol/src/main/java/io/greptime/Write.java | 7 +++++++ ingester-protocol/src/main/java/io/greptime/WritePOJO.java | 7 +++++++ 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index 4a1ebee..1ae200e 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -65,12 +65,10 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc myMetric2s.add(m); } - List> pojos = Arrays.asList(myMetric1s, myMetric2s); - // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> puts = greptimeDB.writePOJOs(pojos); + CompletableFuture> puts = greptimeDB.writePOJOs(myMetric1s, myMetric2s); Result result = puts.get(); diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 7d56223..dc4d56f 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -88,12 +88,10 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc myMetric4.addRow(tag1v, tag2v, ts, field1, field2); } - Collection tables = Arrays.asList(myMetric3, myMetric4); - // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> future = greptimeDB.write(tables); + CompletableFuture> future = greptimeDB.write(myMetric3, myMetric4); Result result = future.get(); diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java index 853b3a1..3a18eec 100644 --- a/ingester-protocol/src/main/java/io/greptime/Write.java +++ b/ingester-protocol/src/main/java/io/greptime/Write.java @@ -21,6 +21,7 @@ import io.greptime.models.WriteOk; import io.greptime.rpc.Context; +import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,12 @@ * @author jiachun.fjc */ public interface Write { + /** + * @see #write(Collection, WriteOp, Context) + */ + default CompletableFuture> write(Table... tables) { + return write(Arrays.asList(tables), WriteOp.Insert, Context.newDefault()); + } /** * @see #write(Collection, WriteOp, Context) diff --git a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java index b683877..cf9e263 100644 --- a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java +++ b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java @@ -20,6 +20,7 @@ import io.greptime.models.WriteOk; import io.greptime.rpc.Context; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,12 @@ * @author jiachun.fjc */ public interface WritePOJO { + /** + * @see #writePOJOs(Collection, WriteOp, Context) + */ + default CompletableFuture> writePOJOs(List... pojos) { + return writePOJOs(Arrays.asList(pojos), WriteOp.Insert, Context.newDefault()); + } /** * @see #writePOJOs(Collection, WriteOp, Context) */ From baede06eb1fd5db82e9a029a6216f4687433aeef Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 17:49:35 +0800 Subject: [PATCH 08/10] chore: shutdown on test --- .../src/main/java/io/greptime/StreamWritePOJOsQuickStart.java | 3 +++ .../src/main/java/io/greptime/StreamWriteQuickStart.java | 3 +++ .../src/main/java/io/greptime/WritePOJOsQuickStart.java | 3 +++ .../src/main/java/io/greptime/WriteQuickStart.java | 3 +++ ingester-protocol/src/main/java/io/greptime/Write.java | 2 +- ingester-protocol/src/main/java/io/greptime/WritePOJO.java | 2 +- .../src/test/java/io/greptime/WriteClientTest.java | 2 +- 7 files changed, 15 insertions(+), 3 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index 4412ee6..b0b9c6e 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -77,5 +77,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc WriteOk result = future.get(); LOG.info("Write result: {}", result); + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index dd9b2b3..b7bfe19 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -98,5 +98,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc WriteOk result = future.get(); LOG.info("Write result: {}", result); + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index 1ae200e..6b63e55 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -86,5 +86,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc } else { LOG.error("Failed to delete: {}", result.getErr()); } + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index dc4d56f..e2be6ce 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -109,5 +109,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc } else { LOG.error("Failed to delete: {}", result.getErr()); } + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java index 3a18eec..bf058a4 100644 --- a/ingester-protocol/src/main/java/io/greptime/Write.java +++ b/ingester-protocol/src/main/java/io/greptime/Write.java @@ -35,7 +35,7 @@ public interface Write { * @see #write(Collection, WriteOp, Context) */ default CompletableFuture> write(Table... tables) { - return write(Arrays.asList(tables), WriteOp.Insert, Context.newDefault()); + return write(Arrays.asList(tables)); } /** diff --git a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java index cf9e263..ef440cc 100644 --- a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java +++ b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java @@ -35,7 +35,7 @@ public interface WritePOJO { * @see #writePOJOs(Collection, WriteOp, Context) */ default CompletableFuture> writePOJOs(List... pojos) { - return writePOJOs(Arrays.asList(pojos), WriteOp.Insert, Context.newDefault()); + return writePOJOs(Arrays.asList(pojos)); } /** * @see #writePOJOs(Collection, WriteOp, Context) diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java index 3f4b3da..09eaa3b 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java @@ -105,7 +105,7 @@ public void testWriteSuccess() throws ExecutionException, InterruptedException { Mockito.when(this.routerClient.invoke(Mockito.eq(addr), Mockito.any(), Mockito.any())) // .thenReturn(Util.completedCf(response)); - Result res = this.writeClient.write(Collections.singleton(table)).get(); + Result res = this.writeClient.write(table).get(); Assert.assertTrue(res.isOk()); Assert.assertEquals(3, res.getOk().getSuccess()); From 2063fe649ac6012e73c98e8c382c8e8740696840 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 20:49:44 +0800 Subject: [PATCH 09/10] chore: refactor demos --- ingester-example/pom.xml | 7 ++ .../io/greptime/{MyMetric2.java => Cpu.java} | 53 ++++----- .../src/main/java/io/greptime/Memory.java | 59 ++++++++++ .../src/main/java/io/greptime/MyMetric1.java | 110 ------------------ .../src/main/java/io/greptime/QueryJDBC.java | 56 +++++++++ .../greptime/StreamWritePOJOsQuickStart.java | 40 +++---- .../io/greptime/StreamWriteQuickStart.java | 59 ++++------ .../io/greptime/WritePOJOsQuickStart.java | 40 +++---- .../java/io/greptime/WriteQuickStart.java | 58 ++++----- .../main/resources/db-connection.properties | 5 + .../main/java/io/greptime/WriteClient.java | 5 +- .../java/io/greptime/models/TableHelper.java | 3 + 12 files changed, 226 insertions(+), 269 deletions(-) rename ingester-example/src/main/java/io/greptime/{MyMetric2.java => Cpu.java} (51%) create mode 100644 ingester-example/src/main/java/io/greptime/Memory.java delete mode 100644 ingester-example/src/main/java/io/greptime/MyMetric1.java create mode 100644 ingester-example/src/main/java/io/greptime/QueryJDBC.java create mode 100644 ingester-example/src/main/resources/db-connection.properties diff --git a/ingester-example/pom.xml b/ingester-example/pom.xml index 7ec06e0..4933606 100644 --- a/ingester-example/pom.xml +++ b/ingester-example/pom.xml @@ -38,5 +38,12 @@ log4j-slf4j-impl compile + + + + mysql + mysql-connector-java + 8.0.33 + diff --git a/ingester-example/src/main/java/io/greptime/MyMetric2.java b/ingester-example/src/main/java/io/greptime/Cpu.java similarity index 51% rename from ingester-example/src/main/java/io/greptime/MyMetric2.java rename to ingester-example/src/main/java/io/greptime/Cpu.java index 5eef353..38ca0dd 100644 --- a/ingester-example/src/main/java/io/greptime/MyMetric2.java +++ b/ingester-example/src/main/java/io/greptime/Cpu.java @@ -18,40 +18,29 @@ import io.greptime.models.Column; import io.greptime.models.DataType; import io.greptime.models.Metric; -import java.util.Date; /** * @author jiachun.fjc */ -@Metric(name = "my_metric2") -public class MyMetric2 { - @Column(name = "tag1", tag = true, dataType = DataType.String) - private String tag1; - @Column(name = "tag2", tag = true, dataType = DataType.String) - private String tag2; +@Metric(name = "cpu_metric") +public class Cpu { + @Column(name = "host", tag = true, dataType = DataType.String) + private String host; - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampSecond) + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) private long ts; - @Column(name = "field1", dataType = DataType.Date) - private Date field1; - @Column(name = "field2", dataType = DataType.Float64) - private double field2; + @Column(name = "cpu_user", dataType = DataType.Float64) + private double cpuUser; + @Column(name = "cpu_sys", dataType = DataType.Float64) + private double cpuSys; - public String getTag1() { - return tag1; + public String getHost() { + return host; } - public void setTag1(String tag1) { - this.tag1 = tag1; - } - - public String getTag2() { - return tag2; - } - - public void setTag2(String tag2) { - this.tag2 = tag2; + public void setHost(String host) { + this.host = host; } public long getTs() { @@ -62,19 +51,19 @@ public void setTs(long ts) { this.ts = ts; } - public Date getField1() { - return field1; + public double getCpuUser() { + return cpuUser; } - public void setField1(Date field1) { - this.field1 = field1; + public void setCpuUser(double cpuUser) { + this.cpuUser = cpuUser; } - public double getField2() { - return field2; + public double getCpuSys() { + return cpuSys; } - public void setField2(double field2) { - this.field2 = field2; + public void setCpuSys(double cpuSys) { + this.cpuSys = cpuSys; } } diff --git a/ingester-example/src/main/java/io/greptime/Memory.java b/ingester-example/src/main/java/io/greptime/Memory.java new file mode 100644 index 0000000..1e22d37 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/Memory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.Column; +import io.greptime.models.DataType; +import io.greptime.models.Metric; + +/** + * @author jiachun.fjc + */ +@Metric(name = "mem_metric") +public class Memory { + @Column(name = "host", tag = true, dataType = DataType.String) + private String host; + + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) + private long ts; + + @Column(name = "mem_usage", dataType = DataType.Float64) + private double memUsage; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public long getTs() { + return ts; + } + + public void setTs(long ts) { + this.ts = ts; + } + + public double getMemUsage() { + return memUsage; + } + + public void setMemUsage(double memUsage) { + this.memUsage = memUsage; + } +} diff --git a/ingester-example/src/main/java/io/greptime/MyMetric1.java b/ingester-example/src/main/java/io/greptime/MyMetric1.java deleted file mode 100644 index b29aaaf..0000000 --- a/ingester-example/src/main/java/io/greptime/MyMetric1.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime; - -import io.greptime.models.Column; -import io.greptime.models.DataType; -import io.greptime.models.Metric; -import java.math.BigDecimal; - -/** - * @author jiachun.fjc - */ -@Metric(name = "my_metric1") -public class MyMetric1 { - @Column(name = "tag1", tag = true, dataType = DataType.String) - private String tag1; - @Column(name = "tag2", tag = true, dataType = DataType.String) - private String tag2; - @Column(name = "tag3", tag = true, dataType = DataType.String) - private String tag3; - - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) - private long ts; - - @Column(name = "field1", dataType = DataType.String) - private String field1; - @Column(name = "field2", dataType = DataType.Float64) - private double field2; - @Column(name = "field3", dataType = DataType.Decimal128) - private BigDecimal field3; - @Column(name = "field4", dataType = DataType.Int32) - private int field4; - - public String getTag1() { - return tag1; - } - - public void setTag1(String tag1) { - this.tag1 = tag1; - } - - public String getTag2() { - return tag2; - } - - public void setTag2(String tag2) { - this.tag2 = tag2; - } - - public String getTag3() { - return tag3; - } - - public void setTag3(String tag3) { - this.tag3 = tag3; - } - - public long getTs() { - return ts; - } - - public void setTs(long ts) { - this.ts = ts; - } - - public String getField1() { - return field1; - } - - public void setField1(String field1) { - this.field1 = field1; - } - - public double getField2() { - return field2; - } - - public void setField2(double field2) { - this.field2 = field2; - } - - public BigDecimal getField3() { - return field3; - } - - public void setField3(BigDecimal field3) { - this.field3 = field3; - } - - public int getField4() { - return field4; - } - - public void setField4(int field4) { - this.field4 = field4; - } -} diff --git a/ingester-example/src/main/java/io/greptime/QueryJDBC.java b/ingester-example/src/main/java/io/greptime/QueryJDBC.java new file mode 100644 index 0000000..1aadda2 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/QueryJDBC.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Objects; +import java.util.Properties; + +/** + * @author jiachun.fjc + */ +public class QueryJDBC { + + public static void main(String[] args) throws Exception { + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); + + // Inserts data first + + + try (Connection conn = makeConnection()) { + + } + } + + public static Connection makeConnection() throws IOException, ClassNotFoundException, SQLException { + Properties prop = new Properties(); + prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties")); + + String dbName = (String) prop.get("db.database-driver"); + + String dbConnUrl = (String) prop.get("db.url"); + String dbUserName = (String) prop.get("db.username"); + String dbPassword = (String) prop.get("db.password"); + + Class.forName(dbName); + Connection dbConn = DriverManager.getConnection(dbConnUrl, dbUserName, dbPassword); + + return Objects.requireNonNull(dbConn, "Failed to make connection!"); + } +} diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index b0b9c6e..60ab0f6 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -18,9 +18,7 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Calendar; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -35,41 +33,33 @@ public class StreamWritePOJOsQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - List myMetric1s = new ArrayList<>(); + List cpus = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric1 m = new MyMetric1(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTag3("tag_value_3_" + i); - m.setTs(System.currentTimeMillis()); - m.setField1("field_value_1_" + i); - m.setField2(i); - m.setField3(new BigDecimal(i)); - m.setField4(i); - - myMetric1s.add(m); + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); } - List myMetric2s = new ArrayList<>(); + List memories = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric2 m = new MyMetric2(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); + Memory m = new Memory(); + m.setHost("127.0.0." + i); m.setTs(System.currentTimeMillis() / 1000); - m.setField1(Calendar.getInstance().getTime()); - m.setField2(i); - - myMetric2s.add(m); + m.setMemUsage(i + 0.2); + memories.add(m); } StreamWriter, WriteOk> writer = greptimeDB.streamWriterPOJOs(); // write data into stream - writer.write(myMetric1s); - writer.write(myMetric2s); + writer.write(cpus); + writer.write(memories); // delete the first 5 rows - writer.write(myMetric1s.subList(0, 5), WriteOp.Delete); + writer.write(cpus.subList(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index b7bfe19..9808b2e 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -22,9 +22,6 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.Calendar; -import java.util.Date; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -38,59 +35,45 @@ public class StreamWriteQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("tag3", SemanticType.Tag, DataType.String) // + TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("field1", SemanticType.Field, DataType.String) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // - .addColumn("field3", SemanticType.Field, DataType.Decimal128) // - .addColumn("field4", SemanticType.Field, DataType.Int32) // + .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // + .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // .build(); - TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // - .addColumn("field1", SemanticType.Field, DataType.Date) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // + TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3 = Table.from(myMetric3Schema); - Table myMetric4 = Table.from(myMetric4Schema); + Table cpuMetric = Table.from(cpuMetricSchema); + Table memMetric = Table.from(memMetricSchema); for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - String tag3v = "tag_value_3_" + i; + String host = "127.0.0." + i; long ts = System.currentTimeMillis(); - String field1 = "field_value_1" + i; - double field2 = i + 0.1; - BigDecimal field3 = new BigDecimal(i); - int field4 = i + 1; - - myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + double cpuUser = i + 0.1; + double cpuSys = i + 0.12; + cpuMetric.addRow(host, ts, cpuUser, cpuSys); } for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - long ts = System.currentTimeMillis() / 1000; - Date field1 = Calendar.getInstance().getTime(); - double field2 = i + 0.1; - - myMetric4.addRow(tag1v, tag2v, ts, field1, field2); + String host = "127.0.0." + i; + long ts = System.currentTimeMillis(); + double memUsage = i + 0.2; + memMetric.addRow(host, ts, memUsage); } StreamWriter writer = greptimeDB.streamWriter(); // write data into stream - writer.write(myMetric3); - writer.write(myMetric4); + writer.write(cpuMetric); + writer.write(memMetric); // delete the first 5 rows - writer.write(myMetric3.subRange(0, 5), WriteOp.Delete); + writer.write(cpuMetric.subRange(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index 6b63e55..a0288dd 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -20,10 +20,8 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; -import java.util.Calendar; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -38,37 +36,29 @@ public class WritePOJOsQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - List myMetric1s = new ArrayList<>(); + List cpus = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric1 m = new MyMetric1(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTag3("tag_value_3_" + i); - m.setTs(System.currentTimeMillis()); - m.setField1("field_value_1_" + i); - m.setField2(i); - m.setField3(new BigDecimal(i)); - m.setField4(i); - - myMetric1s.add(m); + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); } - List myMetric2s = new ArrayList<>(); + List memories = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric2 m = new MyMetric2(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTs(System.currentTimeMillis() / 1000); - m.setField1(Calendar.getInstance().getTime()); - m.setField2(i); - - myMetric2s.add(m); + Memory m = new Memory(); + m.setHost("127.0.0." + i); + m.setTs(System.currentTimeMillis()); + m.setMemUsage(i + 0.2); + memories.add(m); } // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> puts = greptimeDB.writePOJOs(myMetric1s, myMetric2s); + CompletableFuture> puts = greptimeDB.writePOJOs(cpus, memories); Result result = puts.get(); @@ -78,7 +68,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List> delete_pojos = Arrays.asList(myMetric1s.subList(0, 5), myMetric2s.subList(0, 5)); + List> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5)); Result deletes = greptimeDB.writePOJOs(delete_pojos, WriteOp.Delete).get(); if (deletes.isOk()) { diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index e2be6ce..ffab47f 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -24,11 +24,7 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.Arrays; -import java.util.Calendar; -import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -43,55 +39,41 @@ public class WriteQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("tag3", SemanticType.Tag, DataType.String) // + TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("field1", SemanticType.Field, DataType.String) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // - .addColumn("field3", SemanticType.Field, DataType.Decimal128) // - .addColumn("field4", SemanticType.Field, DataType.Int32) // + .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // + .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // .build(); - TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // - .addColumn("field1", SemanticType.Field, DataType.Date) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // + TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3 = Table.from(myMetric3Schema); - Table myMetric4 = Table.from(myMetric4Schema); + Table cpuMetric = Table.from(cpuMetricSchema); + Table memMetric = Table.from(memMetricSchema); for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - String tag3v = "tag_value_3_" + i; + String host = "127.0.0." + i; long ts = System.currentTimeMillis(); - String field1 = "field_value_1" + i; - double field2 = i + 0.1; - BigDecimal field3 = new BigDecimal(i); - int field4 = i + 1; - - myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + double cpuUser = i + 0.1; + double cpuSys = i + 0.12; + cpuMetric.addRow(host, ts, cpuUser, cpuSys); } for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - long ts = System.currentTimeMillis() / 1000; - Date field1 = Calendar.getInstance().getTime(); - double field2 = i + 0.1; - - myMetric4.addRow(tag1v, tag2v, ts, field1, field2); + String host = "127.0.0." + i; + long ts = System.currentTimeMillis(); + double memUsage = i + 0.2; + memMetric.addRow(host, ts, memUsage); } // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> future = greptimeDB.write(myMetric3, myMetric4); + CompletableFuture> future = greptimeDB.write(cpuMetric, memMetric); Result result = future.get(); @@ -101,7 +83,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List
delete_objs = Arrays.asList(myMetric3.subRange(0, 5), myMetric4.subRange(0, 5)); + List
delete_objs = Arrays.asList(cpuMetric.subRange(0, 5), memMetric.subRange(0, 5)); Result deletes = greptimeDB.write(delete_objs, WriteOp.Delete).get(); if (deletes.isOk()) { diff --git a/ingester-example/src/main/resources/db-connection.properties b/ingester-example/src/main/resources/db-connection.properties new file mode 100644 index 0000000..9e47c11 --- /dev/null +++ b/ingester-example/src/main/resources/db-connection.properties @@ -0,0 +1,5 @@ +# DataSource +db.database-driver=com.mysql.cj.jdbc.Driver +db.url=jdbc:mysql://localhost:4002/public +db.username= +db.password= \ No newline at end of file diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index 83732b2..83fa262 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -340,11 +340,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(Table table, WriteOp writeOp) { Ensures.ensureNonNull(table, "null `table`"); + WriteTables writeTables = new WriteTables(table, writeOp); + if (this.rateLimiter != null) { double timeSpent = this.rateLimiter.acquire(table.pointCount()); InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent); } - this.observer.onNext(new WriteTables(table, writeOp)); + + this.observer.onNext(writeTables); return this; } } diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java index 6ea992e..c029125 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java @@ -16,6 +16,7 @@ package io.greptime.models; import io.greptime.WriteOp; +import io.greptime.common.util.Ensures; import io.greptime.v1.Common; import io.greptime.v1.Database; import java.util.Collection; @@ -41,6 +42,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables case Insert: Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder(); for (Table t : tables) { + Ensures.ensure(t.pointCount() > 0, "No data to insert in table: %s", t.tableName()); insertBuilder.addInserts(t.intoRowInsertRequest()); } return Database.GreptimeRequest.newBuilder() // @@ -50,6 +52,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables case Delete: Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder(); for (Table t : tables) { + Ensures.ensure(t.pointCount() > 0, "No data to delete in table: %s", t.tableName()); deleteBuilder.addDeletes(t.intoRowDeleteRequest()); } return Database.GreptimeRequest.newBuilder() // From 67e4134a771cc579b0bb181ade04849be0068c5a Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 9 Jan 2024 11:49:21 +0800 Subject: [PATCH 10/10] chore: add jdbc demo --- .../src/main/java/io/greptime/QueryJDBC.java | 58 ++++++++++++++++++- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/QueryJDBC.java b/ingester-example/src/main/java/io/greptime/QueryJDBC.java index 1aadda2..5518178 100644 --- a/ingester-example/src/main/java/io/greptime/QueryJDBC.java +++ b/ingester-example/src/main/java/io/greptime/QueryJDBC.java @@ -15,30 +15,69 @@ */ package io.greptime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.ExecutionException; /** * @author jiachun.fjc */ public class QueryJDBC { + private static final Logger LOG = LoggerFactory.getLogger(QueryJDBC.class); + public static void main(String[] args) throws Exception { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - // Inserts data first + // Inserts data for query + insertData(greptimeDB); + + try (Connection conn = getConnection()) { + Statement statement = conn.createStatement(); + // DESC table; + ResultSet rs = statement.executeQuery("DESC cpu_metric"); + LOG.info("Column | Type | Key | Null | Default | Semantic Type "); + while (rs.next()) { + LOG.info("{} | {} | {} | {} | {} | {}", // + rs.getString(1), // + rs.getString(2), // + rs.getString(3), // + rs.getString(4), // + rs.getString(5), // + rs.getString(6)); + } - try (Connection conn = makeConnection()) { + // SELECT COUNT(*) FROM cpu_metric; + rs = statement.executeQuery("SELECT COUNT(*) FROM cpu_metric"); + while (rs.next()) { + LOG.info("Count: {}", rs.getInt(1)); + } + // SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5; + rs = statement.executeQuery("SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5"); + LOG.info("host | ts | cpu_user | cpu_sys"); + while (rs.next()) { + LOG.info("{} | {} | {} | {}", // + rs.getString("host"), // + rs.getTimestamp("ts"), // + rs.getDouble("cpu_user"), // + rs.getDouble("cpu_sys")); + } } } - public static Connection makeConnection() throws IOException, ClassNotFoundException, SQLException { + public static Connection getConnection() throws IOException, ClassNotFoundException, SQLException { Properties prop = new Properties(); prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties")); @@ -53,4 +92,17 @@ public static Connection makeConnection() throws IOException, ClassNotFoundExcep return Objects.requireNonNull(dbConn, "Failed to make connection!"); } + + public static void insertData(GreptimeDB greptimeDB) throws ExecutionException, InterruptedException { + List cpus = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); + } + greptimeDB.writePOJOs(cpus).get(); + } }