From 679a2e01ec985c9743e338bb418c9d4d5c998323 Mon Sep 17 00:00:00 2001 From: justabug Date: Mon, 24 Feb 2025 10:58:45 +0800 Subject: [PATCH 1/3] optimize: Throwing IOException in sendSyncRequest (#7162) --- .../org/apache/seata/core/rpc/RemotingServer.java | 7 ++++--- .../core/rpc/netty/AbstractNettyRemotingServer.java | 13 +++++++------ .../org/apache/seata/mockserver/call/CallRm.java | 5 +++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java index c3141603a07..f38e301b852 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java @@ -21,6 +21,7 @@ import org.apache.seata.core.rpc.processor.RemotingProcessor; import org.apache.seata.core.protocol.MessageType; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; @@ -41,7 +42,7 @@ public interface RemotingServer { * @return client result message * @throws TimeoutException TimeoutException */ - Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException; + Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException, IOException; /** * server send sync request. @@ -51,7 +52,7 @@ public interface RemotingServer { * @return client result message * @throws TimeoutException TimeoutException */ - Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException; + Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException, IOException; /** * server send async request. @@ -59,7 +60,7 @@ public interface RemotingServer { * @param channel client channel * @param msg transaction message {@code org.apache.seata.core.protocol} */ - void sendAsyncRequest(Channel channel, Object msg); + void sendAsyncRequest(Channel channel, Object msg) throws IOException; /** * server send async response. diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 67df2ea8494..fc6a7f21dd3 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -16,6 +16,7 @@ */ package org.apache.seata.core.rpc.netty; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; @@ -68,28 +69,28 @@ public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServ @Override public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) - throws TimeoutException { + throws TimeoutException, IOException { Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp); if (channel == null) { - throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); + throw new IOException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @Override - public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException { + public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException, IOException { if (channel == null) { - throw new RuntimeException("client is not connected"); + throw new IOException("client is not connected"); } RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @Override - public void sendAsyncRequest(Channel channel, Object msg) { + public void sendAsyncRequest(Channel channel, Object msg) throws IOException { if (channel == null) { - throw new RuntimeException("client is not connected"); + throw new IOException("client is not connected"); } RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); super.sendAsync(channel, rpcMessage); diff --git a/mock-server/src/main/java/org/apache/seata/mockserver/call/CallRm.java b/mock-server/src/main/java/org/apache/seata/mockserver/call/CallRm.java index 8a347246f33..be48ab4c63b 100644 --- a/mock-server/src/main/java/org/apache/seata/mockserver/call/CallRm.java +++ b/mock-server/src/main/java/org/apache/seata/mockserver/call/CallRm.java @@ -16,6 +16,7 @@ */ package org.apache.seata.mockserver.call; +import java.io.IOException; import java.util.concurrent.TimeoutException; import io.netty.channel.Channel; @@ -49,7 +50,7 @@ public static BranchStatus branchCommit(RemotingServer remotingServer, MockBranc BranchCommitResponse response = (BranchCommitResponse)remotingServer.sendSyncRequest( branchSession.getResourceId(), branchSession.getClientId(), request, false); return response.getBranchStatus(); - } catch (TimeoutException e) { + } catch (TimeoutException | IOException e) { throw new RuntimeException(e); } } @@ -68,7 +69,7 @@ public static BranchStatus branchRollback(RemotingServer remotingServer, MockBra BranchRollbackResponse response = (BranchRollbackResponse)remotingServer.sendSyncRequest( branchSession.getResourceId(), branchSession.getClientId(), request, false); return response.getBranchStatus(); - } catch (TimeoutException e) { + } catch (TimeoutException | IOException e) { throw new RuntimeException(e); } } From 3c050a226d442f90fdd4f5b8d7110a5a1df81ef2 Mon Sep 17 00:00:00 2001 From: funkye Date: Mon, 24 Feb 2025 14:43:38 +0800 Subject: [PATCH 2/3] feature: support raft mode registry to namingserver (#7114) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 + .../seata/common/metadata/Instance.java | 22 +++ .../apache/seata/common/metadata/Node.java | 6 +- .../namingserver/NamingServerNode.java | 2 - .../NamingserverRegistryServiceImpl.java | 3 + .../controller/NamingController.java | 15 ++ .../namingserver/entity/bo/NamespaceBO.java | 3 +- .../namingserver/manager/NamingManager.java | 36 +++-- .../loader/SeataPropertiesLoader.java | 16 +- .../server/raft/ServerRaftProperties.java | 2 +- .../cluster/raft/snapshot/RaftSnapshot.java | 9 +- .../java/org/apache/seata/server/Server.java | 9 +- .../cluster/raft/RaftServerManager.java | 4 +- .../server/cluster/raft/RaftStateMachine.java | 6 + .../raft/execute/AbstractRaftMsgExecute.java | 4 + .../raft/execute/vgroup/VGroupAddExecute.java | 35 +++++ .../execute/vgroup/VGroupRemoveExecute.java | 35 +++++ .../cluster/raft/snapshot/RaftSnapshot.java | 6 + .../snapshot/vgroup/VGroupSnapshotFile.java | 89 +++++++++++ .../raft/sync/msg/RaftSyncMsgType.java | 15 +- .../raft/sync/msg/RaftVGroupSyncMsg.java | 41 +++++ .../config/SeataNamingserverWebConfig.java | 35 +++++ .../config/ServerInstanceStrategyConfig.java | 41 +++++ .../seata/server/filter/RaftGroupFilter.java | 52 +++++++ .../AbstractSeataInstanceStrategy.java | 93 +++++++++++ ...tory.java => GeneralInstanceStrategy.java} | 63 ++------ .../instance/RaftServerInstanceStrategy.java | 108 +++++++++++++ .../instance/SeataInstanceStrategy.java | 40 +++++ .../seata/server/session/SessionHolder.java | 6 +- .../DataBaseVGroupMappingStoreManager.java | 3 +- .../store/FileVGroupMappingStoreManager.java | 24 +-- .../sore/RaftVGroupMappingStoreManager.java | 147 ++++++++++++++++++ .../store/RedisVGroupMappingStoreManager.java | 2 +- .../store/VGroupMappingStoreManager.java | 5 +- ...ata.server.store.VGroupMappingStoreManager | 3 +- .../FileVGroupMappingStoreManagerTest.java | 8 +- 37 files changed, 875 insertions(+), 116 deletions(-) create mode 100644 server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupAddExecute.java create mode 100644 server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupRemoveExecute.java create mode 100644 server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java create mode 100644 server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftVGroupSyncMsg.java create mode 100644 server/src/main/java/org/apache/seata/server/config/SeataNamingserverWebConfig.java create mode 100644 server/src/main/java/org/apache/seata/server/config/ServerInstanceStrategyConfig.java create mode 100644 server/src/main/java/org/apache/seata/server/filter/RaftGroupFilter.java create mode 100644 server/src/main/java/org/apache/seata/server/instance/AbstractSeataInstanceStrategy.java rename server/src/main/java/org/apache/seata/server/instance/{ServerInstanceFactory.java => GeneralInstanceStrategy.java} (55%) create mode 100644 server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java create mode 100644 server/src/main/java/org/apache/seata/server/instance/SeataInstanceStrategy.java create mode 100644 server/src/main/java/org/apache/seata/server/storage/raft/sore/RaftVGroupMappingStoreManager.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index e3b7cc4db18..d05d99853b2 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer +- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] support raft mode registry to namingserver - [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement scheduled handling for end status transaction - [[#7171](https://github.com/apache/incubator-seata/pull/7171)] support EpollEventLoopGroup in client diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 34e1f5f694b..3ba8323a69d 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -9,9 +9,11 @@ - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器 +- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] 支持raft集群注册至namingserver - [[#7133](https://github.com/apache/incubator-seata/pull/7133)] 实现对残留的end状态事务定时处理 - [[#7171](https://github.com/apache/incubator-seata/pull/7171)] 客户端支持 EpollEventLoopGroup + ### bugfix: - [[#7104](https://github.com/apache/incubator-seata/pull/7104)] 修复SeataApplicationListener在低版本springboot未实现supportsSourceType方法的问题 diff --git a/common/src/main/java/org/apache/seata/common/metadata/Instance.java b/common/src/main/java/org/apache/seata/common/metadata/Instance.java index e588df0e98f..57ae7ff97fb 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Instance.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Instance.java @@ -20,7 +20,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,6 +46,10 @@ public static Instance getInstance() { return SingletonHolder.SERVER_INSTANCE; } + public static List getInstances() { + return SingletonHolder.SERVER_INSTANCES; + } + public String getNamespace() { return namespace; @@ -164,8 +170,24 @@ public String toJsonString(ObjectMapper objectMapper) { } } + public Instance clone() { + Instance instance = new Instance(); + instance.setNamespace(namespace); + instance.setClusterName(clusterName); + instance.setUnit(unit); + instance.setControl(control); + instance.setTransaction(transaction); + instance.setWeight(weight); + instance.setHealthy(healthy); + instance.setTerm(term); + instance.setTimestamp(timestamp); + instance.setMetadata(metadata); + return instance; + } + private static class SingletonHolder { private static final Instance SERVER_INSTANCE = new Instance(); + private static final List SERVER_INSTANCES = new ArrayList<>(); } diff --git a/common/src/main/java/org/apache/seata/common/metadata/Node.java b/common/src/main/java/org/apache/seata/common/metadata/Node.java index bcc85a96962..014cb0fdcb8 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Node.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Node.java @@ -33,9 +33,9 @@ public class Node { private Endpoint internal; - private double weight = 1.0; - private boolean healthy = true; - private long timeStamp; + protected double weight = 1.0; + protected boolean healthy = true; + protected long timeStamp; private String group; private ClusterRole role = ClusterRole.MEMBER; diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java index 507bd943b58..2eeca0e8d35 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java @@ -22,8 +22,6 @@ public class NamingServerNode extends Node { - private double weight = 1.0; - private boolean healthy = true; private long term; private String unit; diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index 88386253fa6..8e5b2c21e02 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -156,6 +156,9 @@ public void register(Instance instance) throws Exception { doRegister(instance, getNamingAddrs()); } + public void doRegister(List instance, List urlList) { + + } public void doRegister(Instance instance, List urlList) { for (String urlSuffix : urlList) { // continue if name server node is unhealthy diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java index a1630d3a464..672c378b9d5 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java @@ -70,6 +70,21 @@ public Result registerInstance(@RequestParam String namespace, return result; } + @PostMapping("/batchRegister") + public Result batchRegisterInstance(@RequestParam String namespace, + @RequestParam String clusterName, + @RequestBody List nodes) { + Result result = new Result<>(); + boolean isSuccess = namingManager.registerInstances(nodes, namespace, clusterName); + if (isSuccess) { + result.setMessage("node has registered successfully!"); + } else { + result.setCode("500"); + result.setMessage("node registered unsuccessfully!"); + } + return result; + } + @PostMapping("/unregister") public Result unregisterInstance(@RequestParam String namespace, @RequestParam String clusterName, @RequestParam String unit, @RequestBody NamingServerNode registerBody) { diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java index e5694b02403..dfba1e64476 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.seata.common.metadata.Cluster; import org.apache.seata.common.util.StringUtils; import org.apache.seata.namingserver.entity.pojo.ClusterData; @@ -37,7 +36,7 @@ public Map getClusterMap() { return clusterMap; } - public List getCluster(ConcurrentMap clusterDataMap) { + public List getCluster(Map clusterDataMap) { List list = new ArrayList<>(); clusterMap.forEach((clusterName, unitNameSet) -> { ClusterData clusterData = clusterDataMap.get(clusterName); diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 9a6b790220d..7e6b74328e0 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.annotation.PostConstruct; import com.github.benmanes.caffeine.cache.Caffeine; @@ -42,6 +43,7 @@ import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; import org.apache.seata.common.metadata.Cluster; +import org.apache.seata.common.metadata.ClusterRole; import org.apache.seata.common.metadata.Node; import org.apache.seata.common.metadata.namingserver.NamingServerNode; import org.apache.seata.common.metadata.namingserver.Unit; @@ -152,7 +154,9 @@ public Result createGroup(String namespace, String vGroup, String cluste LOGGER.error("no instance in cluster {}", clusterName); return new Result<>("301", "no instance in cluster" + clusterName); } else { - Node node = nodeList.get(0); + Node node = + nodeList.stream().filter(n -> n.getRole() == ClusterRole.LEADER || n.getRole() == ClusterRole.MEMBER) + .collect(Collectors.toList()).get(0); String controlHost = node.getControl().getHost(); int controlPort = node.getControl().getPort(); String httpUrl = NamingServerConstants.HTTP_PREFIX + controlHost + NamingServerConstants.IP_PORT_SPLIT_CHAR @@ -231,6 +235,14 @@ public void notifyClusterChange(String vGroup, String namespace, String clusterN }); } + public boolean registerInstances(List node, String namespace, String clusterName) { + boolean result = true; + for (NamingServerNode namingServerNode : node) { + result = registerInstance(namingServerNode, namespace, clusterName, namingServerNode.getUnit()); + } + return result; + } + public boolean registerInstance(NamingServerNode node, String namespace, String clusterName, String unitName) { try { Map clusterDataHashMap = @@ -245,16 +257,14 @@ public boolean registerInstance(NamingServerNode node, String namespace, String // if extended metadata includes vgroup mapping relationship, add it in clusterData if (mappingObj instanceof Map) { Map vGroups = (Map)mappingObj; - if (!CollectionUtils.isEmpty(vGroups)) { - vGroups.forEach((k, v) -> { - // In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node. - // In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used. - boolean changed = addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k); - if (hasChanged || changed) { - notifyClusterChange(k, namespace, clusterName, unitName, node.getTerm()); - } - }); - } + vGroups.forEach((k, v) -> { + // In non-raft mode, a unit is one-to-one with a node, and the unitName is stored on the node. + // In raft mode, the unitName is equal to the raft-group, so the node's unitName cannot be used. + boolean changed = addGroup(namespace, clusterName, StringUtils.isBlank(v) ? unitName : v, k); + if (hasChanged || changed) { + notifyClusterChange(k, namespace, clusterName, unitName, node.getTerm()); + } + }); } instanceLiveTable.put( new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()), @@ -295,8 +305,8 @@ public boolean unregisterInstance(String namespace, String clusterName, String u public List getClusterListByVgroup(String vGroup, String namespace) { // find the cluster where the transaction group is located - HashMap> concurrentVgroupMap = new HashMap<>(vGroupMap.asMap()); - ConcurrentMap vgroupNamespaceMap = concurrentVgroupMap.get(vGroup); + Map> concurrentVgroupMap = new HashMap<>(vGroupMap.asMap()); + Map vgroupNamespaceMap = concurrentVgroupMap.get(vGroup); List clusterList = new ArrayList<>(); if (!CollectionUtils.isEmpty(vgroupNamespaceMap)) { NamespaceBO namespaceBO = vgroupNamespaceMap.get(namespace); diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/loader/SeataPropertiesLoader.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/loader/SeataPropertiesLoader.java index 77d27827c0b..4496c3dc98b 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/loader/SeataPropertiesLoader.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/loader/SeataPropertiesLoader.java @@ -16,6 +16,14 @@ */ package org.apache.seata.spring.boot.autoconfigure.loader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + import org.apache.seata.common.holder.ObjectHolder; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; @@ -29,14 +37,6 @@ import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.PropertiesPropertySource; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_PREFIX_CONFIG; import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_PREFIX_REGISTRY; import static org.apache.seata.common.ConfigurationKeys.METRICS_PREFIX; diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/raft/ServerRaftProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/raft/ServerRaftProperties.java index 4a3c3649b4d..6164d6daa96 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/raft/ServerRaftProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/raft/ServerRaftProperties.java @@ -29,7 +29,7 @@ public class ServerRaftProperties { private String serverAddr; - private String group; + private String group = "default"; private Boolean autoJoin = false; diff --git a/server/src/main/java/io/seata/server/cluster/raft/snapshot/RaftSnapshot.java b/server/src/main/java/io/seata/server/cluster/raft/snapshot/RaftSnapshot.java index 40c7752a520..d0b6bcb5446 100644 --- a/server/src/main/java/io/seata/server/cluster/raft/snapshot/RaftSnapshot.java +++ b/server/src/main/java/io/seata/server/cluster/raft/snapshot/RaftSnapshot.java @@ -16,11 +16,6 @@ */ package io.seata.server.cluster.raft.snapshot; -import static org.apache.seata.common.ConfigurationKeys.SERVER_RAFT_COMPRESSOR; -import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_COMPRESSOR; -import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_SERIALIZATION; - - import java.io.Serializable; import org.apache.seata.common.util.StringUtils; @@ -29,6 +24,10 @@ import org.apache.seata.core.protocol.Version; import org.apache.seata.core.serializer.SerializerType; +import static org.apache.seata.common.ConfigurationKeys.SERVER_RAFT_COMPRESSOR; +import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_COMPRESSOR; +import static org.apache.seata.common.DefaultValues.DEFAULT_RAFT_SERIALIZATION; + @Deprecated public class RaftSnapshot implements Serializable { diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java index 39648dae824..b93ec5fb080 100644 --- a/server/src/main/java/org/apache/seata/server/Server.java +++ b/server/src/main/java/org/apache/seata/server/Server.java @@ -16,6 +16,7 @@ */ package org.apache.seata.server; +import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -31,11 +32,10 @@ import org.apache.seata.core.rpc.netty.NettyRemotingServer; import org.apache.seata.core.rpc.netty.NettyServerConfig; import org.apache.seata.server.coordinator.DefaultCoordinator; -import org.apache.seata.server.instance.ServerInstanceFactory; +import org.apache.seata.server.instance.SeataInstanceStrategy; import org.apache.seata.server.lock.LockerManagerFactory; import org.apache.seata.server.metrics.MetricsManager; import org.apache.seata.server.session.SessionHolder; - import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -53,7 +53,7 @@ public class Server { @Resource - ServerInstanceFactory serverInstanceFactory; + SeataInstanceStrategy seataInstanceStrategy; /** * The entry point of application. @@ -103,8 +103,7 @@ public void start(String[] args) { LockerManagerFactory.init(); coordinator.init(); nettyRemotingServer.setHandler(coordinator); - - serverInstanceFactory.serverInstanceInit(); + Optional.ofNullable(seataInstanceStrategy).ifPresent(SeataInstanceStrategy::init); // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); nettyRemotingServer.init(); diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java index 47e7ee4432b..e5bfcb1056e 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java @@ -45,6 +45,7 @@ import org.apache.seata.discovery.registry.FileRegistryServiceImpl; import org.apache.seata.discovery.registry.MultiRegistryFactory; import org.apache.seata.discovery.registry.RegistryService; +import org.apache.seata.discovery.registry.namingserver.NamingserverRegistryServiceImpl; import org.apache.seata.server.cluster.raft.processor.PutNodeInfoRequestProcessor; import org.apache.seata.server.cluster.raft.serializer.JacksonBoltSerializer; import org.apache.seata.server.store.StoreConfig; @@ -98,7 +99,8 @@ public static void init() { } else { if (RAFT_MODE) { for (RegistryService instance : MultiRegistryFactory.getInstances()) { - if (!(instance instanceof FileRegistryServiceImpl)) { + if (!(instance instanceof FileRegistryServiceImpl) + && !(instance instanceof NamingserverRegistryServiceImpl)) { throw new IllegalArgumentException("Raft store mode not support other Registration Center"); } } diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java index 6ba9d66673b..89f85e9b298 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java @@ -56,6 +56,8 @@ import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.serializer.SerializerType; import org.apache.seata.server.cluster.raft.context.SeataClusterContext; +import org.apache.seata.server.cluster.raft.execute.vgroup.VGroupAddExecute; +import org.apache.seata.server.cluster.raft.execute.vgroup.VGroupRemoveExecute; import org.apache.seata.server.cluster.raft.processor.request.PutNodeMetadataRequest; import org.apache.seata.server.cluster.raft.processor.response.PutNodeMetadataResponse; import org.apache.seata.server.cluster.raft.snapshot.metadata.LeaderMetadataSnapshotFile; @@ -88,11 +90,13 @@ import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_BRANCH_SESSION; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_GLOBAL_SESSION; +import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_VGROUP_MAPPING; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REFRESH_CLUSTER_METADATA; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.RELEASE_BRANCH_SESSION_LOCK; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.RELEASE_GLOBAL_SESSION_LOCK; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REMOVE_BRANCH_SESSION; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REMOVE_GLOBAL_SESSION; +import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REMOVE_VGROUP_MAPPING; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.UPDATE_BRANCH_SESSION_STATUS; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.UPDATE_GLOBAL_SESSION_STATUS; @@ -152,6 +156,8 @@ public RaftStateMachine(String group) { EXECUTES.put(REMOVE_GLOBAL_SESSION, new RemoveGlobalSessionExecute()); EXECUTES.put(UPDATE_BRANCH_SESSION_STATUS, new UpdateBranchSessionExecute()); EXECUTES.put(RELEASE_BRANCH_SESSION_LOCK, new BranchReleaseLockExecute()); + EXECUTES.put(REMOVE_VGROUP_MAPPING, new VGroupRemoveExecute()); + EXECUTES.put(ADD_VGROUP_MAPPING, new VGroupAddExecute()); this.scheduledFuture = RESYNC_METADATA_POOL.scheduleAtFixedRate(() -> syncCurrentNodeInfo(group), 10, 10, TimeUnit.SECONDS); } diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/AbstractRaftMsgExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/AbstractRaftMsgExecute.java index 922b58bc848..8d1a14c0fd2 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/AbstractRaftMsgExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/AbstractRaftMsgExecute.java @@ -16,6 +16,8 @@ */ package org.apache.seata.server.cluster.raft.execute; +import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.store.VGroupMappingStoreManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,4 +32,6 @@ public abstract class AbstractRaftMsgExecute implements RaftMsgExecute protected RaftLockManager raftLockManager = (RaftLockManager)LockerManagerFactory.getLockManager(); + protected VGroupMappingStoreManager raftVGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + } diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupAddExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupAddExecute.java new file mode 100644 index 00000000000..47ccf9e12b6 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupAddExecute.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.execute.vgroup; + +import org.apache.seata.server.cluster.raft.execute.AbstractRaftMsgExecute; +import org.apache.seata.server.cluster.raft.sync.msg.RaftBaseMsg; +import org.apache.seata.server.cluster.raft.sync.msg.RaftVGroupSyncMsg; +import org.apache.seata.server.storage.raft.sore.RaftVGroupMappingStoreManager; + +/** + */ +public class VGroupAddExecute extends AbstractRaftMsgExecute { + + @Override + public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { + RaftVGroupSyncMsg vGroupSyncMsg = (RaftVGroupSyncMsg)syncMsg; + ((RaftVGroupMappingStoreManager)raftVGroupMappingStoreManager).localAddVGroup(vGroupSyncMsg.getMappingDO()); + return true; + } + +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupRemoveExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupRemoveExecute.java new file mode 100644 index 00000000000..42d2e8dfe96 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/vgroup/VGroupRemoveExecute.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.execute.vgroup; + +import org.apache.seata.server.cluster.raft.execute.AbstractRaftMsgExecute; +import org.apache.seata.server.cluster.raft.sync.msg.RaftBaseMsg; +import org.apache.seata.server.cluster.raft.sync.msg.RaftVGroupSyncMsg; +import org.apache.seata.server.storage.raft.sore.RaftVGroupMappingStoreManager; + +/** + */ +public class VGroupRemoveExecute extends AbstractRaftMsgExecute { + + @Override + public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { + RaftVGroupSyncMsg vGroupSyncMsg = (RaftVGroupSyncMsg)syncMsg; + ((RaftVGroupMappingStoreManager)raftVGroupMappingStoreManager).localRemoveVGroup(vGroupSyncMsg.getMappingDO().getVGroup()); + return true; + } + +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/RaftSnapshot.java b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/RaftSnapshot.java index e8c1cc68f31..da97df7f5b9 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/RaftSnapshot.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/RaftSnapshot.java @@ -128,6 +128,12 @@ public enum SnapshotType { * session snapshot */ session("session"), + + /** + * vgroup mapping + */ + vgroup_mapping("vgroup_mapping"), + /** * leader metadata snapshot */ diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java new file mode 100644 index 00000000000..229eeada589 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.snapshot.vgroup; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.server.cluster.raft.snapshot.RaftSnapshot; +import org.apache.seata.server.cluster.raft.snapshot.StoreSnapshotFile; +import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.storage.raft.sore.RaftVGroupMappingStoreManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VGroupSnapshotFile implements Serializable, StoreSnapshotFile { + + private final static Logger LOGGER = LoggerFactory.getLogger(VGroupSnapshotFile.class); + + public static final String ROOT_MAPPING_MANAGER_NAME = "vgroup_mapping.json"; + + String group; + + public VGroupSnapshotFile(String group) { + this.group = group; + } + + @Override + public Status save(SnapshotWriter writer) { + RaftSnapshot raftSnapshot = new RaftSnapshot(); + RaftVGroupMappingStoreManager raftVGroupMappingStoreManager = + (RaftVGroupMappingStoreManager)SessionHolder.getRootVGroupMappingManager(); + Map map = raftVGroupMappingStoreManager.loadVGroupsByUnit(group); + raftSnapshot.setBody(map); + raftSnapshot.setType(RaftSnapshot.SnapshotType.vgroup_mapping); + String path = new StringBuilder(writer.getPath()).append(File.separator).append(ROOT_MAPPING_MANAGER_NAME).toString(); + try { + if (save(raftSnapshot, path)) { + if (writer.addFile(ROOT_MAPPING_MANAGER_NAME)) { + return Status.OK(); + } else { + return new Status(RaftError.EIO, "Fail to add file to writer"); + } + } + } catch (IOException e) { + LOGGER.error("Fail to save groupId: {} snapshot {}", group, path, e); + } + return new Status(RaftError.EIO, "Fail to save groupId: " + group + " snapshot %s", path); + } + + @Override + public boolean load(SnapshotReader reader) { + if (reader.getFileMeta(ROOT_MAPPING_MANAGER_NAME) == null) { + LOGGER.error("Fail to find data file in {}", reader.getPath()); + return false; + } + String path = new StringBuilder(reader.getPath()).append(File.separator).append(ROOT_MAPPING_MANAGER_NAME).toString(); + try { + Map map = (Map)load(path); + RaftVGroupMappingStoreManager raftVGroupMappingStoreManager = + (RaftVGroupMappingStoreManager)SessionHolder.getRootVGroupMappingManager(); + raftVGroupMappingStoreManager.localAddVGroups(map, group); + return true; + } catch (final Exception e) { + LOGGER.error("fail to load snapshot from {}", path, e); + return false; + } + } + +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftSyncMsgType.java b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftSyncMsgType.java index f74cdb88c52..fc9829d323b 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftSyncMsgType.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftSyncMsgType.java @@ -55,5 +55,18 @@ public enum RaftSyncMsgType { /** * refresh cluster metadata */ - REFRESH_CLUSTER_METADATA; + REFRESH_CLUSTER_METADATA, + + /** + * add vgroup mapping + */ + ADD_VGROUP_MAPPING, + /** + * remove vgroup mapping + */ + REMOVE_VGROUP_MAPPING, + /** + * update vgroup mapping + */ + UPDATE_VGROUP_MAPPING,; } diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftVGroupSyncMsg.java b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftVGroupSyncMsg.java new file mode 100644 index 00000000000..a41e8961a14 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/RaftVGroupSyncMsg.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.sync.msg; + +import org.apache.seata.core.store.MappingDO; + +public class RaftVGroupSyncMsg extends RaftBaseMsg { + + MappingDO mappingDO; + + public RaftVGroupSyncMsg() { + } + + public RaftVGroupSyncMsg(MappingDO mappingDO, RaftSyncMsgType raftSyncMsgType) { + this.msgType = raftSyncMsgType; + this.mappingDO = mappingDO; + } + + public MappingDO getMappingDO() { + return mappingDO; + } + + public void setMappingDO(MappingDO mappingDO) { + this.mappingDO = mappingDO; + } + +} diff --git a/server/src/main/java/org/apache/seata/server/config/SeataNamingserverWebConfig.java b/server/src/main/java/org/apache/seata/server/config/SeataNamingserverWebConfig.java new file mode 100644 index 00000000000..21f045aecdf --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/config/SeataNamingserverWebConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.config; + +import org.apache.seata.server.filter.RaftGroupFilter; +import org.springframework.boot.web.servlet.FilterRegistrationBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SeataNamingserverWebConfig { + + @Bean + public FilterRegistrationBean raftGroupFilter() { + FilterRegistrationBean registrationBean = new FilterRegistrationBean<>(); + registrationBean.setFilter(new RaftGroupFilter()); + registrationBean.addUrlPatterns("/vgroup/v1/*"); + return registrationBean; + } + +} diff --git a/server/src/main/java/org/apache/seata/server/config/ServerInstanceStrategyConfig.java b/server/src/main/java/org/apache/seata/server/config/ServerInstanceStrategyConfig.java new file mode 100644 index 00000000000..7cf53fd2ff0 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/config/ServerInstanceStrategyConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.config; + +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.server.instance.GeneralInstanceStrategy; +import org.apache.seata.server.instance.RaftServerInstanceStrategy; +import org.apache.seata.server.instance.SeataInstanceStrategy; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ServerInstanceStrategyConfig { + + @Value("${sessionMode:file}") + String sessionMode; + + @Bean + public SeataInstanceStrategy seataInstanceStrategy() { + if (StringUtils.equalsIgnoreCase("raft", sessionMode)) { + return new RaftServerInstanceStrategy(); + } + return new GeneralInstanceStrategy(); + } + +} diff --git a/server/src/main/java/org/apache/seata/server/filter/RaftGroupFilter.java b/server/src/main/java/org/apache/seata/server/filter/RaftGroupFilter.java new file mode 100644 index 00000000000..1a52c300ac5 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/filter/RaftGroupFilter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.filter; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import org.apache.seata.server.cluster.raft.context.SeataClusterContext; + +public class RaftGroupFilter implements Filter { + + @Override + public void init(FilterConfig filterConfig) throws ServletException {} + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + HttpServletRequest httpRequest = (HttpServletRequest)request; + String unit = httpRequest.getParameter("unit"); + if (unit != null) { + SeataClusterContext.bindGroup(unit); + } + try { + chain.doFilter(request, response); + } finally { + SeataClusterContext.unbindGroup(); + } + } + + @Override + public void destroy() {} + +} diff --git a/server/src/main/java/org/apache/seata/server/instance/AbstractSeataInstanceStrategy.java b/server/src/main/java/org/apache/seata/server/instance/AbstractSeataInstanceStrategy.java new file mode 100644 index 00000000000..c657358c265 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/instance/AbstractSeataInstanceStrategy.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.instance; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import org.apache.seata.common.metadata.Instance; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.store.VGroupMappingStoreManager; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.context.ApplicationContext; + + +import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; + +public abstract class AbstractSeataInstanceStrategy implements SeataInstanceStrategy { + + @Resource + protected RegistryProperties registryProperties; + + protected ServerProperties serverProperties; + + @Resource + protected ApplicationContext applicationContext; + + @Resource + protected RegistryNamingServerProperties registryNamingServerProperties; + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected static volatile ScheduledExecutorService EXECUTOR_SERVICE; + + protected AtomicBoolean init = new AtomicBoolean(false); + @PostConstruct + public void postConstruct() { + this.serverProperties = applicationContext.getBean(ServerProperties.class); + } + + @Override + public void init() { + if (!StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { + return; + } + Instance instance = serverInstanceInit(); + if (init.compareAndSet(false, true)) { + VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + // load vgroup mapping relationship + instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); + EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true)); + EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { + try { + if (instance.getTerm() > 0) { + SessionHolder.getRootVGroupMappingManager().notifyMapping(); + } + } catch (Exception e) { + logger.error("Naming server register Exception", e); + } + }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), + TimeUnit.MILLISECONDS); + } + } + + @PreDestroy + public void destroy() { + Optional.ofNullable(EXECUTOR_SERVICE).ifPresent(ScheduledExecutorService::shutdown); + } + +} diff --git a/server/src/main/java/org/apache/seata/server/instance/ServerInstanceFactory.java b/server/src/main/java/org/apache/seata/server/instance/GeneralInstanceStrategy.java similarity index 55% rename from server/src/main/java/org/apache/seata/server/instance/ServerInstanceFactory.java rename to server/src/main/java/org/apache/seata/server/instance/GeneralInstanceStrategy.java index 2a50b8da25f..18b167d3364 100644 --- a/server/src/main/java/org/apache/seata/server/instance/ServerInstanceFactory.java +++ b/server/src/main/java/org/apache/seata/server/instance/GeneralInstanceStrategy.java @@ -16,51 +16,25 @@ */ package org.apache.seata.server.instance; +import java.util.UUID; + import org.apache.seata.common.XID; import org.apache.seata.common.holder.ObjectHolder; import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.metadata.Node; -import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.server.Server; -import org.apache.seata.server.ServerRunner; -import org.apache.seata.server.session.SessionHolder; import org.apache.seata.server.store.StoreConfig; -import org.apache.seata.server.store.VGroupMappingStoreManager; -import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties; -import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.EnumerablePropertySource; import org.springframework.core.env.PropertySource; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import static org.apache.seata.common.ConfigurationKeys.META_PREFIX; -import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; +public class GeneralInstanceStrategy extends AbstractSeataInstanceStrategy { -@Component("serverInstanceFactory") -public class ServerInstanceFactory { - @Resource - private RegistryProperties registryProperties; - - protected static volatile ScheduledExecutorService EXECUTOR_SERVICE; - - @Resource - private RegistryNamingServerProperties registryNamingServerProperties; + @Override + public Instance serverInstanceInit() { - private static final Logger LOGGER = LoggerFactory.getLogger(Server.class); - - public void serverInstanceInit() { ConfigurableEnvironment environment = (ConfigurableEnvironment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); @@ -83,8 +57,7 @@ public void serverInstanceInit() { instance.setTerm(System.currentTimeMillis()); // load node Endpoint - instance.setControl(new Node.Endpoint(XID.getIpAddress(), - Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); + instance.setControl(new Node.Endpoint(XID.getIpAddress(), serverProperties.getPort(), "http")); // load metadata for (PropertySource propertySource : environment.getPropertySources()) { @@ -98,22 +71,12 @@ public void serverInstanceInit() { } } } - instance.setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty")); - if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { - VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); - // load vgroup mapping relationship - instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); - EXECUTOR_SERVICE = - new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("heartbeat-namingserver", 1, true)); - EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { - try { - vGroupMappingStoreManager.notifyMapping(); - } catch (Exception e) { - LOGGER.error("Naming server register Exception", e); - } - }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), - TimeUnit.MILLISECONDS); - ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown); - } + return instance; } + + @Override + public Type type() { + return Type.GENERAL; + } + } diff --git a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java new file mode 100644 index 00000000000..ab8ccd651cd --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.instance; + +import javax.annotation.Resource; +import org.apache.seata.common.XID; +import org.apache.seata.common.holder.ObjectHolder; +import org.apache.seata.common.metadata.ClusterRole; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.metadata.Instance; +import org.apache.seata.server.cluster.listener.ClusterChangeEvent; +import org.apache.seata.server.cluster.listener.ClusterChangeListener; +import org.apache.seata.server.cluster.raft.RaftServerManager; +import org.apache.seata.server.cluster.raft.RaftStateMachine; +import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.store.StoreConfig; +import org.apache.seata.spring.boot.autoconfigure.properties.server.raft.ServerRaftProperties; +import org.springframework.context.event.EventListener; +import org.springframework.core.Ordered; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.EnumerablePropertySource; +import org.springframework.core.env.PropertySource; +import org.springframework.scheduling.annotation.Async; + + +import static org.apache.seata.common.ConfigurationKeys.META_PREFIX; +import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; + +public class RaftServerInstanceStrategy extends AbstractSeataInstanceStrategy + implements ClusterChangeListener, Ordered { + + @Resource + ServerRaftProperties raftProperties; + + @Override + public Instance serverInstanceInit() { + ConfigurableEnvironment environment = + (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); + + // load node properties + Instance instance = Instance.getInstance(); + // load namespace + String namespace = registryNamingServerProperties.getNamespace(); + instance.setNamespace(namespace); + // load cluster name + String clusterName = registryNamingServerProperties.getCluster(); + instance.setClusterName(clusterName); + String unit = raftProperties.getGroup(); + instance.setUnit(unit); + // load cluster type + String clusterType = String.valueOf(StoreConfig.getSessionMode()); + instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); + RaftStateMachine stateMachine = RaftServerManager.getRaftServer(unit).getRaftStateMachine(); + long term = RaftServerManager.getRaftServer(unit).getRaftStateMachine().getCurrentTerm().get(); + instance.setTerm(term); + instance.setRole(stateMachine.isLeader() ? ClusterRole.LEADER : ClusterRole.FOLLOWER); + // load node Endpoint + instance.setControl(new Node.Endpoint(XID.getIpAddress(), serverProperties.getPort(), "http")); + + // load metadata + for (PropertySource propertySource : environment.getPropertySources()) { + if (propertySource instanceof EnumerablePropertySource) { + EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource)propertySource; + for (String propertyName : enumerablePropertySource.getPropertyNames()) { + if (propertyName.startsWith(META_PREFIX)) { + instance.addMetadata(propertyName.substring(META_PREFIX.length()), + enumerablePropertySource.getProperty(propertyName)); + } + } + } + } + return instance; + } + + @Override + public Type type() { + return Type.RAFT; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE - 1; + } + + @Override + @EventListener + @Async + public void onChangeEvent(ClusterChangeEvent event) { + Instance.getInstance().setTerm(event.getTerm()); + Instance.getInstance().setRole(event.isLeader() ? ClusterRole.LEADER : ClusterRole.FOLLOWER); + SessionHolder.getRootVGroupMappingManager().notifyMapping(); + } + +} diff --git a/server/src/main/java/org/apache/seata/server/instance/SeataInstanceStrategy.java b/server/src/main/java/org/apache/seata/server/instance/SeataInstanceStrategy.java new file mode 100644 index 00000000000..56b52529ac3 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/instance/SeataInstanceStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.instance; + +import org.apache.seata.common.metadata.Instance; + +public interface SeataInstanceStrategy { + + Instance serverInstanceInit(); + + void init(); + + Type type(); + + enum Type { + /** + * General type. + */ + GENERAL, + /** + * Raft type. + */ + RAFT + } + +} diff --git a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java index 227edd53e29..d94dd0933cb 100644 --- a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java +++ b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java @@ -109,16 +109,14 @@ public static void init(SessionMode sessionMode) { ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.DB.getName()); } else if (SessionMode.RAFT.equals(sessionMode) || SessionMode.FILE.equals(sessionMode)) { - RaftServerManager.init(); - if (CollectionUtils.isNotEmpty(RaftServerManager.getRaftServers())) { - sessionMode = SessionMode.RAFT; - } if (SessionMode.RAFT.equals(sessionMode)) { String group = CONFIG.getConfig(ConfigurationKeys.SERVER_RAFT_GROUP, DEFAULT_SEATA_GROUP); ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.RAFT.getName(), new Object[]{ROOT_SESSION_MANAGER_NAME}); SESSION_MANAGER_MAP = new HashMap<>(); SESSION_MANAGER_MAP.put(group, ROOT_SESSION_MANAGER); + ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.RAFT.getName()); + RaftServerManager.init(); RaftServerManager.start(); } else { String vGroupMappingStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java index 20072d8945c..a39863da2ff 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java @@ -29,6 +29,7 @@ import javax.sql.DataSource; import java.util.HashMap; import java.util.List; +import java.util.Map; @LoadLevel(name = "db") public class DataBaseVGroupMappingStoreManager implements VGroupMappingStoreManager { @@ -54,7 +55,7 @@ public boolean removeVGroup(String vGroup) { } @Override - public HashMap loadVGroups() { + public Map loadVGroups() { List mappingDOS = vGroupMappingDataBaseDAO.queryMappingDO(); Instance instance = Instance.getInstance(); HashMap mappings = new HashMap<>(); diff --git a/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java index 93879dc16bb..3a6f161c2ca 100644 --- a/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java @@ -16,6 +16,14 @@ */ package org.apache.seata.server.storage.file.store; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.seata.common.loader.LoadLevel; import org.apache.seata.config.Configuration; @@ -28,13 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - @LoadLevel(name = "file") public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager { private static final Logger LOGGER = LoggerFactory.getLogger(FileVGroupMappingStoreManager.class); @@ -50,6 +51,7 @@ public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); + ObjectMapper objectMapper = new ObjectMapper(); public FileVGroupMappingStoreManager() { } @@ -92,7 +94,7 @@ public boolean removeVGroup(String vGroup) { } @Override - public HashMap readVGroups() { + public Map readVGroups() { Lock readLock = lock.readLock(); readLock.lock(); try { @@ -103,7 +105,7 @@ public HashMap readVGroups() { } @Override - public HashMap loadVGroups() { + public Map loadVGroups() { try { File fileToLoad = new File(storePath); if (!fileToLoad.exists()) { @@ -123,7 +125,6 @@ public HashMap loadVGroups() { String fileContent = FileUtils.readFileToString(fileToLoad, "UTF-8"); if (!fileContent.isEmpty()) { - ObjectMapper objectMapper = new ObjectMapper(); vGroupMapping = objectMapper.readValue(fileContent, new TypeReference>() { }); } @@ -138,9 +139,8 @@ public HashMap loadVGroups() { public boolean save(HashMap vGroupMapping) { try { - ObjectMapper objectMapper = new ObjectMapper(); String jsonMapping = objectMapper.writeValueAsString(vGroupMapping); - FileUtils.writeStringToFile(new File(storePath), jsonMapping, "UTF-8"); + FileUtils.writeStringToFile(new File(storePath), jsonMapping, StandardCharsets.UTF_8); return true; } catch (IOException e) { LOGGER.error("mapping relationship saved failed! ", e); diff --git a/server/src/main/java/org/apache/seata/server/storage/raft/sore/RaftVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/raft/sore/RaftVGroupMappingStoreManager.java new file mode 100644 index 00000000000..6faa711b45c --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/raft/sore/RaftVGroupMappingStoreManager.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.raft.sore; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import com.alipay.sofa.jraft.Closure; +import org.apache.seata.common.XID; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.metadata.ClusterRole; +import org.apache.seata.common.metadata.Instance; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.discovery.registry.MultiRegistryFactory; +import org.apache.seata.discovery.registry.RegistryService; +import org.apache.seata.server.cluster.raft.RaftServerManager; +import org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType; +import org.apache.seata.server.cluster.raft.sync.msg.RaftVGroupSyncMsg; +import org.apache.seata.server.cluster.raft.util.RaftTaskUtil; +import org.apache.seata.server.store.VGroupMappingStoreManager; + +@LoadLevel(name = "raft") +public class RaftVGroupMappingStoreManager implements VGroupMappingStoreManager { + + private final static Map> VGROUP_MAPPING = + new HashMap<>(); + + + public boolean localAddVGroup(MappingDO mappingDO) { + return VGROUP_MAPPING.computeIfAbsent(mappingDO.getUnit(), k -> new HashMap<>()).put(mappingDO.getVGroup(), + mappingDO) != null; + } + + public void localAddVGroups(Map vGroups, String unit) { + VGROUP_MAPPING.computeIfAbsent(unit, k -> new HashMap<>()).putAll(vGroups); + } + + @Override + public boolean addVGroup(MappingDO mappingDO) { + CompletableFuture completableFuture = new CompletableFuture<>(); + Closure closure = status -> { + if (status.isOk()) { + completableFuture.complete(localAddVGroup(mappingDO)); + } else { + completableFuture.complete(false); + } + }; + RaftVGroupSyncMsg raftVGroupSyncMsg = new RaftVGroupSyncMsg(mappingDO, RaftSyncMsgType.ADD_VGROUP_MAPPING); + try { + RaftTaskUtil.createTask(closure, raftVGroupSyncMsg, completableFuture); + return completableFuture.get(); + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException)e; + } + throw new RuntimeException(e); + } + } + + @Override + public boolean removeVGroup(String vGroup) { + CompletableFuture completableFuture = new CompletableFuture<>(); + Closure closure = status -> { + if (status.isOk()) { + completableFuture.complete(localRemoveVGroup(vGroup)); + } else { + completableFuture.complete(false); + } + }; + MappingDO mappingDO = new MappingDO(); + mappingDO.setVGroup(vGroup); + RaftVGroupSyncMsg raftVGroupSyncMsg = new RaftVGroupSyncMsg(mappingDO, RaftSyncMsgType.REMOVE_VGROUP_MAPPING); + try { + RaftTaskUtil.createTask(closure, raftVGroupSyncMsg, completableFuture); + return completableFuture.get(); + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException)e; + } + throw new RuntimeException(e); + } + } + + public boolean localRemoveVGroup(String vGroup) { + VGROUP_MAPPING.forEach((unit, vgroup) -> vgroup.remove(vGroup)); + return true; + } + + @Override + public Map loadVGroups() { + Map result = new HashMap<>(); + VGROUP_MAPPING.forEach((unit, vgroup) -> { + for (String group : vgroup.keySet()) { + result.put(group, unit); + } + }); + return result; + } + + public Map loadVGroupsByUnit(String unit) { + return VGROUP_MAPPING.getOrDefault(unit, Collections.emptyMap()); + } + + @Override + public Map readVGroups() { + return loadVGroups(); + } + + @Override + public void notifyMapping() { + Instance instance = Instance.getInstance(); + Map map = this.readVGroups(); + instance.addMetadata("vGroup", map); + for (String group : RaftServerManager.groups()) { + Instance node = instance.clone(); + node.setRole(RaftServerManager.isLeader(group) ? ClusterRole.LEADER : ClusterRole.FOLLOWER); + Instance.getInstances().add(node); + } + try { + InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort()); + for (RegistryService registryService : MultiRegistryFactory.getInstances()) { + registryService.register(address); + } + } catch (Exception e) { + throw new RuntimeException("vGroup mapping relationship notified failed! ", e); + } finally { + Instance.getInstances().clear(); + } + } + +} diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java index 4422bdc33f8..7b9245bf308 100644 --- a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java @@ -64,7 +64,7 @@ public boolean removeVGroup(String vGroup) { } @Override - public HashMap loadVGroups() { + public Map loadVGroups() { Instance instance = Instance.getInstance(); String namespace = REDIS_PREFIX + instance.getNamespace(); String clusterName = instance.getClusterName(); diff --git a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java index f2b2b8bd4f1..178e53d1c0d 100644 --- a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java @@ -23,7 +23,6 @@ import org.apache.seata.discovery.registry.RegistryService; import java.net.InetSocketAddress; -import java.util.HashMap; import java.util.Map; public interface VGroupMappingStoreManager { @@ -46,9 +45,9 @@ public interface VGroupMappingStoreManager { * * @return Key:vGroup,Value:unit */ - HashMap loadVGroups(); + Map loadVGroups(); - default HashMap readVGroups() { + default Map readVGroups() { return loadVGroups(); } diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager b/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager index 25265e50995..3f087458d82 100644 --- a/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager +++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager @@ -16,4 +16,5 @@ # org.apache.seata.server.storage.db.store.DataBaseVGroupMappingStoreManager org.apache.seata.server.storage.file.store.FileVGroupMappingStoreManager -org.apache.seata.server.storage.redis.store.RedisVGroupMappingStoreManager \ No newline at end of file +org.apache.seata.server.storage.redis.store.RedisVGroupMappingStoreManager +org.apache.seata.server.storage.raft.sore.RaftVGroupMappingStoreManager diff --git a/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java index e0343118a60..4de867f67aa 100644 --- a/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java +++ b/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -58,7 +60,7 @@ public void testAddVGroupSuccess() { assertTrue(fileVGroupMappingStoreManager.addVGroup(mappingDO)); - HashMap vGroups = fileVGroupMappingStoreManager.loadVGroups(); + Map vGroups = fileVGroupMappingStoreManager.loadVGroups(); assertEquals(UNIT, vGroups.get(VGROUP_NAME)); } @@ -71,7 +73,7 @@ public void testRemoveVGroupSuccess() { fileVGroupMappingStoreManager.addVGroup(mappingDO); assertTrue(fileVGroupMappingStoreManager.removeVGroup(VGROUP_NAME)); - HashMap vGroups = fileVGroupMappingStoreManager.loadVGroups(); + Map vGroups = fileVGroupMappingStoreManager.loadVGroups(); assertNull(vGroups.get(VGROUP_NAME)); } @@ -82,7 +84,7 @@ public void testLoadVGroups() throws IOException { File file = new File(STORE_PATH); FileUtils.writeStringToFile(file, "{\"testVGroup\":\"testUnit\"}", StandardCharsets.UTF_8); - HashMap actualMapping = fileVGroupMappingStoreManager.loadVGroups(); + Map actualMapping = fileVGroupMappingStoreManager.loadVGroups(); assertEquals(expectedMapping, actualMapping); } From d6674666f0d621d3d1e2bd84578d27395ff706f1 Mon Sep 17 00:00:00 2001 From: funkye Date: Mon, 24 Feb 2025 14:57:21 +0800 Subject: [PATCH 3/3] feature: migrate the console to the naming server (#7157) --- changes/en-us/2.x.md | 2 + changes/zh-cn/2.x.md | 1 + .../static/console-fe/src/locales/en-us.ts | 6 + .../static/console-fe/src/locales/zh-cn.ts | 6 + .../pages/GlobalLockInfo/GlobalLockInfo.tsx | 119 ++++++++++++-- .../pages/TransactionInfo/TransactionInfo.tsx | 126 +++++++++++++-- .../console-fe/src/service/globalLockInfo.ts | 22 +++ .../console-fe/src/service/transactionInfo.ts | 93 +++++++++-- dependencies/pom.xml | 6 + namingserver/pom.xml | 13 ++ .../namingserver/NamingserverApplication.java | 2 +- .../seata/namingserver/config/WebConfig.java | 79 +++++++++ .../namingserver/contants/NamingConstant.java | 29 ++++ .../controller/NamingController.java | 19 ++- .../namingserver/entity/vo/NamespaceVO.java | 43 +++++ .../filter/CachedBodyHttpServletRequest.java | 77 +++++++++ .../filter/ConsoleRemotingFilter.java | 151 ++++++++++++++++++ .../namingserver/manager/NamingManager.java | 35 +++- .../src/main/resources/application.yml | 12 +- .../src/test/resources/application.yml | 12 +- server/pom.xml | 2 + .../file/GlobalSessionFileServiceImpl.java | 4 + .../console/param/GlobalSessionParam.java | 25 ++- 23 files changed, 831 insertions(+), 53 deletions(-) create mode 100644 namingserver/src/main/java/org/apache/seata/namingserver/config/WebConfig.java create mode 100644 namingserver/src/main/java/org/apache/seata/namingserver/contants/NamingConstant.java create mode 100644 namingserver/src/main/java/org/apache/seata/namingserver/entity/vo/NamespaceVO.java create mode 100644 namingserver/src/main/java/org/apache/seata/namingserver/filter/CachedBodyHttpServletRequest.java create mode 100644 namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index d05d99853b2..d61607b66be 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -9,10 +9,12 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer +- [[#7157](https://github.com/apache/incubator-seata/pull/7157)] migrate the console to the naming server - [[#7114](https://github.com/apache/incubator-seata/pull/7114)] support raft mode registry to namingserver - [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement scheduled handling for end status transaction - [[#7171](https://github.com/apache/incubator-seata/pull/7171)] support EpollEventLoopGroup in client + ### bugfix: - [[#7104](https://github.com/apache/incubator-seata/pull/7104)] fix impl of supportsSourceType is not defined diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 3ba8323a69d..2cf5634e609 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -9,6 +9,7 @@ - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器 +- [[#7157](https://github.com/apache/incubator-seata/pull/7157)] 将console迁移至namingserver中 - [[#7114](https://github.com/apache/incubator-seata/pull/7114)] 支持raft集群注册至namingserver - [[#7133](https://github.com/apache/incubator-seata/pull/7133)] 实现对残留的end状态事务定时处理 - [[#7171](https://github.com/apache/incubator-seata/pull/7171)] 客户端支持 EpollEventLoopGroup diff --git a/console/src/main/resources/static/console-fe/src/locales/en-us.ts b/console/src/main/resources/static/console-fe/src/locales/en-us.ts index 6949c4514a6..449c61a3284 100644 --- a/console/src/main/resources/static/console-fe/src/locales/en-us.ts +++ b/console/src/main/resources/static/console-fe/src/locales/en-us.ts @@ -53,6 +53,9 @@ const enUs: ILocale = { subTitle: 'list', createTimeLabel: 'CreateTime', selectFilerPlaceholder: 'Please select filter criteria', + selectNamespaceFilerPlaceholder: 'Please select namespace', + selectClusterFilerPlaceholder: 'Please select cluster', + selectVGroupFilerPlaceholder: 'Please select vgroup', inputFilterPlaceholder: 'Please enter filter criteria', branchSessionSwitchLabel: 'Whether to include branch sessions', resetButtonLabel: 'Reset', @@ -77,6 +80,9 @@ const enUs: ILocale = { subTitle: 'list', createTimeLabel: 'CreateTime', inputFilterPlaceholder: 'Please enter filter criteria', + selectNamespaceFilerPlaceholder: 'Please select namespace', + selectClusterFilerPlaceholder: 'Please select cluster', + selectVGroupFilerPlaceholder: 'Please select vgroup', resetButtonLabel: 'Reset', searchButtonLabel: 'Search', operateTitle: 'operate', diff --git a/console/src/main/resources/static/console-fe/src/locales/zh-cn.ts b/console/src/main/resources/static/console-fe/src/locales/zh-cn.ts index a4ea5e29c15..089802559ae 100644 --- a/console/src/main/resources/static/console-fe/src/locales/zh-cn.ts +++ b/console/src/main/resources/static/console-fe/src/locales/zh-cn.ts @@ -53,6 +53,9 @@ const zhCn: ILocale = { subTitle: '基础列表页', createTimeLabel: '创建时间', selectFilerPlaceholder: '请选择筛选条件', + selectNamespaceFilerPlaceholder: '请选择命名空间', + selectClusterFilerPlaceholder: '请选择集群', + selectVGroupFilerPlaceholder: '请选择事务分组', inputFilterPlaceholder: '请输入筛选条件', branchSessionSwitchLabel: '是否包含分支事务', resetButtonLabel: '重置', @@ -77,6 +80,9 @@ const zhCn: ILocale = { subTitle: '基础列表页', createTimeLabel: '创建时间', inputFilterPlaceholder: '请输入筛选条件', + selectNamespaceFilerPlaceholder: '请选择命名空间', + selectClusterFilerPlaceholder: '请选择集群', + selectVGroupFilerPlaceholder: '请选择事务分组', resetButtonLabel: '重置', searchButtonLabel: '搜索', operateTitle: '操作', diff --git a/console/src/main/resources/static/console-fe/src/pages/GlobalLockInfo/GlobalLockInfo.tsx b/console/src/main/resources/static/console-fe/src/pages/GlobalLockInfo/GlobalLockInfo.tsx index 31e235769f8..a356a506a4b 100644 --- a/console/src/main/resources/static/console-fe/src/pages/GlobalLockInfo/GlobalLockInfo.tsx +++ b/console/src/main/resources/static/console-fe/src/pages/GlobalLockInfo/GlobalLockInfo.tsx @@ -15,7 +15,19 @@ * limitations under the License. */ import React from 'react'; -import { ConfigProvider, Table, Button, DatePicker, Form, Icon, Pagination, Input, Dialog, Message } from '@alicloud/console-components'; +import { + ConfigProvider, + Table, + Button, + DatePicker, + Form, + Icon, + Pagination, + Input, + Dialog, + Message, + Select +} from '@alicloud/console-components'; import Actions, { LinkButton } from '@alicloud/console-components-actions'; import { withRouter } from 'react-router-dom'; import Page from '@/components/Page'; @@ -28,6 +40,7 @@ import moment from 'moment'; import './index.scss'; import {get} from "lodash"; import {enUsKey, getCurrentLanguage} from "@/reducers/locale"; +import {fetchNamespace} from "@/service/transactionInfo"; const { RangePicker } = DatePicker; const FormItem = Form.Item; @@ -35,6 +48,9 @@ const FormItem = Form.Item; type GlobalLockInfoState = { list: Array; total: number; + namespaceOptions: Map; + clusters: Array; + vgroups: Array; loading: boolean; globalLockParam: GlobalLockParam; } @@ -55,17 +71,23 @@ class GlobalLockInfo extends React.Component { pageSize: 10, pageNum: 1, }, + namespaceOptions: new Map(), + clusters: [], + vgroups: [], } componentDidMount = () => { // @ts-ignore const { query } = this.props.history.location; if (query !== undefined) { - const { xid } = query; - if (xid !== undefined) { + const { xid,vgroup ,namespace,cluster} = query; + if (xid !== undefined && vgroup !== undefined) { this.setState({ globalLockParam: { xid, + vgroup, + namespace, + cluster, pageSize: 10, pageNum: 1, }, @@ -73,10 +95,42 @@ class GlobalLockInfo extends React.Component { return; } } - // search once by default anyway - this.search(); + this.loadNamespaces(); + } + loadNamespaces = async () => { + try { + const namespaces = await fetchNamespace(); + const namespaceOptions = new Map(); + Object.keys(namespaces).forEach(namespaceKey => { + const namespaceData = namespaces[namespaceKey]; + namespaceOptions.set(namespaceKey, { + clusters: namespaceData.clusters, + vgroups: namespaceData.vgroups, + }); + }); + if (namespaceOptions.size > 0) { + // Set default namespace to the first option + const firstNamespace = Array.from(namespaceOptions.keys())[0]; + const selectedNamespace = namespaceOptions.get(firstNamespace); + this.setState({ + namespaceOptions, + globalLockParam: { + ...this.state.globalLockParam, + namespace: firstNamespace, + cluster: selectedNamespace ? selectedNamespace.clusters[0] : undefined, + }, + clusters: selectedNamespace ? selectedNamespace.clusters : [], + }); + this.search(); + } else { + this.setState({ + namespaceOptions, + }); + } + } catch (error) { + console.error('Failed to fetch namespaces:', error); + } } - resetSearchFilter = () => { this.setState({ globalLockParam: { @@ -128,10 +182,19 @@ class GlobalLockInfo extends React.Component { } searchFilterOnChange = (key:string, val:string) => { - this.setState({ - globalLockParam: Object.assign(this.state.globalLockParam, - { [key]: val }), - }); + if (key === 'namespace') { + const selectedNamespace = this.state.namespaceOptions.get(val); + this.setState({ + clusters: selectedNamespace ? selectedNamespace.clusters : [], + vgroups: selectedNamespace ? selectedNamespace.vgroups : [], + globalLockParam: Object.assign(this.state.globalLockParam, {[key]: val}), + }); + } else { + this.setState({ + globalLockParam: Object.assign(this.state.globalLockParam, + {[key]: val}), + }); + } } paginationOnChange = (current: number, e: {}) => { @@ -194,6 +257,9 @@ class GlobalLockInfo extends React.Component { const { locale = {} } = this.props; const { title, subTitle, createTimeLabel, inputFilterPlaceholder, + selectNamespaceFilerPlaceholder, + selectClusterFilerPlaceholder, + selectVGroupFilerPlaceholder, searchButtonLabel, resetButtonLabel, operateTitle, @@ -248,7 +314,38 @@ class GlobalLockInfo extends React.Component { onChange={(value: string) => { this.searchFilterOnChange('branchId', value); }} /> - + + { + this.searchFilterOnChange('cluster', value); + }} + dataSource={this.state.clusters.map(value => ({ label: value, value }))} + value={this.state.globalLockParam.cluster} + /> + + + { + this.searchFilterOnChange('namespace', value); + }} + dataSource={Array.from(this.state.namespaceOptions.keys()).map(key => ({ label: key, value: key }))} + value={this.state.globalSessionParam.namespace} + /> + + + { + this.searchFilterOnChange('vgroup', value); + }} + dataSource={this.state.vgroups.map(value => ({ label: value, value }))} + /> + {/* {branch session switch} */} { + const result = await request.get('/naming/namespace', { + method: 'get', + }); + return result.data; +} + export default async function fetchData(params:GlobalLockParam):Promise { let result = await request('/console/globalLock/query', { method: 'get', params, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, + }, }); return result; @@ -42,6 +56,10 @@ export async function deleteData(params: GlobalLockParam): Promise { let result = await request('/console/globalLock/delete', { method: 'delete', params, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, + }, }); return result; } @@ -56,6 +74,10 @@ export async function checkData(params: GlobalLockParam): Promise { xid, branchId }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, + }, }); return result; } diff --git a/console/src/main/resources/static/console-fe/src/service/transactionInfo.ts b/console/src/main/resources/static/console-fe/src/service/transactionInfo.ts index 64cdaea80ad..932ad1bf360 100644 --- a/console/src/main/resources/static/console-fe/src/service/transactionInfo.ts +++ b/console/src/main/resources/static/console-fe/src/service/transactionInfo.ts @@ -24,6 +24,9 @@ export type GlobalSessionParam = { withBranch: boolean, pageSize: number, pageNum: number, + namespace?: string, + cluster?: string, + vgroup?: string, timeStart?: number, timeEnd?: number }; @@ -33,13 +36,27 @@ export type BranchSessionParam = { branchId?: string, applicationId?: string, status?: number, + namespace?: string, + cluster?: string, + vgroup?: string, transactionName?: string, }; +export async function fetchNamespace():Promise { + const result = await request.get('/naming/namespace', { + method: 'get', + }); + return result.data; +} + export default async function fetchData(params:GlobalSessionParam):Promise { let result = await request('/console/globalSession/query', { method: 'get', params, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, + }, }); return result; @@ -47,10 +64,16 @@ export default async function fetchData(params:GlobalSessionParam):Promise export async function deleteGlobalData(params: GlobalSessionParam): Promise { const xid = params.xid + const vgroup = params.vgroup let result = await request('/console/globalSession/deleteGlobalSession', { method: 'delete', params: { - xid + xid, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -58,10 +81,16 @@ export async function deleteGlobalData(params: GlobalSessionParam): Promise export async function forceDeleteGlobalData(params: GlobalSessionParam): Promise { const xid = params.xid + const vgroup = params.vgroup let result = await request('/console/globalSession/forceDeleteGlobalSession', { method: 'delete', params: { - xid + xid, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -69,10 +98,16 @@ export async function forceDeleteGlobalData(params: GlobalSessionParam): Promise export async function stopGlobalData(params: GlobalSessionParam): Promise { const xid = params.xid + const vgroup = params.vgroup let result = await request('/console/globalSession/stopGlobalSession', { method: 'PUT', params: { - xid + xid, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -80,10 +115,16 @@ export async function stopGlobalData(params: GlobalSessionParam): Promise { export async function startGlobalData(params: GlobalSessionParam): Promise { const xid = params.xid + const vgroup = params.vgroup let result = await request('/console/globalSession/startGlobalSession', { method: 'PUT', params: { - xid + xid, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -91,10 +132,12 @@ export async function startGlobalData(params: GlobalSessionParam): Promise export async function sendGlobalCommitOrRollback(params: BranchSessionParam): Promise { const xid = params.xid + const vgroup = params.vgroup let result = await request('/console/globalSession/sendCommitOrRollback', { method: 'PUT', params: { - xid + xid, + vgroup }, }); return result; @@ -102,10 +145,16 @@ export async function sendGlobalCommitOrRollback(params: BranchSessionParam): Pr export async function changeGlobalData(params: GlobalSessionParam): Promise { const xid = params.xid + const vgroup = params.vgroup let result = await request('/console/globalSession/changeGlobalStatus', { method: 'PUT', params: { - xid + xid, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -114,11 +163,17 @@ export async function changeGlobalData(params: GlobalSessionParam): Promise export async function deleteBranchData(params: BranchSessionParam): Promise { const xid = params.xid const branchId = params.branchId + const vgroup = params.vgroup let result = await request('/console/branchSession/deleteBranchSession', { method: 'delete', params: { xid, - branchId + branchId, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -127,11 +182,17 @@ export async function deleteBranchData(params: BranchSessionParam): Promise export async function forceDeleteBranchData(params: BranchSessionParam): Promise { const xid = params.xid const branchId = params.branchId + const vgroup = params.vgroup let result = await request('/console/branchSession/forceDeleteBranchSession', { method: 'delete', params: { xid, - branchId + branchId, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -140,11 +201,17 @@ export async function forceDeleteBranchData(params: BranchSessionParam): Promise export async function stopBranchData(params: BranchSessionParam): Promise { const xid = params.xid const branchId = params.branchId + const vgroup = params.vgroup let result = await request('/console/branchSession/stopBranchSession', { method: 'PUT', params: { xid, - branchId + branchId, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; @@ -153,11 +220,17 @@ export async function stopBranchData(params: BranchSessionParam): Promise { export async function startBranchData(params: BranchSessionParam): Promise { const xid = params.xid const branchId = params.branchId + const vgroup = params.vgroup let result = await request('/console/branchSession/startBranchSession', { method: 'PUT', params: { xid, - branchId + branchId, + vgroup + }, + headers: { + 'x-seata-namespace': params.namespace, + 'x-seata-cluster': params.cluster, }, }); return result; diff --git a/dependencies/pom.xml b/dependencies/pom.xml index d5483d88cfd..fb787080a70 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -47,6 +47,7 @@ 1.2.1 1.2 2.6 + 2.8.0 1.0 0.11 3.7.2 @@ -885,6 +886,11 @@ ${fury.version} + commons-io + commons-io + ${commons-io.version} + + com.fasterxml.jackson.core jackson-databind ${jackson.version} diff --git a/namingserver/pom.xml b/namingserver/pom.xml index fdc7c9f12c1..0f07aa44874 100644 --- a/namingserver/pom.xml +++ b/namingserver/pom.xml @@ -67,6 +67,15 @@ + + commons-io + commons-io + + + ${project.groupId} + seata-console + ${project.version} + org.codehaus.janino janino @@ -167,6 +176,10 @@ org.apache.commons commons-lang3 + + org.apache.httpcomponents + httpasyncclient + diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java b/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java index dbdc1a47fce..9b3f501d4e3 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java @@ -19,7 +19,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication +@SpringBootApplication(scanBasePackages = {"org.apache.seata"}) public class NamingserverApplication { public static void main(String[] args) { diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/config/WebConfig.java b/namingserver/src/main/java/org/apache/seata/namingserver/config/WebConfig.java new file mode 100644 index 00000000000..2398ab154c6 --- /dev/null +++ b/namingserver/src/main/java/org/apache/seata/namingserver/config/WebConfig.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.config; + +import javax.servlet.Filter; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.seata.namingserver.filter.ConsoleRemotingFilter; +import org.apache.seata.namingserver.manager.NamingManager; +import org.springframework.boot.web.servlet.FilterRegistrationBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; + + +import static org.apache.seata.namingserver.contants.NamingConstant.DEFAULT_CONNECTION_MAX_PER_ROUTE; +import static org.apache.seata.namingserver.contants.NamingConstant.DEFAULT_CONNECTION_MAX_TOTAL; +import static org.apache.seata.namingserver.contants.NamingConstant.DEFAULT_REQUEST_TIMEOUT; + +@Configuration +public class WebConfig { + + @Bean + public RestTemplate restTemplate() { + // Create a connection manager with custom settings + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(DEFAULT_CONNECTION_MAX_TOTAL); // Maximum total connections + connectionManager.setDefaultMaxPerRoute(DEFAULT_CONNECTION_MAX_PER_ROUTE); // Maximum connections per route + // Create an HttpClient with the connection manager + CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).build(); + // Create a request factory with the HttpClient + HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient); + requestFactory.setConnectTimeout(DEFAULT_REQUEST_TIMEOUT); // Connection timeout in milliseconds + requestFactory.setReadTimeout(DEFAULT_REQUEST_TIMEOUT); // Read timeout in milliseconds + // Create and return a RestTemplate with the custom request factory + return new RestTemplate(requestFactory); + } + + @Bean + public AsyncRestTemplate asyncRestTemplate(RestTemplate restTemplate) { + HttpComponentsAsyncClientHttpRequestFactory asyncClientHttpRequestFactory = + new HttpComponentsAsyncClientHttpRequestFactory(); + asyncClientHttpRequestFactory.setConnectionRequestTimeout(DEFAULT_REQUEST_TIMEOUT); // Connection request timeout in milliseconds + asyncClientHttpRequestFactory.setConnectTimeout(DEFAULT_REQUEST_TIMEOUT); // Connection timeout in milliseconds + asyncClientHttpRequestFactory.setReadTimeout(DEFAULT_REQUEST_TIMEOUT); // Read timeout in milliseconds + return new AsyncRestTemplate(asyncClientHttpRequestFactory, restTemplate); + } + + @Bean + public FilterRegistrationBean consoleRemotingFilter(NamingManager namingManager, + AsyncRestTemplate asyncRestTemplate) { + ConsoleRemotingFilter consoleRemotingFilter = new ConsoleRemotingFilter(namingManager, asyncRestTemplate); + FilterRegistrationBean registration = new FilterRegistrationBean<>(); + registration.setFilter(consoleRemotingFilter); + registration.addUrlPatterns("/*"); + registration.setOrder(Ordered.HIGHEST_PRECEDENCE); + return registration; + } + +} diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/contants/NamingConstant.java b/namingserver/src/main/java/org/apache/seata/namingserver/contants/NamingConstant.java new file mode 100644 index 00000000000..2e5a1b59d47 --- /dev/null +++ b/namingserver/src/main/java/org/apache/seata/namingserver/contants/NamingConstant.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.contants; + +public interface NamingConstant { + + String CONSOLE_PATTERN = "^/api/.*/console/.*"; + + int DEFAULT_REQUEST_TIMEOUT = 5000; + + int DEFAULT_CONNECTION_MAX_TOTAL = 100; + + int DEFAULT_CONNECTION_MAX_PER_ROUTE = 20; + +} diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java index 672c378b9d5..e194dbe73f6 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java @@ -20,6 +20,8 @@ import org.apache.seata.common.metadata.namingserver.MetaResponse; import org.apache.seata.common.metadata.namingserver.NamingServerNode; import org.apache.seata.common.result.Result; +import org.apache.seata.common.result.SingleResult; +import org.apache.seata.namingserver.entity.vo.NamespaceVO; import org.apache.seata.namingserver.listener.Watcher; import org.apache.seata.namingserver.manager.ClusterWatcherManager; import org.apache.seata.namingserver.manager.NamingManager; @@ -39,14 +41,15 @@ import javax.servlet.AsyncContext; import javax.servlet.http.HttpServletRequest; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @RestController -@RequestMapping("/naming/v1") +@RequestMapping(value = {"/naming/v1", "/api/v1/naming"}) public class NamingController { - private static final Logger LOGGER = LoggerFactory.getLogger(NamingController.class); + private static final Logger LOGGER = LoggerFactory.getLogger(NamingController.class); @Resource private NamingManager namingManager; @@ -111,10 +114,8 @@ public MetaResponse discovery(@RequestParam String vGroup, @RequestParam String } @PostMapping("/addGroup") - public Result addGroup(@RequestParam String namespace, - @RequestParam String clusterName, - @RequestParam String unitName, - @RequestParam String vGroup) { + public Result addGroup(@RequestParam String namespace, @RequestParam String clusterName, String unitName, + @RequestParam String vGroup) { Result addGroupResult = namingManager.createGroup(namespace, vGroup, clusterName, unitName); if (!addGroupResult.isSuccess()) { @@ -128,7 +129,6 @@ public Result changeGroup(@RequestParam String namespace, @RequestParam String clusterName, @RequestParam String unitName, @RequestParam String vGroup) { - Result addGroupResult = namingManager.changeGroup(namespace, vGroup, clusterName, unitName); if (!addGroupResult.isSuccess()) { return addGroupResult; @@ -136,6 +136,11 @@ public Result changeGroup(@RequestParam String namespace, return new Result<>("200", "change vGroup " + vGroup + "to cluster " + clusterName + " successfully!"); } + @GetMapping("/namespace") + public SingleResult> namespaces() { + return namingManager.namespace(); + } + /** * @param clientTerm The timestamp of the subscription saved on the client side * @param vGroup The name of the transaction group diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/vo/NamespaceVO.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/vo/NamespaceVO.java new file mode 100644 index 00000000000..91b53404ac3 --- /dev/null +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/vo/NamespaceVO.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.entity.vo; + +import java.util.ArrayList; +import java.util.List; + +public class NamespaceVO { + + List clusters = new ArrayList<>(); + + List vgroups = new ArrayList<>(); + + public List getClusters() { + return clusters; + } + + public void setClusters(List clusters) { + this.clusters = clusters; + } + + public List getVgroups() { + return vgroups; + } + + public void setVgroups(List vgroups) { + this.vgroups = vgroups; + } +} diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/filter/CachedBodyHttpServletRequest.java b/namingserver/src/main/java/org/apache/seata/namingserver/filter/CachedBodyHttpServletRequest.java new file mode 100644 index 00000000000..db74bfddbc8 --- /dev/null +++ b/namingserver/src/main/java/org/apache/seata/namingserver/filter/CachedBodyHttpServletRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.filter; + +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import org.apache.commons.io.IOUtils; + +public class CachedBodyHttpServletRequest extends HttpServletRequestWrapper { + private final byte[] cachedBody; + + public CachedBodyHttpServletRequest(HttpServletRequest request) throws IOException { + super(request); + InputStream requestInputStream = request.getInputStream(); + this.cachedBody = toByteArray(requestInputStream); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + return new CachedBodyServletInputStream(this.cachedBody); + } + + private byte[] toByteArray(InputStream input) throws IOException { + return IOUtils.toByteArray(input); + } + + public byte[] getCachedBody() { + return cachedBody; + } + + private static class CachedBodyServletInputStream extends ServletInputStream { + private final ByteArrayInputStream inputStream; + + public CachedBodyServletInputStream(byte[] cachedBody) { + this.inputStream = new ByteArrayInputStream(cachedBody); + } + + @Override + public boolean isFinished() { + return inputStream.available() == 0; + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public void setReadListener(ReadListener readListener) { + throw new UnsupportedOperationException(); + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + } +} diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java new file mode 100644 index 00000000000..7c24593d033 --- /dev/null +++ b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.filter; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Pattern; +import javax.servlet.AsyncContext; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.namingserver.manager.NamingManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.web.client.AsyncRestTemplate; + + +import static org.apache.seata.namingserver.contants.NamingConstant.CONSOLE_PATTERN; + +public class ConsoleRemotingFilter implements Filter { + + private final NamingManager namingManager; + + private final AsyncRestTemplate asyncRestTemplate; + + private final Pattern urlPattern = Pattern.compile(CONSOLE_PATTERN); + + private final Logger logger = LoggerFactory.getLogger(ConsoleRemotingFilter.class); + + public ConsoleRemotingFilter(NamingManager namingManager, AsyncRestTemplate asyncRestTemplate) { + this.namingManager = namingManager; + this.asyncRestTemplate = asyncRestTemplate; + } + + @Override + public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) + throws IOException, ServletException { + if (servletRequest instanceof HttpServletRequest) { + if (urlPattern.matcher(((HttpServletRequest)servletRequest).getRequestURI()).matches()) { + CachedBodyHttpServletRequest request = + new CachedBodyHttpServletRequest((HttpServletRequest)servletRequest); + HttpServletResponse response = (HttpServletResponse)servletResponse; + String namespace = request.getHeader("x-seata-namespace"); + String cluster = request.getHeader("x-seata-cluster"); + String vgroup = request.getParameter("vgroup"); + if (StringUtils.isNotBlank(namespace) + && (StringUtils.isNotBlank(cluster) || StringUtils.isNotBlank(vgroup))) { + List list = null; + if (StringUtils.isNotBlank(vgroup)) { + list = namingManager.getInstancesByVgroupAndNamespace(namespace, vgroup); + } else if (StringUtils.isNotBlank(cluster)) { + list = namingManager.getInstances(namespace, cluster); + } + if (CollectionUtils.isNotEmpty(list)) { + // Randomly select a node from the list + Node node = list.get(ThreadLocalRandom.current().nextInt(list.size())); + Node.Endpoint controlEndpoint = node.getControl(); + + if (controlEndpoint != null) { + // Construct the target URL + String targetUrl = "http://" + controlEndpoint.getHost() + ":" + controlEndpoint.getPort() + + request.getRequestURI() + + (request.getQueryString() != null ? "?" + request.getQueryString() : ""); + + // Copy headers from the original request + HttpHeaders headers = new HttpHeaders(); + Collections.list(request.getHeaderNames()) + .forEach(headerName -> headers.add(headerName, request.getHeader(headerName))); + + // Create the HttpEntity with headers and body + HttpEntity httpEntity = new HttpEntity<>(request.getCachedBody(), headers); + + // Forward the request + AsyncContext asyncContext = servletRequest.startAsync(); + asyncContext.setTimeout(5000L); + ListenableFuture> responseEntityFuture = asyncRestTemplate.exchange( + URI.create(targetUrl), Objects.requireNonNull(HttpMethod.resolve(request.getMethod())), + httpEntity, byte[].class); + responseEntityFuture.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable ex) { + try { + logger.error(ex.getMessage(), ex); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } finally { + asyncContext.complete(); + } + } + + @Override + public void onSuccess(ResponseEntity responseEntity) { + // Copy response headers and status code + responseEntity.getHeaders().forEach((key, value) -> { + value.forEach(v -> response.addHeader(key, v)); + }); + response.setStatus(responseEntity.getStatusCodeValue()); + // Write response body + Optional.ofNullable(responseEntity.getBody()).ifPresent(body -> { + try (ServletOutputStream outputStream = response.getOutputStream()) { + outputStream.write(body); + outputStream.flush(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + }); + asyncContext.complete(); + } + }); + return; + } + } + } + } + } + filterChain.doFilter(servletRequest, servletResponse); + } + +} diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 7e6b74328e0..069f321881c 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -48,11 +48,13 @@ import org.apache.seata.common.metadata.namingserver.NamingServerNode; import org.apache.seata.common.metadata.namingserver.Unit; import org.apache.seata.common.result.Result; +import org.apache.seata.common.result.SingleResult; import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.common.NamingServerConstants; import org.apache.seata.common.util.StringUtils; import org.apache.seata.namingserver.entity.bo.ClusterBO; import org.apache.seata.namingserver.entity.bo.NamespaceBO; +import org.apache.seata.namingserver.entity.vo.NamespaceVO; import org.apache.seata.namingserver.listener.ClusterChangeEvent; import org.apache.seata.namingserver.entity.pojo.ClusterData; import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO; @@ -74,7 +76,7 @@ public class NamingManager { private static final Logger LOGGER = LoggerFactory.getLogger(NamingManager.class); private final ConcurrentMap instanceLiveTable; - private volatile LoadingCache> vGroupMap; + private volatile LoadingCache> vGroupMap; private final ConcurrentMap> namespaceClusterDataMap; @@ -328,6 +330,15 @@ public List getInstances(String namespace, String clusterName) { return clusterData.getInstanceList(); } + public List getInstancesByVgroupAndNamespace(String namespace, String vgroup) { + List clusters = getClusterListByVgroup(vgroup, namespace); + if (CollectionUtils.isEmpty(clusters)) { + return Collections.emptyList(); + } else { + return getInstances(namespace, clusters.get(0).getClusterName()); + } + } + public void instanceHeartBeatCheck() { for (String namespace : namespaceClusterDataMap.keySet()) { for (ClusterData clusterData : namespaceClusterDataMap.get(namespace).values()) { @@ -405,4 +416,26 @@ public Result changeGroup(String namespace, String vGroup, String cluste return Optional.ofNullable(result.get()).orElseGet(() -> new Result<>("200", "change vGroup successfully!")); } + public SingleResult> namespace() { + // namespace->cluster->vgroups + Map namespaceVOs = new HashMap<>(); + Map> currentVGourpMap = + new HashMap<>(vGroupMap.asMap()); + if (currentVGourpMap.isEmpty()) { + namespaceClusterDataMap.forEach((namespace, clusterDataMap) -> { + NamespaceVO namespaceVO = new NamespaceVO(); + namespaceVO.setClusters(new ArrayList<>(clusterDataMap.keySet())); + namespaceVOs.put(namespace, namespaceVO); + }); + return SingleResult.success(namespaceVOs); + } + currentVGourpMap.forEach((vGroup, namespaceMap) -> namespaceMap + .forEach((namespace, namespaceBO) -> namespaceBO.getClusterMap().forEach((clusterName, clusterBO) -> { + NamespaceVO namespaceVO = namespaceVOs.computeIfAbsent(namespace, value -> new NamespaceVO()); + namespaceVO.getClusters().add(clusterName); + namespaceVO.getVgroups().add(vGroup); + }))); + return SingleResult.success(namespaceVOs); + } + } diff --git a/namingserver/src/main/resources/application.yml b/namingserver/src/main/resources/application.yml index ca73a0f2abd..03817e23a84 100644 --- a/namingserver/src/main/resources/application.yml +++ b/namingserver/src/main/resources/application.yml @@ -25,6 +25,16 @@ logging: config: classpath:logback-spring.xml file: path: ${log.home:${user.home}/logs/seata} +console: + user: + username: seata + password: seata heartbeat: threshold: 90000 - period: 60000 \ No newline at end of file + period: 60000 +seata: + security: + secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 + tokenValidityInMilliseconds: 1800000 + ignore: + urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/health,/error,/naming/v1/** diff --git a/namingserver/src/test/resources/application.yml b/namingserver/src/test/resources/application.yml index 0aaca022b8f..573dde0b268 100644 --- a/namingserver/src/test/resources/application.yml +++ b/namingserver/src/test/resources/application.yml @@ -25,6 +25,16 @@ logging: config: classpath:logback-spring.xml file: path: ${log.home:${user.home}/logs/seata} +console: + user: + username: seata + password: seata heartbeat: threshold: 5000 - period: 5000 \ No newline at end of file + period: 5000 +seata: + security: + secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 + tokenValidityInMilliseconds: 1800000 + ignore: + urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/health,/error,/naming/v1/** diff --git a/server/pom.xml b/server/pom.xml index dba1d3f5ff6..1f3d9d6cdf3 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -203,6 +203,7 @@ seata-metrics-all ${project.version} + com.alibaba diff --git a/server/src/main/java/org/apache/seata/server/console/impl/file/GlobalSessionFileServiceImpl.java b/server/src/main/java/org/apache/seata/server/console/impl/file/GlobalSessionFileServiceImpl.java index 5d62e352e59..c3c93082013 100644 --- a/server/src/main/java/org/apache/seata/server/console/impl/file/GlobalSessionFileServiceImpl.java +++ b/server/src/main/java/org/apache/seata/server/console/impl/file/GlobalSessionFileServiceImpl.java @@ -87,6 +87,10 @@ private Predicate obtainPredicate(GlobalSessionParam para && // transactionName (isBlank(param.getTransactionName()) || session.getTransactionName().contains(param.getTransactionName())) + && + + // vgroup + (isBlank(param.getVgroup()) || session.getTransactionServiceGroup().equals(param.getVgroup())) && // timeStart diff --git a/server/src/main/java/org/apache/seata/server/console/param/GlobalSessionParam.java b/server/src/main/java/org/apache/seata/server/console/param/GlobalSessionParam.java index 78ba2b6a4e9..88e61aef8bd 100644 --- a/server/src/main/java/org/apache/seata/server/console/param/GlobalSessionParam.java +++ b/server/src/main/java/org/apache/seata/server/console/param/GlobalSessionParam.java @@ -43,6 +43,12 @@ public class GlobalSessionParam extends BaseParam implements Serializable { * the transaction name */ private String transactionName; + + /** + * the vgroup + */ + private String vgroup; + /** * if with branch * true: with branch session @@ -90,14 +96,19 @@ public void setWithBranch(boolean withBranch) { this.withBranch = withBranch; } + public String getVgroup() { + return vgroup; + } + + public void setVgroup(String vgroup) { + this.vgroup = vgroup; + } + @Override public String toString() { - return "GlobalSessionParam{" + - "xid='" + xid + '\'' + - ", applicationId='" + applicationId + '\'' + - ", status=" + status + - ", transactionName='" + transactionName + '\'' + - ", withBranch=" + withBranch + - '}'; + return "GlobalSessionParam{" + "xid='" + xid + '\'' + ", applicationId='" + applicationId + '\'' + ", status=" + + status + ", transactionName='" + transactionName + '\'' + ", vgroup='" + vgroup + '\'' + ", withBranch=" + + withBranch + '}'; } + }