diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java index e6a5e78533..7fc6b6e681 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java @@ -107,44 +107,11 @@ private void listenChanges() { }; this.graphParams().loadGraphStore().provider().listen(this.storeEventListener); - // Listen cache event: "cache"(invalid cache item) - this.cacheEventListener = event -> { - LOG.debug("Graph {} received schema cache event: {}", - this.graph(), event); - Object[] args = event.args(); - E.checkArgument(args.length > 0 && args[0] instanceof String, - "Expect event action argument"); - if (Cache.ACTION_INVALID.equals(args[0])) { - event.checkArgs(String.class, HugeType.class, Id.class); - HugeType type = (HugeType) args[1]; - Id id = (Id) args[2]; - this.arrayCaches.remove(type, id); - - id = generateId(type, id); - Object value = this.idCache.get(id); - if (value != null) { - // Invalidate id cache - this.idCache.invalidate(id); - - // Invalidate name cache - SchemaElement schema = (SchemaElement) value; - Id prefixedName = generateId(schema.type(), - schema.name()); - this.nameCache.invalidate(prefixedName); - } - this.resetCachedAll(type); - return true; - } else if (Cache.ACTION_CLEAR.equals(args[0])) { - event.checkArgs(String.class, HugeType.class); - this.clearCache(false); - return true; - } - return false; - }; - EventHub schemaEventHub = this.graphParams().schemaEventHub(); - if (!schemaEventHub.containsListener(Events.CACHE)) { - schemaEventHub.listen(Events.CACHE, this.cacheEventListener); - } + // Listen cache event: "cache.clear", ... + MetaManager.instance().listenSchemaCacheClear((e) -> { + this.clearCache(true); + LOG.debug("Graph {} Schema cache cleared", this.graph()); + }); } public void clearCache(boolean notify) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java index faa1367e3c..bfedd4ed48 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; @@ -59,8 +61,13 @@ import com.google.common.collect.ImmutableMap; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + public class MetaManager { + private static final Logger LOG = Log.logger(MetaManager.class); + private static final String SCHEMA_CLEAR_KEY = "SCHEMA_CLEAR_KEY"; public static final String META_PATH_DELIMITER = "/"; public static final String META_PATH_JOIN = "-"; @@ -130,6 +137,7 @@ public class MetaManager { private KafkaMetaManager kafkaMetaManager; private SchemaTemplateMetaManager schemaTemplateManager; private LockMetaManager lockMetaManager; + private ConcurrentHashMap listenerInitialized; private MetaManager() { } @@ -187,6 +195,7 @@ private void initManagers(String cluster) { this.kafkaMetaManager = new KafkaMetaManager(this.metaDriver, cluster); this.schemaTemplateManager = new SchemaTemplateMetaManager(this.metaDriver, cluster); this.lockMetaManager = new LockMetaManager(this.metaDriver, cluster); + this.listenerInitialized = new ConcurrentHashMap<>(); } public void listenGraphSpaceAdd(Consumer consumer) { @@ -242,7 +251,19 @@ public void listenGraphClear(Consumer consumer) { } public void listenSchemaCacheClear(Consumer consumer) { - this.graphMetaManager.listenSchemaCacheClear(consumer); + if (isListenerInitialized(SCHEMA_CLEAR_KEY)) { + this.graphMetaManager.listenSchemaCacheClear(consumer); + LOG.debug("Schema cache clear listener registered"); + } + } + + public Boolean isListenerInitialized(String listenerName) { + listenerInitialized.putIfAbsent(listenerName, new AtomicBoolean(false)); + AtomicBoolean flag = listenerInitialized.get(listenerName); + if (!flag.get()) { + return flag.compareAndSet(false, true); + } + return false; } public void listenGraphCacheClear(Consumer consumer) {