diff --git a/ingester-all/pom.xml b/ingester-all/pom.xml
index 8d36bed..61cf624 100644
--- a/ingester-all/pom.xml
+++ b/ingester-all/pom.xml
@@ -4,7 +4,7 @@
greptimedb-ingester
io.greptime
- 0.4.0
+ 0.5.0
ingester-all
diff --git a/ingester-common/pom.xml b/ingester-common/pom.xml
index 430654d..de32dfe 100644
--- a/ingester-common/pom.xml
+++ b/ingester-common/pom.xml
@@ -4,7 +4,7 @@
greptimedb-ingester
io.greptime
- 0.4.0
+ 0.5.0
ingester-common
diff --git a/ingester-example/pom.xml b/ingester-example/pom.xml
index 0454d9d..4933606 100644
--- a/ingester-example/pom.xml
+++ b/ingester-example/pom.xml
@@ -4,7 +4,7 @@
greptimedb-ingester
io.greptime
- 0.4.0
+ 0.5.0
ingester-example
@@ -38,5 +38,12 @@
log4j-slf4j-impl
compile
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.33
+
diff --git a/ingester-example/src/main/java/io/greptime/MyMetric2.java b/ingester-example/src/main/java/io/greptime/Cpu.java
similarity index 51%
rename from ingester-example/src/main/java/io/greptime/MyMetric2.java
rename to ingester-example/src/main/java/io/greptime/Cpu.java
index 5eef353..38ca0dd 100644
--- a/ingester-example/src/main/java/io/greptime/MyMetric2.java
+++ b/ingester-example/src/main/java/io/greptime/Cpu.java
@@ -18,40 +18,29 @@
import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Metric;
-import java.util.Date;
/**
* @author jiachun.fjc
*/
-@Metric(name = "my_metric2")
-public class MyMetric2 {
- @Column(name = "tag1", tag = true, dataType = DataType.String)
- private String tag1;
- @Column(name = "tag2", tag = true, dataType = DataType.String)
- private String tag2;
+@Metric(name = "cpu_metric")
+public class Cpu {
+ @Column(name = "host", tag = true, dataType = DataType.String)
+ private String host;
- @Column(name = "ts", timestamp = true, dataType = DataType.TimestampSecond)
+ @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
private long ts;
- @Column(name = "field1", dataType = DataType.Date)
- private Date field1;
- @Column(name = "field2", dataType = DataType.Float64)
- private double field2;
+ @Column(name = "cpu_user", dataType = DataType.Float64)
+ private double cpuUser;
+ @Column(name = "cpu_sys", dataType = DataType.Float64)
+ private double cpuSys;
- public String getTag1() {
- return tag1;
+ public String getHost() {
+ return host;
}
- public void setTag1(String tag1) {
- this.tag1 = tag1;
- }
-
- public String getTag2() {
- return tag2;
- }
-
- public void setTag2(String tag2) {
- this.tag2 = tag2;
+ public void setHost(String host) {
+ this.host = host;
}
public long getTs() {
@@ -62,19 +51,19 @@ public void setTs(long ts) {
this.ts = ts;
}
- public Date getField1() {
- return field1;
+ public double getCpuUser() {
+ return cpuUser;
}
- public void setField1(Date field1) {
- this.field1 = field1;
+ public void setCpuUser(double cpuUser) {
+ this.cpuUser = cpuUser;
}
- public double getField2() {
- return field2;
+ public double getCpuSys() {
+ return cpuSys;
}
- public void setField2(double field2) {
- this.field2 = field2;
+ public void setCpuSys(double cpuSys) {
+ this.cpuSys = cpuSys;
}
}
diff --git a/ingester-example/src/main/java/io/greptime/Memory.java b/ingester-example/src/main/java/io/greptime/Memory.java
new file mode 100644
index 0000000..1e22d37
--- /dev/null
+++ b/ingester-example/src/main/java/io/greptime/Memory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.greptime;
+
+import io.greptime.models.Column;
+import io.greptime.models.DataType;
+import io.greptime.models.Metric;
+
+/**
+ * @author jiachun.fjc
+ */
+@Metric(name = "mem_metric")
+public class Memory {
+ @Column(name = "host", tag = true, dataType = DataType.String)
+ private String host;
+
+ @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
+ private long ts;
+
+ @Column(name = "mem_usage", dataType = DataType.Float64)
+ private double memUsage;
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public long getTs() {
+ return ts;
+ }
+
+ public void setTs(long ts) {
+ this.ts = ts;
+ }
+
+ public double getMemUsage() {
+ return memUsage;
+ }
+
+ public void setMemUsage(double memUsage) {
+ this.memUsage = memUsage;
+ }
+}
diff --git a/ingester-example/src/main/java/io/greptime/MyMetric1.java b/ingester-example/src/main/java/io/greptime/MyMetric1.java
deleted file mode 100644
index b29aaaf..0000000
--- a/ingester-example/src/main/java/io/greptime/MyMetric1.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2023 Greptime Team
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.greptime;
-
-import io.greptime.models.Column;
-import io.greptime.models.DataType;
-import io.greptime.models.Metric;
-import java.math.BigDecimal;
-
-/**
- * @author jiachun.fjc
- */
-@Metric(name = "my_metric1")
-public class MyMetric1 {
- @Column(name = "tag1", tag = true, dataType = DataType.String)
- private String tag1;
- @Column(name = "tag2", tag = true, dataType = DataType.String)
- private String tag2;
- @Column(name = "tag3", tag = true, dataType = DataType.String)
- private String tag3;
-
- @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
- private long ts;
-
- @Column(name = "field1", dataType = DataType.String)
- private String field1;
- @Column(name = "field2", dataType = DataType.Float64)
- private double field2;
- @Column(name = "field3", dataType = DataType.Decimal128)
- private BigDecimal field3;
- @Column(name = "field4", dataType = DataType.Int32)
- private int field4;
-
- public String getTag1() {
- return tag1;
- }
-
- public void setTag1(String tag1) {
- this.tag1 = tag1;
- }
-
- public String getTag2() {
- return tag2;
- }
-
- public void setTag2(String tag2) {
- this.tag2 = tag2;
- }
-
- public String getTag3() {
- return tag3;
- }
-
- public void setTag3(String tag3) {
- this.tag3 = tag3;
- }
-
- public long getTs() {
- return ts;
- }
-
- public void setTs(long ts) {
- this.ts = ts;
- }
-
- public String getField1() {
- return field1;
- }
-
- public void setField1(String field1) {
- this.field1 = field1;
- }
-
- public double getField2() {
- return field2;
- }
-
- public void setField2(double field2) {
- this.field2 = field2;
- }
-
- public BigDecimal getField3() {
- return field3;
- }
-
- public void setField3(BigDecimal field3) {
- this.field3 = field3;
- }
-
- public int getField4() {
- return field4;
- }
-
- public void setField4(int field4) {
- this.field4 = field4;
- }
-}
diff --git a/ingester-example/src/main/java/io/greptime/QueryJDBC.java b/ingester-example/src/main/java/io/greptime/QueryJDBC.java
new file mode 100644
index 0000000..5518178
--- /dev/null
+++ b/ingester-example/src/main/java/io/greptime/QueryJDBC.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.greptime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * @author jiachun.fjc
+ */
+public class QueryJDBC {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryJDBC.class);
+
+ public static void main(String[] args) throws Exception {
+ GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
+
+ // Inserts data for query
+ insertData(greptimeDB);
+
+ try (Connection conn = getConnection()) {
+ Statement statement = conn.createStatement();
+
+ // DESC table;
+ ResultSet rs = statement.executeQuery("DESC cpu_metric");
+ LOG.info("Column | Type | Key | Null | Default | Semantic Type ");
+ while (rs.next()) {
+ LOG.info("{} | {} | {} | {} | {} | {}", //
+ rs.getString(1), //
+ rs.getString(2), //
+ rs.getString(3), //
+ rs.getString(4), //
+ rs.getString(5), //
+ rs.getString(6));
+ }
+
+ // SELECT COUNT(*) FROM cpu_metric;
+ rs = statement.executeQuery("SELECT COUNT(*) FROM cpu_metric");
+ while (rs.next()) {
+ LOG.info("Count: {}", rs.getInt(1));
+ }
+
+ // SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5;
+ rs = statement.executeQuery("SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5");
+ LOG.info("host | ts | cpu_user | cpu_sys");
+ while (rs.next()) {
+ LOG.info("{} | {} | {} | {}", //
+ rs.getString("host"), //
+ rs.getTimestamp("ts"), //
+ rs.getDouble("cpu_user"), //
+ rs.getDouble("cpu_sys"));
+ }
+ }
+ }
+
+ public static Connection getConnection() throws IOException, ClassNotFoundException, SQLException {
+ Properties prop = new Properties();
+ prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties"));
+
+ String dbName = (String) prop.get("db.database-driver");
+
+ String dbConnUrl = (String) prop.get("db.url");
+ String dbUserName = (String) prop.get("db.username");
+ String dbPassword = (String) prop.get("db.password");
+
+ Class.forName(dbName);
+ Connection dbConn = DriverManager.getConnection(dbConnUrl, dbUserName, dbPassword);
+
+ return Objects.requireNonNull(dbConn, "Failed to make connection!");
+ }
+
+ public static void insertData(GreptimeDB greptimeDB) throws ExecutionException, InterruptedException {
+ List cpus = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ Cpu c = new Cpu();
+ c.setHost("127.0.0." + i);
+ c.setTs(System.currentTimeMillis());
+ c.setCpuUser(i + 0.1);
+ c.setCpuSys(i + 0.12);
+ cpus.add(c);
+ }
+ greptimeDB.writePOJOs(cpus).get();
+ }
+}
diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java
index ee743e1..60ab0f6 100644
--- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java
@@ -16,12 +16,9 @@
package io.greptime;
import io.greptime.models.WriteOk;
-import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -34,52 +31,35 @@ public class StreamWritePOJOsQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class);
public static void main(String[] args) throws ExecutionException, InterruptedException {
- // GreptimeDB has a default database named "public", we can use it as the test database
- String database = "public";
- // By default, GreptimeDB listens on port 4001 using the gRPC protocol.
- // We can provide multiple endpoints that point to the same GreptimeDB cluster.
- // The client will make calls to these endpoints based on a load balancing strategy.
- String[] endpoints = {"127.0.0.1:4001"};
- GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
- .build();
+ GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
- GreptimeDB greptimeDB = GreptimeDB.create(opts);
-
- List myMetric1s = new ArrayList<>();
+ List cpus = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- MyMetric1 m = new MyMetric1();
- m.setTag1("tag_value_1_" + i);
- m.setTag2("tag_value_2_" + i);
- m.setTag3("tag_value_3_" + i);
- m.setTs(System.currentTimeMillis());
- m.setField1("field_value_1_" + i);
- m.setField2(i);
- m.setField3(new BigDecimal(i));
- m.setField4(i);
-
- myMetric1s.add(m);
+ Cpu c = new Cpu();
+ c.setHost("127.0.0." + i);
+ c.setTs(System.currentTimeMillis());
+ c.setCpuUser(i + 0.1);
+ c.setCpuSys(i + 0.12);
+ cpus.add(c);
}
- List myMetric2s = new ArrayList<>();
+ List memories = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- MyMetric2 m = new MyMetric2();
- m.setTag1("tag_value_1_" + i);
- m.setTag2("tag_value_2_" + i);
+ Memory m = new Memory();
+ m.setHost("127.0.0." + i);
m.setTs(System.currentTimeMillis() / 1000);
- m.setField1(Calendar.getInstance().getTime());
- m.setField2(i);
-
- myMetric2s.add(m);
+ m.setMemUsage(i + 0.2);
+ memories.add(m);
}
StreamWriter, WriteOk> writer = greptimeDB.streamWriterPOJOs();
// write data into stream
- writer.write(myMetric1s);
- writer.write(myMetric2s);
+ writer.write(cpus);
+ writer.write(memories);
// delete the first 5 rows
- writer.write(myMetric1s.subList(0, 5), WriteOp.Delete);
+ writer.write(cpus.subList(0, 5), WriteOp.Delete);
// complete the stream
CompletableFuture future = writer.completed();
@@ -87,5 +67,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
WriteOk result = future.get();
LOG.info("Write result: {}", result);
+
+ // Shutdown the client when application exits.
+ greptimeDB.shutdownGracefully();
}
}
diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java
index 38c5b05..9808b2e 100644
--- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java
@@ -20,12 +20,8 @@
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
-import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
-import java.util.Calendar;
-import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -37,70 +33,47 @@ public class StreamWriteQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class);
public static void main(String[] args) throws ExecutionException, InterruptedException {
- // GreptimeDB has a default database named "public", we can use it as the test database
- String database = "public";
- // By default, GreptimeDB listens on port 4001 using the gRPC protocol.
- // We can provide multiple endpoints that point to the same GreptimeDB cluster.
- // The client will make calls to these endpoints based on a load balancing strategy.
- String[] endpoints = {"127.0.0.1:4001"};
- GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
- .build();
-
- GreptimeDB greptimeDB = GreptimeDB.create(opts);
+ GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
- TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") //
- .addColumn("tag1", SemanticType.Tag, DataType.String) //
- .addColumn("tag2", SemanticType.Tag, DataType.String) //
- .addColumn("tag3", SemanticType.Tag, DataType.String) //
+ TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
+ .addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("field1", SemanticType.Field, DataType.String) //
- .addColumn("field2", SemanticType.Field, DataType.Float64) //
- .addColumn("field3", SemanticType.Field, DataType.Decimal128) //
- .addColumn("field4", SemanticType.Field, DataType.Int32) //
+ .addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
+ .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
.build();
- TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") //
- .addColumn("tag1", SemanticType.Tag, DataType.String) //
- .addColumn("tag2", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) //
- .addColumn("field1", SemanticType.Field, DataType.Date) //
- .addColumn("field2", SemanticType.Field, DataType.Float64) //
+ TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
+ .addColumn("host", SemanticType.Tag, DataType.String) //
+ .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
+ .addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
.build();
- Table myMetric3 = Table.from(myMetric3Schema);
- Table myMetric4 = Table.from(myMetric4Schema);
+ Table cpuMetric = Table.from(cpuMetricSchema);
+ Table memMetric = Table.from(memMetricSchema);
for (int i = 0; i < 10; i++) {
- String tag1v = "tag_value_1_" + i;
- String tag2v = "tag_value_2_" + i;
- String tag3v = "tag_value_3_" + i;
+ String host = "127.0.0." + i;
long ts = System.currentTimeMillis();
- String field1 = "field_value_1" + i;
- double field2 = i + 0.1;
- BigDecimal field3 = new BigDecimal(i);
- int field4 = i + 1;
-
- myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
+ double cpuUser = i + 0.1;
+ double cpuSys = i + 0.12;
+ cpuMetric.addRow(host, ts, cpuUser, cpuSys);
}
for (int i = 0; i < 10; i++) {
- String tag1v = "tag_value_1_" + i;
- String tag2v = "tag_value_2_" + i;
- long ts = System.currentTimeMillis() / 1000;
- Date field1 = Calendar.getInstance().getTime();
- double field2 = i + 0.1;
-
- myMetric4.addRow(tag1v, tag2v, ts, field1, field2);
+ String host = "127.0.0." + i;
+ long ts = System.currentTimeMillis();
+ double memUsage = i + 0.2;
+ memMetric.addRow(host, ts, memUsage);
}
StreamWriter writer = greptimeDB.streamWriter();
// write data into stream
- writer.write(myMetric3);
- writer.write(myMetric4);
+ writer.write(cpuMetric);
+ writer.write(memMetric);
// delete the first 5 rows
- writer.write(myMetric3.subRange(0, 5), WriteOp.Delete);
+ writer.write(cpuMetric.subRange(0, 5), WriteOp.Delete);
// complete the stream
CompletableFuture future = writer.completed();
@@ -108,5 +81,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
WriteOk result = future.get();
LOG.info("Write result: {}", result);
+
+ // Shutdown the client when application exits.
+ greptimeDB.shutdownGracefully();
}
}
diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java
new file mode 100644
index 0000000..c5ac637
--- /dev/null
+++ b/ingester-example/src/main/java/io/greptime/TestConnector.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.greptime;
+
+import io.greptime.common.util.SerializingExecutor;
+import io.greptime.limit.LimitedPolicy;
+import io.greptime.models.AuthInfo;
+import io.greptime.options.GreptimeOptions;
+import io.greptime.rpc.RpcOptions;
+
+/**
+ * @author jiachun.fjc
+ */
+public class TestConnector {
+
+ public static GreptimeDB connectToDefaultDB() {
+ // GreptimeDB has a default database named "public" in the default catalog "greptime",
+ // we can use it as the test database
+ String database = "public";
+ // By default, GreptimeDB listens on port 4001 using the gRPC protocol.
+ // We can provide multiple endpoints that point to the same GreptimeDB cluster.
+ // The client will make calls to these endpoints based on a load balancing strategy.
+ String[] endpoints = {"127.0.0.1:4001"};
+ GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
+ // Optional, the default value is fine.
+ //
+ // Asynchronous thread pool, which is used to handle various asynchronous
+ // tasks in the SDK (You are using a purely asynchronous SDK). If you do not
+ // set it, there will be a default implementation, which you can reconfigure
+ // if the default implementation is not satisfied.
+ // The default implementation is: `SerializingExecutor`
+ .asyncPool(new SerializingExecutor("asyncPool"))
+ // Optional, the default value is fine.
+ //
+ // Sets the RPC options, in general, the default configuration is fine.
+ .rpcOptions(RpcOptions.newDefault())
+ // Optional, the default value is fine.
+ //
+ // In some case of failure, a retry of write can be attempted.
+ // The default is 1
+ .writeMaxRetries(1)
+ // Optional, the default value is fine.
+ //
+ // Write flow limit: maximum number of data rows in-flight. It does not take effect on `StreamWriter`
+ // The default is 65536
+ .maxInFlightWriteRows(65536)
+ // Optional, the default value is fine.
+ //
+ // Write flow limit: the policy to use when the write flow limit is exceeded.
+ // All options:
+ // - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full.
+ // - `LimitedPolicy.AbortPolicy`: abort if the limiter is full.
+ // - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full.
+ // - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full.
+ // The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy`
+ .writeLimitedPolicy(LimitedPolicy.defaultWriteLimitedPolicy())
+ // Optional, the default value is fine.
+ //
+ // The default rate limit for stream writer. It only takes effect when we do not specify the
+ // `maxPointsPerSecond` when creating a `StreamWriter`.
+ // The default is 10 * 65536
+ .defaultStreamMaxWritePointsPerSecond(10 * 65536)
+ // Optional, the default value is fine.
+ //
+ // Refresh frequency of route tables. The background refreshes all route tables
+ // periodically. By default, the route tables will not be refreshed.
+ .routeTableRefreshPeriodSeconds(-1)
+ // Optional, the default value is fine.
+ //
+ // Sets the request router, The internal default implementation works well.
+ // You don't need to set it unless you have special requirements.
+ .router(null)
+ // Sets authentication information. If the DB is not required to authenticate, we can ignore this.
+ .authInfo(AuthInfo.noAuthorization())
+ // A good start ^_^
+ .build();
+
+ return GreptimeDB.create(opts);
+ }
+}
diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java
index 7c20d16..a0288dd 100644
--- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java
@@ -18,13 +18,10 @@
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.WriteOk;
-import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -37,50 +34,31 @@ public class WritePOJOsQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class);
public static void main(String[] args) throws ExecutionException, InterruptedException {
- // GreptimeDB has a default database named "public", we can use it as the test database
- String database = "public";
- // By default, GreptimeDB listens on port 4001 using the gRPC protocol.
- // We can provide multiple endpoints that point to the same GreptimeDB cluster.
- // The client will make calls to these endpoints based on a load balancing strategy.
- String[] endpoints = {"127.0.0.1:4001"};
- GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
- .build();
+ GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
- GreptimeDB greptimeDB = GreptimeDB.create(opts);
-
- List myMetric1s = new ArrayList<>();
+ List cpus = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- MyMetric1 m = new MyMetric1();
- m.setTag1("tag_value_1_" + i);
- m.setTag2("tag_value_2_" + i);
- m.setTag3("tag_value_3_" + i);
- m.setTs(System.currentTimeMillis());
- m.setField1("field_value_1_" + i);
- m.setField2(i);
- m.setField3(new BigDecimal(i));
- m.setField4(i);
-
- myMetric1s.add(m);
+ Cpu c = new Cpu();
+ c.setHost("127.0.0." + i);
+ c.setTs(System.currentTimeMillis());
+ c.setCpuUser(i + 0.1);
+ c.setCpuSys(i + 0.12);
+ cpus.add(c);
}
- List myMetric2s = new ArrayList<>();
+ List memories = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- MyMetric2 m = new MyMetric2();
- m.setTag1("tag_value_1_" + i);
- m.setTag2("tag_value_2_" + i);
- m.setTs(System.currentTimeMillis() / 1000);
- m.setField1(Calendar.getInstance().getTime());
- m.setField2(i);
-
- myMetric2s.add(m);
+ Memory m = new Memory();
+ m.setHost("127.0.0." + i);
+ m.setTs(System.currentTimeMillis());
+ m.setMemUsage(i + 0.2);
+ memories.add(m);
}
- List> pojos = Arrays.asList(myMetric1s, myMetric2s);
-
// For performance reasons, the SDK is designed to be purely asynchronous.
// The return value is a future object. If you want to immediately obtain
// the result, you can call `future.get()`.
- CompletableFuture> puts = greptimeDB.writePOJOs(pojos);
+ CompletableFuture> puts = greptimeDB.writePOJOs(cpus, memories);
Result result = puts.get();
@@ -90,7 +68,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
LOG.error("Failed to write: {}", result.getErr());
}
- List> delete_pojos = Arrays.asList(myMetric1s.subList(0, 5), myMetric2s.subList(0, 5));
+ List> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5));
Result deletes = greptimeDB.writePOJOs(delete_pojos, WriteOp.Delete).get();
if (deletes.isOk()) {
@@ -98,5 +76,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
} else {
LOG.error("Failed to delete: {}", result.getErr());
}
+
+ // Shutdown the client when application exits.
+ greptimeDB.shutdownGracefully();
}
}
diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java
index 6b6d6d0..ffab47f 100644
--- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java
@@ -22,14 +22,9 @@
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
-import io.greptime.options.GreptimeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -42,68 +37,43 @@ public class WriteQuickStart {
private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class);
public static void main(String[] args) throws ExecutionException, InterruptedException {
- // GreptimeDB has a default database named "public", we can use it as the test database
- String database = "public";
- // By default, GreptimeDB listens on port 4001 using the gRPC protocol.
- // We can provide multiple endpoints that point to the same GreptimeDB cluster.
- // The client will make calls to these endpoints based on a load balancing strategy.
- String[] endpoints = {"127.0.0.1:4001"};
- GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
- .build();
-
- GreptimeDB greptimeDB = GreptimeDB.create(opts);
+ GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
- TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") //
- .addColumn("tag1", SemanticType.Tag, DataType.String) //
- .addColumn("tag2", SemanticType.Tag, DataType.String) //
- .addColumn("tag3", SemanticType.Tag, DataType.String) //
+ TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
+ .addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("field1", SemanticType.Field, DataType.String) //
- .addColumn("field2", SemanticType.Field, DataType.Float64) //
- .addColumn("field3", SemanticType.Field, DataType.Decimal128) //
- .addColumn("field4", SemanticType.Field, DataType.Int32) //
+ .addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
+ .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
.build();
- TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") //
- .addColumn("tag1", SemanticType.Tag, DataType.String) //
- .addColumn("tag2", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) //
- .addColumn("field1", SemanticType.Field, DataType.Date) //
- .addColumn("field2", SemanticType.Field, DataType.Float64) //
+ TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
+ .addColumn("host", SemanticType.Tag, DataType.String) //
+ .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
+ .addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
.build();
- Table myMetric3 = Table.from(myMetric3Schema);
- Table myMetric4 = Table.from(myMetric4Schema);
+ Table cpuMetric = Table.from(cpuMetricSchema);
+ Table memMetric = Table.from(memMetricSchema);
for (int i = 0; i < 10; i++) {
- String tag1v = "tag_value_1_" + i;
- String tag2v = "tag_value_2_" + i;
- String tag3v = "tag_value_3_" + i;
+ String host = "127.0.0." + i;
long ts = System.currentTimeMillis();
- String field1 = "field_value_1" + i;
- double field2 = i + 0.1;
- BigDecimal field3 = new BigDecimal(i);
- int field4 = i + 1;
-
- myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
+ double cpuUser = i + 0.1;
+ double cpuSys = i + 0.12;
+ cpuMetric.addRow(host, ts, cpuUser, cpuSys);
}
for (int i = 0; i < 10; i++) {
- String tag1v = "tag_value_1_" + i;
- String tag2v = "tag_value_2_" + i;
- long ts = System.currentTimeMillis() / 1000;
- Date field1 = Calendar.getInstance().getTime();
- double field2 = i + 0.1;
-
- myMetric4.addRow(tag1v, tag2v, ts, field1, field2);
+ String host = "127.0.0." + i;
+ long ts = System.currentTimeMillis();
+ double memUsage = i + 0.2;
+ memMetric.addRow(host, ts, memUsage);
}
- Collection tables = Arrays.asList(myMetric3, myMetric4);
-
// For performance reasons, the SDK is designed to be purely asynchronous.
// The return value is a future object. If you want to immediately obtain
// the result, you can call `future.get()`.
- CompletableFuture> future = greptimeDB.write(tables);
+ CompletableFuture> future = greptimeDB.write(cpuMetric, memMetric);
Result result = future.get();
@@ -113,7 +83,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
LOG.error("Failed to write: {}", result.getErr());
}
- List delete_objs = Arrays.asList(myMetric3.subRange(0, 5), myMetric4.subRange(0, 5));
+ List delete_objs = Arrays.asList(cpuMetric.subRange(0, 5), memMetric.subRange(0, 5));
Result deletes = greptimeDB.write(delete_objs, WriteOp.Delete).get();
if (deletes.isOk()) {
@@ -121,5 +91,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
} else {
LOG.error("Failed to delete: {}", result.getErr());
}
+
+ // Shutdown the client when application exits.
+ greptimeDB.shutdownGracefully();
}
}
diff --git a/ingester-example/src/main/resources/db-connection.properties b/ingester-example/src/main/resources/db-connection.properties
new file mode 100644
index 0000000..9e47c11
--- /dev/null
+++ b/ingester-example/src/main/resources/db-connection.properties
@@ -0,0 +1,5 @@
+# DataSource
+db.database-driver=com.mysql.cj.jdbc.Driver
+db.url=jdbc:mysql://localhost:4002/public
+db.username=
+db.password=
\ No newline at end of file
diff --git a/ingester-grpc/pom.xml b/ingester-grpc/pom.xml
index fc55419..05198f0 100644
--- a/ingester-grpc/pom.xml
+++ b/ingester-grpc/pom.xml
@@ -4,7 +4,7 @@
greptimedb-ingester
io.greptime
- 0.4.0
+ 0.5.0
ingester-grpc
diff --git a/ingester-protocol/pom.xml b/ingester-protocol/pom.xml
index f3c0ae6..e46555b 100644
--- a/ingester-protocol/pom.xml
+++ b/ingester-protocol/pom.xml
@@ -4,7 +4,7 @@
greptimedb-ingester
io.greptime
- 0.4.0
+ 0.5.0
ingester-protocol
diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
index 41b2f1a..817fc0f 100644
--- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
+++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
@@ -21,8 +21,8 @@
import io.greptime.common.Endpoint;
import io.greptime.common.Lifecycle;
import io.greptime.common.signal.SignalHandlersLoader;
-import io.greptime.common.util.MetricExecutor;
import io.greptime.common.util.MetricsUtil;
+import io.greptime.common.util.Strings;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.Table;
@@ -42,7 +42,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -70,7 +69,6 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle,
private GreptimeOptions opts;
private RouterClient routerClient;
private WriteClient writeClient;
- private Executor asyncPool;
/**
* Returns all instances of {@link GreptimeDB}.
@@ -82,8 +80,9 @@ public static List instances() {
public static GreptimeDB create(GreptimeOptions opts) {
GreptimeDB greptimeDB = new GreptimeDB();
if (!greptimeDB.init(opts)) {
- throw new RuntimeException("Failed to start GreptimeDB client");
+ throw new RuntimeException("Failed to start the GreptimeDB client");
}
+ LOG.info("GreptimeDB client started: {}", greptimeDB);
return greptimeDB;
}
@@ -99,11 +98,12 @@ public boolean init(GreptimeOptions opts) {
this.opts = GreptimeOptions.checkSelf(opts).copy();
- this.routerClient = makeRouteClient(opts);
- if (this.asyncPool != null) {
- this.asyncPool = new MetricExecutor(this.asyncPool, "async_pool.time");
+ if (Strings.isBlank(this.opts.getDatabase())) {
+ LOG.warn("The `database` is not specified, use default (catalog-database): greptime-public");
}
- this.writeClient = makeWriteClient(opts, this.routerClient, this.asyncPool);
+
+ this.routerClient = makeRouteClient(opts);
+ this.writeClient = makeWriteClient(opts, this.routerClient);
INSTANCES.put(this.id, this);
@@ -184,8 +184,10 @@ public void display(Printer out) {
.println(VERSION) //
.print("endpoints=") //
.println(this.opts.getEndpoints()) //
- .print("userAsyncPool=") //
- .println(this.opts.getAsyncPool());
+ .print("database=") //
+ .println(this.opts.getDatabase()) //
+ .print("rpcOptions=") //
+ .println(this.opts.getRpcOptions());
if (this.routerClient != null) {
out.println("");
@@ -208,7 +210,6 @@ public String toString() {
", opts=" + opts + //
", routerClient=" + routerClient + //
", writeClient=" + writeClient + //
- ", asyncPool=" + asyncPool + //
'}';
}
@@ -222,7 +223,7 @@ private static RpcClient makeRpcClient(GreptimeOptions opts) {
RpcOptions rpcOpts = opts.getRpcOptions();
RpcClient rpcClient = RpcFactoryProvider.getRpcFactory().createRpcClient();
if (!rpcClient.init(rpcOpts)) {
- throw new IllegalStateException("Fail to start RPC client");
+ throw new IllegalStateException("Failed to start the RPC client");
}
rpcClient.registerConnectionObserver(new RpcConnectionObserver());
return rpcClient;
@@ -233,21 +234,17 @@ private static RouterClient makeRouteClient(GreptimeOptions opts) {
routerOpts.setRpcClient(makeRpcClient(opts));
RouterClient routerClient = new RouterClient();
if (!routerClient.init(routerOpts)) {
- throw new IllegalStateException("Fail to start router client");
+ throw new IllegalStateException("Failed to start the router client");
}
return routerClient;
}
- private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient routerClient, Executor asyncPool) {
+ private static WriteClient makeWriteClient(GreptimeOptions opts, RouterClient routerClient) {
WriteOptions writeOpts = opts.getWriteOptions();
writeOpts.setRouterClient(routerClient);
- writeOpts.setAsyncPool(asyncPool);
WriteClient writeClient = new WriteClient();
- if (opts.getAuthInfo() != null) {
- writeOpts.setAuthInfo(opts.getAuthInfo());
- }
if (!writeClient.init(writeOpts)) {
- throw new IllegalStateException("Fail to start write client");
+ throw new IllegalStateException("Failed to start the write client failed");
}
return writeClient;
}
diff --git a/ingester-protocol/src/main/java/io/greptime/Router.java b/ingester-protocol/src/main/java/io/greptime/Router.java
index 928ef84..8d13315 100644
--- a/ingester-protocol/src/main/java/io/greptime/Router.java
+++ b/ingester-protocol/src/main/java/io/greptime/Router.java
@@ -15,6 +15,7 @@
*/
package io.greptime;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@@ -22,7 +23,7 @@
*
* @author jiachun.fjc
*/
-public interface Router {
+public interface Router {
/**
* For a given request return the routing decision for the call.
@@ -30,5 +31,20 @@ public interface Router {
* @param request route request
* @return a endpoint for the call
*/
- CompletableFuture routeFor(Req request);
+ CompletableFuture routeFor(R request);
+
+ /**
+ * Refresh the routing table from remote server.
+ * @return a future that will be completed when the refresh is done
+ */
+ CompletableFuture refresh();
+
+ /**
+ * Refresh the routing table.
+ * We need to get all the endpoints, and this method will overwrite all
+ * current endpoints.
+ *
+ * @param endpoints all new endpoints
+ */
+ void onRefresh(List endpoints);
}
diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java
index 5836127..d44bb92 100644
--- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java
+++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java
@@ -49,7 +49,7 @@ public class RouterClient implements Lifecycle, Display {
private ScheduledExecutorService refresher;
private RouterOptions opts;
private RpcClient rpcClient;
- private InnerRouter inner;
+ private Router router;
@Override
public boolean init(RouterOptions opts) {
@@ -58,14 +58,20 @@ public boolean init(RouterOptions opts) {
List endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`");
- this.inner = new InnerRouter();
- this.inner.refreshLocal(endpoints);
+ this.router = new DefaultRouter();
+ this.router.onRefresh(endpoints);
long refreshPeriod = this.opts.getRefreshPeriodSeconds();
if (refreshPeriod > 0) {
this.refresher = REFRESHER_POOL.getObject();
this.refresher.scheduleWithFixedDelay(
- () -> this.inner.refreshFromRemote(),
+ () -> this.router.refresh().whenComplete((r, e) -> {
+ if (e != null) {
+ LOG.error("Router cache refresh failed.", e);
+ } else {
+ LOG.debug("Router cache refresh {}.", r ? "success" : "failed");
+ }
+ }),
Util.randomInitialDelay(180), refreshPeriod, TimeUnit.SECONDS);
LOG.info("Router cache refresher started.");
@@ -90,7 +96,7 @@ public void shutdownGracefully() {
* Get the current routing table.
*/
public CompletableFuture route() {
- return this.inner.routeFor(null);
+ return this.router.routeFor(null);
}
/**
@@ -211,18 +217,10 @@ public String toString() {
* the client send request using a rr or random policy, and frontend server needs to
* be able to return the member list for the purpose of frontend server members change.
*/
- private static class InnerRouter implements Router {
+ private static class DefaultRouter implements Router {
private final AtomicReference> endpointsRef = new AtomicReference<>();
- public void refreshFromRemote() {
- // TODO
- }
-
- void refreshLocal(List input) {
- this.endpointsRef.set(input);
- }
-
@Override
public CompletableFuture routeFor(Void request) {
List endpoints = this.endpointsRef.get();
@@ -230,5 +228,16 @@ public CompletableFuture routeFor(Void request) {
int i = random.nextInt(0, endpoints.size());
return Util.completedCf(endpoints.get(i));
}
+
+ @Override
+ public CompletableFuture refresh() {
+ // always return true
+ return Util.completedCf(true);
+ }
+
+ @Override
+ public void onRefresh(List endpoints) {
+ this.endpointsRef.set(endpoints);
+ }
}
}
diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java
index 853b3a1..bf058a4 100644
--- a/ingester-protocol/src/main/java/io/greptime/Write.java
+++ b/ingester-protocol/src/main/java/io/greptime/Write.java
@@ -21,6 +21,7 @@
import io.greptime.models.WriteOk;
import io.greptime.rpc.Context;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,12 @@
* @author jiachun.fjc
*/
public interface Write {
+ /**
+ * @see #write(Collection, WriteOp, Context)
+ */
+ default CompletableFuture> write(Table... tables) {
+ return write(Arrays.asList(tables));
+ }
/**
* @see #write(Collection, WriteOp, Context)
diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java
index bc4e73d..83fa262 100644
--- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java
+++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java
@@ -24,6 +24,7 @@
import io.greptime.common.Lifecycle;
import io.greptime.common.util.Clock;
import io.greptime.common.util.Ensures;
+import io.greptime.common.util.MetricExecutor;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.SerializingExecutor;
import io.greptime.errors.LimitedException;
@@ -69,6 +70,7 @@ public boolean init(WriteOptions opts) {
this.routerClient = this.opts.getRouterClient();
Executor pool = this.opts.getAsyncPool();
this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client");
+ this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time");
this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy());
return true;
}
@@ -338,11 +340,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(Table table, WriteOp writeOp) {
Ensures.ensureNonNull(table, "null `table`");
+ WriteTables writeTables = new WriteTables(table, writeOp);
+
if (this.rateLimiter != null) {
double timeSpent = this.rateLimiter.acquire(table.pointCount());
InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent);
}
- this.observer.onNext(new WriteTables(table, writeOp));
+
+ this.observer.onNext(writeTables);
return this;
}
}
diff --git a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java
index b683877..ef440cc 100644
--- a/ingester-protocol/src/main/java/io/greptime/WritePOJO.java
+++ b/ingester-protocol/src/main/java/io/greptime/WritePOJO.java
@@ -20,6 +20,7 @@
import io.greptime.models.WriteOk;
import io.greptime.rpc.Context;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,12 @@
* @author jiachun.fjc
*/
public interface WritePOJO {
+ /**
+ * @see #writePOJOs(Collection, WriteOp, Context)
+ */
+ default CompletableFuture> writePOJOs(List>... pojos) {
+ return writePOJOs(Arrays.asList(pojos));
+ }
/**
* @see #writePOJOs(Collection, WriteOp, Context)
*/
diff --git a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java
index bd01595..e5d27c4 100644
--- a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java
+++ b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java
@@ -25,8 +25,8 @@
*/
public class AuthInfo implements Into {
- private String username;
- private String password;
+ private final String username;
+ private final String password;
/**
* Create AuthInfo from username/password.
@@ -36,27 +36,15 @@ public AuthInfo(String username, String password) {
this.password = password;
}
- public String getUsername() {
- return this.username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return this.password;
- }
-
- public void setPassword(String password) {
- this.password = password;
+ public static AuthInfo noAuthorization() {
+ return null;
}
@Override
public Common.AuthHeader into() {
Common.Basic basic = Common.Basic.newBuilder() //
- .setUsername(getUsername()) //
- .setPassword(getPassword()) //
+ .setUsername(this.username) //
+ .setPassword(this.password) //
.build();
return Common.AuthHeader.newBuilder() //
.setBasic(basic) //
diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java
index 6ea992e..c029125 100644
--- a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java
+++ b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java
@@ -16,6 +16,7 @@
package io.greptime.models;
import io.greptime.WriteOp;
+import io.greptime.common.util.Ensures;
import io.greptime.v1.Common;
import io.greptime.v1.Database;
import java.util.Collection;
@@ -41,6 +42,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables
case Insert:
Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder();
for (Table t : tables) {
+ Ensures.ensure(t.pointCount() > 0, "No data to insert in table: %s", t.tableName());
insertBuilder.addInserts(t.intoRowInsertRequest());
}
return Database.GreptimeRequest.newBuilder() //
@@ -50,6 +52,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables
case Delete:
Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder();
for (Table t : tables) {
+ Ensures.ensure(t.pointCount() > 0, "No data to delete in table: %s", t.tableName());
deleteBuilder.addDeletes(t.intoRowDeleteRequest());
}
return Database.GreptimeRequest.newBuilder() //
diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java
index 739debf..310f602 100644
--- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java
+++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java
@@ -15,6 +15,7 @@
*/
package io.greptime.options;
+import io.greptime.Router;
import io.greptime.common.Copiable;
import io.greptime.common.Endpoint;
import io.greptime.common.util.Ensures;
@@ -35,12 +36,10 @@
*/
public class GreptimeOptions implements Copiable {
private List endpoints;
- private Executor asyncPool;
private RpcOptions rpcOptions;
private RouterOptions routerOptions;
private WriteOptions writeOptions;
private String database;
- private AuthInfo authInfo;
public List getEndpoints() {
return endpoints;
@@ -50,14 +49,6 @@ public void setEndpoints(List endpoints) {
this.endpoints = endpoints;
}
- public Executor getAsyncPool() {
- return asyncPool;
- }
-
- public void setAsyncPool(Executor asyncPool) {
- this.asyncPool = asyncPool;
- }
-
public RpcOptions getRpcOptions() {
return rpcOptions;
}
@@ -90,21 +81,11 @@ public void setDatabase(String database) {
this.database = database;
}
- public AuthInfo getAuthInfo() {
- return authInfo;
- }
-
- public void setAuthInfo(AuthInfo authInfo) {
- this.authInfo = authInfo;
- }
-
@Override
public GreptimeOptions copy() {
GreptimeOptions opts = new GreptimeOptions();
opts.endpoints = new ArrayList<>(this.endpoints);
- opts.asyncPool = this.asyncPool;
opts.database = this.database;
- opts.authInfo = this.authInfo;
if (this.rpcOptions != null) {
opts.rpcOptions = this.rpcOptions.copy();
}
@@ -121,12 +102,10 @@ public GreptimeOptions copy() {
public String toString() {
return "GreptimeOptions{" + //
"endpoints=" + endpoints + //
- ", asyncPool=" + asyncPool + //
", rpcOptions=" + rpcOptions + //
", routerOptions=" + routerOptions + //
", writeOptions=" + writeOptions + //
", database='" + database + '\'' + //
- ", authInfo=" + authInfo + //
'}';
}
@@ -137,6 +116,7 @@ public static GreptimeOptions checkSelf(GreptimeOptions opts) {
Ensures.ensureNonNull(opts.getRpcOptions(), "null `rpcOptions`");
Ensures.ensureNonNull(opts.getRouterOptions(), "null `routerOptions`");
Ensures.ensureNonNull(opts.getWriteOptions(), "null `writeOptions`");
+
return opts;
}
@@ -174,6 +154,8 @@ public static final class Builder {
private long routeTableRefreshPeriodSeconds = -1;
// Authentication information
private AuthInfo authInfo;
+ // The request router
+ private Router router;
public Builder(List endpoints, String database) {
this.endpoints.addAll(endpoints);
@@ -230,7 +212,13 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) {
}
/**
- * Set write limited policy.
+ * Write flow limit: the policy to use when the write flow limit is exceeded.
+ * The options:
+ * - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full.
+ * - `LimitedPolicy.AbortPolicy`: abort if the limiter is full.
+ * - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full.
+ * - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full.
+ * The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy`
*
* @param writeLimitedPolicy write limited policy
* @return this builder
@@ -241,7 +229,10 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) {
}
/**
- * The default rate limit for stream writer.
+ * The default rate limit for `StreamWriter`. It only takes effect when we do not specify the
+ * `maxPointsPerSecond` when creating a `StreamWriter`.
+ * The default is 10 * 65536
+ *
* @param defaultStreamMaxWritePointsPerSecond default max write points per second
* @return this builder
*/
@@ -252,7 +243,7 @@ public Builder defaultStreamMaxWritePointsPerSecond(int defaultStreamMaxWritePoi
/**
* Refresh frequency of route tables. The background refreshes all route tables
- * periodically. By default, all route tables are refreshed every 30 seconds.
+ * periodically. By default, By default, the route tables will not be refreshed.
*
* @param routeTableRefreshPeriodSeconds refresh period for route tables cache
* @return this builder
@@ -263,7 +254,8 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond
}
/**
- * Sets authentication information.
+ * Sets authentication information. If the DB is not required to authenticate,
+ * we can ignore this.
*
* @param authInfo the authentication information
* @return this builder
@@ -273,6 +265,18 @@ public Builder authInfo(AuthInfo authInfo) {
return this;
}
+ /**
+ * Sets the request router, The internal default implementation works well.
+ * You don't need to set it unless you have special requirements.
+ *
+ * @param router the request router
+ * @return this builder
+ */
+ public Builder router(Router router) {
+ this.router = router;
+ return this;
+ }
+
/**
* A good start, happy coding.
*
@@ -281,23 +285,22 @@ public Builder authInfo(AuthInfo authInfo) {
public GreptimeOptions build() {
GreptimeOptions opts = new GreptimeOptions();
opts.setEndpoints(this.endpoints);
- opts.setAsyncPool(this.asyncPool);
opts.setRpcOptions(this.rpcOptions);
opts.setDatabase(this.database);
- opts.setAuthInfo(this.authInfo);
- opts.setRouterOptions(createRouterOptions());
- opts.setWriteOptions(createWriteOptions());
+ opts.setRouterOptions(routerOptions());
+ opts.setWriteOptions(writeOptions());
return GreptimeOptions.checkSelf(opts);
}
- private RouterOptions createRouterOptions() {
+ private RouterOptions routerOptions() {
RouterOptions routerOpts = new RouterOptions();
routerOpts.setEndpoints(this.endpoints);
+ routerOpts.setRouter(this.router);
routerOpts.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds);
return routerOpts;
}
- private WriteOptions createWriteOptions() {
+ private WriteOptions writeOptions() {
WriteOptions writeOpts = new WriteOptions();
writeOpts.setDatabase(this.database);
writeOpts.setAuthInfo(this.authInfo);
diff --git a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java
index e978b7e..3caab4f 100644
--- a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java
+++ b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java
@@ -15,6 +15,7 @@
*/
package io.greptime.options;
+import io.greptime.Router;
import io.greptime.common.Copiable;
import io.greptime.common.Endpoint;
import io.greptime.rpc.RpcClient;
@@ -34,6 +35,7 @@ public class RouterOptions implements Copiable {
// all route tables periodically. If the value is less than or
// equal to 0, the route tables will not be refreshed.
private long refreshPeriodSeconds = -1;
+ private Router router;
public RpcClient getRpcClient() {
return rpcClient;
@@ -59,12 +61,21 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) {
this.refreshPeriodSeconds = refreshPeriodSeconds;
}
+ public Router getRouter() {
+ return router;
+ }
+
+ public void setRouter(Router router) {
+ this.router = router;
+ }
+
@Override
public RouterOptions copy() {
RouterOptions opts = new RouterOptions();
opts.rpcClient = rpcClient;
opts.endpoints = this.endpoints;
opts.refreshPeriodSeconds = this.refreshPeriodSeconds;
+ opts.router = this.router;
return opts;
}
diff --git a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java
index 5624dbb..cdfd7de 100644
--- a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java
+++ b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java
@@ -118,9 +118,9 @@ public WriteOptions copy() {
@Override
public String toString() {
+ // Do not print auto info
return "WriteOptions{" + //
"database='" + database + '\'' + //
- ", authInfo=" + authInfo + //
", routerClient=" + routerClient + //
", asyncPool=" + asyncPool + //
", maxRetries=" + maxRetries + //
diff --git a/ingester-protocol/src/main/resources/client_version.properties b/ingester-protocol/src/main/resources/client_version.properties
index 2777b47..269f518 100644
--- a/ingester-protocol/src/main/resources/client_version.properties
+++ b/ingester-protocol/src/main/resources/client_version.properties
@@ -1 +1 @@
-client.version=0.4.0
+client.version=0.5.0
diff --git a/ingester-protocol/src/test/java/io/greptime/UtilTest.java b/ingester-protocol/src/test/java/io/greptime/UtilTest.java
index ce6f6a3..3d58056 100644
--- a/ingester-protocol/src/test/java/io/greptime/UtilTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/UtilTest.java
@@ -26,6 +26,6 @@ public class UtilTest {
@Test
public void testClientVersion() {
String ver = Util.clientVersion();
- Assert.assertEquals("0.4.0", ver);
+ Assert.assertEquals("0.5.0", ver);
}
}
diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
index 3f4b3da..09eaa3b 100644
--- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
@@ -105,7 +105,7 @@ public void testWriteSuccess() throws ExecutionException, InterruptedException {
Mockito.when(this.routerClient.invoke(Mockito.eq(addr), Mockito.any(), Mockito.any())) //
.thenReturn(Util.completedCf(response));
- Result res = this.writeClient.write(Collections.singleton(table)).get();
+ Result res = this.writeClient.write(table).get();
Assert.assertTrue(res.isOk());
Assert.assertEquals(3, res.getOk().getSuccess());
diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
new file mode 100644
index 0000000..3252acb
--- /dev/null
+++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.greptime.options;
+
+import io.greptime.Router;
+import io.greptime.limit.LimitedPolicy;
+import io.greptime.models.AuthInfo;
+import io.greptime.rpc.RpcOptions;
+import org.junit.Assert;
+import org.junit.Test;
+import io.greptime.common.Endpoint;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * @author jiachun.fjc
+ */
+public class GreptimeOptionsTest {
+
+ @Test
+ public void testAllOptions() {
+ String database = "greptime.public";
+ String[] endpoints = {"127.0.0.1:4001"};
+ Executor asyncPool = command -> System.out.println("asyncPool");
+ RpcOptions rpcOptions = RpcOptions.newDefault();
+ int writeMaxRetries = 2;
+ int maxInFlightWriteRows = 999;
+ LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy();
+ int defaultStreamMaxWritePointsPerSecond = 100000;
+ long routeTableRefreshPeriodSeconds = 99;
+ AuthInfo authInfo = new AuthInfo("user", "password");
+ Router router = createTestRouter();
+
+ GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) //
+ .asyncPool(asyncPool) //
+ .rpcOptions(rpcOptions) //
+ .writeMaxRetries(writeMaxRetries) //
+ .maxInFlightWriteRows(maxInFlightWriteRows) //
+ .writeLimitedPolicy(limitedPolicy) //
+ .defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) //
+ .routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) //
+ .authInfo(authInfo) //
+ .router(router) //
+ .build();
+
+ Assert.assertEquals(database, opts.getDatabase());
+ Assert.assertArrayEquals(endpoints, opts.getEndpoints().stream().map(Endpoint::toString).toArray());
+ Assert.assertEquals(rpcOptions, opts.getRpcOptions());
+
+ RouterOptions routerOptions = opts.getRouterOptions();
+ Assert.assertNotNull(routerOptions);
+ Assert.assertArrayEquals(endpoints, routerOptions.getEndpoints().stream().map(Endpoint::toString).toArray());
+ Assert.assertEquals(router, routerOptions.getRouter());
+ Assert.assertEquals(routeTableRefreshPeriodSeconds, routerOptions.getRefreshPeriodSeconds());
+
+ WriteOptions writeOptions = opts.getWriteOptions();
+ Assert.assertNotNull(writeOptions);
+ Assert.assertEquals(asyncPool, writeOptions.getAsyncPool());
+ Assert.assertEquals(writeMaxRetries, writeOptions.getMaxRetries());
+ Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWriteRows());
+ Assert.assertEquals(limitedPolicy, writeOptions.getLimitedPolicy());
+ Assert.assertEquals(defaultStreamMaxWritePointsPerSecond, writeOptions.getDefaultStreamMaxWritePointsPerSecond());
+ Assert.assertEquals(authInfo, writeOptions.getAuthInfo());
+ }
+
+ private Router createTestRouter() {
+ return new Router() {
+
+ @Override
+ public CompletableFuture routeFor(Void request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture refresh() {
+ return null;
+ }
+
+ @Override
+ public void onRefresh(List endpoints) {}
+ };
+ }
+}
diff --git a/ingester-rpc/pom.xml b/ingester-rpc/pom.xml
index cb4f191..57d7411 100644
--- a/ingester-rpc/pom.xml
+++ b/ingester-rpc/pom.xml
@@ -4,7 +4,7 @@
greptimedb-ingester
io.greptime
- 0.4.0
+ 0.5.0
ingester-rpc
diff --git a/pom.xml b/pom.xml
index 9571137..6ce821a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
io.greptime
greptimedb-ingester
- 0.4.0
+ 0.5.0
pom
${project.groupId}:${project.artifactId}