From 2063fe649ac6012e73c98e8c382c8e8740696840 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 8 Jan 2024 20:49:44 +0800 Subject: [PATCH] chore: refactor demos --- ingester-example/pom.xml | 7 ++ .../io/greptime/{MyMetric2.java => Cpu.java} | 53 ++++----- .../src/main/java/io/greptime/Memory.java | 59 ++++++++++ .../src/main/java/io/greptime/MyMetric1.java | 110 ------------------ .../src/main/java/io/greptime/QueryJDBC.java | 56 +++++++++ .../greptime/StreamWritePOJOsQuickStart.java | 40 +++---- .../io/greptime/StreamWriteQuickStart.java | 59 ++++------ .../io/greptime/WritePOJOsQuickStart.java | 40 +++---- .../java/io/greptime/WriteQuickStart.java | 58 ++++----- .../main/resources/db-connection.properties | 5 + .../main/java/io/greptime/WriteClient.java | 5 +- .../java/io/greptime/models/TableHelper.java | 3 + 12 files changed, 226 insertions(+), 269 deletions(-) rename ingester-example/src/main/java/io/greptime/{MyMetric2.java => Cpu.java} (51%) create mode 100644 ingester-example/src/main/java/io/greptime/Memory.java delete mode 100644 ingester-example/src/main/java/io/greptime/MyMetric1.java create mode 100644 ingester-example/src/main/java/io/greptime/QueryJDBC.java create mode 100644 ingester-example/src/main/resources/db-connection.properties diff --git a/ingester-example/pom.xml b/ingester-example/pom.xml index 7ec06e0..4933606 100644 --- a/ingester-example/pom.xml +++ b/ingester-example/pom.xml @@ -38,5 +38,12 @@ log4j-slf4j-impl compile + + + + mysql + mysql-connector-java + 8.0.33 + diff --git a/ingester-example/src/main/java/io/greptime/MyMetric2.java b/ingester-example/src/main/java/io/greptime/Cpu.java similarity index 51% rename from ingester-example/src/main/java/io/greptime/MyMetric2.java rename to ingester-example/src/main/java/io/greptime/Cpu.java index 5eef353..38ca0dd 100644 --- a/ingester-example/src/main/java/io/greptime/MyMetric2.java +++ b/ingester-example/src/main/java/io/greptime/Cpu.java @@ -18,40 +18,29 @@ import io.greptime.models.Column; import io.greptime.models.DataType; import io.greptime.models.Metric; -import java.util.Date; /** * @author jiachun.fjc */ -@Metric(name = "my_metric2") -public class MyMetric2 { - @Column(name = "tag1", tag = true, dataType = DataType.String) - private String tag1; - @Column(name = "tag2", tag = true, dataType = DataType.String) - private String tag2; +@Metric(name = "cpu_metric") +public class Cpu { + @Column(name = "host", tag = true, dataType = DataType.String) + private String host; - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampSecond) + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) private long ts; - @Column(name = "field1", dataType = DataType.Date) - private Date field1; - @Column(name = "field2", dataType = DataType.Float64) - private double field2; + @Column(name = "cpu_user", dataType = DataType.Float64) + private double cpuUser; + @Column(name = "cpu_sys", dataType = DataType.Float64) + private double cpuSys; - public String getTag1() { - return tag1; + public String getHost() { + return host; } - public void setTag1(String tag1) { - this.tag1 = tag1; - } - - public String getTag2() { - return tag2; - } - - public void setTag2(String tag2) { - this.tag2 = tag2; + public void setHost(String host) { + this.host = host; } public long getTs() { @@ -62,19 +51,19 @@ public void setTs(long ts) { this.ts = ts; } - public Date getField1() { - return field1; + public double getCpuUser() { + return cpuUser; } - public void setField1(Date field1) { - this.field1 = field1; + public void setCpuUser(double cpuUser) { + this.cpuUser = cpuUser; } - public double getField2() { - return field2; + public double getCpuSys() { + return cpuSys; } - public void setField2(double field2) { - this.field2 = field2; + public void setCpuSys(double cpuSys) { + this.cpuSys = cpuSys; } } diff --git a/ingester-example/src/main/java/io/greptime/Memory.java b/ingester-example/src/main/java/io/greptime/Memory.java new file mode 100644 index 0000000..1e22d37 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/Memory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.Column; +import io.greptime.models.DataType; +import io.greptime.models.Metric; + +/** + * @author jiachun.fjc + */ +@Metric(name = "mem_metric") +public class Memory { + @Column(name = "host", tag = true, dataType = DataType.String) + private String host; + + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) + private long ts; + + @Column(name = "mem_usage", dataType = DataType.Float64) + private double memUsage; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public long getTs() { + return ts; + } + + public void setTs(long ts) { + this.ts = ts; + } + + public double getMemUsage() { + return memUsage; + } + + public void setMemUsage(double memUsage) { + this.memUsage = memUsage; + } +} diff --git a/ingester-example/src/main/java/io/greptime/MyMetric1.java b/ingester-example/src/main/java/io/greptime/MyMetric1.java deleted file mode 100644 index b29aaaf..0000000 --- a/ingester-example/src/main/java/io/greptime/MyMetric1.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime; - -import io.greptime.models.Column; -import io.greptime.models.DataType; -import io.greptime.models.Metric; -import java.math.BigDecimal; - -/** - * @author jiachun.fjc - */ -@Metric(name = "my_metric1") -public class MyMetric1 { - @Column(name = "tag1", tag = true, dataType = DataType.String) - private String tag1; - @Column(name = "tag2", tag = true, dataType = DataType.String) - private String tag2; - @Column(name = "tag3", tag = true, dataType = DataType.String) - private String tag3; - - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) - private long ts; - - @Column(name = "field1", dataType = DataType.String) - private String field1; - @Column(name = "field2", dataType = DataType.Float64) - private double field2; - @Column(name = "field3", dataType = DataType.Decimal128) - private BigDecimal field3; - @Column(name = "field4", dataType = DataType.Int32) - private int field4; - - public String getTag1() { - return tag1; - } - - public void setTag1(String tag1) { - this.tag1 = tag1; - } - - public String getTag2() { - return tag2; - } - - public void setTag2(String tag2) { - this.tag2 = tag2; - } - - public String getTag3() { - return tag3; - } - - public void setTag3(String tag3) { - this.tag3 = tag3; - } - - public long getTs() { - return ts; - } - - public void setTs(long ts) { - this.ts = ts; - } - - public String getField1() { - return field1; - } - - public void setField1(String field1) { - this.field1 = field1; - } - - public double getField2() { - return field2; - } - - public void setField2(double field2) { - this.field2 = field2; - } - - public BigDecimal getField3() { - return field3; - } - - public void setField3(BigDecimal field3) { - this.field3 = field3; - } - - public int getField4() { - return field4; - } - - public void setField4(int field4) { - this.field4 = field4; - } -} diff --git a/ingester-example/src/main/java/io/greptime/QueryJDBC.java b/ingester-example/src/main/java/io/greptime/QueryJDBC.java new file mode 100644 index 0000000..1aadda2 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/QueryJDBC.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Objects; +import java.util.Properties; + +/** + * @author jiachun.fjc + */ +public class QueryJDBC { + + public static void main(String[] args) throws Exception { + GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); + + // Inserts data first + + + try (Connection conn = makeConnection()) { + + } + } + + public static Connection makeConnection() throws IOException, ClassNotFoundException, SQLException { + Properties prop = new Properties(); + prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties")); + + String dbName = (String) prop.get("db.database-driver"); + + String dbConnUrl = (String) prop.get("db.url"); + String dbUserName = (String) prop.get("db.username"); + String dbPassword = (String) prop.get("db.password"); + + Class.forName(dbName); + Connection dbConn = DriverManager.getConnection(dbConnUrl, dbUserName, dbPassword); + + return Objects.requireNonNull(dbConn, "Failed to make connection!"); + } +} diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java index b0b9c6e..60ab0f6 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -18,9 +18,7 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Calendar; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -35,41 +33,33 @@ public class StreamWritePOJOsQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - List myMetric1s = new ArrayList<>(); + List cpus = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric1 m = new MyMetric1(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTag3("tag_value_3_" + i); - m.setTs(System.currentTimeMillis()); - m.setField1("field_value_1_" + i); - m.setField2(i); - m.setField3(new BigDecimal(i)); - m.setField4(i); - - myMetric1s.add(m); + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); } - List myMetric2s = new ArrayList<>(); + List memories = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric2 m = new MyMetric2(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); + Memory m = new Memory(); + m.setHost("127.0.0." + i); m.setTs(System.currentTimeMillis() / 1000); - m.setField1(Calendar.getInstance().getTime()); - m.setField2(i); - - myMetric2s.add(m); + m.setMemUsage(i + 0.2); + memories.add(m); } StreamWriter, WriteOk> writer = greptimeDB.streamWriterPOJOs(); // write data into stream - writer.write(myMetric1s); - writer.write(myMetric2s); + writer.write(cpus); + writer.write(memories); // delete the first 5 rows - writer.write(myMetric1s.subList(0, 5), WriteOp.Delete); + writer.write(cpus.subList(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index b7bfe19..9808b2e 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -22,9 +22,6 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.Calendar; -import java.util.Date; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -38,59 +35,45 @@ public class StreamWriteQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("tag3", SemanticType.Tag, DataType.String) // + TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("field1", SemanticType.Field, DataType.String) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // - .addColumn("field3", SemanticType.Field, DataType.Decimal128) // - .addColumn("field4", SemanticType.Field, DataType.Int32) // + .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // + .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // .build(); - TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // - .addColumn("field1", SemanticType.Field, DataType.Date) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // + TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3 = Table.from(myMetric3Schema); - Table myMetric4 = Table.from(myMetric4Schema); + Table cpuMetric = Table.from(cpuMetricSchema); + Table memMetric = Table.from(memMetricSchema); for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - String tag3v = "tag_value_3_" + i; + String host = "127.0.0." + i; long ts = System.currentTimeMillis(); - String field1 = "field_value_1" + i; - double field2 = i + 0.1; - BigDecimal field3 = new BigDecimal(i); - int field4 = i + 1; - - myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + double cpuUser = i + 0.1; + double cpuSys = i + 0.12; + cpuMetric.addRow(host, ts, cpuUser, cpuSys); } for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - long ts = System.currentTimeMillis() / 1000; - Date field1 = Calendar.getInstance().getTime(); - double field2 = i + 0.1; - - myMetric4.addRow(tag1v, tag2v, ts, field1, field2); + String host = "127.0.0." + i; + long ts = System.currentTimeMillis(); + double memUsage = i + 0.2; + memMetric.addRow(host, ts, memUsage); } StreamWriter writer = greptimeDB.streamWriter(); // write data into stream - writer.write(myMetric3); - writer.write(myMetric4); + writer.write(cpuMetric); + writer.write(memMetric); // delete the first 5 rows - writer.write(myMetric3.subRange(0, 5), WriteOp.Delete); + writer.write(cpuMetric.subRange(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java index 6b63e55..a0288dd 100644 --- a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -20,10 +20,8 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; -import java.util.Calendar; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -38,37 +36,29 @@ public class WritePOJOsQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - List myMetric1s = new ArrayList<>(); + List cpus = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric1 m = new MyMetric1(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTag3("tag_value_3_" + i); - m.setTs(System.currentTimeMillis()); - m.setField1("field_value_1_" + i); - m.setField2(i); - m.setField3(new BigDecimal(i)); - m.setField4(i); - - myMetric1s.add(m); + Cpu c = new Cpu(); + c.setHost("127.0.0." + i); + c.setTs(System.currentTimeMillis()); + c.setCpuUser(i + 0.1); + c.setCpuSys(i + 0.12); + cpus.add(c); } - List myMetric2s = new ArrayList<>(); + List memories = new ArrayList<>(); for (int i = 0; i < 10; i++) { - MyMetric2 m = new MyMetric2(); - m.setTag1("tag_value_1_" + i); - m.setTag2("tag_value_2_" + i); - m.setTs(System.currentTimeMillis() / 1000); - m.setField1(Calendar.getInstance().getTime()); - m.setField2(i); - - myMetric2s.add(m); + Memory m = new Memory(); + m.setHost("127.0.0." + i); + m.setTs(System.currentTimeMillis()); + m.setMemUsage(i + 0.2); + memories.add(m); } // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> puts = greptimeDB.writePOJOs(myMetric1s, myMetric2s); + CompletableFuture> puts = greptimeDB.writePOJOs(cpus, memories); Result result = puts.get(); @@ -78,7 +68,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List> delete_pojos = Arrays.asList(myMetric1s.subList(0, 5), myMetric2s.subList(0, 5)); + List> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5)); Result deletes = greptimeDB.writePOJOs(delete_pojos, WriteOp.Delete).get(); if (deletes.isOk()) { diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index e2be6ce..ffab47f 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -24,11 +24,7 @@ import io.greptime.models.WriteOk; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.Arrays; -import java.util.Calendar; -import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -43,55 +39,41 @@ public class WriteQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); - TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("tag3", SemanticType.Tag, DataType.String) // + TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("field1", SemanticType.Field, DataType.String) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // - .addColumn("field3", SemanticType.Field, DataType.Decimal128) // - .addColumn("field4", SemanticType.Field, DataType.Int32) // + .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // + .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // .build(); - TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // - .addColumn("tag1", SemanticType.Tag, DataType.String) // - .addColumn("tag2", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // - .addColumn("field1", SemanticType.Field, DataType.Date) // - .addColumn("field2", SemanticType.Field, DataType.Float64) // + TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3 = Table.from(myMetric3Schema); - Table myMetric4 = Table.from(myMetric4Schema); + Table cpuMetric = Table.from(cpuMetricSchema); + Table memMetric = Table.from(memMetricSchema); for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - String tag3v = "tag_value_3_" + i; + String host = "127.0.0." + i; long ts = System.currentTimeMillis(); - String field1 = "field_value_1" + i; - double field2 = i + 0.1; - BigDecimal field3 = new BigDecimal(i); - int field4 = i + 1; - - myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + double cpuUser = i + 0.1; + double cpuSys = i + 0.12; + cpuMetric.addRow(host, ts, cpuUser, cpuSys); } for (int i = 0; i < 10; i++) { - String tag1v = "tag_value_1_" + i; - String tag2v = "tag_value_2_" + i; - long ts = System.currentTimeMillis() / 1000; - Date field1 = Calendar.getInstance().getTime(); - double field2 = i + 0.1; - - myMetric4.addRow(tag1v, tag2v, ts, field1, field2); + String host = "127.0.0." + i; + long ts = System.currentTimeMillis(); + double memUsage = i + 0.2; + memMetric.addRow(host, ts, memUsage); } // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> future = greptimeDB.write(myMetric3, myMetric4); + CompletableFuture> future = greptimeDB.write(cpuMetric, memMetric); Result result = future.get(); @@ -101,7 +83,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List delete_objs = Arrays.asList(myMetric3.subRange(0, 5), myMetric4.subRange(0, 5)); + List
delete_objs = Arrays.asList(cpuMetric.subRange(0, 5), memMetric.subRange(0, 5)); Result deletes = greptimeDB.write(delete_objs, WriteOp.Delete).get(); if (deletes.isOk()) { diff --git a/ingester-example/src/main/resources/db-connection.properties b/ingester-example/src/main/resources/db-connection.properties new file mode 100644 index 0000000..9e47c11 --- /dev/null +++ b/ingester-example/src/main/resources/db-connection.properties @@ -0,0 +1,5 @@ +# DataSource +db.database-driver=com.mysql.cj.jdbc.Driver +db.url=jdbc:mysql://localhost:4002/public +db.username= +db.password= \ No newline at end of file diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index 83732b2..83fa262 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -340,11 +340,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(Table table, WriteOp writeOp) { Ensures.ensureNonNull(table, "null `table`"); + WriteTables writeTables = new WriteTables(table, writeOp); + if (this.rateLimiter != null) { double timeSpent = this.rateLimiter.acquire(table.pointCount()); InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent); } - this.observer.onNext(new WriteTables(table, writeOp)); + + this.observer.onNext(writeTables); return this; } } diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java index 6ea992e..c029125 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java @@ -16,6 +16,7 @@ package io.greptime.models; import io.greptime.WriteOp; +import io.greptime.common.util.Ensures; import io.greptime.v1.Common; import io.greptime.v1.Database; import java.util.Collection; @@ -41,6 +42,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables case Insert: Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder(); for (Table t : tables) { + Ensures.ensure(t.pointCount() > 0, "No data to insert in table: %s", t.tableName()); insertBuilder.addInserts(t.intoRowInsertRequest()); } return Database.GreptimeRequest.newBuilder() // @@ -50,6 +52,7 @@ public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables case Delete: Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder(); for (Table t : tables) { + Ensures.ensure(t.pointCount() > 0, "No data to delete in table: %s", t.tableName()); deleteBuilder.addDeletes(t.intoRowDeleteRequest()); } return Database.GreptimeRequest.newBuilder() //