Skip to content

Commit

Permalink
feat: support delete
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Dec 21, 2023
1 parent a7f5a56 commit be1d061
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 72 deletions.
12 changes: 6 additions & 6 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,22 @@ public void ensureInitialized() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp, Context ctx) {
List<TableRows> rows = new ArrayList<>(pojos.size());
for (List<?> pojo : pojos) {
rows.add(this.pojoMapper.toTableRows(pojo));
}
return write(rows, ctx);
return write(rows, writeOp, ctx);
}

@Override
public StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, Context ctx) {
StreamWriter<TableRows, WriteOk> delegate = streamWriter(maxPointsPerSecond, ctx);
return new StreamWriter<List<?>, WriteOk>() {
@Override
public StreamWriter<List<?>, WriteOk> write(List<?> val) {
public StreamWriter<List<?>, WriteOk> write(List<?> val, WriteOp writeOp) {
TableRows rows = pojoMapper.toTableRows(val);
delegate.write(rows);
delegate.write(rows, writeOp);
return this;
}

Expand All @@ -157,9 +157,9 @@ public CompletableFuture<WriteOk> completed() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp, Context ctx) {
ensureInitialized();
return this.writeClient.write(rows, attachCtx(ctx));
return this.writeClient.write(rows, writeOp, attachCtx(ctx));
}

@Override
Expand Down
10 changes: 9 additions & 1 deletion ingester-protocol/src/main/java/io/greptime/StreamWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@
*/
public interface StreamWriter<V, R> {

/**
* @see #write(Object, WriteOp)
*/
default StreamWriter<V, R> write(V val) {
return write(val, WriteOp.Insert);
}

/**
* Write data to this stream.
*
* @param val data value
* @param writeOp write operation(insert or delete)
* @return this
*/
StreamWriter<V, R> write(V val);
StreamWriter<V, R> write(V val, WriteOp writeOp);

/**
* Tell server that the stream-write has completed.
Expand Down
15 changes: 12 additions & 3 deletions ingester-protocol/src/main/java/io/greptime/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,30 @@
* @author jiachun.fjc
*/
public interface Write {

/**
* @see #write(Collection, Context)
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows) {
return write(rows, Context.newDefault());
return write(rows, WriteOp.Insert, Context.newDefault());
}

/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp) {
return write(rows, writeOp, Context.newDefault());
}

/**
* Write multi tables multi rows data to database.
*
* @param rows rows with multi tables
* @param writeOp write operation(insert or delete)
* @param ctx invoke context
* @return write result
*/
CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, Context ctx);
CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp, Context ctx);

/**
* @see #streamWriter(int, Context)
Expand Down
76 changes: 49 additions & 27 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.greptime.models.WriteOk;
import io.greptime.models.TableRows;
import io.greptime.models.TableRowsHelper;
import io.greptime.models.WriteTable;
import io.greptime.options.WriteOptions;
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
Expand Down Expand Up @@ -79,26 +80,27 @@ public void shutdownGracefully() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp, Context ctx) {
Ensures.ensureNonNull(rows, "null `rows`");
Ensures.ensure(!rows.isEmpty(), "empty `rows`");

long startCall = Clock.defaultClock().getTick();

return this.writeLimiter.acquireAndDo(rows, () -> write0(rows, ctx, 0).whenCompleteAsync((r, e) -> {
return this.writeLimiter.acquireAndDo(rows, () -> write0(rows, writeOp, ctx, 0).whenCompleteAsync((r, e) -> {
InnerMetricHelper.writeQps().mark();
if (r != null) {
if (Util.isRwLogging()) {
LOG.info("Write to {}, duration={} ms, result={}.",
LOG.info("Write to {} with operation {}, duration={} ms, result={}.",
Keys.DB_NAME,
writeOp,
Clock.defaultClock().duration(startCall),
r
);
}
if (r.isOk()) {
WriteOk ok = r.getOk();
InnerMetricHelper.writeRowsSuccessNum().update(ok.getSuccess());
InnerMetricHelper.writeRowsFailureNum().update(ok.getFailure());
InnerMetricHelper.writeRowsSuccessNum(writeOp).update(ok.getSuccess());
InnerMetricHelper.writeRowsFailureNum(writeOp).update(ok.getFailure());
return;
}
}
Expand All @@ -117,11 +119,11 @@ public StreamWriter<TableRows, WriteOk> streamWriter(int maxPointsPerSecond, Con
.thenApply(reqObserver -> new RateLimitingStreamWriter(reqObserver, permitsPerSecond) {

@Override
public StreamWriter<TableRows, WriteOk> write(TableRows rows) {
public StreamWriter<TableRows, WriteOk> write(TableRows rows, WriteOp writeOp) {
if (respFuture.isCompletedExceptionally()) {
respFuture.getNow(null); // throw the exception now
}
return super.write(rows); // may wait
return super.write(rows, writeOp); // may wait
}

@Override
Expand All @@ -132,11 +134,11 @@ public CompletableFuture<WriteOk> completed() {
}).join();
}

private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> rows, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> rows, WriteOp writeOp, Context ctx, int retries) {
InnerMetricHelper.writeByRetries(retries).mark();

return this.routerClient.route()
.thenComposeAsync(endpoint -> writeTo(endpoint, rows, ctx, retries), this.asyncPool)
.thenComposeAsync(endpoint -> writeTo(endpoint, rows, writeOp, ctx, retries), this.asyncPool)
.thenComposeAsync(r -> {
if (r.isOk()) {
LOG.debug("Success to write to {}, ok={}.", Keys.DB_NAME, r.getOk());
Expand All @@ -154,13 +156,15 @@ private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> row
return Util.completedCf(r);
}

return write0(rows, ctx, retries + 1);
return write0(rows, writeOp, ctx, retries + 1);
}, this.asyncPool);
}

private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<TableRows> rows, Context ctx, int retries) {
Collection<TableName> tableNames = rows.stream().map(TableRows::tableName).collect(Collectors.toList());
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(tableNames, rows, this.opts.getAuthInfo());
private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<TableRows> rows, WriteOp writeOp, Context ctx, int retries) {
Collection<TableName> tableNames = rows.stream() //
.map(TableRows::tableName) //
.collect(Collectors.toList());
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(tableNames, rows, writeOp, this.opts.getAuthInfo());
ctx.with("retries", retries);

CompletableFuture<Database.GreptimeResponse> future = this.routerClient.invoke(endpoint, req, ctx);
Expand All @@ -178,7 +182,7 @@ private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Colle
}, this.asyncPool);
}

private Observer<TableRows> streamWriteTo(Endpoint endpoint, Context ctx, Observer<WriteOk> respObserver) {
private Observer<WriteTable> streamWriteTo(Endpoint endpoint, Context ctx, Observer<WriteOk> respObserver) {
Observer<Database.GreptimeRequest> rpcObserver =
this.routerClient.invokeClientStreaming(endpoint, Database.GreptimeRequest.getDefaultInstance(), ctx,
new Observer<Database.GreptimeResponse>() {
Expand All @@ -205,12 +209,14 @@ public void onCompleted() {
}
});

return new Observer<TableRows>() {
return new Observer<WriteTable>() {

@Override
public void onNext(TableRows rows) {
public void onNext(WriteTable writeTable) {
TableRows rows = writeTable.getRows();
WriteOp writeOp = writeTable.getWriteOp();
Database.GreptimeRequest req =
TableRowsHelper.toGreptimeRequest(rows, WriteClient.this.opts.getAuthInfo());
TableRowsHelper.toGreptimeRequest(rows, writeOp, WriteClient.this.opts.getAuthInfo());
rpcObserver.onNext(req);
}

Expand Down Expand Up @@ -245,19 +251,35 @@ public String toString() {
}

static final class InnerMetricHelper {
static final Histogram WRITE_ROWS_SUCCESS_NUM = MetricsUtil.histogram("write_rows_success_num");
static final Histogram WRITE_ROWS_FAILURE_NUM = MetricsUtil.histogram("write_rows_failure_num");
static final Histogram INSERT_ROWS_SUCCESS_NUM = MetricsUtil.histogram("insert_rows_success_num");
static final Histogram DELETE_ROWS_SUCCESS_NUM = MetricsUtil.histogram("delete_rows_success_num");
static final Histogram INSERT_ROWS_FAILURE_NUM = MetricsUtil.histogram("insert_rows_failure_num");
static final Histogram DELETE_ROWS_FAILURE_NUM = MetricsUtil.histogram("delete_rows_failure_num");
static final Histogram WRITE_STREAM_LIMITER_TIME_SPENT = MetricsUtil
.histogram("write_stream_limiter_time_spent");
static final Meter WRITE_FAILURE_NUM = MetricsUtil.meter("write_failure_num");
static final Meter WRITE_QPS = MetricsUtil.meter("write_qps");

static Histogram writeRowsSuccessNum() {
return WRITE_ROWS_SUCCESS_NUM;
static Histogram writeRowsSuccessNum(WriteOp writeOp) {
switch (writeOp) {
case Insert:
return INSERT_ROWS_SUCCESS_NUM;
case Delete:
return DELETE_ROWS_SUCCESS_NUM;
default:
throw new IllegalArgumentException("Unsupported write operation: " + writeOp);
}
}

static Histogram writeRowsFailureNum() {
return WRITE_ROWS_FAILURE_NUM;
static Histogram writeRowsFailureNum(WriteOp writeOp) {
switch (writeOp) {
case Insert:
return INSERT_ROWS_FAILURE_NUM;
case Delete:
return DELETE_ROWS_FAILURE_NUM;
default:
throw new IllegalArgumentException("Unsupported write operation: " + writeOp);
}
}

static Histogram writeStreamLimiterTimeSpent() {
Expand Down Expand Up @@ -303,10 +325,10 @@ public Result<WriteOk, Err> rejected(Collection<TableRows> in, RejectedState sta
@SuppressWarnings("UnstableApiUsage")
static abstract class RateLimitingStreamWriter implements StreamWriter<TableRows, WriteOk> {

private final Observer<TableRows> observer;
private final Observer<WriteTable> observer;
private final RateLimiter rateLimiter;

RateLimitingStreamWriter(Observer<TableRows> observer, double permitsPerSecond) {
RateLimitingStreamWriter(Observer<WriteTable> observer, double permitsPerSecond) {
this.observer = observer;
if (permitsPerSecond > 0) {
this.rateLimiter = RateLimiter.create(permitsPerSecond);
Expand All @@ -316,14 +338,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter<TableRows
}

@Override
public StreamWriter<TableRows, WriteOk> write(TableRows rows) {
public StreamWriter<TableRows, WriteOk> write(TableRows rows, WriteOp writeOp) {
Ensures.ensureNonNull(rows, "null `rows`");

if (this.rateLimiter != null) {
double timeSpent = this.rateLimiter.acquire(rows.pointCount());
InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent);
}
this.observer.onNext(rows);
this.observer.onNext(new WriteTable(rows, writeOp));
return this;
}
}
Expand Down
23 changes: 23 additions & 0 deletions ingester-protocol/src/main/java/io/greptime/WriteOp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.greptime;

/**
* @author jiachun.fjc
*/
public enum WriteOp {
Insert, Delete,
}
14 changes: 11 additions & 3 deletions ingester-protocol/src/main/java/io/greptime/WritePOJO.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,28 @@
*/
public interface WritePOJO {
/**
* @see #writePOJOs(Collection, Context)
* @see #writePOJOs(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos) {
return writePOJOs(pojos, Context.newDefault());
return writePOJOs(pojos, WriteOp.Insert, Context.newDefault());
}

/**
* @see #writePOJOs(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp) {
return writePOJOs(pojos, writeOp, Context.newDefault());
}

/**
* Write multi tables multi rows data to database.
*
* @param pojos rows with multi tables
* @param writeOp write operation(insert or delete)
* @param ctx invoke context
* @return write result
*/
CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, Context ctx);
CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp, Context ctx);

/**
* @see #streamWriterPOJOs(int, Context)
Expand Down
Loading

0 comments on commit be1d061

Please sign in to comment.