Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support MemoryManagement for graph query framework #2649

Merged
merged 51 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7a652be
feat: framework for memoryManagement
Pengzna Aug 25, 2024
ccc918b
wip: memory pool and manager framework
Pengzna Aug 25, 2024
b266358
wip: memory allocation
Pengzna Oct 8, 2024
ff10c3a
wip: memory reclaim
Pengzna Oct 9, 2024
26b0cfe
rename class
Pengzna Oct 9, 2024
f6aeace
fix review
Pengzna Oct 9, 2024
72e5bf3
remove useless allocator
Pengzna Oct 9, 2024
48f4817
netty allocator
Pengzna Oct 10, 2024
d906d04
revert config.properties
Pengzna Oct 13, 2024
b308be0
fix and improvement for allocation and deallocation
Pengzna Oct 22, 2024
09367a1
Merge remote-tracking branch 'refs/remotes/base/master' into memory/m…
Pengzna Oct 22, 2024
ea9a459
move monitor
Pengzna Oct 22, 2024
f552fd2
Revert "move monitor"
Pengzna Oct 22, 2024
8a2c65c
improve memory arbitration
Pengzna Oct 22, 2024
c37f869
suspend query when arbitration & kill query when OOM
Pengzna Oct 23, 2024
5904909
fix review
Pengzna Oct 23, 2024
0e70e44
fury test
Pengzna Oct 23, 2024
5d71541
offHeap magic
Pengzna Oct 23, 2024
f73f0ab
Revert "fury test"
Pengzna Oct 23, 2024
871015e
offHeap magic util
Pengzna Oct 23, 2024
8344443
complete adoption for all id
Pengzna Oct 25, 2024
d9cf408
complete property adoption
Pengzna Oct 26, 2024
aaeacb5
release ByteBuf off heap memory block
Pengzna Oct 26, 2024
ef0d629
complete allocate memory test and fix bug
Pengzna Oct 27, 2024
54d1fd8
fix some bugs: arbitration & suspend
Pengzna Oct 27, 2024
bfe75c0
complete OOM UT and fix bugs
Pengzna Oct 27, 2024
ab1bcde
complete memory management framework UT and fix all bugs
Pengzna Oct 27, 2024
91df57a
fix ut
Pengzna Oct 27, 2024
52ca7af
keep format consistent with original version
Pengzna Oct 28, 2024
ee8e125
fix review
Pengzna Oct 28, 2024
1a7d461
Merge branch 'master' into memory/management
imbajin Oct 28, 2024
de9d7a1
wip: adoption to query chain & introduce factory
Pengzna Oct 28, 2024
46066eb
fix concurrent bug when local arbitrate
Pengzna Oct 29, 2024
4f1e966
add comments
Pengzna Oct 29, 2024
7be5069
Merge remote-tracking branch 'origin/memory/management' into memory/m…
Pengzna Oct 29, 2024
231b647
feat: off-heap object factory
Pengzna Oct 29, 2024
865f1fb
fix gc child bugs and add consumer test
Pengzna Oct 29, 2024
f34e233
fix deallocate netty memory block bug & add complexId test
Pengzna Oct 29, 2024
879390b
complete all ut
Pengzna Oct 29, 2024
a96e9ee
fix all bugs
Pengzna Oct 29, 2024
7c86e84
dependency
Pengzna Oct 29, 2024
5af2cb9
add comments
Pengzna Oct 29, 2024
5e47bb0
add private constructor for singleton
Pengzna Oct 29, 2024
b77346b
add memory management config
Pengzna Oct 29, 2024
d00a8df
improve robustness
Pengzna Oct 29, 2024
fecc909
remove duplicate
Pengzna Oct 29, 2024
31f1feb
improve condition usage
Pengzna Oct 29, 2024
d4035bd
improve log
Pengzna Oct 29, 2024
d87388b
fix memory conservation bug
Pengzna Oct 29, 2024
d25396d
Revert "dependency"
Pengzna Nov 4, 2024
b334c61
revert duplicate known-dependencies.txt under huge-common
Pengzna Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hugegraph-server/hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.9.0-SNAPSHOT</version>
</dependency>

