Skip to content

Commit

Permalink
feat: friendly api for table schema (#26)
Browse files Browse the repository at this point in the history
* feat: friendly api for table schema

* chore: add some unit test for TableSchela

* chore: minor change
  • Loading branch information
fengjiachun authored Feb 2, 2024
1 parent 158d378 commit 7b6bcc1
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.greptime;

import io.greptime.models.DataType;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
Expand All @@ -36,16 +35,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
.addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("cpu_user", DataType.Float64) //
.addField("cpu_sys", DataType.Float64) //
.build();

TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("mem_usage", DataType.Float64) //
.build();

Table cpuMetric = Table.from(cpuMetricSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc

Result<WriteOk, Err> result = puts.get();

if (result.isOk()) {
LOG.info("Write result: {}", result.getOk());
Result<Integer, String> simpleResult = result //
.map(WriteOk::getSuccess) //
.mapErr(err -> err.getError().getMessage());
if (simpleResult.isOk()) {
LOG.info("Write success: {}", simpleResult.getOk());
} else {
LOG.error("Failed to write: {}", result.getErr());
LOG.error("Failed to write: {}", simpleResult.getErr());
}

List<List<?>> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5));
Expand Down
24 changes: 13 additions & 11 deletions ingester-example/src/main/java/io/greptime/WriteQuickStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.greptime.models.DataType;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
Expand All @@ -40,16 +39,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
.addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("cpu_user", DataType.Float64) //
.addField("cpu_sys", DataType.Float64) //
.build();

TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("mem_usage", DataType.Float64) //
.build();

Table cpuMetric = Table.from(cpuMetricSchema);
Expand Down Expand Up @@ -77,10 +76,13 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc

Result<WriteOk, Err> result = future.get();

if (result.isOk()) {
LOG.info("Write result: {}", result.getOk());
Result<Integer, String> simpleResult = result //
.map(WriteOk::getSuccess) //
.mapErr(err -> err.getError().getMessage());
if (simpleResult.isOk()) {
LOG.info("Write success: {}", simpleResult.getOk());
} else {
LOG.error("Failed to write: {}", result.getErr());
LOG.error("Failed to write: {}", simpleResult.getErr());
}

List<Table> delete_objs = Arrays.asList(cpuMetric.subRange(0, 5), memMetric.subRange(0, 5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public enum DataType {
Decimal128, //
;

public boolean isTimestamp() {
return this == TimestampSecond || this == TimestampMillisecond || this == TimestampMicrosecond
|| this == TimestampNanosecond;
}

public Common.ColumnDataType toProtoValue() {
switch (this) {
case Bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public static void addValue(RowData.Row.Builder builder, //
case TIMESTAMP_MILLISECOND:
valueBuilder.setTimestampMillisecondValue(Util.getLongValue(value));
break;
case TIMESTAMP_MICROSECOND:
valueBuilder.setTimestampMicrosecondValue(Util.getLongValue(value));
break;
case TIMESTAMP_NANOSECOND:
valueBuilder.setTimestampNanosecondValue(Util.getLongValue(value));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,41 @@ public Builder(String tableName) {
this.tableName = tableName;
}

/**
* Add tag schema.
*
* @param name the name of this tag
* @param dataType the data type of this tag
* @return this builder
*/
public Builder addTag(String name, DataType dataType) {
return addColumn(name, SemanticType.Tag, dataType);
}

/**
* Add timestamp schema.
*
* @param name the name of this timestamp
* @param dataType the data type of this timestamp
* @return this builder
*/
public Builder addTimestamp(String name, DataType dataType) {
Ensures.ensure(dataType.isTimestamp(),
"Invalid timestamp data type: %s, only support `DataType.TimestampXXX`", dataType);
return addColumn(name, SemanticType.Timestamp, dataType);
}

/**
* Add field schema.
*
* @param name the name of this field
* @param dataType the data type of this field
* @return this builder
*/
public Builder addField(String name, DataType dataType) {
return addColumn(name, SemanticType.Field, dataType);
}

/**
* Add column schema.
*
Expand All @@ -96,12 +131,19 @@ public Builder addColumn(String name, SemanticType semanticType, DataType dataTy
* @param decimalTypeExtension the decimal type extension of this column(only for `DataType.Decimal128`)
* @return this builder
*/
public Builder addColumn(String name, SemanticType semanticType, DataType dataType,
public Builder addColumn(String name, //
SemanticType semanticType, //
DataType dataType, //
DataType.DecimalTypeExtension decimalTypeExtension) {
Ensures.ensureNonNull(name, "Null column name");
Ensures.ensureNonNull(semanticType, "Null semantic type");
Ensures.ensureNonNull(dataType, "Null data type");

if (semanticType == SemanticType.Timestamp) {
Ensures.ensure(dataType.isTimestamp(),
"Invalid timestamp data type: %s, only support `DataType.TimestampXXX`", dataType);
}

this.columnNames.add(name);
this.semanticTypes.add(semanticType.toProtoValue());
this.dataTypes.add(dataType.toProtoValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Pojo1Test {
long c;
@Column(name = "d", dataType = DataType.Float64)
double d;
@Column(name = "ts", dataType = DataType.TimeSecond, timestamp = true)
@Column(name = "ts", dataType = DataType.TimestampMillisecond, timestamp = true)
long ts;
}

Expand Down
7 changes: 3 additions & 4 deletions ingester-protocol/src/test/java/io/greptime/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.greptime;

import io.greptime.models.DataType;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.v1.Common;
Expand All @@ -32,9 +31,9 @@ public class TestUtil {

public static Collection<Table> testTable(String tableName, int rowCount) {
TableSchema tableSchema = TableSchema.newBuilder(tableName) //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("cpu", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("cpu", DataType.Float64) //
.build();

Table table = Table.from(tableSchema);
Expand Down
73 changes: 47 additions & 26 deletions ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.greptime.common.Endpoint;
import io.greptime.models.DataType;
import io.greptime.models.Err;
import io.greptime.models.IntervalMonthDayNano;
import io.greptime.models.Result;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
Expand All @@ -33,11 +34,9 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import java.math.BigDecimal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import static io.greptime.models.SemanticType.Field;
import static io.greptime.models.SemanticType.Tag;
import static io.greptime.models.SemanticType.Timestamp;

/**
* @author jiachun.fjc
Expand Down Expand Up @@ -67,36 +66,58 @@ public void after() {
@Test
public void testWriteSuccess() throws ExecutionException, InterruptedException {
TableSchema schema = TableSchema.newBuilder("test_table") //
.addColumn("test_tag", Tag, DataType.String) //
.addColumn("test_ts", Timestamp, DataType.Int64) //
.addColumn("field1", Field, DataType.Int8) //
.addColumn("field2", Field, DataType.Int16) //
.addColumn("field3", Field, DataType.Int32) //
.addColumn("field4", Field, DataType.Int64) //
.addColumn("field5", Field, DataType.UInt8) //
.addColumn("field6", Field, DataType.UInt16) //
.addColumn("field7", Field, DataType.UInt32) //
.addColumn("field8", Field, DataType.UInt64) //
.addColumn("field9", Field, DataType.Float32) //
.addColumn("field10", Field, DataType.Float64) //
.addColumn("field11", Field, DataType.Bool) //
.addColumn("field12", Field, DataType.Binary) //
.addColumn("field13", Field, DataType.Date) //
.addColumn("field14", Field, DataType.DateTime) //
.addColumn("field15", Field, DataType.TimestampSecond) //
.addColumn("field16", Field, DataType.TimestampMillisecond) //
.addColumn("field17", Field, DataType.TimestampNanosecond) //
.addTag("test_tag", DataType.String) //
.addTimestamp("test_ts", DataType.TimestampMillisecond) //
.addField("field1", DataType.Int8) //
.addField("field2", DataType.Int16) //
.addField("field3", DataType.Int32) //
.addField("field4", DataType.Int64) //
.addField("field5", DataType.UInt8) //
.addField("field6", DataType.UInt16) //
.addField("field7", DataType.UInt32) //
.addField("field8", DataType.UInt64) //
.addField("field9", DataType.Float32) //
.addField("field10", DataType.Float64) //
.addField("field11", DataType.Bool) //
.addField("field12", DataType.Binary) //
.addField("field13", DataType.Date) //
.addField("field14", DataType.DateTime) //
.addField("field15", DataType.TimestampSecond) //
.addField("field16", DataType.TimestampMillisecond) //
.addField("field17", DataType.TimestampMicrosecond) //
.addField("field18", DataType.TimestampNanosecond) //
.addField("field19", DataType.TimeSecond) //
.addField("field20", DataType.TimeMilliSecond) //
.addField("field21", DataType.TimeMicroSecond) //
.addField("field22", DataType.TimeNanoSecond) //
.addField("field23", DataType.IntervalYearMonth) //
.addField("field24", DataType.IntervalDayTime) //
.addField("field25", DataType.IntervalMonthDayNano) //
.addField("field26", DataType.Decimal128) //
.build();
Table table = Table.from(schema);
long ts = System.currentTimeMillis();

table.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L);
table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15);
table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15);
Object[] row1 =
new Object[] {"tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(1, 2, 3),
BigDecimal.valueOf(123.456)};
Object[] row2 =
new Object[] {"tag2", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(4, 5, 6),
BigDecimal.valueOf(123.456)};
Object[] row3 =
new Object[] {"tag3", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(7, 8, 9),
BigDecimal.valueOf(123.456)};
table.addRow(row1);
table.addRow(row2);
table.addRow(row3);

Endpoint addr = Endpoint.parse("127.0.0.1:8081");
Database.GreptimeResponse response = Database.GreptimeResponse.newBuilder() //
.setAffectedRows(Common.AffectedRows.newBuilder().setValue(3)) //
.setAffectedRows(Common.AffectedRows.newBuilder() //
.setValue(3)) //
.build();

Mockito.when(this.routerClient.route()) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void blockingWriteLimitTest() throws InterruptedException {
try {
limiter.acquireAndDo(rows, this::emptyOk);
alwaysFalse.set(true);
} catch (final Throwable err) {
} catch (Throwable err) {
// noinspection ConstantConditions
Assert.assertTrue(err instanceof InterruptedException);
}
Expand Down
Loading

0 comments on commit 7b6bcc1

Please sign in to comment.