Skip to content

Commit

Permalink
feat: max inflight write points limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Feb 4, 2024
1 parent b12eed3 commit 2b530a6
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 21 deletions.
6 changes: 3 additions & 3 deletions ingester-example/src/main/java/io/greptime/TestConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public static GreptimeDB connectToDefaultDB() {
.writeMaxRetries(1)
// Optional, the default value is fine.
//
// Write flow limit: maximum number of data rows in-flight. It does not take effect on `StreamWriter`
// The default is 65536
.maxInFlightWriteRows(65536)
// Write flow limit: maximum number of data points in-flight. It does not take effect on `StreamWriter`
// The default is 10 * 65536
.maxInFlightWritePoints(10 * 65536)
// Optional, the default value is fine.
//
// Write flow limit: the policy to use when the write flow limit is exceeded.
Expand Down
5 changes: 3 additions & 2 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public boolean init(WriteOptions opts) {
Executor pool = this.opts.getAsyncPool();
this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client");
this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time");
this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy());
this.writeLimiter =
new DefaultWriteLimiter(this.opts.getMaxInFlightWritePoints(), this.opts.getLimitedPolicy());
return true;
}

Expand Down Expand Up @@ -313,7 +314,7 @@ public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) {

@Override
public int calculatePermits(Collection<Table> in) {
return in.stream().map(Table::rowCount).reduce(0, Integer::sum);
return in.stream().map(Table::pointCount).reduce(0, Integer::sum);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public static final class Builder {
// Rpc options, in general the default configuration is fine.
private RpcOptions rpcOptions = RpcOptions.newDefault();
private int writeMaxRetries = 1;
// Write flow limit: maximum number of data rows in-flight.
private int maxInFlightWriteRows = 65536;
// Write flow limit: maximum number of data points in-flight.
private int maxInFlightWritePoints = 10 * 65536;
private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy();
private int defaultStreamMaxWritePointsPerSecond = 10 * 65536;
// Refresh frequency of route tables. The background refreshes all route tables periodically.
Expand Down Expand Up @@ -202,13 +202,13 @@ public Builder writeMaxRetries(int maxRetries) {
}

/**
* Write flow limit: maximum number of data rows in-flight.
* Write flow limit: maximum number of data points in-flight.
*
* @param maxInFlightWriteRows max in-flight rows
* @param maxInFlightWritePoints max in-flight points
* @return this builder
*/
public Builder maxInFlightWriteRows(int maxInFlightWriteRows) {
this.maxInFlightWriteRows = maxInFlightWriteRows;
public Builder maxInFlightWritePoints(int maxInFlightWritePoints) {
this.maxInFlightWritePoints = maxInFlightWritePoints;
return this;
}

Expand Down Expand Up @@ -308,7 +308,7 @@ private WriteOptions writeOptions() {
writeOpts.setAuthInfo(this.authInfo);
writeOpts.setAsyncPool(this.asyncPool);
writeOpts.setMaxRetries(this.writeMaxRetries);
writeOpts.setMaxInFlightWriteRows(this.maxInFlightWriteRows);
writeOpts.setMaxInFlightWritePoints(this.maxInFlightWritePoints);
writeOpts.setLimitedPolicy(this.writeLimitedPolicy);
writeOpts.setDefaultStreamMaxWritePointsPerSecond(this.defaultStreamMaxWritePointsPerSecond);
return writeOpts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class WriteOptions implements Copiable<WriteOptions> {
private Executor asyncPool;
private int maxRetries = 1;
// Write flow limit: maximum number of data rows in-flight.
private int maxInFlightWriteRows = 65536;
private int maxInFlightWritePoints = 10 * 65536;
private LimitedPolicy limitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy();
// Default rate limit for stream writer
private int defaultStreamMaxWritePointsPerSecond = 10 * 65536;
Expand Down Expand Up @@ -78,12 +78,12 @@ public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public int getMaxInFlightWriteRows() {
return maxInFlightWriteRows;
public int getMaxInFlightWritePoints() {
return maxInFlightWritePoints;
}

public void setMaxInFlightWriteRows(int maxInFlightWriteRows) {
this.maxInFlightWriteRows = maxInFlightWriteRows;
public void setMaxInFlightWritePoints(int maxInFlightWritePoints) {
this.maxInFlightWritePoints = maxInFlightWritePoints;
}

public LimitedPolicy getLimitedPolicy() {
Expand All @@ -110,7 +110,7 @@ public WriteOptions copy() {
opts.routerClient = this.routerClient;
opts.asyncPool = this.asyncPool;
opts.maxRetries = this.maxRetries;
opts.maxInFlightWriteRows = this.maxInFlightWriteRows;
opts.maxInFlightWritePoints = this.maxInFlightWritePoints;
opts.limitedPolicy = this.limitedPolicy;
opts.defaultStreamMaxWritePointsPerSecond = this.defaultStreamMaxWritePointsPerSecond;
return opts;
Expand All @@ -124,7 +124,7 @@ public String toString() {
", routerClient=" + routerClient + //
", asyncPool=" + asyncPool + //
", maxRetries=" + maxRetries + //
", maxInFlightWriteRows=" + maxInFlightWriteRows + //
", maxInFlightWritePoints=" + maxInFlightWritePoints + //
", limitedPolicy=" + limitedPolicy + //
", defaultStreamMaxWritePointsPerSecond=" + defaultStreamMaxWritePointsPerSecond + //
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testAllOptions() {
.asyncPool(asyncPool) //
.rpcOptions(rpcOptions) //
.writeMaxRetries(writeMaxRetries) //
.maxInFlightWriteRows(maxInFlightWriteRows) //
.maxInFlightWritePoints(maxInFlightWriteRows) //
.writeLimitedPolicy(limitedPolicy) //
.defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) //
.routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) //
Expand All @@ -71,7 +71,7 @@ public void testAllOptions() {
Assert.assertNotNull(writeOptions);
Assert.assertEquals(asyncPool, writeOptions.getAsyncPool());
Assert.assertEquals(writeMaxRetries, writeOptions.getMaxRetries());
Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWriteRows());
Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWritePoints());
Assert.assertEquals(limitedPolicy, writeOptions.getLimitedPolicy());
Assert.assertEquals(defaultStreamMaxWritePointsPerSecond, writeOptions.getDefaultStreamMaxWritePointsPerSecond());
Assert.assertEquals(authInfo, writeOptions.getAuthInfo());
Expand Down

0 comments on commit 2b530a6

Please sign in to comment.