diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index 4e9263a488..6480d7f288 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -316,8 +316,7 @@ private void initRoleStateMachine(Id serverId) { conf.get( RoleElectionOptions.BASE_TIMEOUT_MILLISECOND)); ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params); - this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, - roleStore); + this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, roleStore); } @Override @@ -1007,7 +1006,7 @@ public void create(String configPath, GlobalMasterInfo nodeInfo) { this.initBackend(); this.serverStarted(nodeInfo); - // Write config to disk file + // Write config to the disk file String confPath = ConfigUtil.writeToFile(configPath, this.name(), this.configuration()); this.configuration.file(confPath); @@ -1349,7 +1348,7 @@ public String schedulerType() { private class TinkerPopTransaction extends AbstractThreadLocalTransaction { - // Times opened from upper layer + // Times opened from the upper layer private final AtomicInteger refs; // Flag opened of each thread private final ThreadLocal opened; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java index a304b4f75b..71feb3f688 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,14 +46,13 @@ public class HugeServerInfo { // Unit millisecond - private static final long EXPIRED_INTERVAL = - TaskManager.SCHEDULE_PERIOD * 10; + private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD * 10; - private Id id; private NodeRole role; + private Date updateTime; private int maxLoad; private int load; - private Date updateTime; + private final Id id; private transient boolean updated = false; @@ -114,6 +112,10 @@ public void increaseLoad(int delta) { this.updated = true; } + public long expireTime() { + return this.updateTime.getTime() + EXPIRED_INTERVAL; + } + public Date updateTime() { return this.updateTime; } @@ -200,8 +202,7 @@ public Map asMap() { public static HugeServerInfo fromVertex(Vertex vertex) { HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id()); - for (Iterator> iter = vertex.properties(); - iter.hasNext(); ) { + for (var iter = vertex.properties(); iter.hasNext(); ) { VertexProperty prop = iter.next(); serverInfo.property(prop.key(), prop.value()); } @@ -246,7 +247,7 @@ public static final class Schema { public static final String SERVER = P.SERVER; - protected final HugeGraphParams graph; + private final HugeGraphParams graph; public Schema(HugeGraphParams graph) { this.graph = graph; @@ -264,8 +265,7 @@ public void initSchemaIfNeeded() { VertexLabel label = graph.schema().vertexLabel(SERVER) .properties(properties) .useCustomizeStringId() - .nullableKeys(P.ROLE, P.MAX_LOAD, - P.LOAD, P.UPDATE_TIME) + .nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD, P.UPDATE_TIME) .enableLabelIndex(true) .build(); this.graph.schemaTransaction().addVertexLabel(label); @@ -273,7 +273,6 @@ public void initSchemaIfNeeded() { private String[] initProperties() { List props = new ArrayList<>(); - props.add(createPropertyKey(P.ROLE, DataType.BYTE)); props.add(createPropertyKey(P.MAX_LOAD, DataType.INT)); props.add(createPropertyKey(P.LOAD, DataType.INT)); @@ -283,8 +282,7 @@ private String[] initProperties() { } public boolean existVertexLabel(String label) { - return this.graph.schemaTransaction() - .getVertexLabel(label) != null; + return this.graph.schemaTransaction().getVertexLabel(label) != null; } @SuppressWarnings("unused") @@ -296,8 +294,7 @@ private String createPropertyKey(String name, DataType dataType) { return this.createPropertyKey(name, dataType, Cardinality.SINGLE); } - private String createPropertyKey(String name, DataType dataType, - Cardinality cardinality) { + private String createPropertyKey(String name, DataType dataType, Cardinality cardinality) { SchemaManager schema = this.graph.graph().schema(); PropertyKey propertyKey = schema.propertyKey(name) .dataType(dataType) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java index de0d08b03a..bcef869017 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java @@ -67,8 +67,7 @@ public class ServerInfoManager { private volatile boolean onlySingleNode; private volatile boolean closed; - public ServerInfoManager(HugeGraphParams graph, - ExecutorService dbExecutor) { + public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) { E.checkNotNull(graph, "graph"); E.checkNotNull(dbExecutor, "db executor"); @@ -107,9 +106,20 @@ public synchronized void initServerInfo(GlobalMasterInfo nodeInfo) { Id serverId = nodeInfo.nodeId(); HugeServerInfo existed = this.serverInfo(serverId); + if (existed != null && existed.alive()) { + final long now = DateUtil.now().getTime(); + if (existed.expireTime() > now + 30 * 1000) { + LOG.info("The node time maybe skew very much: {}", existed); + throw new HugeException("The server with name '%s' maybe skew very much", serverId); + } + try { + Thread.sleep(existed.expireTime() - now + 1); + } catch (InterruptedException e) { + throw new HugeException("Interrupted when waiting for server info expired", e); + } + } E.checkArgument(existed == null || !existed.alive(), - "The server with name '%s' already in cluster", - serverId); + "The server with name '%s' already in cluster", serverId); if (nodeInfo.nodeRole().master()) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; @@ -185,13 +195,12 @@ public synchronized void heartbeat() { /* ServerInfo is missing */ if (this.selfNodeId() == null) { // Ignore if ServerInfo is not initialized - LOG.info("ServerInfo is missing: {}, may not be initialized yet"); + LOG.info("ServerInfo is missing: {}, may not be initialized yet", this.selfNodeId()); return; } if (this.selfIsMaster()) { - // On master node, just wait for ServerInfo re-init - LOG.warn("ServerInfo is missing: {}, may be cleared before", - this.selfNodeId()); + // On the master node, just wait for ServerInfo re-init + LOG.warn("ServerInfo is missing: {}, may be cleared before", this.selfNodeId()); return; } /* @@ -232,12 +241,10 @@ protected synchronized HugeServerInfo pickWorkerNode(Collection if (!server.alive()) { continue; } - if (server.role().master()) { master = server; continue; } - hasWorkerNode = true; if (!server.suitableFor(task, now)) { continue; @@ -254,13 +261,12 @@ protected synchronized HugeServerInfo pickWorkerNode(Collection this.onlySingleNode = singleNode; } - // Only schedule to master if there is no workers and master is suitable + // Only schedule to master if there are no workers and master are suitable if (!hasWorkerNode) { if (master != null && master.suitableFor(task, now)) { serverWithMinLoad = master; } } - return serverWithMinLoad; } @@ -286,8 +292,7 @@ private Id save(HugeServerInfo serverInfo) { throw new HugeException("Schema is missing for %s '%s'", HugeServerInfo.P.SERVER, serverInfo); } - HugeVertex vertex = this.tx().constructVertex(false, - serverInfo.asArray()); + HugeVertex vertex = this.tx().constructVertex(false, serverInfo.asArray()); // Add or update server info in backend store vertex = this.tx().addVertex(vertex); return vertex.id(); @@ -301,8 +306,7 @@ private int save(Collection serverInfos) { } HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { - throw new HugeException("Schema is missing for %s", - HugeServerInfo.P.SERVER); + throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER); } // Save server info in batch GraphTransaction tx = this.tx(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index 8afe11dff2..1395888611 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -120,7 +120,7 @@ private TaskTransaction tx() { if (this.taskTx == null) { BackendStore store = this.graph.loadSystemStore(); TaskTransaction tx = new TaskTransaction(this.graph, store); - assert this.taskTx == null; // may be reentrant? + assert this.taskTx == null; // maybe reentrant? this.taskTx = tx; } } @@ -196,7 +196,7 @@ public Future schedule(HugeTask task) { if (this.serverManager().onlySingleNode() && !task.computer()) { /* - * Speed up for single node, submit task immediately, + * Speed up for single node, submit the task immediately, * this code can be removed without affecting code logic */ task.status(TaskStatus.QUEUED); @@ -205,7 +205,7 @@ public Future schedule(HugeTask task) { return this.submitTask(task); } else { /* - * Just set SCHEDULING status and save task, + * Just set the SCHEDULING status and save the task, * it will be scheduled by periodic scheduler worker */ task.status(TaskStatus.SCHEDULING); @@ -276,11 +276,11 @@ public synchronized void cancel(HugeTask task) { assert this.serverManager().selfIsMaster(); if (!task.server().equals(this.serverManager().selfNodeId())) { /* - * Remove task from memory if it's running on worker node, - * but keep task in memory if it's running on master node. - * cancel-scheduling will read task from backend store, if + * Remove the task from memory if it's running on worker node, + * but keep the task in memory if it's running on master node. + * Cancel-scheduling will read the task from backend store, if * removed this instance from memory, there will be two task - * instances with same id, and can't cancel the real task that + * instances with the same id, and can't cancel the real task that * is running but removed from memory. */ this.remove(task); @@ -301,12 +301,10 @@ public ServerInfoManager serverManager() { protected synchronized void scheduleTasksOnMaster() { // Master server schedule all scheduling tasks to suitable worker nodes - Collection serverInfos = this.serverManager() - .allServerInfos(); + Collection serverInfos = this.serverManager().allServerInfos(); String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, - PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page); while (tasks.hasNext()) { HugeTask task = tasks.next(); if (task.server() != null) { @@ -318,12 +316,10 @@ protected synchronized void scheduleTasksOnMaster() { return; } - HugeServerInfo server = this.serverManager().pickWorkerNode( - serverInfos, task); + HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task); if (server == null) { LOG.info("The master can't find suitable servers to " + - "execute task '{}', wait for next schedule", - task.id()); + "execute task '{}', wait for next schedule", task.id()); continue; } @@ -336,8 +332,7 @@ protected synchronized void scheduleTasksOnMaster() { // Update server load in memory, it will be saved at the ending server.increaseLoad(task.load()); - LOG.info("Scheduled task '{}' to server '{}'", - task.id(), server.id()); + LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id()); } if (page != null) { page = PageInfo.pageInfo(tasks); @@ -351,8 +346,7 @@ protected synchronized void scheduleTasksOnMaster() { protected void executeTasksOnWorker(Id server) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, - PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page); while (tasks.hasNext()) { HugeTask task = tasks.next(); this.initTaskCallable(task); @@ -381,8 +375,7 @@ protected void executeTasksOnWorker(Id server) { protected void cancelTasksOnWorker(Id server) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.CANCELLING, - PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page); while (tasks.hasNext()) { HugeTask task = tasks.next(); Id taskServer = task.server(); @@ -557,10 +550,10 @@ public HugeTask delete(Id id, boolean force) { HugeTask task = this.task(id); /* - * The following is out of date when task running on worker node: + * The following is out of date when the task running on worker node: * HugeTask task = this.tasks.get(id); * Tasks are removed from memory after completed at most time, - * but there is a tiny gap between tasks are completed and + * but there is a tiny gap between tasks is completed and * removed from memory. * We assume tasks only in memory may be incomplete status, * in fact, it is also possible to appear on the backend tasks @@ -621,7 +614,7 @@ private HugeTask waitUntilTaskCompleted(Id id, long seconds, throw e; } if (task.completed()) { - // Wait for task result being set after status is completed + // Wait for the task result being set after the status is completed sleep(intervalMs); return task; }