From adb1463f577b16b12f8ae35e71abce17e0269301 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 14:49:33 +0800 Subject: [PATCH] chore: add TestConnector --- .../greptime/StreamWritePOJOsQuickStart.java | 12 +-- .../io/greptime/StreamWriteQuickStart.java | 12 +-- .../main/java/io/greptime/TestConnector.java | 93 +++++++++++++++++++ .../io/greptime/WritePOJOsQuickStart.java | 12 +-- .../java/io/greptime/WriteQuickStart.java | 12 +-- .../src/main/java/io/greptime/GreptimeDB.java | 2 +- .../io/greptime/options/GreptimeOptions.java | 23 +++-- 7 files changed, 115 insertions(+), 51 deletions(-) create mode 100644 ingester-example/src/main/java/io/greptime/TestConnector.java diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index 3eb4c74..4412ee6 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -16,7 +16,6 @@ package io.greptime; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -34,16 +33,7 @@ public class StreamWritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); List myMetric1s = new ArrayList<>(); for (int i = 0; i < 10; i++) { diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index 4db5efe..dd9b2b3 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -20,7 +20,6 @@ import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -37,16 +36,7 @@ public class StreamWriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // .addColumn("tag1", SemanticType.Tag, DataType.String) // diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java new file mode 100644 index 0000000..e99fbb4 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/TestConnector.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.common.util.SerializingExecutor; +import io.greptime.limit.LimitedPolicy; +import io.greptime.models.AuthInfo; +import io.greptime.options.GreptimeOptions; +import io.greptime.rpc.RpcOptions; + +/** + * @author jiachun.fjc + */ +public class TestConnector { + + public static GreptimeDB connectToDefaultDB() { + // GreptimeDB has a default database named "public" in the default catalog "greptime", + // we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + // Optional, the default value is fine. + // + // Asynchronous thread pool, which is used to handle various asynchronous + // tasks in the SDK (You are using a purely asynchronous SDK). If you do not + // set it, there will be a default implementation, which you can reconfigure + // if the default implementation is not satisfied. + // The default implementation is: `SerializingExecutor` + .asyncPool(new SerializingExecutor("asyncPool")) + // Optional, the default value is fine. + // + // Sets the RPC options, in general, the default configuration is fine. + .rpcOptions(RpcOptions.newDefault()) + // Optional, the default value is fine. + // + // In some case of failure, a retry of write can be attempted. + // The default is 1 + .writeMaxRetries(1) + // Optional, the default value is fine. + // + // Write flow limit: maximum number of data rows in-flight. + // The default is 65536 + .maxInFlightWriteRows(65536) + // Optional, the default value is fine. + // + // Write flow limit: the policy to use when the write flow limit is exceeded. + // All options: + // - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. + // - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. + // - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. + // - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + // The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` + .writeLimitedPolicy(LimitedPolicy.defaultWriteLimitedPolicy()) + // Optional, the default value is fine. + // + // The default rate limit for stream writer. It only takes effect when we do not specify the + // `maxPointsPerSecond` when creating a `StreamWriter`. + // The default is 10 * 65536 + .defaultStreamMaxWritePointsPerSecond(10 * 65536) + // Optional, the default value is fine. + // + // Refresh frequency of route tables. The background refreshes all route tables + // periodically. By default, the route tables will not be refreshed. + .routeTableRefreshPeriodSeconds(-1) + // Sets authentication information. If the DB is not required to authenticate, we can ignore this. + .authInfo(new AuthInfo("username", "password")) + // Optional, the default value is fine. + // + // Sets the request router, The internal default implementation works well. + // You don't need to set it unless you have special requirements. + .router(null) + // A good start ^_^ + .build(); + + return GreptimeDB.create(opts); + } +} diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index de98623..4a1ebee 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -18,7 +18,6 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -37,16 +36,7 @@ public class WritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); List myMetric1s = new ArrayList<>(); for (int i = 0; i < 10; i++) { diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 85e94f4..7d56223 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -22,7 +22,6 @@ import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; @@ -42,16 +41,7 @@ public class WriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "greptime.public", we can use it as the test database - String database = "greptime.public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // .addColumn("tag1", SemanticType.Tag, DataType.String) // diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 9d8436f..817fc0f 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -99,7 +99,7 @@ public boolean init(GreptimeOptions opts) { this.opts = GreptimeOptions.checkSelf(opts).copy(); if (Strings.isBlank(this.opts.getDatabase())) { - LOG.warn("The `database` is not specified, use default database: greptime.public"); + LOG.warn("The `database` is not specified, use default (catalog-database): greptime-public"); } this.routerClient = makeRouteClient(opts); diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index cf8ac30..310f602 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -19,7 +19,6 @@ import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.common.util.Ensures; -import io.greptime.common.util.Strings; import io.greptime.limit.LimitedPolicy; import io.greptime.models.AuthInfo; import io.greptime.rpc.RpcOptions; @@ -213,7 +212,13 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) { } /** - * Set write limited policy. + * Write flow limit: the policy to use when the write flow limit is exceeded. + * The options: + * - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. + * - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. + * - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. + * - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + * The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` * * @param writeLimitedPolicy write limited policy * @return this builder @@ -224,7 +229,10 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) { } /** - * The default rate limit for stream writer. + * The default rate limit for `StreamWriter`. It only takes effect when we do not specify the + * `maxPointsPerSecond` when creating a `StreamWriter`. + * The default is 10 * 65536 + * * @param defaultStreamMaxWritePointsPerSecond default max write points per second * @return this builder */ @@ -235,7 +243,7 @@ public Builder defaultStreamMaxWritePointsPerSecond(int defaultStreamMaxWritePoi /** * Refresh frequency of route tables. The background refreshes all route tables - * periodically. By default, all route tables are refreshed every 30 seconds. + * periodically. By default, By default, the route tables will not be refreshed. * * @param routeTableRefreshPeriodSeconds refresh period for route tables cache * @return this builder @@ -246,7 +254,8 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond } /** - * Sets authentication information. + * Sets authentication information. If the DB is not required to authenticate, + * we can ignore this. * * @param authInfo the authentication information * @return this builder @@ -257,7 +266,9 @@ public Builder authInfo(AuthInfo authInfo) { } /** - * Sets the request router. + * Sets the request router, The internal default implementation works well. + * You don't need to set it unless you have special requirements. + * * @param router the request router * @return this builder */