From 28f5fd793b97fd36fc24e79e84a9e288e36e2925 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Sun, 4 Feb 2024 11:57:09 +0800 Subject: [PATCH] feat: max inflight write points limiter (#28) --- .../src/main/java/io/greptime/TestConnector.java | 6 +++--- .../src/main/java/io/greptime/WriteClient.java | 5 +++-- .../java/io/greptime/options/GreptimeOptions.java | 14 +++++++------- .../java/io/greptime/options/WriteOptions.java | 14 +++++++------- .../io/greptime/options/GreptimeOptionsTest.java | 6 +++--- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java index 01d8197..471a553 100644 --- a/ingester-example/src/main/java/io/greptime/TestConnector.java +++ b/ingester-example/src/main/java/io/greptime/TestConnector.java @@ -54,9 +54,9 @@ public static GreptimeDB connectToDefaultDB() { .writeMaxRetries(1) // Optional, the default value is fine. // - // Write flow limit: maximum number of data rows in-flight. It does not take effect on `StreamWriter` - // The default is 65536 - .maxInFlightWriteRows(65536) + // Write flow limit: maximum number of data points in-flight. It does not take effect on `StreamWriter` + // The default is 10 * 65536 + .maxInFlightWritePoints(10 * 65536) // Optional, the default value is fine. // // Write flow limit: the policy to use when the write flow limit is exceeded. diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index 347253b..4cfc509 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -73,7 +73,8 @@ public boolean init(WriteOptions opts) { Executor pool = this.opts.getAsyncPool(); this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client"); this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time"); - this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy()); + this.writeLimiter = + new DefaultWriteLimiter(this.opts.getMaxInFlightWritePoints(), this.opts.getLimitedPolicy()); return true; } @@ -313,7 +314,7 @@ public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) { @Override public int calculatePermits(Collection in) { - return in.stream().map(Table::rowCount).reduce(0, Integer::sum); + return in.stream().map(Table::pointCount).reduce(0, Integer::sum); } @Override diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 9a3ff81..43f7597 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -145,8 +145,8 @@ public static final class Builder { // Rpc options, in general the default configuration is fine. private RpcOptions rpcOptions = RpcOptions.newDefault(); private int writeMaxRetries = 1; - // Write flow limit: maximum number of data rows in-flight. - private int maxInFlightWriteRows = 65536; + // Write flow limit: maximum number of data points in-flight. + private int maxInFlightWritePoints = 10 * 65536; private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy(); private int defaultStreamMaxWritePointsPerSecond = 10 * 65536; // Refresh frequency of route tables. The background refreshes all route tables periodically. @@ -202,13 +202,13 @@ public Builder writeMaxRetries(int maxRetries) { } /** - * Write flow limit: maximum number of data rows in-flight. + * Write flow limit: maximum number of data points in-flight. * - * @param maxInFlightWriteRows max in-flight rows + * @param maxInFlightWritePoints max in-flight points * @return this builder */ - public Builder maxInFlightWriteRows(int maxInFlightWriteRows) { - this.maxInFlightWriteRows = maxInFlightWriteRows; + public Builder maxInFlightWritePoints(int maxInFlightWritePoints) { + this.maxInFlightWritePoints = maxInFlightWritePoints; return this; } @@ -308,7 +308,7 @@ private WriteOptions writeOptions() { writeOpts.setAuthInfo(this.authInfo); writeOpts.setAsyncPool(this.asyncPool); writeOpts.setMaxRetries(this.writeMaxRetries); - writeOpts.setMaxInFlightWriteRows(this.maxInFlightWriteRows); + writeOpts.setMaxInFlightWritePoints(this.maxInFlightWritePoints); writeOpts.setLimitedPolicy(this.writeLimitedPolicy); writeOpts.setDefaultStreamMaxWritePointsPerSecond(this.defaultStreamMaxWritePointsPerSecond); return writeOpts; diff --git a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java index cdfd7de..6449a94 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/WriteOptions.java @@ -33,7 +33,7 @@ public class WriteOptions implements Copiable { private Executor asyncPool; private int maxRetries = 1; // Write flow limit: maximum number of data rows in-flight. - private int maxInFlightWriteRows = 65536; + private int maxInFlightWritePoints = 10 * 65536; private LimitedPolicy limitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy(); // Default rate limit for stream writer private int defaultStreamMaxWritePointsPerSecond = 10 * 65536; @@ -78,12 +78,12 @@ public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } - public int getMaxInFlightWriteRows() { - return maxInFlightWriteRows; + public int getMaxInFlightWritePoints() { + return maxInFlightWritePoints; } - public void setMaxInFlightWriteRows(int maxInFlightWriteRows) { - this.maxInFlightWriteRows = maxInFlightWriteRows; + public void setMaxInFlightWritePoints(int maxInFlightWritePoints) { + this.maxInFlightWritePoints = maxInFlightWritePoints; } public LimitedPolicy getLimitedPolicy() { @@ -110,7 +110,7 @@ public WriteOptions copy() { opts.routerClient = this.routerClient; opts.asyncPool = this.asyncPool; opts.maxRetries = this.maxRetries; - opts.maxInFlightWriteRows = this.maxInFlightWriteRows; + opts.maxInFlightWritePoints = this.maxInFlightWritePoints; opts.limitedPolicy = this.limitedPolicy; opts.defaultStreamMaxWritePointsPerSecond = this.defaultStreamMaxWritePointsPerSecond; return opts; @@ -124,7 +124,7 @@ public String toString() { ", routerClient=" + routerClient + // ", asyncPool=" + asyncPool + // ", maxRetries=" + maxRetries + // - ", maxInFlightWriteRows=" + maxInFlightWriteRows + // + ", maxInFlightWritePoints=" + maxInFlightWritePoints + // ", limitedPolicy=" + limitedPolicy + // ", defaultStreamMaxWritePointsPerSecond=" + defaultStreamMaxWritePointsPerSecond + // '}'; diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index 1b182b8..6ecf8dc 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -38,7 +38,7 @@ public void testAllOptions() { Executor asyncPool = command -> System.out.println("asyncPool"); RpcOptions rpcOptions = RpcOptions.newDefault(); int writeMaxRetries = 2; - int maxInFlightWriteRows = 999; + int maxInFlightWritePoints = 9990; LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy(); int defaultStreamMaxWritePointsPerSecond = 100000; long routeTableRefreshPeriodSeconds = 99; @@ -49,7 +49,7 @@ public void testAllOptions() { .asyncPool(asyncPool) // .rpcOptions(rpcOptions) // .writeMaxRetries(writeMaxRetries) // - .maxInFlightWriteRows(maxInFlightWriteRows) // + .maxInFlightWritePoints(maxInFlightWritePoints) // .writeLimitedPolicy(limitedPolicy) // .defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) // .routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) // @@ -71,7 +71,7 @@ public void testAllOptions() { Assert.assertNotNull(writeOptions); Assert.assertEquals(asyncPool, writeOptions.getAsyncPool()); Assert.assertEquals(writeMaxRetries, writeOptions.getMaxRetries()); - Assert.assertEquals(maxInFlightWriteRows, writeOptions.getMaxInFlightWriteRows()); + Assert.assertEquals(maxInFlightWritePoints, writeOptions.getMaxInFlightWritePoints()); Assert.assertEquals(limitedPolicy, writeOptions.getLimitedPolicy()); Assert.assertEquals(defaultStreamMaxWritePointsPerSecond, writeOptions.getDefaultStreamMaxWritePointsPerSecond()); Assert.assertEquals(authInfo, writeOptions.getAuthInfo());