Skip to content

Commit

Permalink
feat: refactor high level API
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Feb 27, 2024
1 parent cf5d4f1 commit 94dcb67
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
/**
* @author jiachun.fjc
*/
public class StreamWritePOJOsQuickStart {
public class HighLevelApiStreamWriteQuickStart {

private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class);
private static final Logger LOG = LoggerFactory.getLogger(HighLevelApiStreamWriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
Expand All @@ -52,7 +52,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
memories.add(m);
}

StreamWriter<List<?>, WriteOk> writer = greptimeDB.streamWriterPOJOs();
StreamWriter<List<?>, WriteOk> writer = greptimeDB.objectsStreamWriter();

// write data into stream
writer.write(cpus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
/**
* @author jiachun.fjc
*/
public class WritePOJOsQuickStart {
public class HighLevelApiWriteQuickStart {

private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class);
private static final Logger LOG = LoggerFactory.getLogger(HighLevelApiWriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
Expand All @@ -58,7 +58,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
// For performance reasons, the SDK is designed to be purely asynchronous.
// The return value is a future object. If you want to immediately obtain
// the result, you can call `future.get()`.
CompletableFuture<Result<WriteOk, Err>> puts = greptimeDB.writePOJOs(cpus, memories);
CompletableFuture<Result<WriteOk, Err>> puts = greptimeDB.writeObjects(cpus, memories);

Result<WriteOk, Err> result = puts.get();

Expand All @@ -71,8 +71,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
LOG.error("Failed to write: {}", simpleResult.getErr());
}

List<List<?>> delete_pojos = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5));
Result<WriteOk, Err> deletes = greptimeDB.writePOJOs(delete_pojos, WriteOp.Delete).get();
List<List<?>> deletePojoObjects = Arrays.asList(cpus.subList(0, 5), memories.subList(0, 5));
Result<WriteOk, Err> deletes = greptimeDB.writeObjects(deletePojoObjects, WriteOp.Delete).get();

if (deletes.isOk()) {
LOG.info("Delete result: {}", result.getOk());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
/**
* @author jiachun.fjc
*/
public class StreamWriteQuickStart {
public class LowLevelApiStreamWriteQuickStart {

private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class);
private static final Logger LOG = LoggerFactory.getLogger(LowLevelApiStreamWriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
/**
* @author jiachun.fjc
*/
public class WriteQuickStart {
public class LowLevelApiWriteQuickStart {

private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class);
private static final Logger LOG = LoggerFactory.getLogger(LowLevelApiWriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
Expand Down
2 changes: 1 addition & 1 deletion ingester-example/src/main/java/io/greptime/QueryJDBC.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ public static void insertData(GreptimeDB greptimeDB) throws ExecutionException,
c.setCpuSys(i + 0.12);
cpus.add(c);
}
greptimeDB.writePOJOs(cpus).get();
greptimeDB.writeObjects(cpus).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
* @author jiachun.fjc
*/
@SPI(priority = 9)
public class CachedPojoMapper implements PojoMapper {
public class CachedPojoObjectMapper implements PojoObjectMapper {

private final LoadingCache<Class<?>, Map<String, Field>> classFieldCache;

public CachedPojoMapper() {
public CachedPojoObjectMapper() {
this(1024);
}

public CachedPojoMapper(int maxCachedPOJOs) {
public CachedPojoObjectMapper(int maxCachedPOJOs) {
this.classFieldCache =
CacheBuilder.newBuilder().maximumSize(maxCachedPOJOs)
.build(new CacheLoader<Class<?>, Map<String, Field>>() {
Expand All @@ -60,11 +60,11 @@ public Map<String, Field> load(Class<?> key) {
}

@Override
public <M> Table mapToTable(List<M> pojos) {
Ensures.ensureNonNull(pojos, "pojos");
Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty");
public <M> Table mapToTable(List<M> pojoObjects) {
Ensures.ensureNonNull(pojoObjects, "pojoObjects");
Ensures.ensure(!pojoObjects.isEmpty(), "pojoObjects can not be empty");

M first = pojos.get(0);
M first = pojoObjects.get(0);

Class<?> metricType = first.getClass();

Expand All @@ -88,7 +88,7 @@ public <M> Table mapToTable(List<M> pojos) {
}

Table table = Table.from(schemaBuilder.build());
for (M pojo : pojos) {
for (M pojo : pojoObjects) {
Class<?> type = pojo.getClass();
if (!type.equals(metricType)) {
throw new PojoException("All POJOs must be of the same type");
Expand Down
23 changes: 12 additions & 11 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
*
* @author jiachun.fjc
*/
public class GreptimeDB implements Write, WritePOJO, Lifecycle<GreptimeOptions>, Display {
public class GreptimeDB implements Write, WriteObject, Lifecycle<GreptimeOptions>, Display {

private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class);

Expand All @@ -63,7 +63,7 @@ public class GreptimeDB implements Write, WritePOJO, Lifecycle<GreptimeOptions>,
private static final String VERSION = Util.clientVersion();
private static final String NODE_ID = UUID.randomUUID().toString();

private static final PojoMapper POJO_MAPPER = getDefaultPojoMapper();
private static final PojoObjectMapper POJO_OBJECT_MAPPER = getDefaultPojoObjectMapper();

private final int id;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -140,21 +140,22 @@ public void ensureInitialized() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp, Context ctx) {
List<Table> rows = new ArrayList<>(pojos.size());
for (List<?> pojo : pojos) {
rows.add(POJO_MAPPER.mapToTable(pojo));
public CompletableFuture<Result<WriteOk, Err>> writeObjects(Collection<List<?>> objects, WriteOp writeOp,
Context ctx) {
List<Table> rows = new ArrayList<>(objects.size());
for (List<?> pojo : objects) {
rows.add(POJO_OBJECT_MAPPER.mapToTable(pojo));
}
return write(rows, writeOp, ctx);
}

@Override
public StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, Context ctx) {
public StreamWriter<List<?>, WriteOk> objectsStreamWriter(int maxPointsPerSecond, Context ctx) {
StreamWriter<Table, WriteOk> delegate = streamWriter(maxPointsPerSecond, ctx);
return new StreamWriter<List<?>, WriteOk>() {
@Override
public StreamWriter<List<?>, WriteOk> write(List<?> val, WriteOp writeOp) {
Table table = POJO_MAPPER.mapToTable(val);
Table table = POJO_OBJECT_MAPPER.mapToTable(val);
delegate.write(table, writeOp);
return this;
}
Expand Down Expand Up @@ -320,12 +321,12 @@ private void truncateBuf() {
}
}

private static PojoMapper getDefaultPojoMapper() {
private static PojoObjectMapper getDefaultPojoObjectMapper() {
try {
return ServiceLoader.load(PojoMapper.class).first();
return ServiceLoader.load(PojoObjectMapper.class).first();
} catch (Throwable t) {
LOG.warn("Failed to load `PojoMapper`, use default: `CachedPojoMapper(1024)`", t);
return new CachedPojoMapper();
return new CachedPojoObjectMapper();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @author jiachun.fjc
*/
public interface PojoMapper {
public interface PojoObjectMapper {

<M> Table mapToTable(List<M> pojos);
<M> Table mapToTable(List<M> pojoObjects);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,53 +26,53 @@
import java.util.concurrent.CompletableFuture;

/**
* Write POJO API: writes data in POJO object format to the DB.
* Write POJO object API: writes data in POJO object format to the DB.
*
* @author jiachun.fjc
*/
public interface WritePOJO {
public interface WriteObject {
/**
* @see #writePOJOs(Collection, WriteOp, Context)
* @see #writeObjects(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> writePOJOs(List<?>... pojos) {
return writePOJOs(Arrays.asList(pojos));
default CompletableFuture<Result<WriteOk, Err>> writeObjects(List<?>... objects) {
return writeObjects(Arrays.asList(objects));
}
/**
* @see #writePOJOs(Collection, WriteOp, Context)
* @see #writeObjects(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos) {
return writePOJOs(pojos, WriteOp.Insert, Context.newDefault());
default CompletableFuture<Result<WriteOk, Err>> writeObjects(Collection<List<?>> objects) {
return writeObjects(objects, WriteOp.Insert, Context.newDefault());
}

/**
* @see #writePOJOs(Collection, WriteOp, Context)
* @see #writeObjects(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp) {
return writePOJOs(pojos, writeOp, Context.newDefault());
default CompletableFuture<Result<WriteOk, Err>> writeObjects(Collection<List<?>> objects, WriteOp writeOp) {
return writeObjects(objects, writeOp, Context.newDefault());
}

/**
* Write multiple rows of data (which can belong to multiple tables) to the database at once.
*
* @param pojos a collection of data to be written, classified by table
* @param objects a collection of data to be written, classified by table
* @param writeOp write operation(insert or delete)
* @param ctx invoke context
* @return write result
*/
CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp, Context ctx);
CompletableFuture<Result<WriteOk, Err>> writeObjects(Collection<List<?>> objects, WriteOp writeOp, Context ctx);

/**
* @see #streamWriterPOJOs(int, Context)
* @see #objectsStreamWriter(int, Context)
*/
default StreamWriter<List<?>, WriteOk> streamWriterPOJOs() {
return streamWriterPOJOs(-1);
default StreamWriter<List<?>, WriteOk> objectsStreamWriter() {
return objectsStreamWriter(-1);
}

/**
* @see #streamWriterPOJOs(int, Context)
* @see #objectsStreamWriter(int, Context)
*/
default StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond) {
return streamWriterPOJOs(maxPointsPerSecond, Context.newDefault());
default StreamWriter<List<?>, WriteOk> objectsStreamWriter(int maxPointsPerSecond) {
return objectsStreamWriter(maxPointsPerSecond, Context.newDefault());
}

/**
Expand All @@ -90,5 +90,5 @@ default StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond)
* @param ctx invoke context
* @return a stream writer instance
*/
StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, Context ctx);
StreamWriter<List<?>, WriteOk> objectsStreamWriter(int maxPointsPerSecond, Context ctx);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.greptime.CachedPojoObjectMapper
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* @author jiachun.fjc
*/
public class PojoMapperTest {
public class PojoObjectMapperTest {

@Test
public void testToTable() {
Expand All @@ -37,7 +37,7 @@ public void testToTable() {
Pojo1Test pojo1 = createNewPojo1Test();
pojos1.add(pojo1);
}
Table tp1 = new CachedPojoMapper().mapToTable(pojos1);
Table tp1 = new CachedPojoObjectMapper().mapToTable(pojos1);
Assert.assertEquals("pojo1", tp1.tableName());
Assert.assertEquals(50, tp1.pointCount());

Expand All @@ -47,7 +47,7 @@ public void testToTable() {
Pojo2Test pojo2 = createNewPojo2Test();
pojos2.add(pojo2);
}
Table tp2 = new CachedPojoMapper().mapToTable(pojos2);
Table tp2 = new CachedPojoObjectMapper().mapToTable(pojos2);
Assert.assertEquals("pojo2", tp2.tableName());
Assert.assertEquals(30, tp2.pointCount());
}
Expand Down

0 comments on commit 94dcb67

Please sign in to comment.