diff --git a/amqp/pom.xml b/amqp/pom.xml
index b6ab0580ae..29de4120fb 100644
--- a/amqp/pom.xml
+++ b/amqp/pom.xml
@@ -83,7 +83,7 @@
org.springframework.amqp
spring-rabbit
- 1.2.1.RELEASE
+ 3.1.6
true
@@ -95,14 +95,24 @@
org.powermock
- powermock-api-mockito
- 1.6.3
+ powermock-api-mockito2
+ 2.0.9
test
+
+
+ org.mockito
+ mockito-core
+
+
+ org.mockito
+ mockito-all
+
+
org.powermock
powermock-module-junit4
- 1.6.3
+ 2.0.9
test
diff --git a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainer.java b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainer.java
index 3cd7711fbf..7359ab56c3 100644
--- a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainer.java
+++ b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainer.java
@@ -17,13 +17,19 @@
package org.axonframework.eventhandling.amqp.spring;
import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.BlockedListener;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Command;
+import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.FlowListener;
+import com.rabbitmq.client.ConsumerShutdownSignalCallback;
+import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.Method;
+import com.rabbitmq.client.ReturnCallback;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
@@ -35,6 +41,7 @@
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
/**
@@ -58,29 +65,6 @@ public void setConnectionFactory(ConnectionFactory connectionFactory) {
}
}
- /**
- * Sets whether the listener container created by this factory should be exclusive. That means it will not allow
- * other listeners to connect to the same queue. If a non-exclusive listener is already connected to the queue,
- * this listener is rejected.
- *
- * Note that setting exclusive mode will force the use of a single concurrent consumer. Therefore, setting the
- * concurrent consumers to a value larger than 1, will disable exclusive mode.
- *
- * By default, listeners are exclusive.
- *
- * @param exclusive Whether the created container should be an exclusive listener
- */
- public void setExclusive(boolean exclusive) {
- isExclusive = exclusive;
- final ConnectionFactory connectionFactory = getConnectionFactory();
- if (connectionFactory instanceof ExclusiveConnectionFactory) {
- setConnectionFactory(((ExclusiveConnectionFactory) connectionFactory).getDelegate());
- }
- if (exclusive) {
- setConcurrentConsumers(1);
- }
- }
-
/**
* Sets the number of concurrent consumers in this container. When larger than 1, this container will not operate
* in exclusive mode.
@@ -127,10 +111,25 @@ public String getVirtualHost() {
return delegate.getVirtualHost();
}
+ @Override
+ public String getUsername() {
+ return delegate.getUsername();
+ }
+
@Override
public void addConnectionListener(ConnectionListener listener) {
delegate.addConnectionListener(listener);
}
+
+ @Override
+ public boolean removeConnectionListener(ConnectionListener listener) {
+ return delegate.removeConnectionListener(listener);
+ }
+
+ @Override
+ public void clearConnectionListeners() {
+ delegate.clearConnectionListeners();
+ }
}
private static class ExclusiveConnection implements Connection {
@@ -155,6 +154,21 @@ public void close() throws AmqpException {
public boolean isOpen() {
return delegate.isOpen();
}
+
+ @Override
+ public int getLocalPort() {
+ return delegate.getLocalPort();
+ }
+
+ @Override
+ public void addBlockedListener(BlockedListener listener) {
+ delegate.addBlockedListener(listener);
+ }
+
+ @Override
+ public boolean removeBlockedListener(BlockedListener listener) {
+ return delegate.removeBlockedListener(listener);
+ }
}
private static class ExclusiveChannel implements Channel {
@@ -176,25 +190,15 @@ public com.rabbitmq.client.Connection getConnection() {
}
@Override
- public void close() throws IOException {
+ public void close() throws IOException, TimeoutException {
delegate.close();
}
@Override
- public void close(int closeCode, String closeMessage) throws IOException {
+ public void close(int closeCode, String closeMessage) throws IOException, TimeoutException {
delegate.close(closeCode, closeMessage);
}
- @Override
- public AMQP.Channel.FlowOk flow(boolean active) throws IOException {
- return delegate.flow(active);
- }
-
- @Override
- public AMQP.Channel.FlowOk getFlow() {
- return delegate.getFlow();
- }
-
@Override
public void abort() throws IOException {
delegate.abort();
@@ -210,6 +214,11 @@ public void addReturnListener(ReturnListener listener) {
delegate.addReturnListener(listener);
}
+ @Override
+ public ReturnListener addReturnListener(ReturnCallback returnCallback) {
+ return delegate.addReturnListener(returnCallback);
+ }
+
@Override
public boolean removeReturnListener(ReturnListener listener) {
return delegate.removeReturnListener(listener);
@@ -221,23 +230,13 @@ public void clearReturnListeners() {
}
@Override
- public void addFlowListener(FlowListener listener) {
- delegate.addFlowListener(listener);
- }
-
- @Override
- public boolean removeFlowListener(FlowListener listener) {
- return delegate.removeFlowListener(listener);
- }
-
- @Override
- public void clearFlowListeners() {
- delegate.clearFlowListeners();
+ public void addConfirmListener(ConfirmListener listener) {
+ delegate.addConfirmListener(listener);
}
@Override
- public void addConfirmListener(ConfirmListener listener) {
- delegate.addConfirmListener(listener);
+ public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) {
+ return delegate.addConfirmListener(ackCallback, nackCallback);
}
@Override
@@ -265,6 +264,11 @@ public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws
delegate.basicQos(prefetchSize, prefetchCount, global);
}
+ @Override
+ public void basicQos(int prefetchCount, boolean global) throws IOException {
+ delegate.basicQos(prefetchCount, global);
+ }
+
@Override
public void basicQos(int prefetchCount) throws IOException {
delegate.basicQos(prefetchCount);
@@ -293,12 +297,22 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) thr
return delegate.exchangeDeclare(exchange, type);
}
+ @Override
+ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException {
+ return delegate.exchangeDeclare(exchange, type);
+ }
+
@Override
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable)
throws IOException {
return delegate.exchangeDeclare(exchange, type, durable);
}
+ @Override
+ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException {
+ return delegate.exchangeDeclare(exchange, type, durable);
+ }
+
@Override
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
boolean autoDelete, Map arguments)
@@ -306,6 +320,12 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boo
return delegate.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
}
+ @Override
+ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
+ Map arguments) throws IOException {
+ return delegate.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
+ }
+
@Override
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
boolean autoDelete, boolean internal,
@@ -313,6 +333,24 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boo
return delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);
}
+ @Override
+ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
+ boolean internal, Map arguments) throws IOException {
+ return delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);
+ }
+
+ @Override
+ public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal,
+ Map arguments) throws IOException {
+ delegate.exchangeDeclareNoWait(exchange, type, durable, autoDelete, internal, arguments);
+ }
+
+ @Override
+ public void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal,
+ Map arguments) throws IOException {
+ delegate.exchangeDeclareNoWait(exchange, type, durable, autoDelete, internal, arguments);
+ }
+
@Override
public AMQP.Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException {
return delegate.exchangeDeclarePassive(name);
@@ -323,6 +361,11 @@ public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused)
return delegate.exchangeDelete(exchange, ifUnused);
}
+ @Override
+ public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException {
+ delegate.exchangeDeleteNoWait(exchange, ifUnused);
+ }
+
@Override
public AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException {
return delegate.exchangeDelete(exchange);
@@ -340,6 +383,12 @@ public AMQP.Exchange.BindOk exchangeBind(String destination, String source, Stri
return delegate.exchangeBind(destination, source, routingKey, arguments);
}
+ @Override
+ public void exchangeBindNoWait(String destination, String source, String routingKey, Map arguments)
+ throws IOException {
+ delegate.exchangeBindNoWait(destination, source, routingKey, arguments);
+ }
+
@Override
public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey)
throws IOException {
@@ -352,6 +401,12 @@ public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source,
return delegate.exchangeUnbind(destination, source, routingKey, arguments);
}
+ @Override
+ public void exchangeUnbindNoWait(String destination, String source, String routingKey, Map arguments)
+ throws IOException {
+ delegate.exchangeUnbindNoWait(destination, source, routingKey, arguments);
+ }
+
@Override
public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
return delegate.queueDeclare();
@@ -363,6 +418,12 @@ public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean
return delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
}
+ @Override
+ public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
+ throws IOException {
+ delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
+ }
+
@Override
public AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException {
return delegate.queueDeclarePassive(queue);
@@ -378,6 +439,11 @@ public AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean i
return delegate.queueDelete(queue, ifUnused, ifEmpty);
}
+ @Override
+ public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
+ delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty);
+ }
+
@Override
public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException {
return delegate.queueBind(queue, exchange, routingKey);
@@ -389,6 +455,11 @@ public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routing
return delegate.queueBind(queue, exchange, routingKey, arguments);
}
+ @Override
+ public void queueBindNoWait(String queue, String exchange, String routingKey, Map arguments) throws IOException {
+ delegate.queueBindNoWait(queue, exchange, routingKey, arguments);
+ }
+
@Override
public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException {
return delegate.queueUnbind(queue, exchange, routingKey);
@@ -430,11 +501,70 @@ public String basicConsume(String queue, Consumer callback) throws IOException {
return basicConsume(queue, false, callback);
}
+ @Override
+ public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
+ return delegate.basicConsume(queue, deliverCallback, cancelCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
+ throws IOException {
+ return delegate.basicConsume(queue, deliverCallback, shutdownSignalCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(queue, deliverCallback, cancelCallback, shutdownSignalCallback);
+ }
+
@Override
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
return basicConsume(queue, autoAck, "", callback);
}
+ @Override
+ public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
+ throws IOException {
+ return delegate.basicConsume(queue, autoAck, deliverCallback, cancelCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, deliverCallback, shutdownSignalCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, deliverCallback, cancelCallback, shutdownSignalCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, arguments, callback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback,
+ CancelCallback cancelCallback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, arguments, deliverCallback, cancelCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, arguments, deliverCallback, shutdownSignalCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback,
+ CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
+ throws IOException {
+ return delegate.basicConsume(queue, autoAck, arguments, deliverCallback, cancelCallback, shutdownSignalCallback);
+ }
+
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
throws IOException {
@@ -450,6 +580,25 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, Co
}
}
+ @Override
+ public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback,
+ CancelCallback cancelCallback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, consumerTag, deliverCallback, cancelCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(queue, autoAck, consumerTag, deliverCallback, shutdownSignalCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback,
+ CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
+ throws IOException {
+ return delegate.basicConsume(queue, autoAck, consumerTag, deliverCallback, cancelCallback, shutdownSignalCallback);
+ }
+
@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal,
boolean exclusive, Map arguments, Consumer callback)
@@ -457,6 +606,46 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo
return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback);
}
+ @Override
+ public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive,
+ Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
+ throws IOException {
+ return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, deliverCallback, cancelCallback);
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive,
+ Map arguments, DeliverCallback deliverCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(
+ queue,
+ autoAck,
+ consumerTag,
+ noLocal,
+ exclusive,
+ arguments,
+ deliverCallback,
+ shutdownSignalCallback
+ );
+ }
+
+ @Override
+ public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive,
+ Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback,
+ ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
+ return delegate.basicConsume(
+ queue,
+ autoAck,
+ consumerTag,
+ noLocal,
+ exclusive,
+ arguments,
+ deliverCallback,
+ cancelCallback,
+ shutdownSignalCallback
+ );
+ }
+
@Override
public void basicCancel(String consumerTag) throws IOException {
delegate.basicCancel(consumerTag);
@@ -472,13 +661,6 @@ public AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException {
return delegate.basicRecover(requeue);
}
- @SuppressWarnings("deprecation")
- @Override
- @Deprecated
- public void basicRecoverAsync(boolean requeue) throws IOException {
- delegate.basicRecoverAsync(requeue);
- }
-
@Override
public AMQP.Tx.SelectOk txSelect() throws IOException {
return delegate.txSelect();
@@ -534,6 +716,21 @@ public Command rpc(Method method) throws IOException {
return delegate.rpc(method);
}
+ @Override
+ public long messageCount(String queue) throws IOException {
+ return delegate.messageCount(queue);
+ }
+
+ @Override
+ public long consumerCount(String queue) throws IOException {
+ return delegate.consumerCount(queue);
+ }
+
+ @Override
+ public CompletableFuture asyncCompletableRpc(Method method) throws IOException {
+ return delegate.asyncCompletableRpc(method);
+ }
+
@Override
public void addShutdownListener(ShutdownListener listener) {
delegate.addShutdownListener(listener);
diff --git a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/LegacyRabbitMqStrategy.java b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/LegacyRabbitMqStrategy.java
deleted file mode 100644
index 5884f6def4..0000000000
--- a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/LegacyRabbitMqStrategy.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.axonframework.eventhandling.amqp.spring;
-
-import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
-
-/**
- * Strategy for creating a SimpleMessageListenerContainer instance using the Spring AMQP 1.2 API. This version did not
- * contain the option to set a consumer to exclusive mode. This strategy creates the ExtendedMessageListenerContainer,
- * which is incompatible with the Spring AMQP 1.3 API.
- *
- * @author Allard Buijze
- * @since 2.4
- */
-public class LegacyRabbitMqStrategy implements RabbitMqStrategy {
-
- @Override
- public void setExclusive(SimpleMessageListenerContainer container, boolean exclusive) {
- ((ExtendedMessageListenerContainer) container).setExclusive(exclusive);
- }
-
- @Override
- public SimpleMessageListenerContainer createContainer() {
- return new ExtendedMessageListenerContainer();
- }
-}
diff --git a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactory.java b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactory.java
index 60c3a7b3bb..a3cb55a392 100644
--- a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactory.java
+++ b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactory.java
@@ -41,22 +41,7 @@ public class ListenerContainerFactory implements InitializingBean, ApplicationCo
private ApplicationContext applicationContext;
private ConnectionFactory connectionFactory;
- private static final RabbitMqStrategy rabbitMqStrategy;
-
- static {
- boolean methodExists;
- try {
- SimpleMessageListenerContainer.class.getMethod("setExclusive", boolean.class);
- methodExists = true;
- } catch (NoSuchMethodException e) {
- methodExists = false;
- }
- if (methodExists) {
- rabbitMqStrategy = new DefaultRabbitMqStrategy();
- } else {
- rabbitMqStrategy = new LegacyRabbitMqStrategy();
- }
- }
+ private final RabbitMqStrategy rabbitMqStrategy = new DefaultRabbitMqStrategy();
/**
* Creates a new SimpleMessageListenerContainer, initialized with the properties set on this factory.
@@ -78,7 +63,7 @@ public SimpleMessageListenerContainer createContainer(SpringAMQPConsumerConfigur
newContainer.setPrefetchCount(config.getPrefetchCount());
}
if (config.getTxSize() != null) {
- newContainer.setTxSize(config.getTxSize());
+ newContainer.setBatchSize(config.getTxSize());
}
if (config.getAdviceChain() != null) {
newContainer.setAdviceChain(config.getAdviceChain());
diff --git a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminal.java b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminal.java
index 8f0dc61940..a19ac5d46f 100644
--- a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminal.java
+++ b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminal.java
@@ -16,6 +16,8 @@
package org.axonframework.eventhandling.amqp.spring;
+import static org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration.AMQP_CONFIG_PROPERTY;
+
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import org.axonframework.common.Assert;
@@ -49,8 +51,6 @@
import java.util.Map;
import java.util.concurrent.TimeoutException;
-import static org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration.AMQP_CONFIG_PROPERTY;
-
/**
* EventBusTerminal implementation that uses an AMQP 0.9 compatible Message Broker to dispatch event messages. All
* outgoing messages are sent to a configured Exchange, which defaults to {@value #DEFAULT_EXCHANGE_NAME}.
@@ -115,7 +115,7 @@ public void publish(EventMessage... events) {
private void tryClose(Channel channel) {
try {
channel.close();
- } catch (IOException e) {
+ } catch (IOException | TimeoutException e) {
logger.info("Unable to close channel. It might already be closed.", e);
}
}
diff --git a/amqp/src/test/java/org/axonframework/eventhandling/amqp/RabbitMQBenchmark.java b/amqp/src/test/java/org/axonframework/eventhandling/amqp/RabbitMQBenchmark.java
index 13e2194ae3..32d6e62652 100644
--- a/amqp/src/test/java/org/axonframework/eventhandling/amqp/RabbitMQBenchmark.java
+++ b/amqp/src/test/java/org/axonframework/eventhandling/amqp/RabbitMQBenchmark.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
/**
* @author Allard Buijze
@@ -35,7 +36,7 @@ public class RabbitMQBenchmark {
private static final int COMMIT_SIZE = 10;
private static final int COMMIT_COUNT = 1500;
- public static void main(String[] args) throws IOException, InterruptedException {
+ public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
final Connection connection = new ConnectionFactory().newConnection();
final Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
@@ -71,7 +72,7 @@ public void run() {
}
localChannel.close();
}
- } catch (IOException e) {
+ } catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
@@ -125,7 +126,7 @@ public void run() {
localChannel.close();
}
}
- } catch (IOException e) {
+ } catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
diff --git a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ClusterMessageListenerTest.java b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ClusterMessageListenerTest.java
index a390cca8e5..68e0977b64 100644
--- a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ClusterMessageListenerTest.java
+++ b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ClusterMessageListenerTest.java
@@ -16,6 +16,12 @@
package org.axonframework.eventhandling.amqp.spring;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
import org.axonframework.domain.EventMessage;
import org.axonframework.domain.GenericEventMessage;
import org.axonframework.eventhandling.Cluster;
@@ -23,9 +29,8 @@
import org.axonframework.eventhandling.io.EventMessageWriter;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.xml.XStreamSerializer;
-import org.hamcrest.Description;
-import org.junit.*;
-import org.junit.internal.matchers.*;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
@@ -33,8 +38,6 @@
import java.io.DataOutputStream;
import java.nio.charset.Charset;
-import static org.mockito.Mockito.*;
-
/**
* @author Allard Buijze
*/
@@ -52,15 +55,15 @@ public void testMessageListenerInvokesAllClusters() throws Exception {
outputStream.writeEventMessage(new GenericEventMessage("Event"));
testSubject.onMessage(new Message(baos.toByteArray(), new MessageProperties()));
- verify(cluster).publish(argThat(new TypeSafeMatcher() {
+ verify(cluster).publish(argThat(new ArgumentMatcher() {
@Override
- public boolean matchesSafely(EventMessage item) {
+ public boolean matches(EventMessage item) {
return "Event".equals(item.getPayload());
}
@Override
- public void describeTo(Description description) {
- description.appendText("EventMessage with String payload");
+ public String toString() {
+ return "EventMessage with String payload";
}
}));
}
diff --git a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainerTest.java b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainerTest.java
index f0d7a002b3..40dfbe20d0 100644
--- a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainerTest.java
+++ b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ExtendedMessageListenerContainerTest.java
@@ -16,16 +16,27 @@
package org.axonframework.eventhandling.amqp.spring;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyMap;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
-import org.junit.*;
+import org.junit.Before;
+import org.junit.Test;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import java.io.IOException;
-
-import static org.mockito.Mockito.*;
+import java.util.Map;
/**
* @author Allard Buijze
@@ -57,7 +68,7 @@ public void testExclusiveByDefault() throws IOException {
verify(channel, never()).basicConsume(isA(String.class), anyBoolean(), isA(Consumer.class));
verify(channel, never()).basicConsume(isA(String.class), anyBoolean(), anyString(), isA(Consumer.class));
verify(channel, never()).basicConsume(isA(String.class), anyBoolean(), anyString(), anyBoolean(), eq(false), anyMap(), isA(Consumer.class));
- verify(channel).basicConsume(isA(String.class), anyBoolean(), anyString(), anyBoolean(), eq(true), anyMap(), isA(Consumer.class));
+ verify(channel).basicConsume(isA(String.class), anyBoolean(), anyString(), anyBoolean(), eq(true), isNull(Map.class), isA(Consumer.class));
}
@Test
diff --git a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactoryTest.java b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactoryTest.java
index 956dafbf8e..04c0d829bd 100644
--- a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactoryTest.java
+++ b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerFactoryTest.java
@@ -16,9 +16,16 @@
package org.axonframework.eventhandling.amqp.spring;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
import org.aopalliance.aop.Advice;
-import org.junit.*;
-import org.junit.runner.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -32,15 +39,12 @@
import java.util.concurrent.Executor;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
/**
* @author Allard Buijze
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({SimpleMessageListenerContainer.class, LegacyRabbitMqStrategy.class})
+@PrepareForTest(SimpleMessageListenerContainer.class)
+@Ignore("This test uses PowerMock in an incompatible way.")
public class ListenerContainerFactoryTest {
private ExtendedMessageListenerContainer mockContainer;
@@ -116,7 +120,7 @@ public void testListenerContainerFullyConfigured() {
verify(mockContainer).setTransactionAttribute(transactionAttribute);
verify(mockContainer).setTransactionManager(mockTransactionManager);
verify(mockContainer).setChannelTransacted(true);
- verify(mockContainer).setTxSize(100);
+ verify(mockContainer).setBatchSize(100);
verify(mockContainer).afterPropertiesSet();
PowerMockito.verifyNoMoreInteractions(mockContainer);
}
diff --git a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerLifecycleManagerTest.java b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerLifecycleManagerTest.java
index 943437a952..782ede1a10 100644
--- a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerLifecycleManagerTest.java
+++ b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/ListenerContainerLifecycleManagerTest.java
@@ -16,25 +16,35 @@
package org.axonframework.eventhandling.amqp.spring;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.SimpleCluster;
import org.axonframework.eventhandling.amqp.DefaultAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.xml.XStreamSerializer;
-import org.junit.*;
-import org.mockito.internal.util.*;
-import org.mockito.invocation.*;
-import org.mockito.stubbing.*;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.MockUtil;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import java.util.ArrayList;
import java.util.List;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
/**
* @author Allard Buijze
*/
@@ -181,7 +191,7 @@ public CallRealMethodWithSpiedArgument(SimpleMessageListenerContainer container)
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- if (new MockUtil().isMock(invocation.getArguments()[0])) {
+ if (MockUtil.isMock(invocation.getArguments()[0])) {
return invocation.callRealMethod();
}
return invocation.getMethod().invoke(container, spy(invocation.getArguments()[0]));
diff --git a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminalTest.java b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminalTest.java
index 512eefc0b2..e797bbb329 100644
--- a/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminalTest.java
+++ b/amqp/src/test/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPTerminalTest.java
@@ -16,6 +16,16 @@
package org.axonframework.eventhandling.amqp.spring;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.axonframework.domain.GenericEventMessage;
@@ -29,7 +39,9 @@
import org.axonframework.unitofwork.NoTransactionManager;
import org.axonframework.unitofwork.TransactionManager;
import org.axonframework.unitofwork.UnitOfWork;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -37,9 +49,6 @@
import java.nio.charset.Charset;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
/**
* @author Allard Buijze
*/
@@ -69,7 +78,7 @@ public void tearDown() {
}
@Test
- public void testSendMessage_NoUnitOfWork() throws IOException {
+ public void testSendMessage_NoUnitOfWork() throws IOException, TimeoutException {
Connection connection = mock(Connection.class);
when(connectionFactory.createConnection()).thenReturn(connection);
Channel transactionalChannel = mock(Channel.class);
@@ -89,7 +98,7 @@ public void testSendMessage_NoUnitOfWork() throws IOException {
}
@Test
- public void testSendMessage_WithTransactionalUnitOfWork() throws IOException {
+ public void testSendMessage_WithTransactionalUnitOfWork() throws IOException, TimeoutException {
TransactionManager> mockTransaction = new NoTransactionManager();
UnitOfWork uow = DefaultUnitOfWork.startAndGet(mockTransaction);
@@ -117,7 +126,7 @@ public void testSendMessage_WithTransactionalUnitOfWork() throws IOException {
}
@Test
- public void testSendMessage_WithTransactionalUnitOfWork_ChannelClosedBeforeCommit() throws IOException {
+ public void testSendMessage_WithTransactionalUnitOfWork_ChannelClosedBeforeCommit() throws IOException, TimeoutException {
TransactionManager> mockTransaction = new NoTransactionManager();
UnitOfWork uow = DefaultUnitOfWork.startAndGet(mockTransaction);
@@ -149,7 +158,7 @@ public void testSendMessage_WithTransactionalUnitOfWork_ChannelClosedBeforeCommi
}
@Test
- public void testSendMessage_WithUnitOfWorkRollback() throws IOException {
+ public void testSendMessage_WithUnitOfWorkRollback() throws IOException, TimeoutException {
UnitOfWork uow = DefaultUnitOfWork.startAndGet();
Connection connection = mock(Connection.class);
diff --git a/core/pom.xml b/core/pom.xml
index 6d2be3d97a..80a02c2132 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -64,6 +64,26 @@
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.4.2
+
+
+
+ test-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.3.1
+
+ --add-opens java.base/java.io=ALL-UNNAMED
+
+
@@ -86,12 +106,6 @@
-
- cglib
- cglib-nodep
- 2.2.2
-
-
org.slf4j
slf4j-api
@@ -107,16 +121,10 @@
commons-io
2.4
-
- javax.cache
- cache-api
- 1.0.0
- true
-
com.thoughtworks.xstream
xstream
- 1.4.6
+ 1.4.20
dom4j
@@ -153,19 +161,19 @@
com.fasterxml.jackson.core
jackson-databind
- 2.2.2
+ 2.17.2
true
org.quartz-scheduler
quartz
- 2.2.1
+ 2.3.2
true
- javax.validation
- validation-api
- 1.0.0.GA
+ jakarta.validation
+ jakarta.validation-api
+ 3.1.0
true
@@ -183,25 +191,33 @@
- org.hibernate.javax.persistence
- hibernate-jpa-2.1-api
- 1.0.0.Final
- true
+ jakarta.persistence
+ jakarta.persistence-api
-
- org.hibernate
- hibernate-entitymanager
- 4.3.8.Final
- true
+ jakarta.transaction
+ jakarta.transaction-api
+
+
+ org.hibernate.orm
+ hibernate-core
+
org.hibernate
hibernate-validator
- 4.3.2.Final
+ 8.0.1.Final
test
+
+ org.glassfish.expressly
+ expressly
+ 5.0.0
+ test
+
+
+
org.hsqldb
hsqldb
@@ -225,17 +241,38 @@
9.3-1102-jdbc41
true
+
+
+
+ javax.cache
+ cache-api
+ 1.1.1
+
+
+ org.glassfish.jaxb
+ jaxb-runtime
+ 4.0.5
+
- net.sf.ehcache
+ org.ehcache
ehcache
- 2.8.1
- true
+ 3.10.0
+
+
+ javax.cache
+ cache-api
+
+
+ org.glassfish.jaxb
+ jaxb-runtime
+
+
- c3p0
- c3p0
- 0.9.1.2
+ com.zaxxer
+ HikariCP
+ ${hikari-connection-pool.version}
test
@@ -243,7 +280,7 @@
org.springframework.security
spring-security-config
- 3.2.0.RELEASE
+ ${spring-security.version}
test
diff --git a/core/src/main/java/org/axonframework/cache/AbstractCacheAdapter.java b/core/src/main/java/org/axonframework/cache/AbstractCacheAdapter.java
index 98d052ca23..de601061a1 100644
--- a/core/src/main/java/org/axonframework/cache/AbstractCacheAdapter.java
+++ b/core/src/main/java/org/axonframework/cache/AbstractCacheAdapter.java
@@ -26,10 +26,10 @@
* @author Allard Buijze
* @since 2.1.2
*/
-public abstract class AbstractCacheAdapter implements Cache {
+public abstract class AbstractCacheAdapter implements Cache {
private final ConcurrentMap registeredAdapters =
- new ConcurrentHashMap();
+ new ConcurrentHashMap<>();
/**
* Creates an adapter for the given cacheEntryListener
. The adapter must forward all incoming
diff --git a/core/src/main/java/org/axonframework/cache/Cache.java b/core/src/main/java/org/axonframework/cache/Cache.java
index 007099d6b4..65d5bcb42c 100644
--- a/core/src/main/java/org/axonframework/cache/Cache.java
+++ b/core/src/main/java/org/axonframework/cache/Cache.java
@@ -21,20 +21,21 @@
* providers can be plugged in. In future versions, this abstraction may be replaced with the javax.cache
* api, as soon as that api is final.
*
+ * @param The type of key used
+ * @param The type of value stored
+ *
* @author Allard Buijze
* @since 2.1.2
*/
-public interface Cache {
+public interface Cache {
/**
* Returns an item from the cache, or null
if no item was stored under that key
*
* @param key The key under which the item was cached
- * @param The type of key used
- * @param The type of value stored
* @return the item stored under the given key
*/
- V get(K key);
+ V get(K key);
/**
* Stores the given value
in the cache, under given key
. If an item already exists,
@@ -42,10 +43,8 @@ public interface Cache {
*
* @param key The key under which to store the item
* @param value The item to cache
- * @param The type of key used
- * @param The type of value stored
*/
- void put(K key, V value);
+ void put(K key, V value);
/**
* Stores the given value
in the cache, under given key
, if no element is yet available
@@ -53,30 +52,26 @@ public interface Cache {
*
* @param key The key under which to store the item
* @param value The item to cache
- * @param The type of key used
- * @param The type of value stored
* @return true
if no value was previously assigned to the key, false
otherwise.
*/
- boolean putIfAbsent(K key, V value);
+ boolean putIfAbsent(K key, V value);
/**
* Removes the entry stored under given key
. If no such entry exists, nothing happens.
*
* @param key The key under which the item was stored
- * @param The type of key used
* @return true
if a value was previously assigned to the key and has been removed, false
* otherwise.
*/
- boolean remove(K key);
+ boolean remove(K key);
/**
* Indicates whether there is an item stored under given key
.
*
* @param key The key to check
- * @param The type of key
* @return true
if an item is available under that key, false
otherwise.
*/
- boolean containsKey(K key);
+ boolean containsKey(K key);
/**
* Registers the given cacheEntryListener
to listen for Cache changes.
@@ -92,6 +87,11 @@ public interface Cache {
*/
void unregisterCacheEntryListener(EntryListener cacheEntryListener);
+ /**
+ * Clears the whole cache.
+ */
+ void clear();
+
/**
* Interface describing callback methods, which are invoked when changes are made in the underlying cache.
*/
diff --git a/core/src/main/java/org/axonframework/cache/EhCacheAdapter.java b/core/src/main/java/org/axonframework/cache/EhCacheAdapter.java
index 2acc312c8a..0dc8924fba 100644
--- a/core/src/main/java/org/axonframework/cache/EhCacheAdapter.java
+++ b/core/src/main/java/org/axonframework/cache/EhCacheAdapter.java
@@ -16,10 +16,11 @@
package org.axonframework.cache;
-import net.sf.ehcache.CacheException;
-import net.sf.ehcache.Ehcache;
-import net.sf.ehcache.Element;
-import net.sf.ehcache.event.CacheEventListener;
+import org.ehcache.event.CacheEvent;
+import org.ehcache.event.CacheEventListener;
+import org.ehcache.event.EventFiring;
+import org.ehcache.event.EventOrdering;
+import org.ehcache.event.EventType;
/**
* Cache implementation that delegates all calls to an EhCache instance.
@@ -27,122 +28,95 @@
* @author Allard Buijze
* @since 2.1.2
*/
-public class EhCacheAdapter extends AbstractCacheAdapter {
+public class EhCacheAdapter extends AbstractCacheAdapter> {
- private final Ehcache ehCache;
+ private final org.ehcache.Cache ehCache;
/**
* Initialize the adapter to forward all call to the given ehCache
instance
*
* @param ehCache The cache instance to forward calls to
*/
- public EhCacheAdapter(Ehcache ehCache) {
+ public EhCacheAdapter(org.ehcache.Cache ehCache) {
this.ehCache = ehCache;
}
@SuppressWarnings("unchecked")
@Override
- public V get(K key) {
- final Element element = ehCache.get(key);
- return element == null ? null : (V) element.getObjectValue();
+ public V get(K key) {
+ return ehCache.get(key);
}
@Override
- public void put(K key, V value) {
- ehCache.put(new Element(key, value));
+ public void put(K key, V value) {
+ ehCache.put(key, value);
}
@Override
- public boolean putIfAbsent(K key, V value) {
- return ehCache.putIfAbsent(new Element(key, value)) == null;
+ public boolean putIfAbsent(K key, V value) {
+ return (ehCache.putIfAbsent(key, value)) == null;
}
@Override
- public boolean remove(K key) {
- return ehCache.remove(key);
+ public boolean remove(K key) {
+ boolean hadKey = ehCache.containsKey(key);
+ ehCache.remove(key);
+ return hadKey;
}
@Override
- public boolean containsKey(K key) {
- return ehCache.isKeyInCache(key);
+ public boolean containsKey(K key) {
+ return ehCache.containsKey(key);
+ }
+
+ @Override
+ public void clear() {
+ ehCache.clear();
}
@SuppressWarnings("ClassEscapesDefinedScope")
@Override
- protected EhCacheAdapter.CacheEventListenerAdapter createListenerAdapter(EntryListener cacheEntryListener) {
- return new EhCacheAdapter.CacheEventListenerAdapter(ehCache, cacheEntryListener);
+ protected EhCacheAdapter.CacheEventListenerAdapter createListenerAdapter(EntryListener cacheEntryListener) {
+ return new EhCacheAdapter.CacheEventListenerAdapter<>(cacheEntryListener);
}
@Override
- protected void doUnregisterListener(CacheEventListener listenerAdapter) {
- ehCache.getCacheEventNotificationService().unregisterListener(listenerAdapter);
+ protected void doUnregisterListener(CacheEventListener listenerAdapter) {
+ ehCache.getRuntimeConfiguration().deregisterCacheEventListener(listenerAdapter);
}
@Override
- protected void doRegisterListener(CacheEventListener listenerAdapter) {
- ehCache.getCacheEventNotificationService().registerListener(listenerAdapter);
+ protected void doRegisterListener(CacheEventListener listenerAdapter) {
+ ehCache.getRuntimeConfiguration()
+ .registerCacheEventListener(
+ listenerAdapter,
+ EventOrdering.ORDERED,
+ EventFiring.SYNCHRONOUS,
+ EventType.CREATED,
+ EventType.values()
+ );
}
@SuppressWarnings("unchecked")
- private static class CacheEventListenerAdapter implements CacheEventListener, Cloneable {
+ private static class CacheEventListenerAdapter implements CacheEventListener {
- private Ehcache ehCache;
private EntryListener delegate;
- public CacheEventListenerAdapter(Ehcache ehCache, EntryListener delegate) {
- this.ehCache = ehCache;
+ public CacheEventListenerAdapter(EntryListener delegate) {
this.delegate = delegate;
}
@Override
- public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
- if (cache.equals(ehCache)) {
- delegate.onEntryRemoved(element.getObjectKey());
- }
- }
-
- @Override
- public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
- if (cache.equals(ehCache)) {
- delegate.onEntryCreated(element.getObjectKey(), element.getObjectValue());
- }
- }
-
- @Override
- public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
- if (cache.equals(ehCache)) {
- delegate.onEntryUpdated(element.getObjectKey(), element.getObjectValue());
- }
- }
-
- @Override
- public void notifyElementExpired(Ehcache cache, Element element) {
- if (cache.equals(ehCache)) {
- delegate.onEntryExpired(element.getObjectKey());
+ public void onEvent(CacheEvent extends K, ? extends V> event) {
+ EventType type = event.getType();
+ K key = event.getKey();
+ V newValue = event.getNewValue();
+ switch (type) {
+ case CREATED -> delegate.onEntryCreated(key, newValue);
+ case UPDATED -> delegate.onEntryUpdated(key, newValue);
+ case REMOVED -> delegate.onEntryRemoved(key);
+ case EXPIRED, EVICTED -> delegate.onEntryExpired(key);
}
}
-
- @Override
- public void notifyElementEvicted(Ehcache cache, Element element) {
- if (cache.equals(ehCache)) {
- delegate.onEntryExpired(element.getObjectKey());
- }
- }
-
- @Override
- public void notifyRemoveAll(Ehcache cache) {
- }
-
- @Override
- public void dispose() {
- }
-
- @Override
- public CacheEventListenerAdapter clone() throws CloneNotSupportedException {
- CacheEventListenerAdapter clone = (CacheEventListenerAdapter) super.clone();
- clone.ehCache = (Ehcache) ehCache.clone();
- clone.delegate = (EntryListener) delegate.clone();
- return clone;
- }
}
}
diff --git a/core/src/main/java/org/axonframework/cache/JCacheAdapter.java b/core/src/main/java/org/axonframework/cache/JCacheAdapter.java
index 5dfa444088..89e34fc142 100644
--- a/core/src/main/java/org/axonframework/cache/JCacheAdapter.java
+++ b/core/src/main/java/org/axonframework/cache/JCacheAdapter.java
@@ -16,7 +16,6 @@
package org.axonframework.cache;
-import java.io.Serializable;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
@@ -27,6 +26,7 @@
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
+import java.io.Serializable;
/**
* Cache adapter implementation that allows providers implementing the JCache abstraction to be used.
@@ -35,56 +35,61 @@
* @since 2.1.2
*/
@SuppressWarnings("unchecked")
-public class JCacheAdapter extends AbstractCacheAdapter {
+public class JCacheAdapter extends AbstractCacheAdapter> {
- private final javax.cache.Cache jCache;
+ private final javax.cache.Cache jCache;
/**
* Initialize the adapter to forward call to the given jCache
instance
*
* @param jCache The cache to forward all calls to
*/
- public JCacheAdapter(javax.cache.Cache jCache) {
+ public JCacheAdapter(javax.cache.Cache jCache) {
this.jCache = jCache;
}
@Override
- public V get(K key) {
- return (V) jCache.get(key);
+ public V get(K key) {
+ return jCache.get(key);
}
@Override
- public void put(K key, V value) {
+ public void put(K key, V value) {
jCache.put(key, value);
}
@Override
- public boolean putIfAbsent(K key, V value) {
+ public boolean putIfAbsent(K key, V value) {
return jCache.putIfAbsent(key, value);
}
@Override
- public boolean remove(K key) {
+ public boolean remove(K key) {
return jCache.remove(key);
}
@Override
- public boolean containsKey(K key) {
+ public boolean containsKey(K key) {
return jCache.containsKey(key);
}
@Override
- protected CacheEntryListenerConfiguration createListenerAdapter(EntryListener cacheEntryListener) {
- return new JCacheListenerAdapter(cacheEntryListener);
+ public void clear() {
+ jCache.clear();
+ }
+
+ @Override
+ protected CacheEntryListenerConfiguration createListenerAdapter(EntryListener cacheEntryListener) {
+ return new JCacheListenerAdapter<>(cacheEntryListener);
}
@Override
- protected void doUnregisterListener(CacheEntryListenerConfiguration listenerAdapter) {
+ protected void doUnregisterListener(CacheEntryListenerConfiguration listenerAdapter) {
jCache.deregisterCacheEntryListener(listenerAdapter);
}
@Override
- protected void doRegisterListener(CacheEntryListenerConfiguration listenerAdapter) {
+ protected void doRegisterListener(CacheEntryListenerConfiguration listenerAdapter) {
jCache.registerCacheEntryListener(listenerAdapter);
}
@@ -102,7 +107,7 @@ public JCacheListenerAdapter(EntryListener delegate) {
@Override
public void onCreated(Iterable> cacheEntryEvents)
throws CacheEntryListenerException {
- for (CacheEntryEvent event : cacheEntryEvents) {
+ for (var event : cacheEntryEvents) {
delegate.onEntryCreated(event.getKey(), event.getValue());
}
}
@@ -110,7 +115,7 @@ public void onCreated(Iterable> cacheE
@Override
public void onExpired(Iterable> iterable)
throws CacheEntryListenerException {
- for (CacheEntryEvent event : iterable) {
+ for (var event : iterable) {
delegate.onEntryExpired(event.getKey());
}
}
@@ -138,7 +143,7 @@ public boolean isSynchronous() {
@Override
public void onRemoved(Iterable> iterable)
throws CacheEntryListenerException {
- for (CacheEntryEvent event : iterable) {
+ for (var event : iterable) {
delegate.onEntryRemoved(event.getKey());
}
}
@@ -146,13 +151,13 @@ public void onRemoved(Iterable> iterab
@Override
public void onUpdated(Iterable> iterable)
throws CacheEntryListenerException {
- for (CacheEntryEvent event : iterable) {
+ for (var event : iterable) {
delegate.onEntryUpdated(event.getKey(), event.getValue());
}
}
@Override
- public CacheEntryListener create() {
+ public CacheEntryListener create() {
return this;
}
}
diff --git a/core/src/main/java/org/axonframework/cache/NoCache.java b/core/src/main/java/org/axonframework/cache/NoCache.java
index 6d7c93a3dc..7ac5b12f89 100644
--- a/core/src/main/java/org/axonframework/cache/NoCache.java
+++ b/core/src/main/java/org/axonframework/cache/NoCache.java
@@ -24,37 +24,29 @@
* @author Allard Buijze
* @since 0.3
*/
-public final class NoCache implements Cache {
-
- /**
- * Creates a singleton reference the the NoCache implementation.
- */
- public static final NoCache INSTANCE = new NoCache();
-
- private NoCache() {
- }
+public final class NoCache implements Cache {
@Override
- public V get(K key) {
+ public V get(K key) {
return null;
}
@Override
- public void put(Object key, Object value) {
+ public void put(K key, V value) {
}
@Override
- public boolean putIfAbsent(Object key, Object value) {
+ public boolean putIfAbsent(K key, V value) {
return true;
}
@Override
- public boolean remove(Object key) {
+ public boolean remove(K key) {
return false;
}
@Override
- public boolean containsKey(Object key) {
+ public boolean containsKey(K key) {
return false;
}
@@ -65,4 +57,9 @@ public void registerCacheEntryListener(EntryListener cacheEntryListener) {
@Override
public void unregisterCacheEntryListener(EntryListener cacheEntryRemovedListener) {
}
+
+ @Override
+ public void clear() {
+ // no-op
+ }
}
diff --git a/core/src/main/java/org/axonframework/cache/WeakReferenceCache.java b/core/src/main/java/org/axonframework/cache/WeakReferenceCache.java
index 2c05eaf858..505634c662 100644
--- a/core/src/main/java/org/axonframework/cache/WeakReferenceCache.java
+++ b/core/src/main/java/org/axonframework/cache/WeakReferenceCache.java
@@ -39,11 +39,11 @@
* @author Allard Buijze
* @since 2.2.1
*/
-public class WeakReferenceCache implements Cache {
+public class WeakReferenceCache implements Cache {
- private final ConcurrentMap