Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: friendly api for table schema #26

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.greptime;

import io.greptime.models.DataType;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
Expand All @@ -36,16 +35,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
.addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("cpu_user", DataType.Float64) //
.addField("cpu_sys", DataType.Float64) //
.build();

TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("mem_usage", DataType.Float64) //
.build();

Table cpuMetric = Table.from(cpuMetricSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc

Result<WriteOk, Err> result = puts.get();

if (result.isOk()) {
LOG.info("Write result: {}", result.getOk());
Result<Integer, String> simpleResult = result //
.map(WriteOk::getSuccess) //
.mapErr(err -> err.getError().getMessage());
if (simpleResult.isOk()) {
LOG.info("Write success: {}", simpleResult.getOk());
} else {
LOG.error("Failed to write: {}", result.getErr());
LOG.error("Failed to write: {}", simpleResult.getErr());
}

List<List<?>> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5));
Expand Down
24 changes: 13 additions & 11 deletions ingester-example/src/main/java/io/greptime/WriteQuickStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.greptime.models.DataType;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
Expand All @@ -40,16 +39,16 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();

TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("cpu_user", SemanticType.Field, DataType.Float64) //
.addColumn("cpu_sys", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("cpu_user", DataType.Float64) //
.addField("cpu_sys", DataType.Float64) //
.build();

TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric") //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("mem_usage", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("mem_usage", DataType.Float64) //
.build();

Table cpuMetric = Table.from(cpuMetricSchema);
Expand Down Expand Up @@ -77,10 +76,13 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc

Result<WriteOk, Err> result = future.get();

if (result.isOk()) {
LOG.info("Write result: {}", result.getOk());
Result<Integer, String> simpleResult = result //
.map(WriteOk::getSuccess) //
.mapErr(err -> err.getError().getMessage());
if (simpleResult.isOk()) {
LOG.info("Write success: {}", simpleResult.getOk());
} else {
LOG.error("Failed to write: {}", result.getErr());
LOG.error("Failed to write: {}", simpleResult.getErr());
}

List<Table> delete_objs = Arrays.asList(cpuMetric.subRange(0, 5), memMetric.subRange(0, 5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public enum DataType {
Decimal128, //
;

public boolean isTimestamp() {
return this == TimestampSecond || this == TimestampMillisecond || this == TimestampMicrosecond
|| this == TimestampNanosecond;
}

public Common.ColumnDataType toProtoValue() {
switch (this) {
case Bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public static void addValue(RowData.Row.Builder builder, //
case TIMESTAMP_MILLISECOND:
valueBuilder.setTimestampMillisecondValue(Util.getLongValue(value));
break;
case TIMESTAMP_MICROSECOND:
valueBuilder.setTimestampMicrosecondValue(Util.getLongValue(value));
break;
case TIMESTAMP_NANOSECOND:
valueBuilder.setTimestampNanosecondValue(Util.getLongValue(value));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,41 @@ public Builder(String tableName) {
this.tableName = tableName;
}

/**
* Add tag schema.
*
* @param name the name of this tag
* @param dataType the data type of this tag
* @return this builder
*/
public Builder addTag(String name, DataType dataType) {
return addColumn(name, SemanticType.Tag, dataType);
}

/**
* Add timestamp schema.
*
* @param name the name of this timestamp
* @param dataType the data type of this timestamp
* @return this builder
*/
public Builder addTimestamp(String name, DataType dataType) {
Ensures.ensure(dataType.isTimestamp(),
"Invalid timestamp data type: %s, only support `DataType.TimestampXXX`", dataType);
return addColumn(name, SemanticType.Timestamp, dataType);
}

/**
* Add field schema.
*
* @param name the name of this field
* @param dataType the data type of this field
* @return this builder
*/
public Builder addField(String name, DataType dataType) {
return addColumn(name, SemanticType.Field, dataType);
}

/**
* Add column schema.
*
Expand All @@ -96,12 +131,19 @@ public Builder addColumn(String name, SemanticType semanticType, DataType dataTy
* @param decimalTypeExtension the decimal type extension of this column(only for `DataType.Decimal128`)
* @return this builder
*/
public Builder addColumn(String name, SemanticType semanticType, DataType dataType,
public Builder addColumn(String name, //
SemanticType semanticType, //
DataType dataType, //
DataType.DecimalTypeExtension decimalTypeExtension) {
Ensures.ensureNonNull(name, "Null column name");
Ensures.ensureNonNull(semanticType, "Null semantic type");
Ensures.ensureNonNull(dataType, "Null data type");

if (semanticType == SemanticType.Timestamp) {
Ensures.ensure(dataType.isTimestamp(),
"Invalid timestamp data type: %s, only support `DataType.TimestampXXX`", dataType);
}

this.columnNames.add(name);
this.semanticTypes.add(semanticType.toProtoValue());
this.dataTypes.add(dataType.toProtoValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Pojo1Test {
long c;
@Column(name = "d", dataType = DataType.Float64)
double d;
@Column(name = "ts", dataType = DataType.TimeSecond, timestamp = true)
@Column(name = "ts", dataType = DataType.TimestampMillisecond, timestamp = true)
long ts;
}

Expand Down
7 changes: 3 additions & 4 deletions ingester-protocol/src/test/java/io/greptime/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.greptime;

import io.greptime.models.DataType;
import io.greptime.models.SemanticType;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.v1.Common;
Expand All @@ -32,9 +31,9 @@ public class TestUtil {

public static Collection<Table> testTable(String tableName, int rowCount) {
TableSchema tableSchema = TableSchema.newBuilder(tableName) //
.addColumn("host", SemanticType.Tag, DataType.String) //
.addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) //
.addColumn("cpu", SemanticType.Field, DataType.Float64) //
.addTag("host", DataType.String) //
.addTimestamp("ts", DataType.TimestampMillisecond) //
.addField("cpu", DataType.Float64) //
.build();

Table table = Table.from(tableSchema);
Expand Down
73 changes: 47 additions & 26 deletions ingester-protocol/src/test/java/io/greptime/WriteClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.greptime.common.Endpoint;
import io.greptime.models.DataType;
import io.greptime.models.Err;
import io.greptime.models.IntervalMonthDayNano;
import io.greptime.models.Result;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
Expand All @@ -33,11 +34,9 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import java.math.BigDecimal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import static io.greptime.models.SemanticType.Field;
import static io.greptime.models.SemanticType.Tag;
import static io.greptime.models.SemanticType.Timestamp;

/**
* @author jiachun.fjc
Expand Down Expand Up @@ -67,36 +66,58 @@ public void after() {
@Test
public void testWriteSuccess() throws ExecutionException, InterruptedException {
TableSchema schema = TableSchema.newBuilder("test_table") //
.addColumn("test_tag", Tag, DataType.String) //
.addColumn("test_ts", Timestamp, DataType.Int64) //
.addColumn("field1", Field, DataType.Int8) //
.addColumn("field2", Field, DataType.Int16) //
.addColumn("field3", Field, DataType.Int32) //
.addColumn("field4", Field, DataType.Int64) //
.addColumn("field5", Field, DataType.UInt8) //
.addColumn("field6", Field, DataType.UInt16) //
.addColumn("field7", Field, DataType.UInt32) //
.addColumn("field8", Field, DataType.UInt64) //
.addColumn("field9", Field, DataType.Float32) //
.addColumn("field10", Field, DataType.Float64) //
.addColumn("field11", Field, DataType.Bool) //
.addColumn("field12", Field, DataType.Binary) //
.addColumn("field13", Field, DataType.Date) //
.addColumn("field14", Field, DataType.DateTime) //
.addColumn("field15", Field, DataType.TimestampSecond) //
.addColumn("field16", Field, DataType.TimestampMillisecond) //
.addColumn("field17", Field, DataType.TimestampNanosecond) //
.addTag("test_tag", DataType.String) //
.addTimestamp("test_ts", DataType.TimestampMillisecond) //
.addField("field1", DataType.Int8) //
.addField("field2", DataType.Int16) //
.addField("field3", DataType.Int32) //
.addField("field4", DataType.Int64) //
.addField("field5", DataType.UInt8) //
.addField("field6", DataType.UInt16) //
.addField("field7", DataType.UInt32) //
.addField("field8", DataType.UInt64) //
.addField("field9", DataType.Float32) //
.addField("field10", DataType.Float64) //
.addField("field11", DataType.Bool) //
.addField("field12", DataType.Binary) //
.addField("field13", DataType.Date) //
.addField("field14", DataType.DateTime) //
.addField("field15", DataType.TimestampSecond) //
.addField("field16", DataType.TimestampMillisecond) //
.addField("field17", DataType.TimestampMicrosecond) //
.addField("field18", DataType.TimestampNanosecond) //
.addField("field19", DataType.TimeSecond) //
.addField("field20", DataType.TimeMilliSecond) //
.addField("field21", DataType.TimeMicroSecond) //
.addField("field22", DataType.TimeNanoSecond) //
.addField("field23", DataType.IntervalYearMonth) //
.addField("field24", DataType.IntervalDayTime) //
.addField("field25", DataType.IntervalMonthDayNano) //
.addField("field26", DataType.Decimal128) //
.build();
Table table = Table.from(schema);
long ts = System.currentTimeMillis();

table.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L);
table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15);
table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15);
Object[] row1 =
new Object[] {"tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(1, 2, 3),
BigDecimal.valueOf(123.456)};
Object[] row2 =
new Object[] {"tag2", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(4, 5, 6),
BigDecimal.valueOf(123.456)};
Object[] row3 =
new Object[] {"tag3", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 13L, 14L, 15L, 16L,
17L, 18L, 19L, 20L, 21L, 22L, 23, 24L, new IntervalMonthDayNano(7, 8, 9),
BigDecimal.valueOf(123.456)};
table.addRow(row1);
table.addRow(row2);
table.addRow(row3);

Endpoint addr = Endpoint.parse("127.0.0.1:8081");
Database.GreptimeResponse response = Database.GreptimeResponse.newBuilder() //
.setAffectedRows(Common.AffectedRows.newBuilder().setValue(3)) //
.setAffectedRows(Common.AffectedRows.newBuilder() //
.setValue(3)) //
.build();

Mockito.when(this.routerClient.route()) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void blockingWriteLimitTest() throws InterruptedException {
try {
limiter.acquireAndDo(rows, this::emptyOk);
alwaysFalse.set(true);
} catch (final Throwable err) {
} catch (Throwable err) {
// noinspection ConstantConditions
Assert.assertTrue(err instanceof InterruptedException);
}
Expand Down
Loading
Loading