diff --git a/exist-core/src/main/java/org/exist/collections/triggers/TriggerStatePerThread.java b/exist-core/src/main/java/org/exist/collections/triggers/TriggerStatePerThread.java index 0faad32731d..b5c99909861 100644 --- a/exist-core/src/main/java/org/exist/collections/triggers/TriggerStatePerThread.java +++ b/exist-core/src/main/java/org/exist/collections/triggers/TriggerStatePerThread.java @@ -21,6 +21,8 @@ */ package org.exist.collections.triggers; +import org.exist.storage.txn.Txn; +import org.exist.storage.txn.TxnListener; import org.exist.xmldb.XmldbURI; import javax.annotation.Nullable; @@ -30,6 +32,8 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Avoid infinite recursions in Triggers by preventing the same trigger @@ -39,10 +43,10 @@ */ public class TriggerStatePerThread { - private static final ConcurrentMap> THREAD_LOCAL_STATES = new ConcurrentHashMap<>(); + private static final ConcurrentMap> TRIGGER_STATES = new ConcurrentHashMap<>(); - public static void setAndTest(final Trigger trigger, final TriggerPhase triggerPhase, final TriggerEvent triggerEvent, final XmldbURI src, final @Nullable XmldbURI dst) throws CyclicTriggerException { - final Deque states = getStates(); + public static void setAndTest(final Txn txn, final Trigger trigger, final TriggerPhase triggerPhase, final TriggerEvent triggerEvent, final XmldbURI src, final @Nullable XmldbURI dst) throws CyclicTriggerException { + final Deque states = getStates(txn); if (states.isEmpty()) { if (triggerPhase != TriggerPhase.BEFORE) { @@ -115,11 +119,11 @@ public CyclicTriggerException(final String message) { } } - public static void clearIfFinished(final TriggerPhase phase) { + public static void clearIfFinished(final Txn txn, final TriggerPhase phase) { if (phase == TriggerPhase.AFTER) { int depth = 0; - final Deque states = getStates(); + final Deque states = getStates(txn); for (final Iterator it = states.descendingIterator(); it.hasNext(); ) { final TriggerState state = it.next(); switch (state.triggerPhase) { @@ -135,24 +139,45 @@ public static void clearIfFinished(final TriggerPhase phase) { } if (depth == 0) { - clear(); + clear(txn); } } } - public static void clear() { - THREAD_LOCAL_STATES.remove(Thread.currentThread()); + public static void clear(final Txn txn) { + TRIGGER_STATES.remove(txn); } - public static boolean isEmpty() { - return getStates().isEmpty(); + public static boolean isEmpty(final Txn txn) { + return getStates(txn).isEmpty(); } - private static Deque getStates() { - return THREAD_LOCAL_STATES.computeIfAbsent(Thread.currentThread(), thread -> new ArrayDeque<>()); + public static void forEach(BiConsumer> action) { + TRIGGER_STATES.forEach(action); } - record TriggerState(Trigger trigger, TriggerPhase triggerPhase, TriggerEvent triggerEvent, XmldbURI src, + private static Deque getStates(final Txn txn) { + return TRIGGER_STATES.computeIfAbsent(txn, TriggerStatePerThread::initStates); + } + + private static Deque initStates(final Txn txn) { + txn.registerListener(new TransactionCleanUp(txn, TriggerStatePerThread::clear)); + return new ArrayDeque<>(); + } + + public record TransactionCleanUp(Txn txn, Consumer consumer) implements TxnListener { + @Override + public void commit() { + consumer.accept(txn); + } + + @Override + public void abort() { + consumer.accept(txn); + } + } + + public record TriggerState(Trigger trigger, TriggerPhase triggerPhase, TriggerEvent triggerEvent, XmldbURI src, @Nullable XmldbURI dst, boolean possiblyCyclic) { @Override diff --git a/exist-core/src/main/java/org/exist/collections/triggers/XQueryTrigger.java b/exist-core/src/main/java/org/exist/collections/triggers/XQueryTrigger.java index ddde971e207..887d69e6eb1 100644 --- a/exist-core/src/main/java/org/exist/collections/triggers/XQueryTrigger.java +++ b/exist-core/src/main/java/org/exist/collections/triggers/XQueryTrigger.java @@ -227,7 +227,7 @@ private void prepare(final TriggerEvent event, final DBBroker broker, final Txn // avoid infinite recursion try { - TriggerStatePerThread.setAndTest(this, TriggerPhase.BEFORE, event, src, dst); + TriggerStatePerThread.setAndTest(transaction,this, TriggerPhase.BEFORE, event, src, dst); } catch (final TriggerStatePerThread.CyclicTriggerException e) { LOG.warn(e.getMessage()); return; @@ -241,7 +241,7 @@ private void prepare(final TriggerEvent event, final DBBroker broker, final Txn declareExternalVariables(context, TriggerPhase.BEFORE, event, src, dst, isCollection); } catch (final XPathException | IOException | PermissionDeniedException e) { - TriggerStatePerThread.clear(); + TriggerStatePerThread.clear(transaction); throw new TriggerException(PREPARE_EXCEPTION_MESSAGE, e); } @@ -255,7 +255,7 @@ private void prepare(final TriggerEvent event, final DBBroker broker, final Txn LOG.debug("Trigger fired for prepare"); } } catch (final XPathException | PermissionDeniedException e) { - TriggerStatePerThread.clear(); + TriggerStatePerThread.clear(transaction); throw new TriggerException(PREPARE_EXCEPTION_MESSAGE, e); } finally { context.runCleanupTasks(); @@ -271,7 +271,7 @@ private void finish(final TriggerEvent event, final DBBroker broker, final Txn t // avoid infinite recursion try { - TriggerStatePerThread.setAndTest(this, TriggerPhase.AFTER, event, src, dst); + TriggerStatePerThread.setAndTest(transaction,this, TriggerPhase.AFTER, event, src, dst); } catch (final TriggerStatePerThread.CyclicTriggerException e) { LOG.warn(e.getMessage()); return; @@ -305,7 +305,7 @@ private void finish(final TriggerEvent event, final DBBroker broker, final Txn t context.runCleanupTasks(); } - TriggerStatePerThread.clearIfFinished(TriggerPhase.AFTER); + TriggerStatePerThread.clearIfFinished(transaction, TriggerPhase.AFTER); if (LOG.isDebugEnabled()) { LOG.debug("Trigger fired for finish"); @@ -393,10 +393,11 @@ private CompiledXQuery getScript(final DBBroker broker, final Txn transaction) t } private void execute(final TriggerPhase phase, final TriggerEvent event, final DBBroker broker, final Txn transaction, final QName functionName, final XmldbURI src, final XmldbURI dst) throws TriggerException { + System.err.format("phase: %s, event: %s, tx: %s, thread: %s", phase, event, transaction, Thread.currentThread()).println(); // avoid infinite recursion try { - TriggerStatePerThread.setAndTest(this, phase, event, src, dst); + TriggerStatePerThread.setAndTest(transaction, this, phase, event, src, dst); } catch (final TriggerStatePerThread.CyclicTriggerException e) { LOG.warn("Skipping Trigger: {}", e.getMessage()); return; @@ -414,7 +415,7 @@ private void execute(final TriggerPhase phase, final TriggerEvent event, final D return; } } catch (final TriggerException e) { - TriggerStatePerThread.clear(); + TriggerStatePerThread.clear(transaction); throw e; } @@ -454,14 +455,14 @@ private void execute(final TriggerPhase phase, final TriggerEvent event, final D } } - TriggerStatePerThread.clear(); + TriggerStatePerThread.clear(transaction); throw new TriggerException(PREPARE_EXCEPTION_MESSAGE, e); } finally { compiledQuery.reset(); context.runCleanupTasks(); } - TriggerStatePerThread.clearIfFinished(phase); + TriggerStatePerThread.clearIfFinished(transaction, phase); if (LOG.isDebugEnabled()) { if (phase == TriggerPhase.AFTER) {