-
Notifications
You must be signed in to change notification settings - Fork 528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(server) Fix cacheEventListener in CachedSystemTransactionV2 #2619
base: master
Are you sure you want to change the base?
Changes from all commits
f23f1ca
8be1a1e
ffd4d16
ebafb8e
c7248c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,44 +107,11 @@ private void listenChanges() { | |
}; | ||
this.graphParams().loadGraphStore().provider().listen(this.storeEventListener); | ||
|
||
// Listen cache event: "cache"(invalid cache item) | ||
this.cacheEventListener = event -> { | ||
LOG.debug("Graph {} received schema cache event: {}", | ||
this.graph(), event); | ||
Object[] args = event.args(); | ||
E.checkArgument(args.length > 0 && args[0] instanceof String, | ||
"Expect event action argument"); | ||
if (Cache.ACTION_INVALID.equals(args[0])) { | ||
event.checkArgs(String.class, HugeType.class, Id.class); | ||
HugeType type = (HugeType) args[1]; | ||
Id id = (Id) args[2]; | ||
this.arrayCaches.remove(type, id); | ||
|
||
id = generateId(type, id); | ||
Object value = this.idCache.get(id); | ||
if (value != null) { | ||
// Invalidate id cache | ||
this.idCache.invalidate(id); | ||
|
||
// Invalidate name cache | ||
SchemaElement schema = (SchemaElement) value; | ||
Id prefixedName = generateId(schema.type(), | ||
schema.name()); | ||
this.nameCache.invalidate(prefixedName); | ||
} | ||
this.resetCachedAll(type); | ||
return true; | ||
} else if (Cache.ACTION_CLEAR.equals(args[0])) { | ||
event.checkArgs(String.class, HugeType.class); | ||
this.clearCache(false); | ||
return true; | ||
} | ||
return false; | ||
}; | ||
EventHub schemaEventHub = this.graphParams().schemaEventHub(); | ||
if (!schemaEventHub.containsListener(Events.CACHE)) { | ||
schemaEventHub.listen(Events.CACHE, this.cacheEventListener); | ||
} | ||
// Listen cache event: "cache.clear", ... | ||
MetaManager.instance().listenSchemaCacheClear((e) -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removing this.cacheEventListener will cause resource leak when unlistenChanges() |
||
this.clearCache(true); | ||
LOG.debug("Graph {} Schema cache cleared", this.graph()); | ||
}); | ||
} | ||
|
||
public void clearCache(boolean notify) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ | |
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Consumer; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
|
@@ -59,8 +61,13 @@ | |
|
||
import com.google.common.collect.ImmutableMap; | ||
|
||
import org.apache.hugegraph.util.Log; | ||
import org.slf4j.Logger; | ||
|
||
public class MetaManager { | ||
|
||
private static final Logger LOG = Log.logger(MetaManager.class); | ||
private static final String SCHEMA_CLEAR_KEY = "SCHEMA_CLEAR_KEY"; | ||
public static final String META_PATH_DELIMITER = "/"; | ||
public static final String META_PATH_JOIN = "-"; | ||
|
||
|
@@ -130,6 +137,7 @@ public class MetaManager { | |
private KafkaMetaManager kafkaMetaManager; | ||
private SchemaTemplateMetaManager schemaTemplateManager; | ||
private LockMetaManager lockMetaManager; | ||
private ConcurrentHashMap<String, AtomicBoolean> listenerInitialized; | ||
|
||
private MetaManager() { | ||
} | ||
|
@@ -187,6 +195,7 @@ private void initManagers(String cluster) { | |
this.kafkaMetaManager = new KafkaMetaManager(this.metaDriver, cluster); | ||
this.schemaTemplateManager = new SchemaTemplateMetaManager(this.metaDriver, cluster); | ||
this.lockMetaManager = new LockMetaManager(this.metaDriver, cluster); | ||
this.listenerInitialized = new ConcurrentHashMap<>(); | ||
} | ||
|
||
public <T> void listenGraphSpaceAdd(Consumer<T> consumer) { | ||
|
@@ -242,7 +251,19 @@ public <T> void listenGraphClear(Consumer<T> consumer) { | |
} | ||
|
||
public <T> void listenSchemaCacheClear(Consumer<T> consumer) { | ||
this.graphMetaManager.listenSchemaCacheClear(consumer); | ||
if (isListenerInitialized(SCHEMA_CLEAR_KEY)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure why registering listener when isListenerInitialized(), do you mean isListenerNotInitialized()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes,it is isListenerNotInitialized() |
||
this.graphMetaManager.listenSchemaCacheClear(consumer); | ||
LOG.debug("Schema cache clear listener registered"); | ||
} | ||
} | ||
|
||
public Boolean isListenerInitialized(String listenerName) { | ||
listenerInitialized.putIfAbsent(listenerName, new AtomicBoolean(false)); | ||
AtomicBoolean flag = listenerInitialized.get(listenerName); | ||
if (!flag.get()) { | ||
return flag.compareAndSet(false, true); | ||
} | ||
return false; | ||
} | ||
|
||
public <T> void listenGraphCacheClear(Consumer<T> consumer) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleting these codes will cause the test to report the error: Not found listener for cache...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, i can't find the specific error information. could you tell me how to locate wehre the error occurred ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For detailed information, see the failed test case here 👉 https://github.com/apache/incubator-hugegraph/actions/runs/10431704040/job/28891653277?pr=2619