Skip to content

Commit

Permalink
feat: implement leader election for EtcdMetadataStore
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui committed Sep 15, 2023
1 parent cefae0a commit c788505
Show file tree
Hide file tree
Showing 17 changed files with 548 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.common;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class PrefixThreadFactory implements ThreadFactory {
private final String prefix;

private final AtomicInteger index;

public PrefixThreadFactory(String prefix) {
this.prefix = prefix;
this.index = new AtomicInteger(0);
}

@Override
public Thread newThread(Runnable r) {
final String threadName = String.format("%s_%d", this.prefix, this.index.getAndIncrement());
Thread t = new Thread(r);
t.setName(threadName);
t.setDaemon(true);
return t;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.common.exception;

public class RocketMQException extends Exception {

private int errorCode;

public RocketMQException(int errorCode) {
this.errorCode = errorCode;
}

public RocketMQException(int errorCode, String message) {
super(message);
this.errorCode = errorCode;
}

public RocketMQException(int errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}

public RocketMQException(int errorCode, Throwable cause) {
super(cause);
this.errorCode = errorCode;
}

public RocketMQException(int errorCode, String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
this.errorCode = errorCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.common.exception;

public class RocketMQRuntimeException extends RuntimeException {
private int errorCode;

public RocketMQRuntimeException(int errorCode) {
this.errorCode = errorCode;
}

public RocketMQRuntimeException(int errorCode, String message) {
super(message);
this.errorCode = errorCode;
}

public RocketMQRuntimeException(int errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}

public RocketMQRuntimeException(int errorCode, Throwable cause) {
super(cause);
this.errorCode = errorCode;
}

public RocketMQRuntimeException(int errorCode, String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
this.errorCode = errorCode;
}
}
6 changes: 6 additions & 0 deletions controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package com.automq.rocketmq;
package com.automq.rocketmq.controller;

import apache.rocketmq.controller.v1.BrokerHeartbeatReply;
import apache.rocketmq.controller.v1.BrokerHeartbeatRequest;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.exception;

import com.automq.rocketmq.common.exception.RocketMQException;

public class ControllerException extends RocketMQException {
public ControllerException(int errorCode) {
super(errorCode);
}

public ControllerException(int errorCode, String message) {
super(errorCode, message);
}

public ControllerException(int errorCode, String message, Throwable cause) {
super(errorCode, message, cause);
}

public ControllerException(int errorCode, Throwable cause) {
super(errorCode, cause);
}

public ControllerException(int errorCode, String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(errorCode, message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.exception;

import com.automq.rocketmq.common.exception.RocketMQRuntimeException;

public class ControllerRuntimeException extends RocketMQRuntimeException {

public ControllerRuntimeException(int errorCode) {
super(errorCode);
}

public ControllerRuntimeException(int errorCode, String message) {
super(errorCode, message);
}

public ControllerRuntimeException(int errorCode, String message, Throwable cause) {
super(errorCode, message, cause);
}

public ControllerRuntimeException(int errorCode, Throwable cause) {
super(errorCode, cause);
}

public ControllerRuntimeException(int errorCode, String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(errorCode, message, cause, enableSuppression, writableStackTrace);
}
}
Loading

0 comments on commit c788505

Please sign in to comment.