Skip to content

Commit

Permalink
feature: support raft mode registry to namingserver (#7114)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Feb 24, 2025
1 parent 679a2e0 commit 3c050a2
Show file tree
Hide file tree
Showing 37 changed files with 875 additions and 116 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation
- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer
- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] support raft mode registry to namingserver
- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement scheduled handling for end status transaction
- [[#7171](https://github.com/apache/incubator-seata/pull/7171)] support EpollEventLoopGroup in client

Expand Down
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器
- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] 支持raft集群注册至namingserver
- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] 实现对残留的end状态事务定时处理
- [[#7171](https://github.com/apache/incubator-seata/pull/7171)] 客户端支持 EpollEventLoopGroup


### bugfix:

- [[#7104](https://github.com/apache/incubator-seata/pull/7104)] 修复SeataApplicationListener在低版本springboot未实现supportsSourceType方法的问题
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -44,6 +46,10 @@ public static Instance getInstance() {
return SingletonHolder.SERVER_INSTANCE;
}

public static List<Instance> getInstances() {
return SingletonHolder.SERVER_INSTANCES;
}


public String getNamespace() {
return namespace;
Expand Down Expand Up @@ -164,8 +170,24 @@ public String toJsonString(ObjectMapper objectMapper) {
}
}

public Instance clone() {
Instance instance = new Instance();
instance.setNamespace(namespace);
instance.setClusterName(clusterName);
instance.setUnit(unit);
instance.setControl(control);
instance.setTransaction(transaction);
instance.setWeight(weight);
instance.setHealthy(healthy);
instance.setTerm(term);
instance.setTimestamp(timestamp);
instance.setMetadata(metadata);
return instance;
}

private static class SingletonHolder {
private static final Instance SERVER_INSTANCE = new Instance();
private static final List<Instance> SERVER_INSTANCES = new ArrayList<>();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class Node {

private Endpoint internal;

private double weight = 1.0;
private boolean healthy = true;
private long timeStamp;
protected double weight = 1.0;
protected boolean healthy = true;
protected long timeStamp;

private String group;
private ClusterRole role = ClusterRole.MEMBER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@


public class NamingServerNode extends Node {
private double weight = 1.0;
private boolean healthy = true;
private long term;
private String unit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ public void register(Instance instance) throws Exception {
doRegister(instance, getNamingAddrs());
}

public void doRegister(List<Instance> instance, List<String> urlList) {

}
public void doRegister(Instance instance, List<String> urlList) {
for (String urlSuffix : urlList) {
// continue if name server node is unhealthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ public Result<String> registerInstance(@RequestParam String namespace,
return result;
}

@PostMapping("/batchRegister")
public Result<String> batchRegisterInstance(@RequestParam String namespace,
@RequestParam String clusterName,
@RequestBody List<NamingServerNode> nodes) {
Result<String> result = new Result<>();
boolean isSuccess = namingManager.registerInstances(nodes, namespace, clusterName);
if (isSuccess) {
result.setMessage("node has registered successfully!");
} else {
result.setCode("500");
result.setMessage("node registered unsuccessfully!");
}
return result;
}

@PostMapping("/unregister")
public Result<String> unregisterInstance(@RequestParam String namespace, @RequestParam String clusterName,
@RequestParam String unit, @RequestBody NamingServerNode registerBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.seata.common.metadata.Cluster;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
Expand All @@ -37,7 +36,7 @@ public Map<String, ClusterBO> getClusterMap() {
return clusterMap;
}

public List<Cluster> getCluster(ConcurrentMap<String/* clusterName */, ClusterData> clusterDataMap) {
public List<Cluster> getCluster(Map<String/* clusterName */, ClusterData> clusterDataMap) {
List<Cluster> list = new ArrayList<>();
clusterMap.forEach((clusterName, unitNameSet) -> {
ClusterData clusterData = clusterDataMap.get(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;

import com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -42,6 +43,7 @@
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
import org.apache.seata.common.metadata.Cluster;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.NamingServerNode;
import org.apache.seata.common.metadata.namingserver.Unit;
Expand Down Expand Up @@ -152,7 +154,9 @@ public Result<String> createGroup(String namespace, String vGroup, String cluste
LOGGER.error("no instance in cluster {}", clusterName);
return new Result<>("301", "no instance in cluster" + clusterName);
} else {
Node node = nodeList.get(0);
Node node =
nodeList.stream().filter(n -> n.getRole() == ClusterRole.LEADER || n.getRole() == ClusterRole.MEMBER)
.collect(Collectors.toList()).get(0);
String controlHost = node.getControl().getHost();
int controlPort = node.getControl().getPort();
String httpUrl = NamingServerConstants.HTTP_PREFIX + controlHost + NamingServerConstants.IP_PORT_SPLIT_CHAR
Expand Down Expand Up @@ -231,6 +235,14 @@ public void notifyClusterChange(String vGroup, String namespace, String clusterN
});
}

public boolean registerInstances(List<NamingServerNode> node, String namespace, String clusterName) {
boolean result = true;
for (NamingServerNode namingServerNode : node) {
result = registerInstance(namingServerNode, namespace, clusterName, namingServerNode.getUnit());
}
return result;
}

public boolean registerInstance(NamingServerNode node, String namespace, String clusterName, String unitName) {
try {
Map<String, ClusterData> clusterDataHashMap =
Expand All @@ -245,16 +257,14 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
// if extended metadata includes vgroup mapping relationship, add it in clusterData
if (mappingObj instanceof Map) {
Map<String, String> vGroups = (Map<String, String>)mappingObj;
if (!CollectionUtils.isEmpty(vGroups)) {
vGroups.forEach((k, v) -> {
// In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node.
// In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used.
boolean changed = addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k);
if (hasChanged || changed) {
notifyClusterChange(k, namespace, clusterName, unitName, node.getTerm());
}
});
}
vGroups.forEach((k, v) -> {
// In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node.
// In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used.
boolean changed = addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k);
if (hasChanged || changed) {
notifyClusterChange(k, namespace, clusterName, unitName, node.getTerm());
}
});
}
instanceLiveTable.put(
new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()),
Expand Down Expand Up @@ -295,8 +305,8 @@ public boolean unregisterInstance(String namespace, String clusterName, String u

public List<Cluster> getClusterListByVgroup(String vGroup, String namespace) {
// find the cluster where the transaction group is located
HashMap<String/* VGroup */, ConcurrentMap<String/* namespace */,NamespaceBO>> concurrentVgroupMap = new HashMap<>(vGroupMap.asMap());
ConcurrentMap<String/* namespace */, NamespaceBO> vgroupNamespaceMap = concurrentVgroupMap.get(vGroup);
Map<String/* VGroup */, ConcurrentMap<String/* namespace */,NamespaceBO>> concurrentVgroupMap = new HashMap<>(vGroupMap.asMap());
Map<String/* namespace */, NamespaceBO> vgroupNamespaceMap = concurrentVgroupMap.get(vGroup);
List<Cluster> clusterList = new ArrayList<>();
if (!CollectionUtils.isEmpty(vgroupNamespaceMap)) {
NamespaceBO namespaceBO = vgroupNamespaceMap.get(namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
*/
package org.apache.seata.spring.boot.autoconfigure.loader;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
Expand All @@ -29,14 +37,6 @@
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertiesPropertySource;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_PREFIX_CONFIG;
import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_PREFIX_REGISTRY;
import static org.apache.seata.common.ConfigurationKeys.METRICS_PREFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ServerRaftProperties {

private String serverAddr;

private String group;
private String group = "default";

private Boolean autoJoin = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package io.seata.server.cluster.raft.snapshot;

import static org.apache.seata.common.ConfigurationKeys.SERVER_RAFT_COMPRESSOR;
import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_COMPRESSOR;
import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_SERIALIZATION;


import java.io.Serializable;

import org.apache.seata.common.util.StringUtils;
Expand All @@ -29,6 +24,10 @@
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.serializer.SerializerType;

import static org.apache.seata.common.ConfigurationKeys.SERVER_RAFT_COMPRESSOR;
import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_COMPRESSOR;
import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_SERIALIZATION;

@Deprecated
public class RaftSnapshot implements Serializable {

Expand Down
9 changes: 4 additions & 5 deletions server/src/main/java/org/apache/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.server;

import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,11 +32,10 @@
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
import org.apache.seata.core.rpc.netty.NettyServerConfig;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.instance.ServerInstanceFactory;
import org.apache.seata.server.instance.SeataInstanceStrategy;
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.metrics.MetricsManager;
import org.apache.seata.server.session.SessionHolder;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
Expand All @@ -53,7 +53,7 @@
public class Server {

@Resource
ServerInstanceFactory serverInstanceFactory;
SeataInstanceStrategy seataInstanceStrategy;

/**
* The entry point of application.
Expand Down Expand Up @@ -103,8 +103,7 @@ public void start(String[] args) {
LockerManagerFactory.init();
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

serverInstanceFactory.serverInstanceInit();
Optional.ofNullable(seataInstanceStrategy).ifPresent(SeataInstanceStrategy::init);
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
nettyRemotingServer.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.seata.discovery.registry.FileRegistryServiceImpl;
import org.apache.seata.discovery.registry.MultiRegistryFactory;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.seata.discovery.registry.namingserver.NamingserverRegistryServiceImpl;
import org.apache.seata.server.cluster.raft.processor.PutNodeInfoRequestProcessor;
import org.apache.seata.server.cluster.raft.serializer.JacksonBoltSerializer;
import org.apache.seata.server.store.StoreConfig;
Expand Down Expand Up @@ -98,7 +99,8 @@ public static void init() {
} else {
if (RAFT_MODE) {
for (RegistryService<?> instance : MultiRegistryFactory.getInstances()) {
if (!(instance instanceof FileRegistryServiceImpl)) {
if (!(instance instanceof FileRegistryServiceImpl)
&& !(instance instanceof NamingserverRegistryServiceImpl)) {
throw new IllegalArgumentException("Raft store mode not support other Registration Center");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.serializer.SerializerType;
import org.apache.seata.server.cluster.raft.context.SeataClusterContext;
import org.apache.seata.server.cluster.raft.execute.vgroup.VGroupAddExecute;
import org.apache.seata.server.cluster.raft.execute.vgroup.VGroupRemoveExecute;
import org.apache.seata.server.cluster.raft.processor.request.PutNodeMetadataRequest;
import org.apache.seata.server.cluster.raft.processor.response.PutNodeMetadataResponse;
import org.apache.seata.server.cluster.raft.snapshot.metadata.LeaderMetadataSnapshotFile;
Expand Down Expand Up @@ -88,11 +90,13 @@
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_BRANCH_SESSION;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_GLOBAL_SESSION;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_VGROUP_MAPPING;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REFRESH_CLUSTER_METADATA;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.RELEASE_BRANCH_SESSION_LOCK;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.RELEASE_GLOBAL_SESSION_LOCK;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REMOVE_BRANCH_SESSION;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REMOVE_GLOBAL_SESSION;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REMOVE_VGROUP_MAPPING;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.UPDATE_BRANCH_SESSION_STATUS;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.UPDATE_GLOBAL_SESSION_STATUS;

Expand Down Expand Up @@ -152,6 +156,8 @@ public RaftStateMachine(String group) {
EXECUTES.put(REMOVE_GLOBAL_SESSION, new RemoveGlobalSessionExecute());
EXECUTES.put(UPDATE_BRANCH_SESSION_STATUS, new UpdateBranchSessionExecute());
EXECUTES.put(RELEASE_BRANCH_SESSION_LOCK, new BranchReleaseLockExecute());
EXECUTES.put(REMOVE_VGROUP_MAPPING, new VGroupRemoveExecute());
EXECUTES.put(ADD_VGROUP_MAPPING, new VGroupAddExecute());
this.scheduledFuture =
RESYNC_METADATA_POOL.scheduleAtFixedRate(() -> syncCurrentNodeInfo(group), 10, 10, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.seata.server.cluster.raft.execute;

import org.apache.seata.server.session.SessionHolder;
import org.apache.seata.server.store.VGroupMappingStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,4 +32,6 @@ public abstract class AbstractRaftMsgExecute implements RaftMsgExecute<Boolean>

protected RaftLockManager raftLockManager = (RaftLockManager)LockerManagerFactory.getLockManager();

protected VGroupMappingStoreManager raftVGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();

}
Loading

0 comments on commit 3c050a2

Please sign in to comment.