diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java
index 9808b2e..c5ac60d 100644
--- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java
@@ -16,7 +16,6 @@
package io.greptime;
import io.greptime.models.DataType;
-import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
@@ -36,16 +35,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
- .addColumn("host", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
- .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
+ .addTag("host", DataType.String) //
+ .addTimestamp("ts", DataType.TimestampMillisecond) //
+ .addField("cpu_user", DataType.Float64) //
+ .addField("cpu_sys", DataType.Float64) //
.build();
TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
- .addColumn("host", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
+ .addTag("host", DataType.String) //
+ .addTimestamp("ts", DataType.TimestampMillisecond) //
+ .addField("mem_usage", DataType.Float64) //
.build();
Table cpuMetric = Table.from(cpuMetricSchema);
diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java
index ffab47f..0db2397 100644
--- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java
@@ -18,7 +18,6 @@
import io.greptime.models.DataType;
import io.greptime.models.Err;
import io.greptime.models.Result;
-import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
@@ -40,16 +39,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
- .addColumn("host", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
- .addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
+ .addTag("host", DataType.String) //
+ .addTimestamp("ts", DataType.TimestampMillisecond) //
+ .addField("cpu_user", DataType.Float64) //
+ .addField("cpu_sys", DataType.Float64) //
.build();
TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
- .addColumn("host", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
+ .addTag("host", DataType.String) //
+ .addTimestamp("ts", DataType.TimestampMillisecond) //
+ .addField("mem_usage", DataType.Float64) //
.build();
Table cpuMetric = Table.from(cpuMetricSchema);
diff --git a/ingester-protocol/src/main/java/io/greptime/models/DataType.java b/ingester-protocol/src/main/java/io/greptime/models/DataType.java
index 41e43e9..8fabf8f 100644
--- a/ingester-protocol/src/main/java/io/greptime/models/DataType.java
+++ b/ingester-protocol/src/main/java/io/greptime/models/DataType.java
@@ -53,6 +53,11 @@ public enum DataType {
Decimal128, //
;
+ public boolean isTimestamp() {
+ return this == TimestampSecond || this == TimestampMillisecond || this == TimestampMicrosecond
+ || this == TimestampNanosecond;
+ }
+
public Common.ColumnDataType toProtoValue() {
switch (this) {
case Bool:
diff --git a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java
index 9f7ef9f..9aaf9ff 100644
--- a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java
+++ b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java
@@ -88,6 +88,9 @@ public static void addValue(RowData.Row.Builder builder, //
case TIMESTAMP_MILLISECOND:
valueBuilder.setTimestampMillisecondValue(Util.getLongValue(value));
break;
+ case TIMESTAMP_MICROSECOND:
+ valueBuilder.setTimestampMicrosecondValue(Util.getLongValue(value));
+ break;
case TIMESTAMP_NANOSECOND:
valueBuilder.setTimestampNanosecondValue(Util.getLongValue(value));
break;
diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java
index 301e51b..659358b 100644
--- a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java
+++ b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java
@@ -75,6 +75,40 @@ public Builder(String tableName) {
this.tableName = tableName;
}
+ /**
+ * Add tag schema.
+ *
+ * @param name the name of this tag
+ * @param dataType the data type of this tag
+ * @return this builder
+ */
+ public Builder addTag(String name, DataType dataType) {
+ return addColumn(name, SemanticType.Tag, dataType);
+ }
+
+ /**
+ * Add timestamp schema.
+ *
+ * @param name the name of this timestamp
+ * @param dataType the data type of this timestamp
+ * @return this builder
+ */
+ public Builder addTimestamp(String name, DataType dataType) {
+ Ensures.ensure(dataType.isTimestamp(), "Invalid timestamp data type");
+ return addColumn(name, SemanticType.Timestamp, dataType);
+ }
+
+ /**
+ * Add field schema.
+ *
+ * @param name the name of this field
+ * @param dataType the data type of this field
+ * @return this builder
+ */
+ public Builder addField(String name, DataType dataType) {
+ return addColumn(name, SemanticType.Field, dataType);
+ }
+
/**
* Add column schema.
*
@@ -96,12 +130,18 @@ public Builder addColumn(String name, SemanticType semanticType, DataType dataTy
* @param decimalTypeExtension the decimal type extension of this column(only for `DataType.Decimal128`)
* @return this builder
*/
- public Builder addColumn(String name, SemanticType semanticType, DataType dataType,
+ public Builder addColumn(String name, //
+ SemanticType semanticType, //
+ DataType dataType, //
DataType.DecimalTypeExtension decimalTypeExtension) {
Ensures.ensureNonNull(name, "Null column name");
Ensures.ensureNonNull(semanticType, "Null semantic type");
Ensures.ensureNonNull(dataType, "Null data type");
+ if (semanticType == SemanticType.Timestamp) {
+ Ensures.ensure(dataType.isTimestamp(), "Invalid timestamp data type");
+ }
+
this.columnNames.add(name);
this.semanticTypes.add(semanticType.toProtoValue());
this.dataTypes.add(dataType.toProtoValue());
diff --git a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java
index 280dfc1..60578bf 100644
--- a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java
@@ -83,7 +83,7 @@ class Pojo1Test {
long c;
@Column(name = "d", dataType = DataType.Float64)
double d;
- @Column(name = "ts", dataType = DataType.TimeSecond, timestamp = true)
+ @Column(name = "ts", dataType = DataType.TimestampMillisecond, timestamp = true)
long ts;
}
diff --git a/ingester-protocol/src/test/java/io/greptime/TestUtil.java b/ingester-protocol/src/test/java/io/greptime/TestUtil.java
index 399d633..93a2431 100644
--- a/ingester-protocol/src/test/java/io/greptime/TestUtil.java
+++ b/ingester-protocol/src/test/java/io/greptime/TestUtil.java
@@ -16,7 +16,6 @@
package io.greptime;
import io.greptime.models.DataType;
-import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.v1.Common;
@@ -32,9 +31,9 @@ public class TestUtil {
public static Collection
testTable(String tableName, int rowCount) {
TableSchema tableSchema = TableSchema.newBuilder(tableName) //
- .addColumn("host", SemanticType.Tag, DataType.String) //
- .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
- .addColumn("cpu", SemanticType.Field, DataType.Float64) //
+ .addTag("host", DataType.String) //
+ .addTimestamp("ts", DataType.TimestampMillisecond) //
+ .addField("cpu", DataType.Float64) //
.build();
Table table = Table.from(tableSchema);
diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
index d7b833b..b990def 100644
--- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
@@ -18,6 +18,7 @@
import io.greptime.common.Endpoint;
import io.greptime.models.DataType;
import io.greptime.models.Err;
+import io.greptime.models.IntervalMonthDayNano;
import io.greptime.models.Result;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
@@ -33,11 +34,9 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import java.math.BigDecimal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
-import static io.greptime.models.SemanticType.Field;
-import static io.greptime.models.SemanticType.Tag;
-import static io.greptime.models.SemanticType.Timestamp;
/**
* @author jiachun.fjc
@@ -67,36 +66,58 @@ public void after() {
@Test
public void testWriteSuccess() throws ExecutionException, InterruptedException {
TableSchema schema = TableSchema.newBuilder("test_table") //
- .addColumn("test_tag", Tag, DataType.String) //
- .addColumn("test_ts", Timestamp, DataType.Int64) //
- .addColumn("field1", Field, DataType.Int8) //
- .addColumn("field2", Field, DataType.Int16) //
- .addColumn("field3", Field, DataType.Int32) //
- .addColumn("field4", Field, DataType.Int64) //
- .addColumn("field5", Field, DataType.UInt8) //
- .addColumn("field6", Field, DataType.UInt16) //
- .addColumn("field7", Field, DataType.UInt32) //
- .addColumn("field8", Field, DataType.UInt64) //
- .addColumn("field9", Field, DataType.Float32) //
- .addColumn("field10", Field, DataType.Float64) //
- .addColumn("field11", Field, DataType.Bool) //
- .addColumn("field12", Field, DataType.Binary) //
- .addColumn("field13", Field, DataType.Date) //
- .addColumn("field14", Field, DataType.DateTime) //
- .addColumn("field15", Field, DataType.TimestampSecond) //
- .addColumn("field16", Field, DataType.TimestampMillisecond) //
- .addColumn("field17", Field, DataType.TimestampNanosecond) //
+ .addTag("test_tag", DataType.String) //
+ .addTimestamp("test_ts", DataType.TimestampMillisecond) //
+ .addField("field1", DataType.Int8) //
+ .addField("field2", DataType.Int16) //
+ .addField("field3", DataType.Int32) //
+ .addField("field4", DataType.Int64) //
+ .addField("field5", DataType.UInt8) //
+ .addField("field6", DataType.UInt16) //
+ .addField("field7", DataType.UInt32) //
+ .addField("field8", DataType.UInt64) //
+ .addField("field9", DataType.Float32) //
+ .addField("field10", DataType.Float64) //
+ .addField("field11", DataType.Bool) //
+ .addField("field12", DataType.Binary) //
+ .addField("field13", DataType.Date) //
+ .addField("field14", DataType.DateTime) //
+ .addField("field15", DataType.TimestampSecond) //
+ .addField("field16", DataType.TimestampMillisecond) //
+ .addField("field17", DataType.TimestampMicrosecond) //
+ .addField("field18", DataType.TimestampNanosecond) //
+ .addField("field19", DataType.TimeSecond) //
+ .addField("field20", DataType.TimeMilliSecond) //
+ .addField("field21", DataType.TimeMicroSecond) //
+ .addField("field22", DataType.TimeNanoSecond) //
+ .addField("field23", DataType.IntervalYearMonth) //
+ .addField("field24", DataType.IntervalDayTime) //
+ .addField("field25", DataType.IntervalMonthDayNano) //
+ .addField("field26", DataType.Decimal128) //
.build();
Table table = Table.from(schema);
long ts = System.currentTimeMillis();
- table.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L);
- table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15);
- table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15);
+ Object[] row1 =
+ new Object[] {"tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
+ 17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(1, 2, 3),
+ BigDecimal.valueOf(123.456)};
+ Object[] row2 =
+ new Object[] {"tag2", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
+ 17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(4, 5, 6),
+ BigDecimal.valueOf(123.456)};
+ Object[] row3 =
+ new Object[] {"tag3", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
+ 17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(7, 8, 9),
+ BigDecimal.valueOf(123.456)};
+ table.addRow(row1);
+ table.addRow(row2);
+ table.addRow(row3);
Endpoint addr = Endpoint.parse("127.0.0.1:8081");
Database.GreptimeResponse response = Database.GreptimeResponse.newBuilder() //
- .setAffectedRows(Common.AffectedRows.newBuilder().setValue(3)) //
+ .setAffectedRows(Common.AffectedRows.newBuilder() //
+ .setValue(3)) //
.build();
Mockito.when(this.routerClient.route()) //
diff --git a/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java b/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java
index 2e61022..791fe5f 100644
--- a/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java
@@ -73,7 +73,7 @@ public void blockingWriteLimitTest() throws InterruptedException {
try {
limiter.acquireAndDo(rows, this::emptyOk);
alwaysFalse.set(true);
- } catch (final Throwable err) {
+ } catch (Throwable err) {
// noinspection ConstantConditions
Assert.assertTrue(err instanceof InterruptedException);
}
diff --git a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java
index 6f82dea..cdcd7dc 100644
--- a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java
@@ -29,9 +29,9 @@ public class TableTest {
@Test
public void testTableNonNull() {
TableSchema schema = TableSchema.newBuilder("test_table") //
- .addColumn("col1", SemanticType.Tag, DataType.String) //
- .addColumn("col2", SemanticType.Tag, DataType.String) //
- .addColumn("col3", SemanticType.Field, DataType.Int32) //
+ .addTag("col1", DataType.String) //
+ .addTag("col2", DataType.String) //
+ .addField("col3", DataType.Int32) //
.addColumn("col4", SemanticType.Field, DataType.Decimal128, new DataType.DecimalTypeExtension(39, 9)) //
.build();
@@ -56,9 +56,9 @@ public void testTableNonNull() {
@Test
public void testTableSomeNull() {
TableSchema schema = TableSchema.newBuilder("test_table") //
- .addColumn("col1", SemanticType.Tag, DataType.String) //
- .addColumn("col2", SemanticType.Tag, DataType.String) //
- .addColumn("col3", SemanticType.Field, DataType.Int32) //
+ .addTag("col1", DataType.String) //
+ .addTag("col2", DataType.String) //
+ .addField("col3", DataType.Int32) //
.build();
Table.RowBasedTable table = (Table.RowBasedTable) Table.from(schema);
diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
index 3252acb..1b182b8 100644
--- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
+++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java
@@ -16,12 +16,12 @@
package io.greptime.options;
import io.greptime.Router;
+import io.greptime.common.Endpoint;
import io.greptime.limit.LimitedPolicy;
import io.greptime.models.AuthInfo;
import io.greptime.rpc.RpcOptions;
import org.junit.Assert;
import org.junit.Test;
-import io.greptime.common.Endpoint;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;