Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server) Fix cacheEventListener in CachedSystemTransactionV2 #2619

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,44 +107,11 @@ private void listenChanges() {
};
this.graphParams().loadGraphStore().provider().listen(this.storeEventListener);

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received schema cache event: {}",
this.graph(), event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALID.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Id.class);
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);

id = generateId(type, id);
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
this.idCache.invalidate(id);

// Invalidate name cache
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(),
schema.name());
this.nameCache.invalidate(prefixedName);
}
this.resetCachedAll(type);
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
this.clearCache(false);
return true;
}
return false;
};
EventHub schemaEventHub = this.graphParams().schemaEventHub();
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}
Comment on lines -145 to -147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting these codes will cause the test to report the error: Not found listener for cache...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i can't find the specific error information. could you tell me how to locate wehre the error occurred ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For detailed information, see the failed test case here 👉 https://github.com/apache/incubator-hugegraph/actions/runs/10431704040/job/28891653277?pr=2619

image

// Listen cache event: "cache.clear", ...
MetaManager.instance().listenSchemaCacheClear((e) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this.cacheEventListener will cause resource leak when unlistenChanges()

this.clearCache(true);
LOG.debug("Graph {} Schema cache cleared", this.graph());
});
}

public void clearCache(boolean notify) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -59,8 +61,13 @@

import com.google.common.collect.ImmutableMap;

import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class MetaManager {

private static final Logger LOG = Log.logger(MetaManager.class);
private static final String SCHEMA_CLEAR_KEY = "SCHEMA_CLEAR_KEY";
public static final String META_PATH_DELIMITER = "/";
public static final String META_PATH_JOIN = "-";

Expand Down Expand Up @@ -130,6 +137,7 @@ public class MetaManager {
private KafkaMetaManager kafkaMetaManager;
private SchemaTemplateMetaManager schemaTemplateManager;
private LockMetaManager lockMetaManager;
private ConcurrentHashMap<String, AtomicBoolean> listenerInitialized;

private MetaManager() {
}
Expand Down Expand Up @@ -187,6 +195,7 @@ private void initManagers(String cluster) {
this.kafkaMetaManager = new KafkaMetaManager(this.metaDriver, cluster);
this.schemaTemplateManager = new SchemaTemplateMetaManager(this.metaDriver, cluster);
this.lockMetaManager = new LockMetaManager(this.metaDriver, cluster);
this.listenerInitialized = new ConcurrentHashMap<>();
}

public <T> void listenGraphSpaceAdd(Consumer<T> consumer) {
Expand Down Expand Up @@ -242,7 +251,19 @@ public <T> void listenGraphClear(Consumer<T> consumer) {
}

public <T> void listenSchemaCacheClear(Consumer<T> consumer) {
this.graphMetaManager.listenSchemaCacheClear(consumer);
if (isListenerInitialized(SCHEMA_CLEAR_KEY)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why registering listener when isListenerInitialized(), do you mean isListenerNotInitialized()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,it is isListenerNotInitialized()

this.graphMetaManager.listenSchemaCacheClear(consumer);
LOG.debug("Schema cache clear listener registered");
}
}

public Boolean isListenerInitialized(String listenerName) {
listenerInitialized.putIfAbsent(listenerName, new AtomicBoolean(false));
AtomicBoolean flag = listenerInitialized.get(listenerName);
if (!flag.get()) {
return flag.compareAndSet(false, true);
}
return false;
}

public <T> void listenGraphCacheClear(Consumer<T> consumer) {
Expand Down
Loading