From d17a2183e66890873091098c05b6b852a43fcceb Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 2 Feb 2024 16:40:17 +0800 Subject: [PATCH] feat: friendly api for table schema --- .../io/greptime/StreamWriteQuickStart.java | 15 ++-- .../java/io/greptime/WriteQuickStart.java | 15 ++-- .../java/io/greptime/models/DataType.java | 5 ++ .../java/io/greptime/models/RowHelper.java | 3 + .../java/io/greptime/models/TableSchema.java | 42 ++++++++++- .../test/java/io/greptime/PojoMapperTest.java | 2 +- .../src/test/java/io/greptime/TestUtil.java | 7 +- .../java/io/greptime/WriteClientTest.java | 73 ++++++++++++------- .../test/java/io/greptime/WriteLimitTest.java | 2 +- .../java/io/greptime/models/TableTest.java | 12 +-- .../greptime/options/GreptimeOptionsTest.java | 2 +- 11 files changed, 122 insertions(+), 56 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index 9808b2e..c5ac60d 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -16,7 +16,6 @@ package io.greptime; import io.greptime.models.DataType; -import io.greptime.models.SemanticType; import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; @@ -36,16 +35,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // - .addColumn("host", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // - .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // + .addTag("host", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond) // + .addField("cpu_user", DataType.Float64) // + .addField("cpu_sys", DataType.Float64) // .build(); TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // - .addColumn("host", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // + .addTag("host", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond) // + .addField("mem_usage", DataType.Float64) // .build(); Table cpuMetric = Table.from(cpuMetricSchema); diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index ffab47f..0db2397 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -18,7 +18,6 @@ import io.greptime.models.DataType; import io.greptime.models.Err; import io.greptime.models.Result; -import io.greptime.models.SemanticType; import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; @@ -40,16 +39,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc GreptimeDB greptimeDB = TestConnector.connectToDefaultDB(); TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") // - .addColumn("host", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("cpu_user", SemanticType.Field, DataType.Float64) // - .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) // + .addTag("host", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond) // + .addField("cpu_user", DataType.Float64) // + .addField("cpu_sys", DataType.Float64) // .build(); TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") // - .addColumn("host", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("mem_usage", SemanticType.Field, DataType.Float64) // + .addTag("host", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond) // + .addField("mem_usage", DataType.Float64) // .build(); Table cpuMetric = Table.from(cpuMetricSchema); diff --git a/ingester-protocol/src/main/java/io/greptime/models/DataType.java b/ingester-protocol/src/main/java/io/greptime/models/DataType.java index 41e43e9..8fabf8f 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/DataType.java +++ b/ingester-protocol/src/main/java/io/greptime/models/DataType.java @@ -53,6 +53,11 @@ public enum DataType { Decimal128, // ; + public boolean isTimestamp() { + return this == TimestampSecond || this == TimestampMillisecond || this == TimestampMicrosecond + || this == TimestampNanosecond; + } + public Common.ColumnDataType toProtoValue() { switch (this) { case Bool: diff --git a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java index 9f7ef9f..9aaf9ff 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java @@ -88,6 +88,9 @@ public static void addValue(RowData.Row.Builder builder, // case TIMESTAMP_MILLISECOND: valueBuilder.setTimestampMillisecondValue(Util.getLongValue(value)); break; + case TIMESTAMP_MICROSECOND: + valueBuilder.setTimestampMicrosecondValue(Util.getLongValue(value)); + break; case TIMESTAMP_NANOSECOND: valueBuilder.setTimestampNanosecondValue(Util.getLongValue(value)); break; diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java index 301e51b..659358b 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java @@ -75,6 +75,40 @@ public Builder(String tableName) { this.tableName = tableName; } + /** + * Add tag schema. + * + * @param name the name of this tag + * @param dataType the data type of this tag + * @return this builder + */ + public Builder addTag(String name, DataType dataType) { + return addColumn(name, SemanticType.Tag, dataType); + } + + /** + * Add timestamp schema. + * + * @param name the name of this timestamp + * @param dataType the data type of this timestamp + * @return this builder + */ + public Builder addTimestamp(String name, DataType dataType) { + Ensures.ensure(dataType.isTimestamp(), "Invalid timestamp data type"); + return addColumn(name, SemanticType.Timestamp, dataType); + } + + /** + * Add field schema. + * + * @param name the name of this field + * @param dataType the data type of this field + * @return this builder + */ + public Builder addField(String name, DataType dataType) { + return addColumn(name, SemanticType.Field, dataType); + } + /** * Add column schema. * @@ -96,12 +130,18 @@ public Builder addColumn(String name, SemanticType semanticType, DataType dataTy * @param decimalTypeExtension the decimal type extension of this column(only for `DataType.Decimal128`) * @return this builder */ - public Builder addColumn(String name, SemanticType semanticType, DataType dataType, + public Builder addColumn(String name, // + SemanticType semanticType, // + DataType dataType, // DataType.DecimalTypeExtension decimalTypeExtension) { Ensures.ensureNonNull(name, "Null column name"); Ensures.ensureNonNull(semanticType, "Null semantic type"); Ensures.ensureNonNull(dataType, "Null data type"); + if (semanticType == SemanticType.Timestamp) { + Ensures.ensure(dataType.isTimestamp(), "Invalid timestamp data type"); + } + this.columnNames.add(name); this.semanticTypes.add(semanticType.toProtoValue()); this.dataTypes.add(dataType.toProtoValue()); diff --git a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java index 280dfc1..60578bf 100644 --- a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java +++ b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java @@ -83,7 +83,7 @@ class Pojo1Test { long c; @Column(name = "d", dataType = DataType.Float64) double d; - @Column(name = "ts", dataType = DataType.TimeSecond, timestamp = true) + @Column(name = "ts", dataType = DataType.TimestampMillisecond, timestamp = true) long ts; } diff --git a/ingester-protocol/src/test/java/io/greptime/TestUtil.java b/ingester-protocol/src/test/java/io/greptime/TestUtil.java index 399d633..93a2431 100644 --- a/ingester-protocol/src/test/java/io/greptime/TestUtil.java +++ b/ingester-protocol/src/test/java/io/greptime/TestUtil.java @@ -16,7 +16,6 @@ package io.greptime; import io.greptime.models.DataType; -import io.greptime.models.SemanticType; import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.v1.Common; @@ -32,9 +31,9 @@ public class TestUtil { public static Collection testTable(String tableName, int rowCount) { TableSchema tableSchema = TableSchema.newBuilder(tableName) // - .addColumn("host", SemanticType.Tag, DataType.String) // - .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // - .addColumn("cpu", SemanticType.Field, DataType.Float64) // + .addTag("host", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond) // + .addField("cpu", DataType.Float64) // .build(); Table table = Table.from(tableSchema); diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java index d7b833b..b990def 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java @@ -18,6 +18,7 @@ import io.greptime.common.Endpoint; import io.greptime.models.DataType; import io.greptime.models.Err; +import io.greptime.models.IntervalMonthDayNano; import io.greptime.models.Result; import io.greptime.models.Table; import io.greptime.models.TableSchema; @@ -33,11 +34,9 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; +import java.math.BigDecimal; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; -import static io.greptime.models.SemanticType.Field; -import static io.greptime.models.SemanticType.Tag; -import static io.greptime.models.SemanticType.Timestamp; /** * @author jiachun.fjc @@ -67,36 +66,58 @@ public void after() { @Test public void testWriteSuccess() throws ExecutionException, InterruptedException { TableSchema schema = TableSchema.newBuilder("test_table") // - .addColumn("test_tag", Tag, DataType.String) // - .addColumn("test_ts", Timestamp, DataType.Int64) // - .addColumn("field1", Field, DataType.Int8) // - .addColumn("field2", Field, DataType.Int16) // - .addColumn("field3", Field, DataType.Int32) // - .addColumn("field4", Field, DataType.Int64) // - .addColumn("field5", Field, DataType.UInt8) // - .addColumn("field6", Field, DataType.UInt16) // - .addColumn("field7", Field, DataType.UInt32) // - .addColumn("field8", Field, DataType.UInt64) // - .addColumn("field9", Field, DataType.Float32) // - .addColumn("field10", Field, DataType.Float64) // - .addColumn("field11", Field, DataType.Bool) // - .addColumn("field12", Field, DataType.Binary) // - .addColumn("field13", Field, DataType.Date) // - .addColumn("field14", Field, DataType.DateTime) // - .addColumn("field15", Field, DataType.TimestampSecond) // - .addColumn("field16", Field, DataType.TimestampMillisecond) // - .addColumn("field17", Field, DataType.TimestampNanosecond) // + .addTag("test_tag", DataType.String) // + .addTimestamp("test_ts", DataType.TimestampMillisecond) // + .addField("field1", DataType.Int8) // + .addField("field2", DataType.Int16) // + .addField("field3", DataType.Int32) // + .addField("field4", DataType.Int64) // + .addField("field5", DataType.UInt8) // + .addField("field6", DataType.UInt16) // + .addField("field7", DataType.UInt32) // + .addField("field8", DataType.UInt64) // + .addField("field9", DataType.Float32) // + .addField("field10", DataType.Float64) // + .addField("field11", DataType.Bool) // + .addField("field12", DataType.Binary) // + .addField("field13", DataType.Date) // + .addField("field14", DataType.DateTime) // + .addField("field15", DataType.TimestampSecond) // + .addField("field16", DataType.TimestampMillisecond) // + .addField("field17", DataType.TimestampMicrosecond) // + .addField("field18", DataType.TimestampNanosecond) // + .addField("field19", DataType.TimeSecond) // + .addField("field20", DataType.TimeMilliSecond) // + .addField("field21", DataType.TimeMicroSecond) // + .addField("field22", DataType.TimeNanoSecond) // + .addField("field23", DataType.IntervalYearMonth) // + .addField("field24", DataType.IntervalDayTime) // + .addField("field25", DataType.IntervalMonthDayNano) // + .addField("field26", DataType.Decimal128) // .build(); Table table = Table.from(schema); long ts = System.currentTimeMillis(); - table.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L); - table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15); - table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15); + Object[] row1 = + new Object[] {"tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(1, 2, 3), + BigDecimal.valueOf(123.456)}; + Object[] row2 = + new Object[] {"tag2", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(4, 5, 6), + BigDecimal.valueOf(123.456)}; + Object[] row3 = + new Object[] {"tag3", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(7, 8, 9), + BigDecimal.valueOf(123.456)}; + table.addRow(row1); + table.addRow(row2); + table.addRow(row3); Endpoint addr = Endpoint.parse("127.0.0.1:8081"); Database.GreptimeResponse response = Database.GreptimeResponse.newBuilder() // - .setAffectedRows(Common.AffectedRows.newBuilder().setValue(3)) // + .setAffectedRows(Common.AffectedRows.newBuilder() // + .setValue(3)) // .build(); Mockito.when(this.routerClient.route()) // diff --git a/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java b/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java index 2e61022..791fe5f 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java @@ -73,7 +73,7 @@ public void blockingWriteLimitTest() throws InterruptedException { try { limiter.acquireAndDo(rows, this::emptyOk); alwaysFalse.set(true); - } catch (final Throwable err) { + } catch (Throwable err) { // noinspection ConstantConditions Assert.assertTrue(err instanceof InterruptedException); } diff --git a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java index 6f82dea..cdcd7dc 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java @@ -29,9 +29,9 @@ public class TableTest { @Test public void testTableNonNull() { TableSchema schema = TableSchema.newBuilder("test_table") // - .addColumn("col1", SemanticType.Tag, DataType.String) // - .addColumn("col2", SemanticType.Tag, DataType.String) // - .addColumn("col3", SemanticType.Field, DataType.Int32) // + .addTag("col1", DataType.String) // + .addTag("col2", DataType.String) // + .addField("col3", DataType.Int32) // .addColumn("col4", SemanticType.Field, DataType.Decimal128, new DataType.DecimalTypeExtension(39, 9)) // .build(); @@ -56,9 +56,9 @@ public void testTableNonNull() { @Test public void testTableSomeNull() { TableSchema schema = TableSchema.newBuilder("test_table") // - .addColumn("col1", SemanticType.Tag, DataType.String) // - .addColumn("col2", SemanticType.Tag, DataType.String) // - .addColumn("col3", SemanticType.Field, DataType.Int32) // + .addTag("col1", DataType.String) // + .addTag("col2", DataType.String) // + .addField("col3", DataType.Int32) // .build(); Table.RowBasedTable table = (Table.RowBasedTable) Table.from(schema); diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index 3252acb..1b182b8 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -16,12 +16,12 @@ package io.greptime.options; import io.greptime.Router; +import io.greptime.common.Endpoint; import io.greptime.limit.LimitedPolicy; import io.greptime.models.AuthInfo; import io.greptime.rpc.RpcOptions; import org.junit.Assert; import org.junit.Test; -import io.greptime.common.Endpoint; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor;