diff --git a/app/src/main/java/org/astraea/app/performance/DataGenerator.java b/app/src/main/java/org/astraea/app/performance/DataGenerator.java index 1e28b650e2..e5afa45d22 100644 --- a/app/src/main/java/org/astraea/app/performance/DataGenerator.java +++ b/app/src/main/java/org/astraea/app/performance/DataGenerator.java @@ -41,9 +41,9 @@ static DataGenerator of( Performance.Argument argument) { if (queues.size() == 0) return terminatedGenerator(); - var keyDistConfig = Configuration.of(argument.keyDistributionConfig); - var keySizeDistConfig = Configuration.of(argument.keySizeDistributionConfig); - var valueDistConfig = Configuration.of(argument.valueDistributionConfig); + var keyDistConfig = new Configuration(argument.keyDistributionConfig); + var keySizeDistConfig = new Configuration(argument.keySizeDistributionConfig); + var valueDistConfig = new Configuration(argument.valueDistributionConfig); var dataSupplier = RecordGenerator.builder() .batchSize(argument.transactionSize) diff --git a/app/src/main/java/org/astraea/app/web/BalancerHandler.java b/app/src/main/java/org/astraea/app/web/BalancerHandler.java index ae12056b8e..5e0c6038da 100644 --- a/app/src/main/java/org/astraea/app/web/BalancerHandler.java +++ b/app/src/main/java/org/astraea/app/web/BalancerHandler.java @@ -113,7 +113,7 @@ public CompletionStage put(Channel channel) { final var request = channel.request(TypeRef.of(BalancerPutRequest.class)); final var taskId = request.id; final var taskPhase = balancerConsole.taskPhase(taskId); - final var executorConfig = Configuration.of(request.executorConfig); + final var executorConfig = new Configuration(request.executorConfig); final var executor = Utils.construct(request.executor, RebalancePlanExecutor.class, executorConfig); @@ -216,7 +216,7 @@ static PostRequestWrapper parsePostRequestWrapper( return new PostRequestWrapper( balancerPostRequest.balancer, - Configuration.of(balancerPostRequest.balancerConfig), + new Configuration(balancerPostRequest.balancerConfig), balancerPostRequest.parse(), currentClusterInfo); } diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index 1dc62648ab..d1e27ff877 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -18,50 +18,31 @@ import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.regex.Pattern; import java.util.stream.Collectors; -public interface Configuration { +public record Configuration(Map raw) { - Configuration EMPTY = Configuration.of(Map.of()); - - static Configuration of(Map configs) { - return new Configuration() { - @Override - public Map raw() { - return Collections.unmodifiableMap(configs); - } - - @Override - public Optional string(String key) { - return Optional.ofNullable(configs.get(key)).map(Object::toString); - } - - @Override - public List list(String key, String separator) { - return string(key).map(s -> Arrays.asList(s.split(separator))).orElseGet(List::of); - } - }; - } - - Map raw(); + public static final Configuration EMPTY = new Configuration(Map.of()); /** * @param key the key whose associated value is to be returned * @return string value. never null */ - Optional string(String key); + public Optional string(String key) { + return Optional.ofNullable(raw.get(Objects.requireNonNull(key))); + } /** * @param key the key whose associated value is to be returned * @return optional {@link Pattern} compiled from the string associated with the key. never null */ - default Optional regexString(String key) { + public Optional regexString(String key) { return string(key).map(Pattern::compile); } @@ -69,11 +50,11 @@ default Optional regexString(String key) { * @param key the key whose associated value is to be returned * @return integer value. never null */ - default Optional integer(String key) { + public Optional integer(String key) { return string(key).map(Integer::parseInt); } - default Optional longInteger(String key) { + public Optional longInteger(String key) { return string(key).map(Long::parseLong); } @@ -81,7 +62,7 @@ default Optional longInteger(String key) { * @param key the key whose associated value is to be returned * @return duration value. If there is no key, return Optional.Empty */ - default Optional duration(String key) { + public Optional duration(String key) { return string(key).map(Utils::toDuration); } @@ -89,11 +70,11 @@ default Optional duration(String key) { * @param key the key whose associated value is to be returned * @return DataSize value. If there is no key, return Optional.Empty */ - default Optional dataSize(String key) { + public Optional dataSize(String key) { return string(key).map(DataSize::of); } - default int requireInteger(String key) { + public int requireInteger(String key) { return integer(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent")); } @@ -101,7 +82,7 @@ default int requireInteger(String key) { * @param key the key whose associated value is to be returned * @return string value. never null */ - default String requireString(String key) { + public String requireString(String key) { return string(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent")); } @@ -110,8 +91,8 @@ default String requireString(String key) { * @return new Configuration only contains which the key value starts with the prefix, and the * prefix string and the following dot will be removed from the key */ - default Configuration filteredPrefixConfigs(String prefix) { - return of( + public Configuration filteredPrefixConfigs(String prefix) { + return new Configuration( raw().entrySet().stream() .filter(k -> k.getKey().startsWith(prefix)) .collect( @@ -124,5 +105,7 @@ default Configuration filteredPrefixConfigs(String prefix) { * @param separator to split string to multiple strings * @return string list. never null */ - List list(String key, String separator); + public List list(String key, String separator) { + return string(key).map(s -> Arrays.asList(s.split(separator))).orElseGet(List::of); + } } diff --git a/common/src/main/java/org/astraea/common/assignor/Assignor.java b/common/src/main/java/org/astraea/common/assignor/Assignor.java index 3cfe207c46..4d75189b22 100644 --- a/common/src/main/java/org/astraea/common/assignor/Assignor.java +++ b/common/src/main/java/org/astraea/common/assignor/Assignor.java @@ -161,7 +161,7 @@ public final GroupAssignment assign(Cluster metadata, GroupSubscription groupSub @Override public final void configure(Map configs) { this.config = - Configuration.of( + new Configuration( configs.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))); var costFunctions = diff --git a/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java b/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java index c1ca72f121..00f73572ef 100644 --- a/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java +++ b/common/src/main/java/org/astraea/common/balancer/AlgorithmConfig.java @@ -184,7 +184,7 @@ public Builder timeout(Duration timeout) { } public AlgorithmConfig build() { - var config = Configuration.of(balancerConfig); + var config = new Configuration(balancerConfig); return new AlgorithmConfig() { @Override diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java b/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java index 7cb4771841..27abca9c9f 100644 --- a/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java +++ b/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java @@ -55,7 +55,7 @@ public AlgorithmConfig parse() { private HasClusterCost clusterCost() { if (clusterCosts.isEmpty()) throw new IllegalArgumentException("clusterCosts is not specified"); - var config = Configuration.of(costConfig); + var config = new Configuration(costConfig); return HasClusterCost.of( Utils.costFunctions( clusterCosts.stream() @@ -65,7 +65,7 @@ private HasClusterCost clusterCost() { } private HasMoveCost moveCost() { - var config = Configuration.of(costConfig); + var config = new Configuration(costConfig); var cf = Utils.costFunctions(moveCosts, HasMoveCost.class, config); return HasMoveCost.of(cf); } diff --git a/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java b/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java index 9cbf965952..6b1b4c6cab 100644 --- a/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java +++ b/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java @@ -33,7 +33,7 @@ public class RecordSizeCost public static final String MAX_MIGRATE_SIZE_KEY = "max.migrated.size"; public RecordSizeCost() { - this.config = Configuration.of(Map.of()); + this.config = new Configuration(Map.of()); } public RecordSizeCost(Configuration config) { diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index fee24f5c6e..7d4f4346e6 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -34,7 +34,7 @@ public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost, HasMove public static final String MAX_MIGRATE_LEADER_KEY = "max.migrated.leader.number"; public ReplicaLeaderCost() { - this.config = Configuration.of(Map.of()); + this.config = new Configuration(Map.of()); } public ReplicaLeaderCost(Configuration config) { diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java index 00acb46537..6d9c72451f 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java @@ -39,7 +39,7 @@ public class ReplicaLeaderSizeCost public static final String MOVED_LEADER_SIZE = "moved leader size (bytes)"; public ReplicaLeaderSizeCost() { - this.config = Configuration.of(Map.of()); + this.config = new Configuration(Map.of()); } public ReplicaLeaderSizeCost(Configuration config) { diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java index 2460f17766..7307c7e998 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaNumberCost.java @@ -32,7 +32,7 @@ public class ReplicaNumberCost implements HasClusterCost, HasMoveCost { private final Configuration config; public ReplicaNumberCost() { - this.config = Configuration.of(Map.of()); + this.config = new Configuration(Map.of()); } public ReplicaNumberCost(Configuration config) { diff --git a/common/src/main/java/org/astraea/common/partitioner/Partitioner.java b/common/src/main/java/org/astraea/common/partitioner/Partitioner.java index c43d6b5a0d..ab5be8282a 100644 --- a/common/src/main/java/org/astraea/common/partitioner/Partitioner.java +++ b/common/src/main/java/org/astraea/common/partitioner/Partitioner.java @@ -141,7 +141,7 @@ private static class Interdependent { @Override public final void configure(Map configs) { var config = - Configuration.of( + new Configuration( configs.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))); config.string(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG).ifPresent(s -> admin = Admin.of(s)); diff --git a/common/src/test/java/org/astraea/common/ConfigurationTest.java b/common/src/test/java/org/astraea/common/ConfigurationTest.java index f4bc920bbe..668668a225 100644 --- a/common/src/test/java/org/astraea/common/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/ConfigurationTest.java @@ -26,27 +26,27 @@ public class ConfigurationTest { @Test void testString() { - var config = Configuration.of(Map.of("key", "value")); + var config = new Configuration(Map.of("key", "value")); Assertions.assertEquals(Optional.of("value"), config.string("key")); Assertions.assertEquals("value", config.requireString("key")); } @Test void testList() { - var config = Configuration.of(Map.of("key", "v0,v1")); + var config = new Configuration(Map.of("key", "v0,v1")); Assertions.assertEquals(List.of("v0", "v1"), config.list("key", ",")); Assertions.assertEquals(List.of(), config.list("nonExistKey", ",")); } @Test void testFilteredConfigs() { - var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2", "key.filtered", "v3")); + var config = new Configuration(Map.of("key", "v1", "filtered.key", "v2", "key.filtered", "v3")); Assertions.assertEquals(Map.of("key", "v2"), config.filteredPrefixConfigs("filtered").raw()); } @Test void testDuration() { - var config = Configuration.of(Map.of("wait.time", "15ms", "response", "3s")); + var config = new Configuration(Map.of("wait.time", "15ms", "response", "3s")); var waitTime = config.duration("wait.time"); var response = config.duration("response"); var empty = config.duration("walala"); @@ -57,13 +57,13 @@ void testDuration() { @Test void testLong() { - var config = Configuration.of(Map.of("long.value", "2147483648")); + var config = new Configuration(Map.of("long.value", "2147483648")); Assertions.assertEquals(2147483648L, config.longInteger("long.value").orElse(0L)); } @Test void testDataSize() { - var config = Configuration.of(Map.of("upper.bound", "30MiB", "traffic.interval", "5MB")); + var config = new Configuration(Map.of("upper.bound", "30MiB", "traffic.interval", "5MB")); var upper = config.dataSize("upper.bound"); var interval = config.dataSize("traffic.interval"); var empty = config.dataSize("kekw"); diff --git a/common/src/test/java/org/astraea/common/DistributionTypeTest.java b/common/src/test/java/org/astraea/common/DistributionTypeTest.java index fd55fda25c..a7c18b5472 100644 --- a/common/src/test/java/org/astraea/common/DistributionTypeTest.java +++ b/common/src/test/java/org/astraea/common/DistributionTypeTest.java @@ -88,7 +88,7 @@ void testZipfianConfig() { // fixed seed var zip100 = DistributionType.ZIPFIAN.create( - 10000, Configuration.of(Map.of(DistributionType.ZIPFIAN_SEED, "100"))); + 10000, new Configuration(Map.of(DistributionType.ZIPFIAN_SEED, "100"))); Assertions.assertEquals( List.of(11, 18, 0, 1126, 12), IntStream.range(0, 5) @@ -101,10 +101,12 @@ void testZipfianConfig() { var seed = ThreadLocalRandom.current().nextInt(); var zipA = DistributionType.ZIPFIAN.create( - 10000, Configuration.of(Map.of(DistributionType.ZIPFIAN_SEED, Integer.toString(seed)))); + 10000, + new Configuration(Map.of(DistributionType.ZIPFIAN_SEED, Integer.toString(seed)))); var zipB = DistributionType.ZIPFIAN.create( - 10000, Configuration.of(Map.of(DistributionType.ZIPFIAN_SEED, Integer.toString(seed)))); + 10000, + new Configuration(Map.of(DistributionType.ZIPFIAN_SEED, Integer.toString(seed)))); var sequenceA = IntStream.range(0, 1000).map(i -> zipA.get().intValue()).toArray(); var sequenceB = IntStream.range(0, 1000).map(i -> zipB.get().intValue()).toArray(); Assertions.assertArrayEquals(sequenceA, sequenceB); @@ -112,10 +114,12 @@ void testZipfianConfig() { // high exponent come with high skewness var zip1 = DistributionType.ZIPFIAN.create( - 100, Configuration.of(Map.of(DistributionType.ZIPFIAN_EXPONENT, Double.toString(1.0)))); + 100, + new Configuration(Map.of(DistributionType.ZIPFIAN_EXPONENT, Double.toString(1.0)))); var zip2 = DistributionType.ZIPFIAN.create( - 100, Configuration.of(Map.of(DistributionType.ZIPFIAN_EXPONENT, Double.toString(2.0)))); + 100, + new Configuration(Map.of(DistributionType.ZIPFIAN_EXPONENT, Double.toString(2.0)))); var counting1 = IntStream.range(0, 10000) .map(x -> zip1.get().intValue()) diff --git a/common/src/test/java/org/astraea/common/UtilsTest.java b/common/src/test/java/org/astraea/common/UtilsTest.java index 437856ece3..dc3ba07d45 100644 --- a/common/src/test/java/org/astraea/common/UtilsTest.java +++ b/common/src/test/java/org/astraea/common/UtilsTest.java @@ -206,7 +206,7 @@ public int value() { @ParameterizedTest @ValueSource(classes = {TestCostFunction.class, TestConfigCostFunction.class}) void testConstruct(Class aClass) { - var config = Configuration.of(Map.of()); + var config = new Configuration(Map.of()); var costFunction = Utils.construct(aClass, config); Assertions.assertInstanceOf(CostFunction.class, costFunction); @@ -226,7 +226,7 @@ void testConstruct(Class aClass) { void testConstructException() { // arrange var aClass = TestBadCostFunction.class; - var config = Configuration.of(Map.of()); + var config = new Configuration(Map.of()); // act, assert Assertions.assertThrows(RuntimeException.class, () -> Utils.construct(aClass, config)); @@ -283,7 +283,7 @@ public TestBadCostFunction(int value) {} @Test void testCostFunctions() { var config = - Configuration.of( + new Configuration( Map.of( "org.astraea.common.cost.BrokerInputCost", "20", @@ -307,7 +307,7 @@ void testCostFunctions() { // test negative weight var config2 = - Configuration.of( + new Configuration( Map.of( "org.astraea.common.cost.BrokerInputCost", "-20", @@ -321,7 +321,7 @@ void testCostFunctions() { var cf = Set.of( "org.astraea.common.cost.RecordSizeCost", "org.astraea.common.cost.ReplicaLeaderCost"); - var mConfig = Configuration.of(Map.of("maxMigratedSize", "50MB", "maxMigratedLeader", "5")); + var mConfig = new Configuration(Map.of("maxMigratedSize", "50MB", "maxMigratedLeader", "5")); var mAns = Utils.costFunctions(cf, HasMoveCost.class, mConfig); Assertions.assertEquals(2, mAns.size()); diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java index 2956b0f111..5fcb74dc10 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java @@ -168,7 +168,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) .toCompletableFuture() .join(); var newAllocation = - Utils.construct(theClass, Configuration.of(Map.of("iteration", "500"))) + Utils.construct(theClass, new Configuration(Map.of("iteration", "500"))) .offer( AlgorithmConfig.builder() .clusterInfo(clusterInfo) @@ -285,7 +285,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) return () -> 0; } }; - Utils.construct(theClass, Configuration.of(Map.of("iteration", "500"))) + Utils.construct(theClass, new Configuration(Map.of("iteration", "500"))) .offer( AlgorithmConfig.builder() .clusterInfo(ClusterInfo.empty()) diff --git a/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java b/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java index 3f9b617391..3cc236f67c 100644 --- a/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java @@ -58,12 +58,13 @@ void testConfig() { @Test void testJmx() { - var cost = new DecreasingCost(Configuration.of(Map.of())); + var cost = new DecreasingCost(new Configuration(Map.of())); var id = "TestJmx-" + UUID.randomUUID(); var clusterInfo = FakeClusterInfo.of(5, 5, 5, 2); var balancer = Utils.construct( - GreedyBalancer.class, Configuration.of(Map.of(GreedyBalancer.ITERATION_CONFIG, "100"))); + GreedyBalancer.class, + new Configuration(Map.of(GreedyBalancer.ITERATION_CONFIG, "100"))); try (JndiClient client = JndiClient.local()) { IntStream.range(0, 10) diff --git a/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java b/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java index 74e70acd87..29dd3dd1c5 100644 --- a/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/algorithms/SingleStepBalancerTest.java @@ -28,7 +28,7 @@ class SingleStepBalancerTest extends BalancerConfigTestSuite { public SingleStepBalancerTest() { super( SingleStepBalancer.class, - Configuration.of( + new Configuration( Map.of( "shuffle.tweaker.min.step", "1000", "shuffle.tweaker.max.step", "2000"))); diff --git a/common/src/test/java/org/astraea/common/balancer/executor/StraightPlanExecutorTest.java b/common/src/test/java/org/astraea/common/balancer/executor/StraightPlanExecutorTest.java index 2db29c6073..511eac0e22 100644 --- a/common/src/test/java/org/astraea/common/balancer/executor/StraightPlanExecutorTest.java +++ b/common/src/test/java/org/astraea/common/balancer/executor/StraightPlanExecutorTest.java @@ -114,7 +114,7 @@ void testAsyncRun() { var execute = new StraightPlanExecutor( - Configuration.of( + new Configuration( Map.of(StraightPlanExecutor.CONFIG_ENABLE_DATA_DIRECTORY_MIGRATION, "true"))) .run(admin, expectedAllocation, Duration.ofSeconds(10)); @@ -173,7 +173,7 @@ void testDisableDataDirMigration() { var spiedAdmin = Mockito.spy(admin); var executor = new StraightPlanExecutor( - Configuration.of( + new Configuration( Map.of(StraightPlanExecutor.CONFIG_ENABLE_DATA_DIRECTORY_MIGRATION, "false"))); executor.run(spiedAdmin, target, Duration.ofSeconds(30)).toCompletableFuture().join(); diff --git a/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java b/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java index 0e4c998737..f7f7dc9b82 100644 --- a/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java @@ -140,18 +140,18 @@ void testMoveCosts() { var beforeClusterInfo = of(before); var afterClusterInfo = of(after); var brokerConfig = - Configuration.of( + new Configuration( Map.of(BrokerDiskSpaceCost.BROKER_COST_LIMIT_KEY, "0:1500MB,1:1000MB,2:1500MB")); var brokerOverflowConfig = - Configuration.of( + new Configuration( Map.of(BrokerDiskSpaceCost.BROKER_COST_LIMIT_KEY, "0:1300MB,1:1000MB,2:1500MB")); var pathConfig = - Configuration.of( + new Configuration( Map.of( BrokerDiskSpaceCost.BROKER_PATH_COST_LIMIT_KEY, "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:1000MB")); var pathOverflowConfig = - Configuration.of( + new Configuration( Map.of( BrokerDiskSpaceCost.BROKER_PATH_COST_LIMIT_KEY, "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB")); diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 7e7daaab04..e85a4b0d76 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -536,19 +536,19 @@ void testEstimationMethod(ServerMetrics.Topic metric, Class PartitionerUtils.parseIdJMXPort(config3)); } diff --git a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerPerfTest.java b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerPerfTest.java index b8c10b8d71..1fbaad6424 100644 --- a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerPerfTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerPerfTest.java @@ -94,7 +94,7 @@ void test() { var value = "value".getBytes(StandardCharsets.UTF_8); try (var partitioner = new StrictCostPartitioner()) { partitioner.admin = admin; - partitioner.configure(Configuration.of(Map.of("round.robin.lease", "2s"))); + partitioner.configure(new Configuration(Map.of("round.robin.lease", "2s"))); Supplier>> resultSupplier = () -> { diff --git a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java index 24a7718be2..4912bb8517 100644 --- a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java @@ -45,10 +45,10 @@ public class StrictCostPartitionerTest { @Test void testJmxPort() { try (var partitioner = new StrictCostPartitioner()) { - partitioner.configure(Configuration.of(Map.of())); + partitioner.configure(new Configuration(Map.of())); Assertions.assertThrows( NoSuchElementException.class, () -> partitioner.jmxPortGetter.apply(0)); - partitioner.configure(Configuration.of(Map.of(StrictCostPartitioner.JMX_PORT, "12345"))); + partitioner.configure(new Configuration(Map.of(StrictCostPartitioner.JMX_PORT, "12345"))); Assertions.assertEquals(12345, partitioner.jmxPortGetter.apply(0)); } } @@ -60,14 +60,14 @@ void testNegativeWeight() { IllegalArgumentException.class, () -> partitioner.configure( - Configuration.of( + new Configuration( Map.of( Partitioner.COST_PREFIX + "." + ReplicaLeaderCost.class.getName(), "-1")))); // Test for cost functions configuring partitioner.configure( - Configuration.of( + new Configuration( Map.of( Partitioner.COST_PREFIX + "." + ReplicaLeaderCost.class.getName(), "0.1", @@ -83,7 +83,7 @@ void testNegativeWeight() { void testConfigureCostFunctions() { try (var partitioner = new StrictCostPartitioner()) { partitioner.configure( - Configuration.of( + new Configuration( Map.of( Partitioner.COST_PREFIX + "." + ReplicaLeaderCost.class.getName(), "0.1", @@ -149,7 +149,7 @@ void testCostFunctionWithoutSensor() { .buildLeader(); try (var partitioner = new StrictCostPartitioner()) { partitioner.configure( - Configuration.of( + new Configuration( (Map.of(Partitioner.COST_PREFIX + "." + DumbHasBrokerCost.class.getName(), "1")))); partitioner.partition( "topic", @@ -166,7 +166,7 @@ void testEmptyJmxPort() { // pass due to local mbean partitioner.configure( - Configuration.of( + new Configuration( Map.of(Partitioner.COST_PREFIX + "." + NodeThroughputCost.class.getName(), "1"))); } } @@ -184,7 +184,7 @@ void testReturnedPartition() { var partitionId = 123; try (var partitioner = new StrictCostPartitioner()) { partitioner.configure( - Configuration.of( + new Configuration( Map.of(Partitioner.COST_PREFIX + "." + MyFunction.class.getName(), "1"))); var replicaInfo0 = @@ -214,7 +214,7 @@ void testReturnedPartition() { @Test void testDefaultFunction() { try (var partitioner = new StrictCostPartitioner()) { - partitioner.configure(Configuration.of(Map.of())); + partitioner.configure(new Configuration(Map.of())); Assertions.assertNotEquals(HasBrokerCost.EMPTY, partitioner.costFunction); Utils.waitFor(() -> partitioner.metricStore.sensors().size() == 1); } @@ -244,7 +244,7 @@ void testInvalidCostToScore() { void testRoundRobinLease() { try (var partitioner = new StrictCostPartitioner()) { partitioner.configure( - Configuration.of(Map.of(StrictCostPartitioner.ROUND_ROBIN_LEASE_KEY, "2s"))); + new Configuration(Map.of(StrictCostPartitioner.ROUND_ROBIN_LEASE_KEY, "2s"))); Assertions.assertEquals(Duration.ofSeconds(2), partitioner.roundRobinKeeper.roundRobinLease); partitioner.roundRobinKeeper.tryToUpdate(ClusterInfo.empty(), Map::of); diff --git a/connector/src/main/java/org/astraea/connector/SinkConnector.java b/connector/src/main/java/org/astraea/connector/SinkConnector.java index 15b76dbe9d..662902de45 100644 --- a/connector/src/main/java/org/astraea/connector/SinkConnector.java +++ b/connector/src/main/java/org/astraea/connector/SinkConnector.java @@ -42,7 +42,7 @@ protected void close() { // -------------------------[final]-------------------------// @Override public final void start(Map props) { - init(Configuration.of(props)); + init(new Configuration(props)); } @Override diff --git a/connector/src/main/java/org/astraea/connector/SinkTask.java b/connector/src/main/java/org/astraea/connector/SinkTask.java index 711d7d9a1c..6f9533aed0 100644 --- a/connector/src/main/java/org/astraea/connector/SinkTask.java +++ b/connector/src/main/java/org/astraea/connector/SinkTask.java @@ -50,7 +50,7 @@ public final String version() { @Override public final void start(Map props) { - init(Configuration.of(props)); + init(new Configuration(props)); } @Override diff --git a/connector/src/main/java/org/astraea/connector/SourceConnector.java b/connector/src/main/java/org/astraea/connector/SourceConnector.java index e2cd3cc9df..89ac37531d 100644 --- a/connector/src/main/java/org/astraea/connector/SourceConnector.java +++ b/connector/src/main/java/org/astraea/connector/SourceConnector.java @@ -40,7 +40,7 @@ protected void close() { // -------------------------[final]-------------------------// @Override public final void start(Map props) { - init(Configuration.of(props), MetadataStorage.of(context().offsetStorageReader())); + init(new Configuration(props), MetadataStorage.of(context().offsetStorageReader())); } @Override diff --git a/connector/src/main/java/org/astraea/connector/SourceTask.java b/connector/src/main/java/org/astraea/connector/SourceTask.java index f82316cb68..99ef9a8cb5 100644 --- a/connector/src/main/java/org/astraea/connector/SourceTask.java +++ b/connector/src/main/java/org/astraea/connector/SourceTask.java @@ -52,7 +52,7 @@ public final String version() { @Override public final void start(Map props) { - init(Configuration.of(props), MetadataStorage.of(context.offsetStorageReader())); + init(new Configuration(props), MetadataStorage.of(context.offsetStorageReader())); } @Override diff --git a/connector/src/main/java/org/astraea/connector/backup/Importer.java b/connector/src/main/java/org/astraea/connector/backup/Importer.java index 7f3741655a..880f50e71f 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Importer.java +++ b/connector/src/main/java/org/astraea/connector/backup/Importer.java @@ -115,7 +115,7 @@ protected List takeConfiguration(int maxTasks) { var taskMap = new HashMap<>(config.raw()); taskMap.put(FILE_SET_KEY, String.valueOf(i)); taskMap.put(TASKS_COUNT_KEY, String.valueOf(maxTasks)); - return Configuration.of(taskMap); + return new Configuration(taskMap); }) .toList(); } diff --git a/connector/src/main/java/org/astraea/connector/perf/PerfSource.java b/connector/src/main/java/org/astraea/connector/perf/PerfSource.java index 0a5ff86f00..c3063b239f 100644 --- a/connector/src/main/java/org/astraea/connector/perf/PerfSource.java +++ b/connector/src/main/java/org/astraea/connector/perf/PerfSource.java @@ -164,7 +164,7 @@ protected List takeConfiguration(int maxTasks) { t -> { var copy = new HashMap<>(config.raw()); copy.put(SourceConnector.TOPICS_KEY, t); - return Configuration.of(copy); + return new Configuration(copy); }) .collect(Collectors.toUnmodifiableList()); return Utils.chunk(topics, maxTasks).stream() @@ -172,7 +172,7 @@ protected List takeConfiguration(int maxTasks) { tps -> { var copy = new HashMap<>(config.raw()); copy.put(SourceConnector.TOPICS_KEY, String.join(",", tps)); - return Configuration.of(copy); + return new Configuration(copy); }) .collect(Collectors.toUnmodifiableList()); } diff --git a/connector/src/test/java/org/astraea/connector/ConnectorTest.java b/connector/src/test/java/org/astraea/connector/ConnectorTest.java index 9845c58088..7e134bd8fb 100644 --- a/connector/src/test/java/org/astraea/connector/ConnectorTest.java +++ b/connector/src/test/java/org/astraea/connector/ConnectorTest.java @@ -126,7 +126,7 @@ protected Class task() { @Override protected List takeConfiguration(int maxTasks) { return IntStream.range(0, maxTasks) - .mapToObj(i -> Configuration.of(Map.of())) + .mapToObj(i -> new Configuration(Map.of())) .collect(Collectors.toList()); } @@ -166,7 +166,7 @@ protected Class task() { @Override protected List takeConfiguration(int maxTasks) { return IntStream.range(0, maxTasks) - .mapToObj(i -> Configuration.of(Map.of())) + .mapToObj(i -> new Configuration(Map.of())) .collect(Collectors.toList()); } diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 261a39ed66..dcfe2bb61b 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -189,7 +189,7 @@ void testFtpSinkTask() { "roll.duration", "100m"); - var fs = FileSystem.of("ftp", Configuration.of(configs)); + var fs = FileSystem.of("ftp", new Configuration(configs)); task.start(configs); @@ -276,7 +276,7 @@ void testFtpSinkTaskIntervalWith1File() { "roll.duration", "300ms"); - var fs = FileSystem.of("ftp", Configuration.of(configs)); + var fs = FileSystem.of("ftp", new Configuration(configs)); task.start(configs); @@ -351,7 +351,7 @@ void testFtpSinkTaskIntervalWith2Writers() { "roll.duration", "100ms"); - var fs = FileSystem.of("ftp", Configuration.of(configs)); + var fs = FileSystem.of("ftp", new Configuration(configs)); task.start(configs); @@ -485,7 +485,7 @@ void testHdfsSinkTask() { Assertions.assertTrue(task.isWriterDone()); - var fs = FileSystem.of("hdfs", Configuration.of(configs)); + var fs = FileSystem.of("hdfs", new Configuration(configs)); Assertions.assertEquals( 2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size()); @@ -559,7 +559,7 @@ void testHdfsSinkTaskIntervalWith1File() { Utils.sleep(Duration.ofMillis(1000)); - var fs = FileSystem.of("hdfs", Configuration.of(configs)); + var fs = FileSystem.of("hdfs", new Configuration(configs)); Assertions.assertEquals( 1, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size()); @@ -657,7 +657,7 @@ void testHdfsSinkTaskIntervalWith2Writers() { task.put(List.of(record3)); Utils.sleep(Duration.ofMillis(1000)); - var fs = FileSystem.of("hdfs", Configuration.of(configs)); + var fs = FileSystem.of("hdfs", new Configuration(configs)); Assertions.assertEquals( 2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size()); @@ -761,7 +761,7 @@ void testCreateRecordWriter() { var writers = new HashMap(); var task = new Exporter.Task(); - task.fs = FileSystem.of("hdfs", Configuration.of(configs)); + task.fs = FileSystem.of("hdfs", new Configuration(configs)); task.interval = 1000; RecordWriter recordWriter = task.createRecordWriter(tp, offset); @@ -822,7 +822,7 @@ void testWriteRecords() { var writers = new HashMap(); var task = new Exporter.Task(); - task.fs = FileSystem.of("hdfs", Configuration.of(configs)); + task.fs = FileSystem.of("hdfs", new Configuration(configs)); task.size = DataSize.of("100MB"); task.bufferSize.reset(); task.recordsQueue.add( diff --git a/connector/src/test/java/org/astraea/connector/backup/ImporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ImporterTest.java index 985f0cd618..fbea1aed67 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ImporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ImporterTest.java @@ -126,7 +126,7 @@ void testFtpSourceTask() { "file.set", "0"); - var fs = FileSystem.of("ftp", Configuration.of(configs)); + var fs = FileSystem.of("ftp", new Configuration(configs)); var records = List.of( @@ -150,7 +150,7 @@ void testFtpSourceTask() { records.forEach(writer::append); writer.close(); - task.init(Configuration.of(configs), MetadataStorage.EMPTY); + task.init(new Configuration(configs), MetadataStorage.EMPTY); var returnRecords = new ArrayList<>(task.take()); for (int i = 0; i < records.size(); i++) { diff --git a/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java b/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java index 9cca7d93ec..9dad45cf99 100644 --- a/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java +++ b/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java @@ -103,7 +103,7 @@ void testFrequency() { @Test void testTask() { var task = new PerfSink.Task(); - task.init(Configuration.of(Map.of(PerfSink.FREQUENCY_DEF.name(), "1s"))); + task.init(new Configuration(Map.of(PerfSink.FREQUENCY_DEF.name(), "1s"))); var now = System.currentTimeMillis(); task.put(List.>of()); diff --git a/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java b/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java index 27c0f57b67..ef4f6dcfcb 100644 --- a/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java +++ b/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java @@ -50,7 +50,7 @@ static void closeService() { @Test void testDistributeConfigs() { var s = new PerfSource(); - var config = Configuration.of(Map.of(SourceConnector.TOPICS_KEY, "a,b,c,d")); + var config = new Configuration(Map.of(SourceConnector.TOPICS_KEY, "a,b,c,d")); s.init(config, MetadataStorage.EMPTY); var configs = s.takeConfiguration(10); Assertions.assertEquals(4, configs.size()); @@ -331,7 +331,7 @@ void testMetrics() { @Test void testInit() { var task = new PerfSource.Task(); - task.init(Configuration.of(Map.of(ConnectorConfigs.TOPICS_KEY, "a")), MetadataStorage.EMPTY); + task.init(new Configuration(Map.of(ConnectorConfigs.TOPICS_KEY, "a")), MetadataStorage.EMPTY); Assertions.assertNotNull(task.recordGenerator); Assertions.assertEquals(1, task.specifyPartitions.size()); } @@ -340,7 +340,7 @@ void testInit() { void testKeyAndValue() { var task = new PerfSource.Task(); task.init( - Configuration.of( + new Configuration( Map.of( ConnectorConfigs.TOPICS_KEY, "a", @@ -362,7 +362,7 @@ void testKeyAndValue() { void testZeroKeySize() { var task = new PerfSource.Task(); task.init( - Configuration.of( + new Configuration( Map.of(ConnectorConfigs.TOPICS_KEY, "a", PerfSource.KEY_SIZE_DEF.name(), "0Byte")), MetadataStorage.EMPTY); var records = task.take(); @@ -374,7 +374,7 @@ void testZeroKeySize() { void testZeroValueSize() { var task = new PerfSource.Task(); task.init( - Configuration.of( + new Configuration( Map.of(ConnectorConfigs.TOPICS_KEY, "a", PerfSource.VALUE_SIZE_DEF.name(), "0Byte")), MetadataStorage.EMPTY); var records = task.take(); diff --git a/fs/src/test/java/org/astraea/fs/FileSystemTest.java b/fs/src/test/java/org/astraea/fs/FileSystemTest.java index c9c1bce833..6ab079abfd 100644 --- a/fs/src/test/java/org/astraea/fs/FileSystemTest.java +++ b/fs/src/test/java/org/astraea/fs/FileSystemTest.java @@ -40,7 +40,7 @@ void testOf() { Assertions.assertThrows( IllegalArgumentException.class, () -> FileSystem.of("unknown", Configuration.EMPTY)); - var fs = FileSystem.of("local", Configuration.of(Map.of("local.impl", Tmp.class.getName()))); + var fs = FileSystem.of("local", new Configuration(Map.of("local.impl", Tmp.class.getName()))); Assertions.assertInstanceOf(Tmp.class, fs); } diff --git a/fs/src/test/java/org/astraea/fs/ftp/FtpFileSystemTest.java b/fs/src/test/java/org/astraea/fs/ftp/FtpFileSystemTest.java index 9a8034c8cf..1d4468f4dd 100644 --- a/fs/src/test/java/org/astraea/fs/ftp/FtpFileSystemTest.java +++ b/fs/src/test/java/org/astraea/fs/ftp/FtpFileSystemTest.java @@ -31,7 +31,7 @@ public class FtpFileSystemTest extends AbstractFileSystemTest { protected FileSystem fileSystem() { return FileSystem.of( "ftp", - Configuration.of( + new Configuration( Map.of( FtpFileSystem.HOSTNAME_KEY, server.hostname(), diff --git a/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java b/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java index 27f43b221e..1258817314 100644 --- a/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java +++ b/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java @@ -33,7 +33,7 @@ public class HdfsFileSystemTest extends AbstractFileSystemTest { void testCreate() { var fs = HdfsFileSystem.create( - Configuration.of( + new Configuration( Map.of( HdfsFileSystem.HOSTNAME_KEY, server.hostname(), @@ -50,7 +50,7 @@ void testCreate() { protected FileSystem fileSystem() { return FileSystem.of( "hdfs", - Configuration.of( + new Configuration( Map.of( HdfsFileSystem.HOSTNAME_KEY, server.hostname(), diff --git a/fs/src/test/java/org/astraea/fs/local/LocalFileSystemTest.java b/fs/src/test/java/org/astraea/fs/local/LocalFileSystemTest.java index 6ddd60f3ac..2d65b85b28 100644 --- a/fs/src/test/java/org/astraea/fs/local/LocalFileSystemTest.java +++ b/fs/src/test/java/org/astraea/fs/local/LocalFileSystemTest.java @@ -31,7 +31,7 @@ protected FileSystem fileSystem() { () -> { var tmp = Files.createTempDirectory("test_local_fs"); return FileSystem.of( - "local", Configuration.of(Map.of(LocalFileSystem.ROOT_KEY, tmp.toString()))); + "local", new Configuration(Map.of(LocalFileSystem.ROOT_KEY, tmp.toString()))); }); } } diff --git a/gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java b/gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java index 0b02005579..2561971e49 100644 --- a/gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java @@ -219,7 +219,7 @@ static Map clusterCosts(List keys) { logger.log("searching better assignments ... "); return Utils.construct( GreedyBalancer.class, - Configuration.of(Map.of(GreedyBalancer.ITERATION_CONFIG, "10000"))) + new Configuration(Map.of(GreedyBalancer.ITERATION_CONFIG, "10000"))) .offer( AlgorithmConfig.builder() .clusterInfo(clusterInfo)