Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): enhance transaction in graph server #2686

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hugegraph.auth.HugePermission;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.core.GraphManager;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.type.define.GraphReadMode;
import org.apache.hugegraph.util.E;
Expand Down Expand Up @@ -66,6 +68,8 @@ public class GraphsAPI extends API {
private static final String CONFIRM_CLEAR = "I'm sure to delete all data";
private static final String CONFIRM_DROP = "I'm sure to drop the graph";

private KvClient<WatchResponse> client;

@GET
@Timed
@Produces(APPLICATION_JSON_WITH_CHARSET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.variables.CheckList;
import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.auth.AuthManager;
Expand All @@ -53,6 +54,10 @@
import org.apache.hugegraph.masterelection.StandardRoleListener;
import org.apache.hugegraph.metrics.MetricsUtil;
import org.apache.hugegraph.metrics.ServerReporter;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.client.PDConfig;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.rpc.RpcClientProvider;
import org.apache.hugegraph.rpc.RpcConsumerConfig;
import org.apache.hugegraph.rpc.RpcProviderConfig;
Expand All @@ -66,12 +71,14 @@
import org.apache.hugegraph.util.ConfigUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Before;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.config.ServerConfig;
Expand All @@ -94,6 +101,7 @@ public final class GraphManager {
private final HugeConfig conf;
private final EventHub eventHub;


public GraphManager(HugeConfig conf, EventHub hub) {
this.graphsDir = conf.get(ServerOptions.GRAPHS);
this.graphs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -172,6 +180,8 @@ public HugeGraph cloneGraph(String name, String newName, String configText) {
}

public HugeGraph createGraph(String name, String configText) {


E.checkArgument(this.conf.get(ServerOptions.ENABLE_DYNAMIC_CREATE_DROP),
"Not allowed to create graph '%s' dynamically, " +
"please set `enable_dynamic_create_drop` to true.",
Expand All @@ -180,11 +190,13 @@ public HugeGraph createGraph(String name, String configText) {
"The graph name can't be null or empty");
E.checkArgument(!this.graphs().contains(name),
"The graph name '%s' has existed", name);

PropertiesConfiguration propConfig = ConfigUtil.buildConfig(configText);
HugeConfig config = new HugeConfig(propConfig);
this.checkOptions(config);
return this.createGraph(config, name);
}

public HugeGraph createGraphRecover(HugeConfig config, String name) {
return this.createGraph(config, name);
}

Expand Down Expand Up @@ -581,7 +593,7 @@ private void notifyAndWaitEvent(String event, HugeGraph graph) {
}
}

private HugeGraph createGraph(HugeConfig config, String name) {
private HugeGraph createGraph(HugeConfig config, String name){
HugeGraph graph = null;
try {
// Create graph instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
import org.apache.hugegraph.masterelection.RoleElectionStateMachine;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.rpc.RpcServiceConfig4Client;
import org.apache.hugegraph.rpc.RpcServiceConfig4Server;
import org.apache.hugegraph.schema.EdgeLabel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
import org.apache.hugegraph.memory.MemoryManager;
import org.apache.hugegraph.memory.util.RoundUtil;
import org.apache.hugegraph.meta.MetaManager;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.client.PDConfig;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.KResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.rpc.RpcServiceConfig4Client;
import org.apache.hugegraph.rpc.RpcServiceConfig4Server;
Expand All @@ -99,8 +104,10 @@
import org.apache.hugegraph.util.DateUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.LockUtil;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.variables.CheckList;
import org.apache.hugegraph.variables.HugeVariables;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.structure.Edge;
Expand Down Expand Up @@ -182,6 +189,8 @@ public class StandardHugeGraph implements HugeGraph {

private final String schedulerType;

private KvClient<WatchResponse> client = new KvClient<>(PDConfig.of("localhost:8686"));
private CheckList checkList;
public StandardHugeGraph(HugeConfig config) {
this.params = new StandardHugeGraphParams();
this.configuration = config;
Expand Down Expand Up @@ -216,6 +225,7 @@ public StandardHugeGraph(HugeConfig config) {
this.mode = GraphMode.NONE;
this.readMode = GraphReadMode.OLTP_ONLY;
this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);
this.checkList = new CheckList(this.name, this.configuration);

MemoryManager.setMemoryMode(
MemoryManager.MemoryMode.fromValue(config.get(CoreOptions.MEMORY_MODE)));
Expand Down Expand Up @@ -294,8 +304,10 @@ public void serverStarted(GlobalMasterInfo nodeInfo) {

LOG.info("Init server info [{}-{}] for graph '{}'...",
nodeInfo.nodeId(), nodeInfo.nodeRole(), this.name);
this.serverInfoManager().initServerInfo(nodeInfo);

if(!this.checkList.isInitServerInfo()) {
this.serverInfoManager().initServerInfo(nodeInfo);
this.checkList.setInitServerInfo();
}
this.initRoleStateMachine(nodeInfo.nodeId());

// TODO: check necessary?
Expand Down Expand Up @@ -435,9 +447,18 @@ public void truncateBackend() {
@Override
public void initSystemInfo() {
try {
this.taskScheduler().init();
this.serverInfoManager().init();
this.authManager().init();
if(!this.checkList.isTaskSchedulerInit()) {
this.taskScheduler().init();
this.checkList.setTaskSchedulerInit();
}
if(!this.checkList.isServerInfoManagerInit()) {
this.serverInfoManager().init();
this.checkList.setTaskSchedulerInit();
}
if(!this.checkList.isAuthManagerInit()) {
this.authManager().init();
this.checkList.setAuthManagerInit();
}
} finally {
this.closeTx();
}
Expand Down Expand Up @@ -1011,14 +1032,16 @@ public synchronized void close() throws Exception {
}

@Override
public void create(String configPath, GlobalMasterInfo nodeInfo) {
this.initBackend();
this.serverStarted(nodeInfo);
public void create(String configPath, GlobalMasterInfo nodeInfo){
this.checkList.setConfigPath(configPath);
this.checkList.setNodeInfo(nodeInfo);
this.initBackend();
this.serverStarted(nodeInfo);

// Write config to the disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
// Write config to disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.apache.hugegraph.variables;

import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.client.PDConfig;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.util.JsonUtil;
import org.junit.Before;

import java.io.Serializable;

public class CheckList implements Serializable {

private String name;

private HugeConfig config;

private boolean taskSchedulerInit;

private boolean serverInfoManagerInit;

private boolean authManagerInit;

private boolean initServerInfo;

private String configPath;

private GlobalMasterInfo nodeInfo;

private KvClient<WatchResponse> client;


private final String preFix = "graph_creat_tx_";

@Before
public void setUp() {
this.client = new KvClient<>(PDConfig.of("localhost:8686"));
}

public CheckList(String name, HugeConfig config) {
this.name = name;
this.config = config;
}

public String getName() {
return name;
}

public boolean isTaskSchedulerInit() {
return taskSchedulerInit;
}

public void setTaskSchedulerInit() {
this.taskSchedulerInit = true;
String json = JsonUtil.toJson(this);
tryPut(json);
}

public boolean isServerInfoManagerInit() {
return serverInfoManagerInit;
}

public void setServerInfoManagerInit() {
this.serverInfoManagerInit = true;
String json = JsonUtil.toJson(this);
tryPut(json);
}

public boolean isAuthManagerInit() {
return authManagerInit;
}

public void setAuthManagerInit() {
this.authManagerInit = true;
String json = JsonUtil.toJson(this);
tryPut(json);
}

public boolean isInitServerInfo() {
return initServerInfo;
}

public void setInitServerInfo() {
this.initServerInfo = true;
String json = JsonUtil.toJson(this);
tryPut(json);
}

private void tryPut(String json) {
try {
client.put(preFix + name, json);
} catch (PDException e) {
throw new RuntimeException(e);
}
}

public String getConfigPath() {
return configPath;
}

public void setConfigPath(String configPath) {
this.configPath = configPath;
}

public GlobalMasterInfo getNodeInfo() {
return nodeInfo;
}

public void setNodeInfo(GlobalMasterInfo nodeInfo) {
this.nodeInfo = nodeInfo;
}

public HugeConfig getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.hugegraph;

import jakarta.ws.rs.core.Context;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.core.GraphManager;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.util.ConfigUtil;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.variables.CheckList;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;



public class TxScanner {
private final String prefix = "graph_creat_tx_";

@Context
GraphManager manager;

private KvClient<WatchResponse> client;

public TxScanner(KvClient<WatchResponse> client) {
}


public void scan() {
try {
ScanPrefixResponse response = this.client.scanPrefix(prefix);
for(String key : response.getKvsMap().keySet()) {
String value = response.getKvsMap().get(key);
CheckList checkList = JsonUtil.fromJson(value, CheckList.class);
HugeGraph graph = manager.createGraphRecover(checkList.getConfig(), checkList.getName());
}
} catch (PDException e) {
throw new RuntimeException(e);
}
}
}
Loading