diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/profile/GraphsAPI.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/profile/GraphsAPI.java index f45c228baf..7fd17c65d9 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/profile/GraphsAPI.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/profile/GraphsAPI.java @@ -29,6 +29,8 @@ import org.apache.hugegraph.auth.HugePermission; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.core.GraphManager; +import org.apache.hugegraph.pd.client.KvClient; +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; import org.apache.hugegraph.type.define.GraphMode; import org.apache.hugegraph.type.define.GraphReadMode; import org.apache.hugegraph.util.E; @@ -66,6 +68,8 @@ public class GraphsAPI extends API { private static final String CONFIRM_CLEAR = "I'm sure to delete all data"; private static final String CONFIRM_DROP = "I'm sure to drop the graph"; + private KvClient client; + @GET @Timed @Produces(APPLICATION_JSON_WITH_CHARSET) diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index daae180a36..ebb75600bb 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -29,6 +29,7 @@ import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.variables.CheckList; import org.apache.hugegraph.HugeFactory; import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.auth.AuthManager; @@ -53,6 +54,10 @@ import org.apache.hugegraph.masterelection.StandardRoleListener; import org.apache.hugegraph.metrics.MetricsUtil; import org.apache.hugegraph.metrics.ServerReporter; +import org.apache.hugegraph.pd.client.KvClient; +import org.apache.hugegraph.pd.client.PDConfig; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; import org.apache.hugegraph.rpc.RpcClientProvider; import org.apache.hugegraph.rpc.RpcConsumerConfig; import org.apache.hugegraph.rpc.RpcProviderConfig; @@ -66,12 +71,14 @@ import org.apache.hugegraph.util.ConfigUtil; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Events; +import org.apache.hugegraph.util.JsonUtil; import org.apache.hugegraph.util.Log; import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException; import org.apache.tinkerpop.gremlin.server.util.MetricManager; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.junit.Before; import org.slf4j.Logger; import com.alipay.sofa.rpc.config.ServerConfig; @@ -94,6 +101,7 @@ public final class GraphManager { private final HugeConfig conf; private final EventHub eventHub; + public GraphManager(HugeConfig conf, EventHub hub) { this.graphsDir = conf.get(ServerOptions.GRAPHS); this.graphs = new ConcurrentHashMap<>(); @@ -172,6 +180,8 @@ public HugeGraph cloneGraph(String name, String newName, String configText) { } public HugeGraph createGraph(String name, String configText) { + + E.checkArgument(this.conf.get(ServerOptions.ENABLE_DYNAMIC_CREATE_DROP), "Not allowed to create graph '%s' dynamically, " + "please set `enable_dynamic_create_drop` to true.", @@ -180,11 +190,13 @@ public HugeGraph createGraph(String name, String configText) { "The graph name can't be null or empty"); E.checkArgument(!this.graphs().contains(name), "The graph name '%s' has existed", name); - PropertiesConfiguration propConfig = ConfigUtil.buildConfig(configText); HugeConfig config = new HugeConfig(propConfig); this.checkOptions(config); + return this.createGraph(config, name); + } + public HugeGraph createGraphRecover(HugeConfig config, String name) { return this.createGraph(config, name); } @@ -581,7 +593,7 @@ private void notifyAndWaitEvent(String event, HugeGraph graph) { } } - private HugeGraph createGraph(HugeConfig config, String name) { + private HugeGraph createGraph(HugeConfig config, String name){ HugeGraph graph = null; try { // Create graph instance diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java index 13d654d174..8d2c277afd 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java @@ -35,6 +35,7 @@ import org.apache.hugegraph.config.TypedOption; import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.masterelection.RoleElectionStateMachine; +import org.apache.hugegraph.pd.common.PDException; import org.apache.hugegraph.rpc.RpcServiceConfig4Client; import org.apache.hugegraph.rpc.RpcServiceConfig4Server; import org.apache.hugegraph.schema.EdgeLabel; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index eb991c0f68..d0fa695a58 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -73,6 +73,11 @@ import org.apache.hugegraph.memory.MemoryManager; import org.apache.hugegraph.memory.util.RoundUtil; import org.apache.hugegraph.meta.MetaManager; +import org.apache.hugegraph.pd.client.KvClient; +import org.apache.hugegraph.pd.client.PDConfig; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.kv.KResponse; +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; import org.apache.hugegraph.perf.PerfUtil.Watched; import org.apache.hugegraph.rpc.RpcServiceConfig4Client; import org.apache.hugegraph.rpc.RpcServiceConfig4Server; @@ -99,8 +104,10 @@ import org.apache.hugegraph.util.DateUtil; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Events; +import org.apache.hugegraph.util.JsonUtil; import org.apache.hugegraph.util.LockUtil; import org.apache.hugegraph.util.Log; +import org.apache.hugegraph.variables.CheckList; import org.apache.hugegraph.variables.HugeVariables; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.structure.Edge; @@ -182,6 +189,8 @@ public class StandardHugeGraph implements HugeGraph { private final String schedulerType; + private KvClient client = new KvClient<>(PDConfig.of("localhost:8686")); + private CheckList checkList; public StandardHugeGraph(HugeConfig config) { this.params = new StandardHugeGraphParams(); this.configuration = config; @@ -216,6 +225,7 @@ public StandardHugeGraph(HugeConfig config) { this.mode = GraphMode.NONE; this.readMode = GraphReadMode.OLTP_ONLY; this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE); + this.checkList = new CheckList(this.name, this.configuration); MemoryManager.setMemoryMode( MemoryManager.MemoryMode.fromValue(config.get(CoreOptions.MEMORY_MODE))); @@ -294,8 +304,10 @@ public void serverStarted(GlobalMasterInfo nodeInfo) { LOG.info("Init server info [{}-{}] for graph '{}'...", nodeInfo.nodeId(), nodeInfo.nodeRole(), this.name); - this.serverInfoManager().initServerInfo(nodeInfo); - + if(!this.checkList.isInitServerInfo()) { + this.serverInfoManager().initServerInfo(nodeInfo); + this.checkList.setInitServerInfo(); + } this.initRoleStateMachine(nodeInfo.nodeId()); // TODO: check necessary? @@ -435,9 +447,18 @@ public void truncateBackend() { @Override public void initSystemInfo() { try { - this.taskScheduler().init(); - this.serverInfoManager().init(); - this.authManager().init(); + if(!this.checkList.isTaskSchedulerInit()) { + this.taskScheduler().init(); + this.checkList.setTaskSchedulerInit(); + } + if(!this.checkList.isServerInfoManagerInit()) { + this.serverInfoManager().init(); + this.checkList.setTaskSchedulerInit(); + } + if(!this.checkList.isAuthManagerInit()) { + this.authManager().init(); + this.checkList.setAuthManagerInit(); + } } finally { this.closeTx(); } @@ -1011,14 +1032,16 @@ public synchronized void close() throws Exception { } @Override - public void create(String configPath, GlobalMasterInfo nodeInfo) { - this.initBackend(); - this.serverStarted(nodeInfo); + public void create(String configPath, GlobalMasterInfo nodeInfo){ + this.checkList.setConfigPath(configPath); + this.checkList.setNodeInfo(nodeInfo); + this.initBackend(); + this.serverStarted(nodeInfo); - // Write config to the disk file - String confPath = ConfigUtil.writeToFile(configPath, this.name(), - this.configuration()); - this.configuration.file(confPath); + // Write config to disk file + String confPath = ConfigUtil.writeToFile(configPath, this.name(), + this.configuration()); + this.configuration.file(confPath); } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/variables/CheckList.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/variables/CheckList.java new file mode 100644 index 0000000000..cb580f59e6 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/variables/CheckList.java @@ -0,0 +1,118 @@ +package org.apache.hugegraph.variables; + +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; +import org.apache.hugegraph.pd.client.KvClient; +import org.apache.hugegraph.pd.client.PDConfig; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; +import org.apache.hugegraph.util.JsonUtil; +import org.junit.Before; + +import java.io.Serializable; + +public class CheckList implements Serializable { + + private String name; + + private HugeConfig config; + + private boolean taskSchedulerInit; + + private boolean serverInfoManagerInit; + + private boolean authManagerInit; + + private boolean initServerInfo; + + private String configPath; + + private GlobalMasterInfo nodeInfo; + + private KvClient client; + + + private final String preFix = "graph_creat_tx_"; + + @Before + public void setUp() { + this.client = new KvClient<>(PDConfig.of("localhost:8686")); + } + + public CheckList(String name, HugeConfig config) { + this.name = name; + this.config = config; + } + + public String getName() { + return name; + } + + public boolean isTaskSchedulerInit() { + return taskSchedulerInit; + } + + public void setTaskSchedulerInit() { + this.taskSchedulerInit = true; + String json = JsonUtil.toJson(this); + tryPut(json); + } + + public boolean isServerInfoManagerInit() { + return serverInfoManagerInit; + } + + public void setServerInfoManagerInit() { + this.serverInfoManagerInit = true; + String json = JsonUtil.toJson(this); + tryPut(json); + } + + public boolean isAuthManagerInit() { + return authManagerInit; + } + + public void setAuthManagerInit() { + this.authManagerInit = true; + String json = JsonUtil.toJson(this); + tryPut(json); + } + + public boolean isInitServerInfo() { + return initServerInfo; + } + + public void setInitServerInfo() { + this.initServerInfo = true; + String json = JsonUtil.toJson(this); + tryPut(json); + } + + private void tryPut(String json) { + try { + client.put(preFix + name, json); + } catch (PDException e) { + throw new RuntimeException(e); + } + } + + public String getConfigPath() { + return configPath; + } + + public void setConfigPath(String configPath) { + this.configPath = configPath; + } + + public GlobalMasterInfo getNodeInfo() { + return nodeInfo; + } + + public void setNodeInfo(GlobalMasterInfo nodeInfo) { + this.nodeInfo = nodeInfo; + } + + public HugeConfig getConfig() { + return config; + } +} diff --git a/hugegraph-server/hugegraph-dist/src/main/java/org/apache/hugegraph/TxScanner.java b/hugegraph-server/hugegraph-dist/src/main/java/org/apache/hugegraph/TxScanner.java new file mode 100644 index 0000000000..3bfa0c3438 --- /dev/null +++ b/hugegraph-server/hugegraph-dist/src/main/java/org/apache/hugegraph/TxScanner.java @@ -0,0 +1,43 @@ +package org.apache.hugegraph; + +import jakarta.ws.rs.core.Context; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.core.GraphManager; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; +import org.apache.hugegraph.pd.client.KvClient; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse; +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; +import org.apache.hugegraph.util.ConfigUtil; +import org.apache.hugegraph.util.Events; +import org.apache.hugegraph.util.JsonUtil; +import org.apache.hugegraph.variables.CheckList; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; + + + +public class TxScanner { + private final String prefix = "graph_creat_tx_"; + + @Context + GraphManager manager; + + private KvClient client; + + public TxScanner(KvClient client) { + } + + + public void scan() { + try { + ScanPrefixResponse response = this.client.scanPrefix(prefix); + for(String key : response.getKvsMap().keySet()) { + String value = response.getKvsMap().get(key); + CheckList checkList = JsonUtil.fromJson(value, CheckList.class); + HugeGraph graph = manager.createGraphRecover(checkList.getConfig(), checkList.getName()); + } + } catch (PDException e) { + throw new RuntimeException(e); + } + } +} diff --git "a/\350\256\276\350\256\241\346\226\207\346\241\243.md" "b/\350\256\276\350\256\241\346\226\207\346\241\243.md" new file mode 100644 index 0000000000..4dac72d30f --- /dev/null +++ "b/\350\256\276\350\256\241\346\226\207\346\241\243.md" @@ -0,0 +1,239 @@ +## 事务日志——checklist + +使用checklist来执行事务日志的记录,checklist以类的形式保存,记录检查点,每执行完一个节点需要做对应的记录。 + +checklist的结构如下: + +```java +package org.apache.hugegraph.variables; + +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; + +import java.io.Serializable; + +public class CheckList implements Serializable { + + private String name; + private String configText; + + private HugeConfig config; + + private boolean initBackended; + + private boolean serverStarted; + + private String stage; + + private String configPath; + + private GlobalMasterInfo nodeInfo; + + boolean toCheck; + String context; + private boolean isBuild; + + public void setBuild(boolean build) { + isBuild = build; + } + + public CheckList(String name, String context) { + this.name = name; + this.context = context; + } + + public HugeConfig getConfig() { + return config; + } + + public void setConfig(HugeConfig config) { + this.config = config; + } + + public boolean isInitBackended() { + return initBackended; + } + + public void setInitBackended(boolean initBackended) { + this.initBackended = initBackended; + } + + public boolean isServerStarted() { + return serverStarted; + } + + public void setServerStarted(boolean serverStarted) { + this.serverStarted = serverStarted; + } + + public String getStage() { + return stage; + } + + public void setStage(String stage) { + this.stage = stage; + } + + public String getConfigPath() { + return configPath; + } + + public void setConfigPath(String configPath) { + this.configPath = configPath; + } + + public GlobalMasterInfo getNodeInfo() { + return nodeInfo; + } + + public void setNodeInfo(GlobalMasterInfo nodeInfo) { + this.nodeInfo = nodeInfo; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} + +``` + + + +主要涉及到的函数有GraphsAPI类中的create、drop两个函数,以create为例,在原来的流程之上,在检查点处保存log信息: + +```java + public void create(String configPath, GlobalMasterInfo nodeInfo){ + //CheckList checkList = new CheckList(); + KResponse result = null; + try { + result = client.get(this.name); + String json = result.getValue(); + CheckList checkList = JsonUtil.fromJson(json, CheckList.class); + + this.initBackend(); + checkList.setInitBackended(true); + checkList.setStage("initBackend"); + client.put(name, JsonUtil.toJson(checkList)); + this.serverStarted(nodeInfo); + checkList.setServerStarted(true); + checkList.setStage("setServerStarted"); + client.put(name, JsonUtil.toJson(checkList)); + + + // Write config to disk file + String confPath = ConfigUtil.writeToFile(configPath, this.name(), + this.configuration()); + this.configuration.file(confPath); + } +``` + + + +除此之外,还有一些上下文信息需要保存,便于重新恢复: + +```java +checkList.setConfig(config); +``` + + + +序列化与日志存放,分别借用了hugegraph中的Jsonutil与pd中的kvclient: + +```java +private KvClient client; +CheckList checkList = JsonUtil.fromJson(json, CheckList.class); +client.put(name, JsonUtil.toJson(checkList)); +``` + + + +恢复机制:在HugeGraphServer中,启动时添加对事务的扫描,扫描的机制为,使用KvClient的前缀扫描,遇到没有执行完的,进行恢复执行。 + +```java +package org.apache.hugegraph; + +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.core.GraphManager; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; +import org.apache.hugegraph.pd.client.KvClient; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse; +import org.apache.hugegraph.pd.grpc.kv.WatchResponse; +import org.apache.hugegraph.util.ConfigUtil; +import org.apache.hugegraph.util.Events; +import org.apache.hugegraph.util.JsonUtil; +import org.apache.hugegraph.variables.CheckList; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; + + + +public class TxScanner { + private final String prefix = "graph_creat_tx"; + + private KvClient client; + + public TxScanner(KvClient client) { + } + + + public void scan() { + try { + ScanPrefixResponse response = this.client.scanPrefix(prefix); + for(String key : response.getKvsMap().keySet()) { + String value = response.getKvsMap().get(key); + CheckList checkList = JsonUtil.fromJson(value, CheckList.class); + switch (checkList.getStage()) { + case "config": { + configContinue(checkList); + } + case "initBackend" : { + HugeConfig config = checkList.getConfig(); + HugeGraph graph = (HugeGraph) GraphFactory.open(config); + GlobalMasterInfo globalMasterInfo = checkList.getNodeInfo(); + graph.serverStarted(globalMasterInfo); + // Write config to disk file + String confPath = ConfigUtil.writeToFile(checkList.getConfigPath(), graph.name(), + (HugeConfig)graph.configuration()); + } + case "setServerStarted" : { + HugeConfig config = checkList.getConfig(); + HugeGraph graph = (HugeGraph) GraphFactory.open(config); + String confPath = ConfigUtil.writeToFile(checkList.getConfigPath(), graph.name(), + (HugeConfig)graph.configuration()); + } + case "finish" : { + client.delete(prefix + checkList.getName()); + } + } + } + } catch (PDException e) { + throw new RuntimeException(e); + } + + } + + private void configContinue(CheckList checkList) { + HugeConfig config = checkList.getConfig(); + HugeGraph graph = (HugeGraph) GraphFactory.open(config); + try { + // Create graph instance + graph = (HugeGraph) GraphFactory.open(config); + String configPath = checkList.getConfigPath(); + GlobalMasterInfo globalMasterInfo = checkList.getNodeInfo(); + // Init graph and start it + graph.create(configPath, globalMasterInfo); + } catch (Throwable e) { + throw e; + } + + } + +} +``` + + +