<!-- jraft -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,21 @@ public Number queryNumber(Query query) {
/**
* Query as an Id for cache
*/
static class QueryId implements Id {
public static class QueryId implements Id {

private String query;
private int hashCode;
protected String query;
protected int hashCode;

public QueryId(Query q) {
this.query = q.toString();
this.hashCode = q.hashCode();
}

public QueryId(String query, int hashCode) {
this.query = query;
this.hashCode = hashCode;
}

@Override
public IdType type() {
return IdType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
* > sortKeys > target-vertex-id }
* NOTE:
* <p>1. for edges with edgeLabelType = NORMAL: edgeLabelId = parentEdgeLabelId = subEdgeLabelId;
* for edges with edgeLabelType = PARENT: edgeLabelId = subEdgeLabelId, parentEdgeLabelId =
* edgeLabelId.fatherId
* for edges with edgeLabelType = PARENT: edgeLabelId = subEdgeLabelId, parentEdgeLabelId =
* edgeLabelId.fatherId
* <p>2.if we use `entry.type()` which is IN or OUT as a part of id,
* an edge's id will be different due to different directions (belongs
* to 2 owner vertex)
Expand All @@ -49,15 +49,14 @@ public class EdgeId implements Id {
HugeKeys.OTHER_VERTEX
};

private final Id ownerVertexId;
private final Directions direction;
private final Id edgeLabelId;
private final Id subLabelId;
private final String sortValues;
private final Id otherVertexId;

private final boolean directed;
private String cache;
protected final Id ownerVertexId;
protected final Id edgeLabelId;
protected final Id subLabelId;
protected final Id otherVertexId;
protected final Directions direction;
protected final boolean directed;
protected String sortValues;
protected String cache;
Pengzna marked this conversation as resolved.
Show resolved Hide resolved

public EdgeId(HugeVertex ownerVertex, Directions direction,
Id edgeLabelId, Id subLabelId, String sortValues, HugeVertex otherVertex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ public static IdType idType(Id id) {
return IdType.UNKNOWN;
}

private static int compareType(Id id1, Id id2) {
public static int compareType(Id id1, Id id2) {
return idType(id1).ordinal() - idType(id2).ordinal();
}

/****************************** id defines ******************************/

public static final class StringId implements Id {
public static class StringId implements Id {

private final String id;
protected String id;

public StringId(String id) {
E.checkArgument(!id.isEmpty(), "The id can't be empty");
Expand Down Expand Up @@ -196,11 +196,11 @@ public String toString() {
}
}

public static final class LongId extends Number implements Id {
public static class LongId extends Number implements Id {

private static final long serialVersionUID = -7732461469037400190L;

private final long id;
protected Long id;
Copy link
Contributor

@javeme javeme Oct 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep primitive type long because a object reference also cause 64bits memory when it's off heap, and cause more memory when it's on heap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because primitive type long can't be set to null by hand. We need a data structure that can be manually set to null so that it can be GCed immediately to free up memory. Primitive long can only be GCed along with the original object.
image


public LongId(long id) {
this.id = id;
Expand Down Expand Up @@ -270,7 +270,7 @@ public String toString() {

@Override
public int intValue() {
return (int) this.id;
return this.id.intValue();
}

@Override
Expand All @@ -289,9 +289,9 @@ public double doubleValue() {
}
}

public static final class UuidId implements Id {
public static class UuidId implements Id {

private final UUID uuid;
protected UUID uuid;

public UuidId(String string) {
this(StringEncoding.uuid(string));
Expand Down Expand Up @@ -379,9 +379,9 @@ public String toString() {
/**
* This class is just used by backend store for wrapper object as Id
*/
public static final class ObjectId implements Id {
public static class ObjectId implements Id {

private final Object object;
protected Object object;

public ObjectId(Object object) {
E.checkNotNull(object, "object");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition)
}

// FIXME: `enablePartition` is unused here
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition, boolean isOlap) {
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition,
boolean isOlap) {
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
this(type, BytesBuffer.wrap(bytes).parseOlapId(type, isOlap));
}

Expand Down Expand Up @@ -207,10 +208,10 @@ public int hashCode() {
return this.id().hashCode() ^ this.columns.size();
}

public static final class BinaryId implements Id {
public static class BinaryId implements Id {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems don't need to transform BinaryId, it's just short-term used during serialization. you cam double check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


private final byte[] bytes;
private final Id id;
protected byte[] bytes;
protected Id id;

public BinaryId(byte[] bytes, Id id) {
this.bytes = bytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory;

import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hugegraph.memory.arbitrator.MemoryArbitrator;
import org.apache.hugegraph.memory.arbitrator.MemoryArbitratorImpl;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.memory.pool.impl.QueryMemoryPool;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryManager {

private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
private static final int ARBITRATE_MEMORY_THREAD_NUM = 12;
private static final String QUERY_MEMORY_POOL_NAME_PREFIX = "QueryMemoryPool";
private static final String ARBITRATE_MEMORY_POOL_NAME = "ArbitrateMemoryPool";
public static final String DELIMINATOR = "_";

// TODO: read it from conf, current 1G
public static final long MAX_MEMORY_CAPACITY_IN_BYTES = Bytes.GB;
private final AtomicLong currentAvailableMemoryInBytes =
new AtomicLong(MAX_MEMORY_CAPACITY_IN_BYTES);
private final AtomicLong currentOffHeapAllocatedMemoryInBytes = new AtomicLong(0);
private final AtomicLong currentOnHeapAllocatedMemoryInBytes = new AtomicLong(0);

private final Queue<MemoryPool> queryMemoryPools =
new PriorityQueue<>((o1, o2) -> (int) (o2.getFreeBytes() - o1.getFreeBytes()));
private final Map<String, MemoryPool> threadName2TaskMemoryPoolMap = new ConcurrentHashMap<>();

private final MemoryArbitrator memoryArbitrator;
private final ExecutorService arbitrateExecutor;

private MemoryManager() {
this.memoryArbitrator = new MemoryArbitratorImpl(this);
this.arbitrateExecutor = ExecutorUtil.newFixedThreadPool(ARBITRATE_MEMORY_THREAD_NUM,
ARBITRATE_MEMORY_POOL_NAME);
}

public MemoryPool addQueryMemoryPool() {
int count = queryMemoryPools.size();
String poolName =
QUERY_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR +
System.currentTimeMillis();
MemoryPool queryPool = new QueryMemoryPool(poolName, this);
queryMemoryPools.add(queryPool);
LOG.info("Manager added query memory pool {}", queryPool);
return queryPool;
}

public void gcQueryMemoryPool(MemoryPool pool) {
LOG.info("Manager gc query memory pool {}", pool);
queryMemoryPools.remove(pool);
long reclaimedMemory = pool.getAllocatedBytes();
pool.releaseSelf(String.format("GC query memory pool %s", pool), false);
currentAvailableMemoryInBytes.addAndGet(reclaimedMemory);
}

public long triggerLocalArbitration(MemoryPool targetPool, long neededBytes) {
LOG.info("LocalArbitration triggered by {}: needed bytes={}", targetPool, neededBytes);
Future<Long> future =
arbitrateExecutor.submit(
() -> memoryArbitrator.reclaimLocally(targetPool, neededBytes));
try {
return future.get(MemoryArbitrator.MAX_WAIT_TIME_FOR_LOCAL_RECLAIM,
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.warn("MemoryManager: arbitration locally for {} timed out", targetPool, e);
} catch (InterruptedException | ExecutionException e) {
LOG.error("MemoryManager: arbitration locally for {} interrupted or failed",
targetPool,
e);
}
return 0;
}

public long triggerGlobalArbitration(MemoryPool requestPool, long neededBytes) {
LOG.info("GlobalArbitration triggered by {}: needed bytes={}", requestPool, neededBytes);
Future<Long> future =
arbitrateExecutor.submit(
() -> memoryArbitrator.reclaimGlobally(requestPool, neededBytes));
try {
return future.get(MemoryArbitrator.MAX_WAIT_TIME_FOR_GLOBAL_RECLAIM,
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.warn("MemoryManager: arbitration globally for {} timed out", requestPool, e);
} catch (InterruptedException | ExecutionException e) {
LOG.error("MemoryManager: arbitration globally for {} interrupted or failed",
requestPool, e);
}
return 0;
}

public synchronized long handleRequestFromQueryPool(long size) {
// 1. check whole memory capacity.
if (currentAvailableMemoryInBytes.get() < size) {
LOG.info("There isn't enough memory for query pool to expand itself: " +
"requestSize={}, remainingCapacity={}", size,
currentAvailableMemoryInBytes.get());
return -1;
}
currentAvailableMemoryInBytes.addAndGet(-size);
LOG.info("Expand query pool successfully: " +
"requestSize={}, afterThisExpandingRemainingCapacity={}", size,
currentAvailableMemoryInBytes.get());
return size;
}

/**
* Used by task thread to find its memory pool to release self's memory resource when exiting.
*/
public MemoryPool getCorrespondingTaskMemoryPool(String threadName) {
return threadName2TaskMemoryPoolMap.getOrDefault(threadName, null);
}

public void bindCorrespondingTaskMemoryPool(String threadName, MemoryPool memoryPool) {
threadName2TaskMemoryPoolMap.computeIfAbsent(threadName, key -> memoryPool);
}

public void removeCorrespondingTaskMemoryPool(String threadName) {
threadName2TaskMemoryPoolMap.remove(threadName);
}

public Queue<MemoryPool> getCurrentQueryMemoryPools() {
return new PriorityQueue<>(queryMemoryPools);
}

public void consumeAvailableMemory(long size) {
currentAvailableMemoryInBytes.addAndGet(-size);
}

public AtomicLong getCurrentOnHeapAllocatedMemoryInBytes() {
return currentOnHeapAllocatedMemoryInBytes;
}

public AtomicLong getCurrentOffHeapAllocatedMemoryInBytes() {
return currentOffHeapAllocatedMemoryInBytes;
}

private static class MemoryManagerHolder {

private static final MemoryManager INSTANCE = new MemoryManager();

private MemoryManagerHolder() {
// empty constructor
}
}

public static MemoryManager getInstance() {
return MemoryManagerHolder.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.allocator;

public interface MemoryAllocator {

Object tryToAllocate(long size);

Object forceAllocate(long size);

void returnMemoryToManager(long size);

void releaseMemoryBlock(Object memoryBlock);
}
Loading
Loading