diff --git a/ingester-all/pom.xml b/ingester-all/pom.xml index 8d36bed..61cf624 100644 --- a/ingester-all/pom.xml +++ b/ingester-all/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-all diff --git a/ingester-common/pom.xml b/ingester-common/pom.xml index 430654d..de32dfe 100644 --- a/ingester-common/pom.xml +++ b/ingester-common/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-common diff --git a/ingester-example/pom.xml b/ingester-example/pom.xml index 0454d9d..4933606 100644 --- a/ingester-example/pom.xml +++ b/ingester-example/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-example @@ -38,5 +38,12 @@ log4j-slf4j-impl compile + + + + mysql + mysql-connector-java + 8.0.33 + diff --git a/ingester-example/src/main/java/io/greptime/MyMetric2.java b/ingester-example/src/main/java/io/greptime/Cpu.java similarity index 51% rename from ingester-example/src/main/java/io/greptime/MyMetric2.java rename to ingester-example/src/main/java/io/greptime/Cpu.java index 5eef353..38ca0dd 100644 --- a/ingester-example/src/main/java/io/greptime/MyMetric2.java +++ b/ingester-example/src/main/java/io/greptime/Cpu.java @@ -18,40 +18,29 @@ import io.greptime.models.Column; import io.greptime.models.DataType; import io.greptime.models.Metric; -import java.util.Date; /** * @author jiachun.fjc */ -@Metric(name = "my_metric2") -public class MyMetric2 { - @Column(name = "tag1", tag = true, dataType = DataType.String) - private String tag1; - @Column(name = "tag2", tag = true, dataType = DataType.String) - private String tag2; +@Metric(name = "cpu_metric") +public class Cpu { + @Column(name = "host", tag = true, dataType = DataType.String) + private String host; - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampSecond) + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) private long ts; - @Column(name = "field1", dataType = DataType.Date) - private Date field1; - @Column(name = "field2", dataType = DataType.Float64) - private double field2; + @Column(name = "cpu_user", dataType = DataType.Float64) + private double cpuUser; + @Column(name = "cpu_sys", dataType = DataType.Float64) + private double cpuSys; - public String getTag1() { - return tag1; + public String getHost() { + return host; } - public void setTag1(String tag1) { - this.tag1 = tag1; - } - - public String getTag2() { - return tag2; - } - - public void setTag2(String tag2) { - this.tag2 = tag2; + public void setHost(String host) { + this.host = host; } public long getTs() { @@ -62,19 +51,19 @@ public void setTs(long ts) { this.ts = ts; } - public Date getField1() { - return field1; + public double getCpuUser() { + return cpuUser; } - public void setField1(Date field1) { - this.field1 = field1; + public void setCpuUser(double cpuUser) { + this.cpuUser = cpuUser; } - public double getField2() { - return field2; + public double getCpuSys() { + return cpuSys; } - public void setField2(double field2) { - this.field2 = field2; + public void setCpuSys(double cpuSys) { + this.cpuSys = cpuSys; } } diff --git a/ingester-example/src/main/java/io/greptime/Memory.java b/ingester-example/src/main/java/io/greptime/Memory.java new file mode 100644 index 0000000..1e22d37 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/Memory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.Column; +import io.greptime.models.DataType; +import io.greptime.models.Metric; + +/** + * @author jiachun.fjc + */ +@Metric(name = "mem_metric") +public class Memory { + @Column(name = "host", tag = true, dataType = DataType.String) + private String host; + + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) + private long ts; + + @Column(name = "mem_usage", dataType = DataType.Float64) + private double memUsage; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public long getTs() { + return ts; + } + + public void setTs(long ts) { + this.ts = ts; + } + + public double getMemUsage() { + return memUsage; + } + + public void setMemUsage(double memUsage) { + this.memUsage = memUsage; + } +} diff --git a/ingester-example/src/main/java/io/greptime/MyMetric1.java b/ingester-example/src/main/java/io/greptime/MyMetric1.java deleted file mode 100644 index b29aaaf..0000000 --- a/ingester-example/src/main/java/io/greptime/MyMetric1.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime; - -import io.greptime.models.Column; -import io.greptime.models.DataType; -import io.greptime.models.Metric; -import java.math.BigDecimal; - -/** - * @author jiachun.fjc - */ -@Metric(name = "my_metric1") -public class MyMetric1 { - @Column(name = "tag1", tag = true, dataType = DataType.String) - private String tag1; - @Column(name = "tag2", tag = true, dataType = DataType.String) - private String tag2; - @Column(name = "tag3", tag = true, dataType = DataType.String) - private String tag3; - - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) - private long ts; - - @Column(name = "field1", dataType = DataType.String) - private String field1; - @Column(name = "field2", dataType = DataType.Float64) - private double field2; - @Column(name = "field3", dataType = DataType.Decimal128) - private BigDecimal field3; - @Column(name = "field4", dataType = DataType.Int32) - private int field4; - - public String getTag1() { - return tag1; - } - - public void setTag1(String tag1) { - this.tag1 = tag1; - } - - public String getTag2() { - return tag2; - } - - public void setTag2(String tag2) { - this.tag2 = tag2; - } - - public String getTag3() { - return tag3; - } - - public void setTag3(String tag3) { - this.tag3 = tag3; - } - - public long getTs() { - return ts; - } - - public void setTs(long ts) { - this.ts = ts; - } - - public String getField1() { - return field1; - } - - public void setField1(String field1) { - this.field1 = field1; - } - - public double getField2() { - return field2; - } - - public void setField2(double field2) { - this.field2 = field2; - } - - public BigDecimal getField3() { - return field3; - } - - public void setField3(BigDecimal field3) { - this.field3 = field3; - } - - public int getField4() { - return field4; - } - - public void setField4(int field4) { - this.field4 = field4; - } -} diff --git a/ingester-example/src/main/java/io/greptime/QueryJDBC.java b/ingester-example/src/main/java/io/greptime/QueryJDBC.java new file mode 100644 index 0000000..5518178 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/QueryJDBC.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * @author jiachun.fjc + */ +public class QueryJDBC { + + private static final Logger LOG = LoggerFactory.getLogger(QueryJDBC.class); + + public static void main(String[] args) throws Exception { + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); + + // Inserts data for query + insertData(greptimeDB); + + try (Connection conn = getConnection()) { + Statement statement = conn.createStatement(); + + // DESC table; + ResultSet rs = statement.executeQuery("DESC cpu_metric"); + LOG.info("Column | Type | Key | Null | Default | Semantic Type "); + while (rs.next()) { + LOG.info("{} | {} | {} | {} | {} | {}", // + rs.getString(1), // + rs.getString(2), // + rs.getString(3), // + rs.getString(4), // + rs.getString(5), // + rs.getString(6)); + } + + // SELECT COUNT(*) FROM cpu_metric; + rs = statement.executeQuery("SELECT COUNT(*) FROM cpu_metric"); + while (rs.next()) { + LOG.info("Count: {}", rs.getInt(1)); + } + + // SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5; + rs = statement.executeQuery("SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5"); + LOG.info("host | ts | cpu_user | cpu_sys"); + while (rs.next()) { + LOG.info("{} | {} | {} | {}", // + rs.getString("host"), // + rs.getTimestamp("ts"), // + rs.getDouble("cpu_user"), // + rs.getDouble("cpu_sys")); + } + } + } + + public static Connection getConnection() throws IOException, ClassNotFoundException, SQLException { + Properties prop = new Properties(); + prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties")); + + String dbName = (String) prop.get("db.database-driver"); + + String dbConnUrl = (String) prop.get("db.url"); + String dbUserName = (String) prop.get("db.username"); + String dbPassword = (String) prop.get("db.password"); + + Class.forName(dbName); + Connection dbConn = DriverManager.getConnection(dbConnUrl, dbUserName, dbPassword); + + return Objects.requireNonNull(dbConn, "Failed to make connection!"); + } + + public static void insertData(GreptimeDB greptimeDB) throws ExecutionException, InterruptedException { + List cpus = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); + } + greptimeDB.writePOJOs(cpus).get(); + } +} diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index ee743e1..60ab0f6 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -16,12 +16,9 @@ package io.greptime; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Calendar; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -34,52 +31,35 @@ public class StreamWritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - GreptimeDB greptimeDB = GreptimeDB.create(opts); - - List myMetric1s = new ArrayList<>(); + List cpus = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric1 m = new MyMetric1(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTag3("tag_value_3_" + i); - m.setTs(System.currentTimeMillis()); - m.setField1("field_value_1_" + i); - m.setField2(i); - m.setField3(new BigDecimal(i)); - m.setField4(i); - - myMetric1s.add(m); + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); } - List myMetric2s = new ArrayList<>(); + List memories = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric2 m = new MyMetric2(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); + Memory m = new Memory(); + m.setHost("127.0.0." + i); m.setTs(System.currentTimeMillis() / 1000); - m.setField1(Calendar.getInstance().getTime()); - m.setField2(i); - - myMetric2s.add(m); + m.setMemUsage(i + 0.2); + memories.add(m); } StreamWriter, WriteOk> writer = greptimeDB.streamWriterPOJOs(); // write data into stream - writer.write(myMetric1s); - writer.write(myMetric2s); + writer.write(cpus); + writer.write(memories); // delete the first 5 rows - writer.write(myMetric1s.subList(0, 5), WriteOp.Delete); + writer.write(cpus.subList(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); @@ -87,5 +67,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc WriteOk result = future.get(); LOG.info("Write result: {}", result); + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index 38c5b05..9808b2e 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -20,12 +20,8 @@ import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.Calendar; -import java.util.Date; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -37,70 +33,47 @@ public class StreamWriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("tag3", SemanticType.Tag, DataType.String) // + TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("field1", SemanticType.Field, DataType.String) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // - .addColumn("field3", SemanticType.Field, DataType.Decimal128) // - .addColumn("field4", SemanticType.Field, DataType.Int32) // + .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // + .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // .build(); - TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // - .addColumn("field1", SemanticType.Field, DataType.Date) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // + TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3 = Table.from(myMetric3Schema); - Table myMetric4 = Table.from(myMetric4Schema); + Table cpuMetric = Table.from(cpuMetricSchema); + Table memMetric = Table.from(memMetricSchema); for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - String tag3v = "tag_value_3_" + i; + String host = "127.0.0." + i; long ts = System.currentTimeMillis(); - String field1 = "field_value_1" + i; - double field2 = i + 0.1; - BigDecimal field3 = new BigDecimal(i); - int field4 = i + 1; - - myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + double cpuUser = i + 0.1; + double cpuSys = i + 0.12; + cpuMetric.addRow(host, ts, cpuUser, cpuSys); } for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - long ts = System.currentTimeMillis() / 1000; - Date field1 = Calendar.getInstance().getTime(); - double field2 = i + 0.1; - - myMetric4.addRow(tag1v, tag2v, ts, field1, field2); + String host = "127.0.0." + i; + long ts = System.currentTimeMillis(); + double memUsage = i + 0.2; + memMetric.addRow(host, ts, memUsage); } StreamWriter writer = greptimeDB.streamWriter(); // write data into stream - writer.write(myMetric3); - writer.write(myMetric4); + writer.write(cpuMetric); + writer.write(memMetric); // delete the first 5 rows - writer.write(myMetric3.subRange(0, 5), WriteOp.Delete); + writer.write(cpuMetric.subRange(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); @@ -108,5 +81,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc WriteOk result = future.get(); LOG.info("Write result: {}", result); + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java new file mode 100644 index 0000000..c5ac637 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/TestConnector.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.common.util.SerializingExecutor; +import io.greptime.limit.LimitedPolicy; +import io.greptime.models.AuthInfo; +import io.greptime.options.GreptimeOptions; +import io.greptime.rpc.RpcOptions; + +/** + * @author jiachun.fjc + */ +public class TestConnector { + + public static GreptimeDB connectToDefaultDB() { + // GreptimeDB has a default database named "public" in the default catalog "greptime", + // we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + // Optional, the default value is fine. + // + // Asynchronous thread pool, which is used to handle various asynchronous + // tasks in the SDK (You are using a purely asynchronous SDK). If you do not + // set it, there will be a default implementation, which you can reconfigure + // if the default implementation is not satisfied. + // The default implementation is: `SerializingExecutor` + .asyncPool(new SerializingExecutor("asyncPool")) + // Optional, the default value is fine. + // + // Sets the RPC options, in general, the default configuration is fine. + .rpcOptions(RpcOptions.newDefault()) + // Optional, the default value is fine. + // + // In some case of failure, a retry of write can be attempted. + // The default is 1 + .writeMaxRetries(1) + // Optional, the default value is fine. + // + // Write flow limit: maximum number of data rows in-flight. It does not take effect on `StreamWriter` + // The default is 65536 + .maxInFlightWriteRows(65536) + // Optional, the default value is fine. + // + // Write flow limit: the policy to use when the write flow limit is exceeded. + // All options: + // - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. + // - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. + // - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. + // - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + // The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` + .writeLimitedPolicy(LimitedPolicy.defaultWriteLimitedPolicy()) + // Optional, the default value is fine. + // + // The default rate limit for stream writer. It only takes effect when we do not specify the + // `maxPointsPerSecond` when creating a `StreamWriter`. + // The default is 10 * 65536 + .defaultStreamMaxWritePointsPerSecond(10 * 65536) + // Optional, the default value is fine. + // + // Refresh frequency of route tables. The background refreshes all route tables + // periodically. By default, the route tables will not be refreshed. + .routeTableRefreshPeriodSeconds(-1) + // Optional, the default value is fine. + // + // Sets the request router, The internal default implementation works well. + // You don't need to set it unless you have special requirements. + .router(null) + // Sets authentication information. If the DB is not required to authenticate, we can ignore this. + .authInfo(AuthInfo.noAuthorization()) + // A good start ^_^ + .build(); + + return GreptimeDB.create(opts); + } +} diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index 7c20d16..a0288dd 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -18,13 +18,10 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; -import java.util.Calendar; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -37,50 +34,31 @@ public class WritePOJOsQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - GreptimeDB greptimeDB = GreptimeDB.create(opts); - - List myMetric1s = new ArrayList<>(); + List cpus = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric1 m = new MyMetric1(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTag3("tag_value_3_" + i); - m.setTs(System.currentTimeMillis()); - m.setField1("field_value_1_" + i); - m.setField2(i); - m.setField3(new BigDecimal(i)); - m.setField4(i); - - myMetric1s.add(m); + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); } - List myMetric2s = new ArrayList<>(); + List memories = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric2 m = new MyMetric2(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTs(System.currentTimeMillis() / 1000); - m.setField1(Calendar.getInstance().getTime()); - m.setField2(i); - - myMetric2s.add(m); + Memory m = new Memory(); + m.setHost("127.0.0." + i); + m.setTs(System.currentTimeMillis()); + m.setMemUsage(i + 0.2); + memories.add(m); } - List> pojos = Arrays.asList(myMetric1s, myMetric2s); - // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> puts = greptimeDB.writePOJOs(pojos); + CompletableFuture> puts = greptimeDB.writePOJOs(cpus, memories); Result result = puts.get(); @@ -90,7 +68,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List> delete_pojos = Arrays.asList(myMetric1s.subList(0, 5), myMetric2s.subList(0, 5)); + List> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5)); Result deletes = greptimeDB.writePOJOs(delete_pojos, WriteOp.Delete).get(); if (deletes.isOk()) { @@ -98,5 +76,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc } else { LOG.error("Failed to delete: {}", result.getErr()); } + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 6b6d6d0..ffab47f 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -22,14 +22,9 @@ import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.Arrays; -import java.util.Calendar; -import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -42,68 +37,43 @@ public class WriteQuickStart { private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { - // GreptimeDB has a default database named "public", we can use it as the test database - String database = "public"; - // By default, GreptimeDB listens on port 4001 using the gRPC protocol. - // We can provide multiple endpoints that point to the same GreptimeDB cluster. - // The client will make calls to these endpoints based on a load balancing strategy. - String[] endpoints = {"127.0.0.1:4001"}; - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // - .build(); - - GreptimeDB greptimeDB = GreptimeDB.create(opts); + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("tag3", SemanticType.Tag, DataType.String) // + TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("field1", SemanticType.Field, DataType.String) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // - .addColumn("field3", SemanticType.Field, DataType.Decimal128) // - .addColumn("field4", SemanticType.Field, DataType.Int32) // + .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // + .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // .build(); - TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // - .addColumn("field1", SemanticType.Field, DataType.Date) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // + TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3 = Table.from(myMetric3Schema); - Table myMetric4 = Table.from(myMetric4Schema); + Table cpuMetric = Table.from(cpuMetricSchema); + Table memMetric = Table.from(memMetricSchema); for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - String tag3v = "tag_value_3_" + i; + String host = "127.0.0." + i; long ts = System.currentTimeMillis(); - String field1 = "field_value_1" + i; - double field2 = i + 0.1; - BigDecimal field3 = new BigDecimal(i); - int field4 = i + 1; - - myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + double cpuUser = i + 0.1; + double cpuSys = i + 0.12; + cpuMetric.addRow(host, ts, cpuUser, cpuSys); } for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - long ts = System.currentTimeMillis() / 1000; - Date field1 = Calendar.getInstance().getTime(); - double field2 = i + 0.1; - - myMetric4.addRow(tag1v, tag2v, ts, field1, field2); + String host = "127.0.0." + i; + long ts = System.currentTimeMillis(); + double memUsage = i + 0.2; + memMetric.addRow(host, ts, memUsage); } - Collection tables = Arrays.asList(myMetric3, myMetric4); - // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> future = greptimeDB.write(tables); + CompletableFuture> future = greptimeDB.write(cpuMetric, memMetric); Result result = future.get(); @@ -113,7 +83,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List
delete_objs = Arrays.asList(myMetric3.subRange(0, 5), myMetric4.subRange(0, 5)); + List
delete_objs = Arrays.asList(cpuMetric.subRange(0, 5), memMetric.subRange(0, 5)); Result deletes = greptimeDB.write(delete_objs, WriteOp.Delete).get(); if (deletes.isOk()) { @@ -121,5 +91,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc } else { LOG.error("Failed to delete: {}", result.getErr()); } + + // Shutdown the client when application exits. + greptimeDB.shutdownGracefully(); } } diff --git a/ingester-example/src/main/resources/db-connection.properties b/ingester-example/src/main/resources/db-connection.properties new file mode 100644 index 0000000..9e47c11 --- /dev/null +++ b/ingester-example/src/main/resources/db-connection.properties @@ -0,0 +1,5 @@ +# DataSource +db.database-driver=com.mysql.cj.jdbc.Driver +db.url=jdbc:mysql://localhost:4002/public +db.username= +db.password= \ No newline at end of file diff --git a/ingester-grpc/pom.xml b/ingester-grpc/pom.xml index fc55419..05198f0 100644 --- a/ingester-grpc/pom.xml +++ b/ingester-grpc/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-grpc diff --git a/ingester-protocol/pom.xml b/ingester-protocol/pom.xml index f3c0ae6..e46555b 100644 --- a/ingester-protocol/pom.xml +++ b/ingester-protocol/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-protocol diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 41b2f1a..817fc0f 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -21,8 +21,8 @@ import io.greptime.common.Endpoint; import io.greptime.common.Lifecycle; import io.greptime.common.signal.SignalHandlersLoader; -import io.greptime.common.util.MetricExecutor; import io.greptime.common.util.MetricsUtil; +import io.greptime.common.util.Strings; import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.Table; @@ -42,7 +42,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -70,7 +69,6 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle, private GreptimeOptions opts; private RouterClient routerClient; private WriteClient writeClient; - private Executor asyncPool; /** * Returns all instances of {@link GreptimeDB}. @@ -82,8 +80,9 @@ public static List instances() { public static GreptimeDB create(GreptimeOptions opts) { GreptimeDB greptimeDB = new GreptimeDB(); if (!greptimeDB.init(opts)) { - throw new RuntimeException("Failed to start GreptimeDB client"); + throw new RuntimeException("Failed to start the GreptimeDB client"); } + LOG.info("GreptimeDB client started: {}", greptimeDB); return greptimeDB; } @@ -99,11 +98,12 @@ public boolean init(GreptimeOptions opts) { this.opts = GreptimeOptions.checkSelf(opts).copy(); - this.routerClient = makeRouteClient(opts); - if (this.asyncPool != null) { - this.asyncPool = new MetricExecutor(this.asyncPool, "async_pool.time"); + if (Strings.isBlank(this.opts.getDatabase())) { + LOG.warn("The `database` is not specified, use default (catalog-database): greptime-public"); } - this.writeClient = makeWriteClient(opts, this.routerClient, this.asyncPool); + + this.routerClient = makeRouteClient(opts); + this.writeClient = makeWriteClient(opts, this.routerClient); INSTANCES.put(this.id, this); @@ -184,8 +184,10 @@ public void display(Printer out) { .println(VERSION) // .print("endpoints=") // .println(this.opts.getEndpoints()) // - .print("userAsyncPool=") // - .println(this.opts.getAsyncPool()); + .print("database=") // + .println(this.opts.getDatabase()) // + .print("rpcOptions=") // + .println(this.opts.getRpcOptions()); if (this.routerClient != null) { out.println(""); @@ -208,7 +210,6 @@ public String toString() { ", opts=" + opts + // ", routerClient=" + routerClient + // ", writeClient=" + writeClient + // - ", asyncPool=" + asyncPool + // '}'; } @@ -222,7 +223,7 @@ private static RpcClient makeRpcClient(GreptimeOptions opts) { RpcOptions rpcOpts = opts.getRpcOptions(); RpcClient rpcClient = RpcFactoryProvider.getRpcFactory().createRpcClient(); if (!rpcClient.init(rpcOpts)) { - throw new IllegalStateException("Fail to start RPC client"); + throw new IllegalStateException("Failed to start the RPC client"); } rpcClient.registerConnectionObserver(new RpcConnectionObserver()); return rpcClient; @@ -233,21 +234,17 @@ private static RouterClient makeRouteClient(GreptimeOptions opts) { routerOpts.setRpcClient(makeRpcClient(opts)); RouterClient routerClient = new RouterClient(); if (!routerClient.init(routerOpts)) { - throw new IllegalStateException("Fail to start router client"); + throw new IllegalStateException("Failed to start the router client"); } return routerClient; } - private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient routerClient, Executor asyncPool) { + private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient routerClient) { WriteOptions writeOpts = opts.getWriteOptions(); writeOpts.setRouterClient(routerClient); - writeOpts.setAsyncPool(asyncPool); WriteClient writeClient = new WriteClient(); - if (opts.getAuthInfo() != null) { - writeOpts.setAuthInfo(opts.getAuthInfo()); - } if (!writeClient.init(writeOpts)) { - throw new IllegalStateException("Fail to start write client"); + throw new IllegalStateException("Failed to start the write client failed"); } return writeClient; } diff --git a/ingester-protocol/src/main/java/io/greptime/Router.java b/ingester-protocol/src/main/java/io/greptime/Router.java index 928ef84..8d13315 100644 --- a/ingester-protocol/src/main/java/io/greptime/Router.java +++ b/ingester-protocol/src/main/java/io/greptime/Router.java @@ -15,6 +15,7 @@ */ package io.greptime; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -22,7 +23,7 @@ * * @author jiachun.fjc */ -public interface Router { +public interface Router { /** * For a given request return the routing decision for the call. @@ -30,5 +31,20 @@ public interface Router { * @param request route request * @return a endpoint for the call */ - CompletableFuture routeFor(Req request); + CompletableFuture routeFor(R request); + + /** + * Refresh the routing table from remote server. + * @return a future that will be completed when the refresh is done + */ + CompletableFuture refresh(); + + /** + * Refresh the routing table. + * We need to get all the endpoints, and this method will overwrite all + * current endpoints. + * + * @param endpoints all new endpoints + */ + void onRefresh(List endpoints); } diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index 5836127..d44bb92 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -49,7 +49,7 @@ public class RouterClient implements Lifecycle, Display { private ScheduledExecutorService refresher; private RouterOptions opts; private RpcClient rpcClient; - private InnerRouter inner; + private Router router; @Override public boolean init(RouterOptions opts) { @@ -58,14 +58,20 @@ public boolean init(RouterOptions opts) { List endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`"); - this.inner = new InnerRouter(); - this.inner.refreshLocal(endpoints); + this.router = new DefaultRouter(); + this.router.onRefresh(endpoints); long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( - () -> this.inner.refreshFromRemote(), + () -> this.router.refresh().whenComplete((r, e) -> { + if (e != null) { + LOG.error("Router cache refresh failed.", e); + } else { + LOG.debug("Router cache refresh {}.", r ? "success" : "failed"); + } + }), Util.randomInitialDelay(180), refreshPeriod, TimeUnit.SECONDS); LOG.info("Router cache refresher started."); @@ -90,7 +96,7 @@ public void shutdownGracefully() { * Get the current routing table. */ public CompletableFuture route() { - return this.inner.routeFor(null); + return this.router.routeFor(null); } /** @@ -211,18 +217,10 @@ public String toString() { * the client send request using a rr or random policy, and frontend server needs to * be able to return the member list for the purpose of frontend server members change. */ - private static class InnerRouter implements Router { + private static class DefaultRouter implements Router { private final AtomicReference> endpointsRef = new AtomicReference<>(); - public void refreshFromRemote() { - // TODO - } - - void refreshLocal(List input) { - this.endpointsRef.set(input); - } - @Override public CompletableFuture routeFor(Void request) { List endpoints = this.endpointsRef.get(); @@ -230,5 +228,16 @@ public CompletableFuture routeFor(Void request) { int i = random.nextInt(0, endpoints.size()); return Util.completedCf(endpoints.get(i)); } + + @Override + public CompletableFuture refresh() { + // always return true + return Util.completedCf(true); + } + + @Override + public void onRefresh(List endpoints) { + this.endpointsRef.set(endpoints); + } } } diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java index 853b3a1..bf058a4 100644 --- a/ingester-protocol/src/main/java/io/greptime/Write.java +++ b/ingester-protocol/src/main/java/io/greptime/Write.java @@ -21,6 +21,7 @@ import io.greptime.models.WriteOk; import io.greptime.rpc.Context; +import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,12 @@ * @author jiachun.fjc */ public interface Write { + /** + * @see #write(Collection, WriteOp, Context) + */ + default CompletableFuture> write(Table... tables) { + return write(Arrays.asList(tables)); + } /** * @see #write(Collection, WriteOp, Context) diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index bc4e73d..83fa262 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -24,6 +24,7 @@ import io.greptime.common.Lifecycle; import io.greptime.common.util.Clock; import io.greptime.common.util.Ensures; +import io.greptime.common.util.MetricExecutor; import io.greptime.common.util.MetricsUtil; import io.greptime.common.util.SerializingExecutor; import io.greptime.errors.LimitedException; @@ -69,6 +70,7 @@ public boolean init(WriteOptions opts) { this.routerClient = this.opts.getRouterClient(); Executor pool = this.opts.getAsyncPool(); this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client"); + this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time"); this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy()); return true; } @@ -338,11 +340,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(Table table, WriteOp writeOp) { Ensures.ensureNonNull(table, "null `table`"); + WriteTables writeTables = new WriteTables(table, writeOp); + if (this.rateLimiter != null) { double timeSpent = this.rateLimiter.acquire(table.pointCount()); InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent); } - this.observer.onNext(new WriteTables(table, writeOp)); + + this.observer.onNext(writeTables); return this; } } diff --git a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java index b683877..ef440cc 100644 --- a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java +++ b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java @@ -20,6 +20,7 @@ import io.greptime.models.WriteOk; import io.greptime.rpc.Context; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,12 @@ * @author jiachun.fjc */ public interface WritePOJO { + /** + * @see #writePOJOs(Collection, WriteOp, Context) + */ + default CompletableFuture> writePOJOs(List... pojos) { + return writePOJOs(Arrays.asList(pojos)); + } /** * @see #writePOJOs(Collection, WriteOp, Context) */ diff --git a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java index bd01595..e5d27c4 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java +++ b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java @@ -25,8 +25,8 @@ */ public class AuthInfo implements Into { - private String username; - private String password; + private final String username; + private final String password; /** * Create AuthInfo from username/password. @@ -36,27 +36,15 @@ public AuthInfo(String username, String password) { this.password = password; } - public String getUsername() { - return this.username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return this.password; - } - - public void setPassword(String password) { - this.password = password; + public static AuthInfo noAuthorization() { + return null; } @Override public Common.AuthHeader into() { Common.Basic basic = Common.Basic.newBuilder() // - .setUsername(getUsername()) // - .setPassword(getPassword()) // + .setUsername(this.username) // + .setPassword(this.password) // .build(); return Common.AuthHeader.newBuilder() // .setBasic(basic) // diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java index 6ea992e..c029125 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java @@ -16,6 +16,7 @@ package io.greptime.models; import io.greptime.WriteOp; +import io.greptime.common.util.Ensures; import io.greptime.v1.Common; import io.greptime.v1.Database; import java.util.Collection; @@ -41,6 +42,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables case Insert: Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder(); for (Table t : tables) { + Ensures.ensure(t.pointCount() > 0, "No data to insert in table: %s", t.tableName()); insertBuilder.addInserts(t.intoRowInsertRequest()); } return Database.GreptimeRequest.newBuilder() // @@ -50,6 +52,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables case Delete: Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder(); for (Table t : tables) { + Ensures.ensure(t.pointCount() > 0, "No data to delete in table: %s", t.tableName()); deleteBuilder.addDeletes(t.intoRowDeleteRequest()); } return Database.GreptimeRequest.newBuilder() // diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 739debf..310f602 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -15,6 +15,7 @@ */ package io.greptime.options; +import io.greptime.Router; import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.common.util.Ensures; @@ -35,12 +36,10 @@ */ public class GreptimeOptions implements Copiable { private List endpoints; - private Executor asyncPool; private RpcOptions rpcOptions; private RouterOptions routerOptions; private WriteOptions writeOptions; private String database; - private AuthInfo authInfo; public List getEndpoints() { return endpoints; @@ -50,14 +49,6 @@ public void setEndpoints(List endpoints) { this.endpoints = endpoints; } - public Executor getAsyncPool() { - return asyncPool; - } - - public void setAsyncPool(Executor asyncPool) { - this.asyncPool = asyncPool; - } - public RpcOptions getRpcOptions() { return rpcOptions; } @@ -90,21 +81,11 @@ public void setDatabase(String database) { this.database = database; } - public AuthInfo getAuthInfo() { - return authInfo; - } - - public void setAuthInfo(AuthInfo authInfo) { - this.authInfo = authInfo; - } - @Override public GreptimeOptions copy() { GreptimeOptions opts = new GreptimeOptions(); opts.endpoints = new ArrayList<>(this.endpoints); - opts.asyncPool = this.asyncPool; opts.database = this.database; - opts.authInfo = this.authInfo; if (this.rpcOptions != null) { opts.rpcOptions = this.rpcOptions.copy(); } @@ -121,12 +102,10 @@ public GreptimeOptions copy() { public String toString() { return "GreptimeOptions{" + // "endpoints=" + endpoints + // - ", asyncPool=" + asyncPool + // ", rpcOptions=" + rpcOptions + // ", routerOptions=" + routerOptions + // ", writeOptions=" + writeOptions + // ", database='" + database + '\'' + // - ", authInfo=" + authInfo + // '}'; } @@ -137,6 +116,7 @@ public static GreptimeOptions checkSelf(GreptimeOptions opts) { Ensures.ensureNonNull(opts.getRpcOptions(), "null `rpcOptions`"); Ensures.ensureNonNull(opts.getRouterOptions(), "null `routerOptions`"); Ensures.ensureNonNull(opts.getWriteOptions(), "null `writeOptions`"); + return opts; } @@ -174,6 +154,8 @@ public static final class Builder { private long routeTableRefreshPeriodSeconds = -1; // Authentication information private AuthInfo authInfo; + // The request router + private Router router; public Builder(List endpoints, String database) { this.endpoints.addAll(endpoints); @@ -230,7 +212,13 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) { } /** - * Set write limited policy. + * Write flow limit: the policy to use when the write flow limit is exceeded. + * The options: + * - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full. + * - `LimitedPolicy.AbortPolicy`: abort if the limiter is full. + * - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full. + * - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full. + * The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy` * * @param writeLimitedPolicy write limited policy * @return this builder @@ -241,7 +229,10 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) { } /** - * The default rate limit for stream writer. + * The default rate limit for `StreamWriter`. It only takes effect when we do not specify the + * `maxPointsPerSecond` when creating a `StreamWriter`. + * The default is 10 * 65536 + * * @param defaultStreamMaxWritePointsPerSecond default max write points per second * @return this builder */ @@ -252,7 +243,7 @@ public Builder defaultStreamMaxWritePointsPerSecond(int defaultStreamMaxWritePoi /** * Refresh frequency of route tables. The background refreshes all route tables - * periodically. By default, all route tables are refreshed every 30 seconds. + * periodically. By default, By default, the route tables will not be refreshed. * * @param routeTableRefreshPeriodSeconds refresh period for route tables cache * @return this builder @@ -263,7 +254,8 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond } /** - * Sets authentication information. + * Sets authentication information. If the DB is not required to authenticate, + * we can ignore this. * * @param authInfo the authentication information * @return this builder @@ -273,6 +265,18 @@ public Builder authInfo(AuthInfo authInfo) { return this; } + /** + * Sets the request router, The internal default implementation works well. + * You don't need to set it unless you have special requirements. + * + * @param router the request router + * @return this builder + */ + public Builder router(Router router) { + this.router = router; + return this; + } + /** * A good start, happy coding. * @@ -281,23 +285,22 @@ public Builder authInfo(AuthInfo authInfo) { public GreptimeOptions build() { GreptimeOptions opts = new GreptimeOptions(); opts.setEndpoints(this.endpoints); - opts.setAsyncPool(this.asyncPool); opts.setRpcOptions(this.rpcOptions); opts.setDatabase(this.database); - opts.setAuthInfo(this.authInfo); - opts.setRouterOptions(createRouterOptions()); - opts.setWriteOptions(createWriteOptions()); + opts.setRouterOptions(routerOptions()); + opts.setWriteOptions(writeOptions()); return GreptimeOptions.checkSelf(opts); } - private RouterOptions createRouterOptions() { + private RouterOptions routerOptions() { RouterOptions routerOpts = new RouterOptions(); routerOpts.setEndpoints(this.endpoints); + routerOpts.setRouter(this.router); routerOpts.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds); return routerOpts; } - private WriteOptions createWriteOptions() { + private WriteOptions writeOptions() { WriteOptions writeOpts = new WriteOptions(); writeOpts.setDatabase(this.database); writeOpts.setAuthInfo(this.authInfo); diff --git a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java index e978b7e..3caab4f 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java @@ -15,6 +15,7 @@ */ package io.greptime.options; +import io.greptime.Router; import io.greptime.common.Copiable; import io.greptime.common.Endpoint; import io.greptime.rpc.RpcClient; @@ -34,6 +35,7 @@ public class RouterOptions implements Copiable { // all route tables periodically. If the value is less than or // equal to 0, the route tables will not be refreshed. private long refreshPeriodSeconds = -1; + private Router router; public RpcClient getRpcClient() { return rpcClient; @@ -59,12 +61,21 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) { this.refreshPeriodSeconds = refreshPeriodSeconds; } + public Router getRouter() { + return router; + } + + public void setRouter(Router router) { + this.router = router; + } + @Override public RouterOptions copy() { RouterOptions opts = new RouterOptions(); opts.rpcClient = rpcClient; opts.endpoints = this.endpoints; opts.refreshPeriodSeconds = this.refreshPeriodSeconds; + opts.router = this.router; return opts; } diff --git a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java index 5624dbb..cdfd7de 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java @@ -118,9 +118,9 @@ public WriteOptions copy() { @Override public String toString() { + // Do not print auto info return "WriteOptions{" + // "database='" + database + '\'' + // - ", authInfo=" + authInfo + // ", routerClient=" + routerClient + // ", asyncPool=" + asyncPool + // ", maxRetries=" + maxRetries + // diff --git a/ingester-protocol/src/main/resources/client_version.properties b/ingester-protocol/src/main/resources/client_version.properties index 2777b47..269f518 100644 --- a/ingester-protocol/src/main/resources/client_version.properties +++ b/ingester-protocol/src/main/resources/client_version.properties @@ -1 +1 @@ -client.version=0.4.0 +client.version=0.5.0 diff --git a/ingester-protocol/src/test/java/io/greptime/UtilTest.java b/ingester-protocol/src/test/java/io/greptime/UtilTest.java index ce6f6a3..3d58056 100644 --- a/ingester-protocol/src/test/java/io/greptime/UtilTest.java +++ b/ingester-protocol/src/test/java/io/greptime/UtilTest.java @@ -26,6 +26,6 @@ public class UtilTest { @Test public void testClientVersion() { String ver = Util.clientVersion(); - Assert.assertEquals("0.4.0", ver); + Assert.assertEquals("0.5.0", ver); } } diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java index 3f4b3da..09eaa3b 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java @@ -105,7 +105,7 @@ public void testWriteSuccess() throws ExecutionException, InterruptedException { Mockito.when(this.routerClient.invoke(Mockito.eq(addr), Mockito.any(), Mockito.any())) // .thenReturn(Util.completedCf(response)); - Result res = this.writeClient.write(Collections.singleton(table)).get(); + Result res = this.writeClient.write(table).get(); Assert.assertTrue(res.isOk()); Assert.assertEquals(3, res.getOk().getSuccess()); diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java new file mode 100644 index 0000000..3252acb --- /dev/null +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime.options; + +import io.greptime.Router; +import io.greptime.limit.LimitedPolicy; +import io.greptime.models.AuthInfo; +import io.greptime.rpc.RpcOptions; +import org.junit.Assert; +import org.junit.Test; +import io.greptime.common.Endpoint; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * @author jiachun.fjc + */ +public class GreptimeOptionsTest { + + @Test + public void testAllOptions() { + String database = "greptime.public"; + String[] endpoints = {"127.0.0.1:4001"}; + Executor asyncPool = command -> System.out.println("asyncPool"); + RpcOptions rpcOptions = RpcOptions.newDefault(); + int writeMaxRetries = 2; + int maxInFlightWriteRows = 999; + LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy(); + int defaultStreamMaxWritePointsPerSecond = 100000; + long routeTableRefreshPeriodSeconds = 99; + AuthInfo authInfo = new AuthInfo("user", "password"); + Router router = createTestRouter(); + + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + .asyncPool(asyncPool) // + .rpcOptions(rpcOptions) // + .writeMaxRetries(writeMaxRetries) // + .maxInFlightWriteRows(maxInFlightWriteRows) // + .writeLimitedPolicy(limitedPolicy) // + .defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) // + .routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) // + .authInfo(authInfo) // + .router(router) // + .build(); + + Assert.assertEquals(database, opts.getDatabase()); + Assert.assertArrayEquals(endpoints, opts.getEndpoints().stream().map(Endpoint::toString).toArray()); + Assert.assertEquals(rpcOptions, opts.getRpcOptions()); + + RouterOptions routerOptions = opts.getRouterOptions(); + Assert.assertNotNull(routerOptions); + Assert.assertArrayEquals(endpoints, routerOptions.getEndpoints().stream().map(Endpoint::toString).toArray()); + Assert.assertEquals(router, routerOptions.getRouter()); + Assert.assertEquals(routeTableRefreshPeriodSeconds, routerOptions.getRefreshPeriodSeconds()); + + WriteOptions writeOptions = opts.getWriteOptions(); + Assert.assertNotNull(writeOptions); + Assert.assertEquals(asyncPool, writeOptions.getAsyncPool()); + Assert.assertEquals(writeMaxRetries, writeOptions.getMaxRetries()); + Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWriteRows()); + Assert.assertEquals(limitedPolicy, writeOptions.getLimitedPolicy()); + Assert.assertEquals(defaultStreamMaxWritePointsPerSecond, writeOptions.getDefaultStreamMaxWritePointsPerSecond()); + Assert.assertEquals(authInfo, writeOptions.getAuthInfo()); + } + + private Router createTestRouter() { + return new Router() { + + @Override + public CompletableFuture routeFor(Void request) { + return null; + } + + @Override + public CompletableFuture refresh() { + return null; + } + + @Override + public void onRefresh(List endpoints) {} + }; + } +} diff --git a/ingester-rpc/pom.xml b/ingester-rpc/pom.xml index cb4f191..57d7411 100644 --- a/ingester-rpc/pom.xml +++ b/ingester-rpc/pom.xml @@ -4,7 +4,7 @@ greptimedb-ingester io.greptime - 0.4.0 + 0.5.0 ingester-rpc diff --git a/pom.xml b/pom.xml index 9571137..6ce821a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.greptime greptimedb-ingester - 0.4.0 + 0.5.0 pom ${project.groupId}:${project.artifactId}