diff --git a/common/src/main/java/com/automq/rocketmq/common/PrefixThreadFactory.java b/common/src/main/java/com/automq/rocketmq/common/PrefixThreadFactory.java new file mode 100644 index 000000000..76c7d37f5 --- /dev/null +++ b/common/src/main/java/com/automq/rocketmq/common/PrefixThreadFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class PrefixThreadFactory implements ThreadFactory { + private final String prefix; + + private final AtomicInteger index; + + public PrefixThreadFactory(String prefix) { + this.prefix = prefix; + this.index = new AtomicInteger(0); + } + + @Override + public Thread newThread(Runnable r) { + final String threadName = String.format("%s_%d", this.prefix, this.index.getAndIncrement()); + Thread t = new Thread(r); + t.setName(threadName); + t.setDaemon(true); + return t; + } +} diff --git a/common/src/main/java/com/automq/rocketmq/common/exception/RocketMQException.java b/common/src/main/java/com/automq/rocketmq/common/exception/RocketMQException.java new file mode 100644 index 000000000..f62618c0d --- /dev/null +++ b/common/src/main/java/com/automq/rocketmq/common/exception/RocketMQException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.common.exception; + +public class RocketMQException extends Exception { + + private int errorCode; + + public RocketMQException(int errorCode) { + this.errorCode = errorCode; + } + + public RocketMQException(int errorCode, String message) { + super(message); + this.errorCode = errorCode; + } + + public RocketMQException(int errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public RocketMQException(int errorCode, Throwable cause) { + super(cause); + this.errorCode = errorCode; + } + + public RocketMQException(int errorCode, String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + this.errorCode = errorCode; + } +} diff --git a/common/src/main/java/com/automq/rocketmq/common/exception/RocketMQRuntimeException.java b/common/src/main/java/com/automq/rocketmq/common/exception/RocketMQRuntimeException.java new file mode 100644 index 000000000..4d6c0ce76 --- /dev/null +++ b/common/src/main/java/com/automq/rocketmq/common/exception/RocketMQRuntimeException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.common.exception; + +public class RocketMQRuntimeException extends RuntimeException { + private int errorCode; + + public RocketMQRuntimeException(int errorCode) { + this.errorCode = errorCode; + } + + public RocketMQRuntimeException(int errorCode, String message) { + super(message); + this.errorCode = errorCode; + } + + public RocketMQRuntimeException(int errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public RocketMQRuntimeException(int errorCode, Throwable cause) { + super(cause); + this.errorCode = errorCode; + } + + public RocketMQRuntimeException(int errorCode, String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + this.errorCode = errorCode; + } +} diff --git a/controller/pom.xml b/controller/pom.xml index 715dc350f..91a17bfa8 100644 --- a/controller/pom.xml +++ b/controller/pom.xml @@ -102,6 +102,12 @@ + + + ${project.groupId} + rocketmq-common + ${project.version} + diff --git a/controller/src/main/java/com/automq/rocketmq/ControllerServiceImpl.java b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java similarity index 99% rename from controller/src/main/java/com/automq/rocketmq/ControllerServiceImpl.java rename to controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java index aac213795..027e3654f 100644 --- a/controller/src/main/java/com/automq/rocketmq/ControllerServiceImpl.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq; +package com.automq.rocketmq.controller; import apache.rocketmq.controller.v1.BrokerHeartbeatReply; import apache.rocketmq.controller.v1.BrokerHeartbeatRequest; diff --git a/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java b/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java new file mode 100644 index 000000000..967aa6d08 --- /dev/null +++ b/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.controller.exception; + +import com.automq.rocketmq.common.exception.RocketMQException; + +public class ControllerException extends RocketMQException { + public ControllerException(int errorCode) { + super(errorCode); + } + + public ControllerException(int errorCode, String message) { + super(errorCode, message); + } + + public ControllerException(int errorCode, String message, Throwable cause) { + super(errorCode, message, cause); + } + + public ControllerException(int errorCode, Throwable cause) { + super(errorCode, cause); + } + + public ControllerException(int errorCode, String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(errorCode, message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerRuntimeException.java b/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerRuntimeException.java new file mode 100644 index 000000000..27cd69da5 --- /dev/null +++ b/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerRuntimeException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.controller.exception; + +import com.automq.rocketmq.common.exception.RocketMQRuntimeException; + +public class ControllerRuntimeException extends RocketMQRuntimeException { + + public ControllerRuntimeException(int errorCode) { + super(errorCode); + } + + public ControllerRuntimeException(int errorCode, String message) { + super(errorCode, message); + } + + public ControllerRuntimeException(int errorCode, String message, Throwable cause) { + super(errorCode, message, cause); + } + + public ControllerRuntimeException(int errorCode, Throwable cause) { + super(errorCode, cause); + } + + public ControllerRuntimeException(int errorCode, String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(errorCode, message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/EtcdMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/EtcdMetadataStore.java new file mode 100644 index 000000000..337f232fd --- /dev/null +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/EtcdMetadataStore.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.controller.metadata; + +import apache.rocketmq.controller.v1.Code; +import com.automq.rocketmq.common.PrefixThreadFactory; +import com.automq.rocketmq.controller.exception.ControllerException; +import com.google.common.base.Preconditions; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Cluster; +import io.etcd.jetcd.Election; +import io.etcd.jetcd.KV; +import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Lease; +import io.etcd.jetcd.Txn; +import io.etcd.jetcd.Watch; +import io.etcd.jetcd.election.CampaignResponse; +import io.etcd.jetcd.election.LeaderKey; +import io.etcd.jetcd.lease.LeaseGrantResponse; +import io.etcd.jetcd.op.Cmp; +import io.etcd.jetcd.op.CmpTarget; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.options.WatchOption; +import io.etcd.jetcd.watch.WatchEvent; +import io.etcd.jetcd.watch.WatchResponse; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EtcdMetadataStore implements MetadataStore, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdMetadataStore.class); + + private static final int LEASE_TTL_IN_SECS = 3; + private static final int KEEP_ALIVE_LEASE_INTERVAL_IN_SECS = 2; + + private final String clusterName; + + /** + * Nested etcd client. + */ + private final Client etcdClient; + + private final Lease lease; + + private final KV kv; + + private final Watch watch; + + private final Election election; + + private final Cluster cluster; + + private final String leaderElectionName; + + private ScheduledExecutorService executorService; + + private long leaseId; + + private Role role; + + private LeaderKey leaderKey; + + private UUID term; + + public EtcdMetadataStore(Iterable endpoints, String clusterName) throws ControllerException { + this.clusterName = clusterName; + this.leaderElectionName = String.format("/%s/leader", clusterName); + this.etcdClient = Client.builder().endpoints(endpoints).build(); + this.lease = this.etcdClient.getLeaseClient(); + this.kv = this.etcdClient.getKVClient(); + this.watch = this.etcdClient.getWatchClient(); + this.election = this.etcdClient.getElectionClient(); + this.cluster = this.etcdClient.getClusterClient(); + this.role = Role.Follower; + this.executorService = new ScheduledThreadPoolExecutor(2, new PrefixThreadFactory("EtcdMetadataStore"), + new ThreadPoolExecutor.AbortPolicy()); + this.runLease(); + this.runWatch(); + this.runElection(); + } + + private void runLease() throws ControllerException { + try { + LeaseGrantResponse response = this.lease.grant(LEASE_TTL_IN_SECS).get(); + this.leaseId = response.getID(); + // Renew lease periodically + executorService.scheduleAtFixedRate(() -> this.lease.keepAliveOnce(this.leaseId), + KEEP_ALIVE_LEASE_INTERVAL_IN_SECS, + KEEP_ALIVE_LEASE_INTERVAL_IN_SECS, + TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException e) { + throw new ControllerException(Code.INTERRUPTED_VALUE, e); + } + } + + private void runElection() { + // Ensure runLease has run. + Preconditions.checkState(this.leaseId > 0, "Should have gotten an lease"); + ByteSequence electionName = ByteSequence.from(this.leaderElectionName, StandardCharsets.UTF_8); + ByteSequence proposal = ByteSequence.from("dummy", StandardCharsets.UTF_8); + this.election.campaign(electionName, this.leaseId, proposal) + .thenAccept(this::onLeader); + } + + private void onLeader(CampaignResponse campaignResponse) { + this.leaderKey = campaignResponse.getLeader(); + LOGGER.info("Campaign completes and gets elected as leader of {}", this.leaderKey.getName()); + this.role = Role.Elected; + + // Now we update controller term + this.term = UUID.randomUUID(); + ByteSequence termKey = ByteSequence.from(String.format("/%s/term", this.clusterName), StandardCharsets.UTF_8); + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(this.term.getMostSignificantBits()); + buffer.putLong(this.term.getLeastSignificantBits()); + buffer.flip(); + ByteSequence termValue = ByteSequence.from(buffer.array()); + Txn tx = this.kv.txn(); + + tx.If(new Cmp(this.leaderKey.getKey(), + Cmp.Op.EQUAL, + CmpTarget.modRevision(this.leaderKey.getRevision()))) + .Then(Op.put(termKey, termValue, PutOption.builder().withLeaseId(this.leaseId).build())) + .commit().thenAccept(response -> { + if (response.isSucceeded()) { + LOGGER.info("Term of {} updated", this.clusterName); + } + }); + } + + private void runWatch() { + ByteSequence watchPrefix = ByteSequence.from(String.format("/%s", this.clusterName), StandardCharsets.UTF_8); + this.watch.watch(watchPrefix, WatchOption.builder().isPrefix(true).build(), new WatchListener(this)); + } + + void onWatch(WatchResponse response) { + LOGGER.debug("Got a watch response"); + for (WatchEvent event : response.getEvents()) { + processWatchEvent(event); + } + } + + private void processWatchEvent(WatchEvent event) { + ByteSequence termKey = ByteSequence.from(String.format("/%s/term", this.clusterName), StandardCharsets.UTF_8); + KeyValue kv = event.getKeyValue(); + if (kv.getKey().equals(termKey)) { + ByteBuffer buf = ByteBuffer.wrap(kv.getValue().getBytes()); + if (buf.limit() >= 16) { + UUID term = new UUID(buf.getLong(), buf.getLong()); + if (this.term.equals(term) && this.role == Role.Elected) { + this.role = Role.Leader; + LOGGER.info("Now acts as Controller leader"); + } + } + + } + + } + + void onWatchError(Throwable cause) { + LOGGER.error("Watch failed", cause); + } + + void onWatchComplete() { + LOGGER.info("Watch completed"); + } + + @Override + public long registerBroker(int brokerId) throws IOException { + + return 0; + } + + @Override + public void close() throws ControllerException { + this.executorService.shutdown(); + } + + public Role getRole() { + return role; + } +} diff --git a/controller/src/main/java/com/automq/rocketmq/metadata/MetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java similarity index 95% rename from controller/src/main/java/com/automq/rocketmq/metadata/MetadataStore.java rename to controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java index d32244e2b..dd465bf56 100644 --- a/controller/src/main/java/com/automq/rocketmq/metadata/MetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq.metadata; +package com.automq.rocketmq.controller.metadata; import java.io.IOException; diff --git a/controller/src/main/java/com/automq/rocketmq/Main.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/Role.java similarity index 84% rename from controller/src/main/java/com/automq/rocketmq/Main.java rename to controller/src/main/java/com/automq/rocketmq/controller/metadata/Role.java index 39cb0f62a..ed7fd9bd4 100644 --- a/controller/src/main/java/com/automq/rocketmq/Main.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/Role.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package com.automq.rocketmq; +package com.automq.rocketmq.controller.metadata; -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file +public enum Role { + Elected, + Leader, + Follower, + Learner, +} diff --git a/controller/src/main/java/com/automq/rocketmq/metadata/EtcdMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/WatchListener.java similarity index 57% rename from controller/src/main/java/com/automq/rocketmq/metadata/EtcdMetadataStore.java rename to controller/src/main/java/com/automq/rocketmq/controller/metadata/WatchListener.java index af2ec705a..2cc18c00c 100644 --- a/controller/src/main/java/com/automq/rocketmq/metadata/EtcdMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/WatchListener.java @@ -15,34 +15,30 @@ * limitations under the License. */ -package com.automq.rocketmq.metadata; +package com.automq.rocketmq.controller.metadata; -import io.etcd.jetcd.Client; -import java.io.IOException; +import io.etcd.jetcd.Watch; +import io.etcd.jetcd.watch.WatchResponse; -public class EtcdMetadataStore implements MetadataStore { +public class WatchListener implements Watch.Listener { + private final EtcdMetadataStore store; - /** - * Etcd target. - * - * dns:///foo.bar.com:2379 - * ip:///etcd0:2379,etcd1:2379,etcd2:2379 - */ - private final String target; - - /** - * Nested etcd client. - */ - private final Client etcdClient; + public WatchListener(EtcdMetadataStore store) { + this.store = store; + } - public EtcdMetadataStore(String target) { - this.target = target; - this.etcdClient = Client.builder().target(target).build(); + @Override + public void onNext(WatchResponse response) { + this.store.onWatch(response); } @Override - public long registerBroker(int brokerId) throws IOException { + public void onError(Throwable throwable) { + this.store.onWatchError(throwable); + } - return 0; + @Override + public void onCompleted() { + this.store.onWatchComplete(); } } diff --git a/controller/src/main/proto/controller.proto b/controller/src/main/proto/controller.proto index f6aa438f0..ec2647ef4 100644 --- a/controller/src/main/proto/controller.proto +++ b/controller/src/main/proto/controller.proto @@ -18,6 +18,7 @@ message Uuid { // Define all error codes here enum Code { OK = 0; + INTERRUPTED = 1; NOT_FOUND = 404; } diff --git a/controller/src/test/java/com/automq/rocketmq/ControllerServiceImplTest.java b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java similarity index 98% rename from controller/src/test/java/com/automq/rocketmq/ControllerServiceImplTest.java rename to controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java index d25fb816a..f8c05750f 100644 --- a/controller/src/test/java/com/automq/rocketmq/ControllerServiceImplTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq; +package com.automq.rocketmq.controller; import apache.rocketmq.controller.v1.BrokerHeartbeatReply; import apache.rocketmq.controller.v1.BrokerHeartbeatRequest; diff --git a/controller/src/test/java/com/automq/rocketmq/ControllerTestServer.java b/controller/src/test/java/com/automq/rocketmq/controller/ControllerTestServer.java similarity index 97% rename from controller/src/test/java/com/automq/rocketmq/ControllerTestServer.java rename to controller/src/test/java/com/automq/rocketmq/controller/ControllerTestServer.java index a9e2342fe..0f81bad22 100644 --- a/controller/src/test/java/com/automq/rocketmq/ControllerTestServer.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/ControllerTestServer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq; +package com.automq.rocketmq.controller; import io.grpc.BindableService; import io.grpc.Grpc; diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdMetadataStoreTest.java new file mode 100644 index 000000000..7f16e3c7c --- /dev/null +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdMetadataStoreTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.controller.metadata; + +import com.automq.rocketmq.controller.exception.ControllerException; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +public class EtcdMetadataStoreTest extends EtcdTestBase { + + @Test + public void testCtor() throws ControllerException { + EtcdMetadataStore metadataStore = new EtcdMetadataStore(cluster.clientEndpoints(), "testCtor"); + Awaitility.await().atMost(3, TimeUnit.SECONDS).with().pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> metadataStore.getRole() == Role.Leader); + metadataStore.close(); + } + +} \ No newline at end of file diff --git a/controller/src/test/java/com/automq/rocketmq/metadata/EtcdTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdTest.java similarity index 92% rename from controller/src/test/java/com/automq/rocketmq/metadata/EtcdTest.java rename to controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdTest.java index 965881035..264a37b56 100644 --- a/controller/src/test/java/com/automq/rocketmq/metadata/EtcdTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq.metadata; +package com.automq.rocketmq.controller.metadata; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; @@ -29,14 +29,12 @@ import io.etcd.jetcd.election.LeaderKey; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.kv.TxnResponse; -import io.etcd.jetcd.launcher.EtcdCluster; import io.etcd.jetcd.lease.LeaseGrantResponse; import io.etcd.jetcd.op.Cmp; import io.etcd.jetcd.op.CmpTarget; import io.etcd.jetcd.op.Op; import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.options.WatchOption; -import io.etcd.jetcd.test.EtcdClusterExtension; import io.etcd.jetcd.watch.WatchEvent; import io.etcd.jetcd.watch.WatchResponse; import java.nio.charset.Charset; @@ -45,36 +43,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.shaded.org.awaitility.Awaitility; /** * Test basic usage of Etcd Java binding. */ -public class EtcdTest { - - public static final Logger LOGGER = LoggerFactory.getLogger(EtcdTest.class); - - @RegisterExtension - public static final EtcdClusterExtension CLUSTER_EXTENSION = EtcdClusterExtension.builder().withNodes(1).build(); - - public static EtcdCluster cluster = CLUSTER_EXTENSION.cluster(); - - @BeforeClass - public static void setUp() { - cluster.start(); - } - - @AfterClass - public static void tearDown() { - cluster.close(); - } +public class EtcdTest extends EtcdTestBase { @Test public void testKV() throws Exception { diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdTestBase.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdTestBase.java new file mode 100644 index 000000000..83982f05e --- /dev/null +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/EtcdTestBase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.controller.metadata; + +import io.etcd.jetcd.launcher.EtcdCluster; +import io.etcd.jetcd.test.EtcdClusterExtension; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EtcdTestBase { + public static final Logger LOGGER = LoggerFactory.getLogger(EtcdTest.class); + + @RegisterExtension + public static final EtcdClusterExtension CLUSTER_EXTENSION = EtcdClusterExtension.builder().withNodes(1).build(); + + public static EtcdCluster cluster = CLUSTER_EXTENSION.cluster(); + + @BeforeClass + public static void setUp() { + cluster.start(); + } + + @AfterClass + public static void tearDown() { + cluster.close(); + } +}