Skip to content

Commit ec39578

Browse files
committed
Add option for deleting all scheduled messages on startup
1 parent aa842da commit ec39578

File tree

6 files changed

+99
-1
lines changed

6 files changed

+99
-1
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

+14
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public class BrokerService implements Service {
184184
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
185185
// to other jms messaging systems
186186
private boolean deleteAllMessagesOnStartup;
187+
private boolean deleteAllScheduledMessagesOnStartup = false;
187188
private boolean advisorySupport = true;
188189
private boolean anonymousProducerAdvisorySupport = false;
189190
private URI vmConnectorURI;
@@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar
16301631
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
16311632
}
16321633

1634+
/**
1635+
* Sets whether all scheduled messages are deleted on startup
1636+
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1637+
*/
1638+
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) {
1639+
this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup;
1640+
}
1641+
1642+
public boolean isDeleteAllScheduledMessagesOnStartup() {
1643+
return deleteAllScheduledMessagesOnStartup;
1644+
}
1645+
16331646
public URI getVmConnectorURI() {
16341647
if (vmConnectorURI == null) {
16351648
try {
@@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception {
24402453
if (isSchedulerSupport()) {
24412454
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
24422455
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
2456+
sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup);
24432457
if (isUseJmx()) {
24442458
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
24452459
try {

activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java

+17
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
7272
private final JobSchedulerStore store;
7373
private JobScheduler scheduler;
7474
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
75+
private boolean deleteAllScheduledMessagesOnStartup;
7576

7677
public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
7778
super(next);
@@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception {
212213
public void start() throws Exception {
213214
this.started.set(true);
214215
getInternalScheduler();
216+
if (deleteAllScheduledMessagesOnStartup) {
217+
deleteAllScheduledMessages();
218+
}
215219
super.start();
216220
}
217221

@@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu
364368
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
365369
}
366370

371+
private void deleteAllScheduledMessages() throws Exception {
372+
LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided");
373+
getInternalScheduler().removeAllJobs();
374+
}
375+
367376
@Override
368377
public void scheduledJob(String id, ByteSequence job) {
369378
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
@@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() {
487496
public void setMaxRepeatAllowed(int maxRepeatAllowed) {
488497
this.maxRepeatAllowed = maxRepeatAllowed;
489498
}
499+
500+
public boolean getDeleteAllScheduledMessagesOnStartup() {
501+
return deleteAllScheduledMessagesOnStartup;
502+
}
503+
504+
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) {
505+
this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup;
506+
}
490507
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package org.apache.activemq.broker.scheduler;
2+
3+
import jakarta.jms.Connection;
4+
import jakarta.jms.Message;
5+
import jakarta.jms.MessageConsumer;
6+
import jakarta.jms.MessageListener;
7+
import jakarta.jms.MessageProducer;
8+
import jakarta.jms.Session;
9+
import jakarta.jms.TextMessage;
10+
import org.apache.activemq.ScheduledMessage;
11+
import org.apache.activemq.broker.BrokerService;
12+
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
13+
import org.junit.Test;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.TimeUnit;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
public class JmsSchedulerDeleteAllMessageOptionTest extends JobSchedulerTestSupport {
23+
24+
private static final transient Logger LOG = LoggerFactory.getLogger(JmsSchedulerDeleteAllMessageOptionTest.class);
25+
26+
@Override
27+
protected boolean shouldDeleteAllScheduledMessages() throws Exception {
28+
return true;
29+
}
30+
31+
@Test
32+
public void testDeleteAllMessageOnRestart() throws Exception {
33+
// Send a message delayed by 8 seconds
34+
Connection connection = createConnection();
35+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
36+
connection.start();
37+
long time_ms = 10 * 1000;
38+
MessageProducer producer = session.createProducer(destination);
39+
TextMessage message = session.createTextMessage("test msg");
40+
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time_ms);
41+
producer.send(message);
42+
producer.close();
43+
// Shutdown broker
44+
restartBroker(RestartType.NORMAL);
45+
// Make sure the consumer won't get the message
46+
connection = createConnection();
47+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
48+
MessageConsumer consumer = session.createConsumer(destination);
49+
final int COUNT = 1;
50+
final CountDownLatch latch = new CountDownLatch(COUNT);
51+
consumer.setMessageListener(new MessageListener() {
52+
@Override
53+
public void onMessage(Message message) {
54+
latch.countDown();
55+
}
56+
});
57+
latch.await(20, TimeUnit.SECONDS);
58+
assertEquals(latch.getCount(), COUNT);
59+
connection.close();
60+
}
61+
}

activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java

+5
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ protected BrokerService createBroker(boolean delete) throws Exception {
116116
answer.setSchedulerDirectoryFile(schedulerDirectory);
117117
answer.setSchedulerSupport(true);
118118
answer.setUseJmx(isUseJmx());
119+
answer.setDeleteAllScheduledMessagesOnStartup(shouldDeleteAllScheduledMessages());
119120
return answer;
120121
}
121122

@@ -136,4 +137,8 @@ protected void restartBroker(RestartType restartType) throws Exception {
136137
broker.start();
137138
broker.waitUntilStarted();
138139
}
140+
141+
protected boolean shouldDeleteAllScheduledMessages() throws Exception {
142+
return false;
143+
}
139144
}

activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception {
170170
assertEquals("Broker Config Error (persistent)", false, broker.isPersistent());
171171
assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook());
172172
assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup());
173+
assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup());
173174
LOG.info("Success");
174175

175176
// Check specific vm transport

activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
2727
useLoggingForShutdownErrors="true" useJmx="true"
2828
persistent="false" vmConnectorURI="vm://javacoola"
29-
useShutdownHook="false" deleteAllMessagesOnStartup="true">
29+
useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">
3030

3131
<!--
3232
|| NOTE this config file is used for unit testing the configuration mechanism

0 commit comments

Comments
 (0)