Skip to content

Commit

Permalink
Add per partition rate limit table options
Browse files Browse the repository at this point in the history
Added `maxReadsPerSecond` and `maxWritesPerSecond` options to `TableOptions`
which will allow convenient creation of CREATE and ALTER statements with
rate limit options for read and write operations.
Both maxReadsPerSecond and maxWritesPerSecond are optional - omitting one of
them means "no limit" for that type of operation.

Fixes #166
  • Loading branch information
Gor027 authored and avelanarius committed Jan 5, 2024
1 parent 143ae5f commit 4cabc5f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public abstract class TableOptions<T extends TableOptions> extends SchemaStateme

private Optional<Boolean> replicateOnWrite = Optional.absent();

private Optional<Integer> maxReadsPerSecond = Optional.absent();

private Optional<Integer> maxWritesPerSecond = Optional.absent();

private Optional<SpeculativeRetryValue> speculativeRetry = Optional.absent();

private Optional<Boolean> cdc = Optional.absent();
Expand Down Expand Up @@ -348,6 +352,36 @@ public T replicateOnWrite(Boolean replicateOnWrite) {
return self;
}

/**
* Sets rate limit for read operations in table option "per_partition_rate_limit". NOTE: Due to
* ScyllaDB’s distributed nature, tracking per-partition request rates is not perfect and the
* actual rate of accepted requests may be higher up to a factor of keyspace’s RF. This feature
* should not be used to enforce precise limits but rather serve as an overload protection
* feature.
*
* @param maxReadsPerSecond rate limit for read operations
* @return this {@code TableOptions} object.
*/
public T maxReadsPerSecond(int maxReadsPerSecond) {
this.maxReadsPerSecond = Optional.of(maxReadsPerSecond);
return self;
}

/**
* Sets rate limit for write operations in table option "per_partition_rate_limit". NOTE: Due to
* ScyllaDB’s distributed nature, tracking per-partition request rates is not perfect and the
* actual rate of accepted requests may be higher up to a factor of keyspace’s RF. This feature
* should not be used to enforce precise limits but rather serve as an overload protection
* feature.
*
* @param maxWritesPerSecond rate limit for write operations
* @return this {@code TableOptions} object.
*/
public T maxWritesPerSecond(int maxWritesPerSecond) {
this.maxWritesPerSecond = Optional.of(maxWritesPerSecond);
return self;
}

/**
* To override normal read timeout when read_repair_chance is not 1.0, sending another request to
* read, choose one of these values and use the property to create or alter the table:
Expand Down Expand Up @@ -518,6 +552,25 @@ private List<String> buildCommonOptions() {
options.add("replicate_on_write = " + replicateOnWrite.get());
}

if (maxReadsPerSecond.isPresent() || maxWritesPerSecond.isPresent()) {
StringBuilder sBuilder = new StringBuilder("per_partition_rate_limit = {");

if (maxReadsPerSecond.isPresent()) {
sBuilder.append("'max_reads_per_second': ").append(maxReadsPerSecond.get());

if (maxWritesPerSecond.isPresent()) {
sBuilder.append(", ");
}
}

if (maxWritesPerSecond.isPresent()) {
sBuilder.append("'max_writes_per_second': ").append(maxWritesPerSecond.get());
}

sBuilder.append("}");
options.add(sBuilder.toString());
}

if (speculativeRetry.isPresent()) {
options.add("speculative_retry = " + speculativeRetry.get().value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public void should_alter_table_options() throws Exception {
.memtableFlushPeriodInMillis(12)
.populateIOCacheOnFlush(true)
.replicateOnWrite(true)
.maxReadsPerSecond(123)
.maxWritesPerSecond(456)
.readRepairChance(0.42)
.speculativeRetry(always())
.cdc(true);
Expand All @@ -147,6 +149,7 @@ public void should_alter_table_options() throws Exception {
+ "AND populate_io_cache_on_flush = true "
+ "AND read_repair_chance = 0.42 "
+ "AND replicate_on_write = true "
+ "AND per_partition_rate_limit = {'max_reads_per_second': 123, 'max_writes_per_second': 456} "
+ "AND speculative_retry = 'ALWAYS' "
+ "AND cdc = true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,30 @@ public void should_create_table_with_compact_storage() throws Exception {
+ "WITH COMPACT STORAGE");
}

@Test(groups = "unit")
public void should_omit_write_rate_limit_option() throws Exception {
// When
SchemaStatement statement =
createTable("test")
.addPartitionKey("id", DataType.bigint())
.addClusteringColumn("col1", DataType.uuid())
.addClusteringColumn("col2", DataType.uuid())
.addColumn("name", DataType.text())
.withOptions()
.maxReadsPerSecond(123);

// Then
assertThat(statement.getQueryString())
.isEqualTo(
"\n\tCREATE TABLE test(\n\t\t"
+ "id bigint,\n\t\t"
+ "col1 uuid,\n\t\t"
+ "col2 uuid,\n\t\t"
+ "name text,\n\t\t"
+ "PRIMARY KEY(id, col1, col2))\n\t"
+ "WITH per_partition_rate_limit = {'max_reads_per_second': 123}");
}

@Test(groups = "unit")
public void should_create_table_with_all_options() throws Exception {
// When
Expand Down Expand Up @@ -378,6 +402,8 @@ public void should_create_table_with_all_options() throws Exception {
.populateIOCacheOnFlush(true)
.readRepairChance(0.05)
.replicateOnWrite(true)
.maxReadsPerSecond(123)
.maxWritesPerSecond(456)
.speculativeRetry(always())
.cdc(true);

Expand All @@ -403,6 +429,7 @@ public void should_create_table_with_all_options() throws Exception {
+ "AND populate_io_cache_on_flush = true "
+ "AND read_repair_chance = 0.05 "
+ "AND replicate_on_write = true "
+ "AND per_partition_rate_limit = {'max_reads_per_second': 123, 'max_writes_per_second': 456} "
+ "AND speculative_retry = 'ALWAYS' "
+ "AND cdc = true AND CLUSTERING ORDER BY(col1 ASC, col2 DESC) AND COMPACT STORAGE");
}
Expand Down

0 comments on commit 4cabc5f

Please sign in to comment.