Skip to content

Commit

Permalink
[COMMON] upgrade all dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Mar 14, 2024
1 parent 5d9522a commit aa2d79e
Show file tree
Hide file tree
Showing 31 changed files with 157 additions and 126 deletions.
4 changes: 2 additions & 2 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ application {
}

java {
sourceCompatibility = 17
targetCompatibility = 17
sourceCompatibility = 21
targetCompatibility = 21
}

ext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ Producer<byte[], byte[]> createProducer() {
+ "partition level. This argument can't be use in conjunction with `specify.brokers`, `topics` or `partitioner`.",
converter = TopicPartitionField.class)
List<TopicPartition> specifyPartitions = List.of();

/**
* @return a supplier that randomly return a sending target
*/
Expand Down Expand Up @@ -449,6 +450,7 @@ else if (specifiedByBroker) {
return () -> selection.get(ThreadLocalRandom.current().nextInt(selection.size()));
}
}

// replace DataSize by DataRate (see https://github.com/skiptests/astraea/issues/488)
@Parameter(
names = {"--throughput"},
Expand Down
1 change: 1 addition & 0 deletions app/src/main/java/org/astraea/app/performance/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public String clientId() {
* @return Get the number of records.
*/
long records();

/**
* @return Get the maximum of latency put.
*/
Expand Down
26 changes: 14 additions & 12 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,20 @@ private PlanExecutionProgress progress(String taskId) {
(phase) ->
switch (phase) {
case Searching, Searched, Executing, Executed ->
// No error message during the search & execution
null;
case SearchFailed -> planGenerations
.get(taskId)
.handle((plan, err) -> err != null ? err.toString() : null)
.toCompletableFuture()
.getNow(null);
case ExecutionFailed -> planExecutions
.get(taskId)
.handle((ignore, err) -> err != null ? err.toString() : null)
.toCompletableFuture()
.getNow(null);
// No error message during the search & execution
null;
case SearchFailed ->
planGenerations
.get(taskId)
.handle((plan, err) -> err != null ? err.toString() : null)
.toCompletableFuture()
.getNow(null);
case ExecutionFailed ->
planExecutions
.get(taskId)
.handle((ignore, err) -> err != null ? err.toString() : null)
.toCompletableFuture()
.getNow(null);
default -> throw new IllegalStateException("Unknown state: " + phase);
};
var changes =
Expand Down
24 changes: 13 additions & 11 deletions app/src/main/java/org/astraea/app/web/WebService.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,19 @@ public WebService(
yield List.of(
MetricStore.Receiver.local(() -> admin.brokers().thenApply(asBeanClientMap)));
}
case METRIC_STORE_TOPIC -> List.of(
MetricStore.Receiver.topic(config.requireString(BOOTSTRAP_SERVERS_KEY)),
MetricStore.Receiver.local(
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
default -> throw new IllegalArgumentException(
"unknown metric store type: "
+ config.string(METRIC_STORE_KEY)
+ ". use "
+ METRIC_STORE_LOCAL
+ " or "
+ METRIC_STORE_TOPIC);
case METRIC_STORE_TOPIC ->
List.of(
MetricStore.Receiver.topic(config.requireString(BOOTSTRAP_SERVERS_KEY)),
MetricStore.Receiver.local(
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
default ->
throw new IllegalArgumentException(
"unknown metric store type: "
+ config.string(METRIC_STORE_KEY)
+ ". use "
+ METRIC_STORE_LOCAL
+ " or "
+ METRIC_STORE_TOPIC);
};
var metricStore =
MetricStore.builder()
Expand Down
14 changes: 8 additions & 6 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,14 @@ void testMoveCost(String leaderLimit, String sizeLimit) {
report.migrationCosts.forEach(
migrationCost -> {
switch (migrationCost.name) {
case TO_SYNC_BYTES, TO_FETCH_BYTES -> Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= DataSize.of(sizeLimit).bytes());
case REPLICA_LEADERS_TO_ADDED, REPLICA_LEADERS_TO_REMOVE -> Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= Long.parseLong(leaderLimit));
case TO_SYNC_BYTES, TO_FETCH_BYTES ->
Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= DataSize.of(sizeLimit).bytes());
case REPLICA_LEADERS_TO_ADDED, REPLICA_LEADERS_TO_REMOVE ->
Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= Long.parseLong(leaderLimit));
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
plugins {
id "com.diffplug.spotless" version "6.18.0"
id "com.diffplug.spotless" version "6.25.0"
}

repositories {
Expand Down
6 changes: 3 additions & 3 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
plugins {
id 'java'
id 'maven-publish'
id "com.google.protobuf" version "0.9.2"
id "com.google.protobuf" version "0.9.4"
}

apply from: "$rootDir/gradle/dependencies.gradle"
Expand Down Expand Up @@ -48,8 +48,8 @@ dependencies {
}

java {
sourceCompatibility = 17
targetCompatibility = 17
sourceCompatibility = 21
targetCompatibility = 21
}

ext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public final class BrokerConfigs {
public static final Set<String> DYNAMICAL_CONFIGS =
new TreeSet<>(
Utils.constants(BrokerConfigs.class, name -> name.endsWith("CONFIG"), String.class));

// ---------------------------------[values]---------------------------------//

private BrokerConfigs() {}
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/org/astraea/common/admin/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public record Config(Map<String, String> raw) {

public static final Config EMPTY = new Config(Map.of());

/**
* @param key config key
* @return the value associated to input key. otherwise, empty
Expand Down
27 changes: 15 additions & 12 deletions common/src/main/java/org/astraea/common/assignor/Assignor.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public abstract class Assignor implements ConsumerPartitionAssignor, Configurabl
*/
protected abstract Map<String, List<TopicPartition>> assign(
Map<String, SubscriptionInfo> subscriptions, ClusterInfo clusterInfo);

// TODO: replace the topicPartitions by ClusterInfo after Assignor is able to handle Admin
// https://github.com/skiptests/astraea/issues/1409

Expand Down Expand Up @@ -106,11 +107,12 @@ private void establishResource() {
switch (config
.string(ConsumerConfigs.METRIC_STORE_KEY)
.orElse(ConsumerConfigs.METRIC_STORE_LOCAL)) {
case ConsumerConfigs.METRIC_STORE_TOPIC -> List.of(
MetricStore.Receiver.topic(
config.requireString(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG)),
MetricStore.Receiver.local(
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
case ConsumerConfigs.METRIC_STORE_TOPIC ->
List.of(
MetricStore.Receiver.topic(
config.requireString(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG)),
MetricStore.Receiver.local(
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
case ConsumerConfigs.METRIC_STORE_LOCAL -> {
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
() ->
Expand All @@ -130,13 +132,14 @@ private void establishResource() {
});
yield List.of(MetricStore.Receiver.local(clientSupplier));
}
default -> throw new IllegalArgumentException(
"unknown metric store type: "
+ config.string(ConsumerConfigs.METRIC_STORE_KEY)
+ ". Use "
+ ConsumerConfigs.METRIC_STORE_TOPIC
+ " or "
+ ConsumerConfigs.METRIC_STORE_LOCAL);
default ->
throw new IllegalArgumentException(
"unknown metric store type: "
+ config.string(ConsumerConfigs.METRIC_STORE_KEY)
+ ". Use "
+ ConsumerConfigs.METRIC_STORE_TOPIC
+ " or "
+ ConsumerConfigs.METRIC_STORE_LOCAL);
};
metricStore =
MetricStore.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ public void close() {
this.compressionType = configuration.string("compression.type").orElse("none");

switch (this.compressionType) {
case "gzip" -> Utils.packException(
() -> this.targetOutputStream = new GZIPOutputStream(outputStream));
case "gzip" ->
Utils.packException(
() -> this.targetOutputStream = new GZIPOutputStream(outputStream));
case "none" -> this.targetOutputStream = outputStream;
default -> throw new IllegalArgumentException(
String.format("compression type '%s' is not supported", this.compressionType));
default ->
throw new IllegalArgumentException(
String.format(
"compression type '%s' is not supported", this.compressionType));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public static Map<Integer, BalancingModes> balancingMode(ClusterInfo cluster, St
case "balancing" -> BalancingModes.BALANCING;
case "clear" -> BalancingModes.CLEAR;
case "excluded" -> BalancingModes.EXCLUDED;
default -> throw new IllegalArgumentException(
"Unsupported balancing mode: " + s[1]);
default ->
throw new IllegalArgumentException(
"Unsupported balancing mode: " + s[1]);
}));

Function<Integer, BalancingModes> mode =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ static TopicsBuilder<byte[], byte[]> forTopics(Set<String> topics) {
static TopicsBuilder<byte[], byte[]> forTopics(Pattern patternTopics) {
return new TopicsBuilder<>(Objects.requireNonNull(patternTopics));
}

/**
* Create a consumer builder by setting specific topic partitions
*
Expand Down
9 changes: 5 additions & 4 deletions common/src/main/java/org/astraea/common/cost/NetworkCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,11 @@ Map<TopicPartition, Long> estimateRate(
switch (estimationMethod) {
case BROKER_TOPIC_ONE_MINUTE_RATE -> hasRate.oneMinuteRate();
case BROKER_TOPIC_FIVE_MINUTE_RATE -> hasRate.fiveMinuteRate();
case BROKER_TOPIC_FIFTEEN_MINUTE_RATE -> hasRate
.fifteenMinuteRate();
default -> throw new IllegalStateException(
"Unknown estimation method: " + estimationMethod);
case BROKER_TOPIC_FIFTEEN_MINUTE_RATE ->
hasRate.fifteenMinuteRate();
default ->
throw new IllegalStateException(
"Unknown estimation method: " + estimationMethod);
})
// no load metric for this partition, treat as zero load
.orElse(0.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,22 @@ static Map<String, Double> weight(
m -> m.metricName,
m ->
switch (m) {
case inputThroughput -> brokerMetrics.values().stream()
.map(metrics -> metrics.inputScore)
.toList();
case outputThroughput -> brokerMetrics.values().stream()
.map(metrics -> metrics.outputScore)
.toList();
case memory -> brokerMetrics.values().stream()
.map(metrics -> metrics.memoryScore)
.toList();
case cpu -> brokerMetrics.values().stream()
.map(metrics -> metrics.cpuScore)
.toList();
case inputThroughput ->
brokerMetrics.values().stream()
.map(metrics -> metrics.inputScore)
.toList();
case outputThroughput ->
brokerMetrics.values().stream()
.map(metrics -> metrics.outputScore)
.toList();
case memory ->
brokerMetrics.values().stream()
.map(metrics -> metrics.memoryScore)
.toList();
case cpu ->
brokerMetrics.values().stream()
.map(metrics -> metrics.cpuScore)
.toList();
}));
return weightProvider.weight(values);
}
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/org/astraea/common/csv/CsvReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static CsvReaderBuilder builder(Reader source) {
* @param num The number of lines to skip
*/
void skip(int num);

/**
* Get the next line without checking.
*
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/org/astraea/common/csv/CsvWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public interface CsvWriter extends AutoCloseable {
static CsvWriterBuilder builder(Writer sink) {
return new CsvWriterBuilder(sink);
}

/**
* Writes the next line to the file.Empty fields cannot be written and the lengths of the strings
* written should be equal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static List<AppInfo> appInfo(MBeanClient client) {
.map(b -> (AppInfo) () -> b)
.collect(Collectors.toList());
}

/**
* collect HasNodeMetrics from all consumers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static MetricSensor of(
})
.toList();
}

/**
* generate the metrics to stored by metrics collector. The implementation can use MBeanClient to
* fetch metrics from remote/local mbean server. Or the implementation can generate custom metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public static void beginInterdependent(org.astraea.common.producer.Producer<?, ?
public static void endInterdependent(org.apache.kafka.clients.producer.Producer<?, ?> producer) {
THREAD_LOCAL.remove();
}

/**
* Close interdependence function.Send data using the original partitioner logic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private int tryUpdateMinPartition(
.computeIfAbsent(brokerId, (id) -> partition.apply(topic, id));
}
}

/**
* The value of cost returned from cost function is conflict to score, since the higher cost
* represents lower score. This helper reverses the cost by subtracting the cost from "max cost".
Expand Down Expand Up @@ -232,11 +233,12 @@ public void configure(Configuration config) {

List<MetricStore.Receiver> receivers =
switch (config.string(METRIC_STORE_KEY).orElse(METRIC_STORE_LOCAL)) {
case METRIC_STORE_TOPIC -> List.of(
MetricStore.Receiver.topic(
config.requireString(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG)),
MetricStore.Receiver.local(
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
case METRIC_STORE_TOPIC ->
List.of(
MetricStore.Receiver.topic(
config.requireString(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG)),
MetricStore.Receiver.local(
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
case METRIC_STORE_LOCAL -> {
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
() ->
Expand All @@ -256,13 +258,14 @@ public void configure(Configuration config) {
});
yield List.of(MetricStore.Receiver.local(clientSupplier));
}
default -> throw new IllegalArgumentException(
"unknown metric store type: "
+ config.string(METRIC_STORE_KEY)
+ ". Use "
+ METRIC_STORE_TOPIC
+ " or "
+ METRIC_STORE_LOCAL);
default ->
throw new IllegalArgumentException(
"unknown metric store type: "
+ config.string(METRIC_STORE_KEY)
+ ". Use "
+ METRIC_STORE_TOPIC
+ " or "
+ METRIC_STORE_LOCAL);
};
metricStore =
MetricStore.builder()
Expand Down
Loading

0 comments on commit aa2d79e

Please sign in to comment.