Skip to content

Commit

Permalink
[STREAM-546] JMS 11 Support excluding ServerSideFiltering (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep-ctds authored Jan 7, 2025
1 parent 7d20a03 commit be15cbd
Show file tree
Hide file tree
Showing 78 changed files with 12,328 additions and 66 deletions.
30 changes: 17 additions & 13 deletions activemq-filters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-jms-activemq-filters</artifactId>
<packaging>jar</packaging>
<name>DataStax Starlight for JMS - Apache ActiveMQ Filters (waiting for next ActiveMQ release)</name>
<name>DataStax Starlight for JMS - Apache ActiveMQ Filters</name>
<properties>
<build.dir>${project.build.directory}</build.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
Expand All @@ -44,9 +40,24 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf</artifactId>
<version>${hawtbuf.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<additionalOptions>
<additionalOption>-Xdoclint:none</additionalOption>
</additionalOptions>
</configuration>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
Expand All @@ -58,14 +69,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<filters>
<filter>
<artifact>org.apache.activemq:*</artifact>
<excludes>
<exclude>org/apache/activemq/filter/**</exclude>
</excludes>
</filter>
</filters>
<filters/>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

import java.io.IOException;

/** An exception thrown when max frame size is exceeded. */
public class MaxFrameSizeExceededException extends IOException {
private static final long serialVersionUID = -7681404582227153308L;

public MaxFrameSizeExceededException(String message) {
super(message);
}
}
27 changes: 27 additions & 0 deletions activemq-filters/src/main/java/org/apache/activemq/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

/** Represents the JMS extension methods in Apache ActiveMQ */
public interface Message extends jakarta.jms.Message {

/**
* Returns the MIME type of this mesage. This can be used in selectors to filter on the MIME types
* of the different JMS messages, or in the case of BlobMessage it allows you to create a selector
* on the MIME type of the BLOB body
*/
String getJMSXMimeType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

public interface ScheduledMessage {
/**
* The time in milliseconds that a message will wait before being scheduled to be delivered by the
* broker
*/
public static final String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
/**
* The time in milliseconds to wait after the start time to wait before scheduling the message
* again
*/
public static final String AMQ_SCHEDULED_PERIOD = "AMQ_SCHEDULED_PERIOD";
/** The number of times to repeat scheduling a message for delivery */
public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT";
/** Use a Cron tab entry to set the schedule */
public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON";
/**
* An Id that is assigned to a Scheduled Message, this value is only available once the Message is
* scheduled, Messages sent to the Browse Destination or delivered to the assigned Destination
* will have this value set.
*/
public static final String AMQ_SCHEDULED_ID = "scheduledJobId";

/**
* Special destination to send Message's to with an assigned "action" that the Scheduler should
* perform such as removing a message.
*/
public static final String AMQ_SCHEDULER_MANAGEMENT_DESTINATION = "ActiveMQ.Scheduler.Management";
/**
* Used to specify that a some operation should be performed on the Scheduled Message, the Message
* must have an assigned Id for this action to be taken.
*/
public static final String AMQ_SCHEDULER_ACTION = "AMQ_SCHEDULER_ACTION";

/** Indicates that a browse of the Scheduled Messages is being requested. */
public static final String AMQ_SCHEDULER_ACTION_BROWSE = "BROWSE";
/**
* Indicates that a Scheduled Message is to be remove from the Scheduler, the Id of the scheduled
* message must be set as a property in order for this action to have any effect.
*/
public static final String AMQ_SCHEDULER_ACTION_REMOVE = "REMOVE";
/** Indicates that all scheduled Messages should be removed. */
public static final String AMQ_SCHEDULER_ACTION_REMOVEALL = "REMOVEALL";

/**
* A property that holds the beginning of the time interval that the specified action should be
* applied within. Maps to a long value that specified time in milliseconds since UTC.
*/
public static final String AMQ_SCHEDULER_ACTION_START_TIME = "ACTION_START_TIME";
/**
* A property that holds the end of the time interval that the specified action should be applied
* within. Maps to a long value that specified time in milliseconds since UTC.
*/
public static final String AMQ_SCHEDULER_ACTION_END_TIME = "ACTION_END_TIME";
}
29 changes: 29 additions & 0 deletions activemq-filters/src/main/java/org/apache/activemq/Service.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

/**
* The core lifecyle interface for ActiveMQ components.
*
* <p>If there was a standard way to do so, it'd be good to register this interface with Spring so
* it treats the start/stop methods as those of InitializingBean and DisposableBean
*/
public interface Service {

void start() throws Exception;

void stop() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;

import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;

/**
* Keeps track of a message that is flowing through the Broker. This object may hold a hard
* reference to the message or only hold the id of the message if the message has been persisted on
* in a MessageStore.
*/
public interface MessageReference {

MessageId getMessageId();

Message getMessageHardRef();

Message getMessage();

boolean isPersistent();

Message.MessageDestination getRegionDestination();

int getRedeliveryCounter();

void incrementRedeliveryCounter();

int getReferenceCount();

int incrementReferenceCount();

int decrementReferenceCount();

ConsumerId getTargetConsumerId();

int getSize();

long getExpiration();

String getGroupID();

int getGroupSequence();

/** Returns true if this message is expired */
boolean isExpired();

/** Returns true if this message is dropped. */
boolean isDropped();

/** @return true if the message is an advisory */
boolean isAdvisory();

boolean canProcessAsExpired();
}
Loading

0 comments on commit be15cbd

Please sign in to comment.