Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STREAM-546] JMS 11 Support excluding ServerSideFiltering #165

Merged
merged 17 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions activemq-filters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-jms-activemq-filters</artifactId>
<packaging>jar</packaging>
<name>DataStax Starlight for JMS - Apache ActiveMQ Filters (waiting for next ActiveMQ release)</name>
<name>DataStax Starlight for JMS - Apache ActiveMQ Filters</name>
<properties>
<build.dir>${project.build.directory}</build.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
Expand All @@ -44,9 +40,24 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf</artifactId>
<version>${hawtbuf.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<additionalOptions>
<additionalOption>-Xdoclint:none</additionalOption>
</additionalOptions>
</configuration>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
Expand All @@ -58,14 +69,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<filters>
<filter>
<artifact>org.apache.activemq:*</artifact>
<excludes>
<exclude>org/apache/activemq/filter/**</exclude>
</excludes>
</filter>
</filters>
<filters/>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

import java.io.IOException;

/** An exception thrown when max frame size is exceeded. */
public class MaxFrameSizeExceededException extends IOException {
private static final long serialVersionUID = -7681404582227153308L;

public MaxFrameSizeExceededException(String message) {
super(message);
}
}
27 changes: 27 additions & 0 deletions activemq-filters/src/main/java/org/apache/activemq/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

/** Represents the JMS extension methods in Apache ActiveMQ */
public interface Message extends jakarta.jms.Message {

/**
* Returns the MIME type of this mesage. This can be used in selectors to filter on the MIME types
* of the different JMS messages, or in the case of BlobMessage it allows you to create a selector
* on the MIME type of the BLOB body
*/
String getJMSXMimeType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

public interface ScheduledMessage {
/**
* The time in milliseconds that a message will wait before being scheduled to be delivered by the
* broker
*/
public static final String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
/**
* The time in milliseconds to wait after the start time to wait before scheduling the message
* again
*/
public static final String AMQ_SCHEDULED_PERIOD = "AMQ_SCHEDULED_PERIOD";
/** The number of times to repeat scheduling a message for delivery */
public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT";
/** Use a Cron tab entry to set the schedule */
public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON";
/**
* An Id that is assigned to a Scheduled Message, this value is only available once the Message is
* scheduled, Messages sent to the Browse Destination or delivered to the assigned Destination
* will have this value set.
*/
public static final String AMQ_SCHEDULED_ID = "scheduledJobId";

/**
* Special destination to send Message's to with an assigned "action" that the Scheduler should
* perform such as removing a message.
*/
public static final String AMQ_SCHEDULER_MANAGEMENT_DESTINATION = "ActiveMQ.Scheduler.Management";
/**
* Used to specify that a some operation should be performed on the Scheduled Message, the Message
* must have an assigned Id for this action to be taken.
*/
public static final String AMQ_SCHEDULER_ACTION = "AMQ_SCHEDULER_ACTION";

/** Indicates that a browse of the Scheduled Messages is being requested. */
public static final String AMQ_SCHEDULER_ACTION_BROWSE = "BROWSE";
/**
* Indicates that a Scheduled Message is to be remove from the Scheduler, the Id of the scheduled
* message must be set as a property in order for this action to have any effect.
*/
public static final String AMQ_SCHEDULER_ACTION_REMOVE = "REMOVE";
/** Indicates that all scheduled Messages should be removed. */
public static final String AMQ_SCHEDULER_ACTION_REMOVEALL = "REMOVEALL";

/**
* A property that holds the beginning of the time interval that the specified action should be
* applied within. Maps to a long value that specified time in milliseconds since UTC.
*/
public static final String AMQ_SCHEDULER_ACTION_START_TIME = "ACTION_START_TIME";
/**
* A property that holds the end of the time interval that the specified action should be applied
* within. Maps to a long value that specified time in milliseconds since UTC.
*/
public static final String AMQ_SCHEDULER_ACTION_END_TIME = "ACTION_END_TIME";
}
29 changes: 29 additions & 0 deletions activemq-filters/src/main/java/org/apache/activemq/Service.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

/**
* The core lifecyle interface for ActiveMQ components.
*
* <p>If there was a standard way to do so, it'd be good to register this interface with Spring so
* it treats the start/stop methods as those of InitializingBean and DisposableBean
*/
public interface Service {

void start() throws Exception;

void stop() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;

import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;

/**
* Keeps track of a message that is flowing through the Broker. This object may hold a hard
* reference to the message or only hold the id of the message if the message has been persisted on
* in a MessageStore.
*/
public interface MessageReference {

MessageId getMessageId();

Message getMessageHardRef();

Message getMessage();

boolean isPersistent();

Message.MessageDestination getRegionDestination();

int getRedeliveryCounter();

void incrementRedeliveryCounter();

int getReferenceCount();

int incrementReferenceCount();

int decrementReferenceCount();

ConsumerId getTargetConsumerId();

int getSize();

long getExpiration();

String getGroupID();

int getGroupSequence();

/** Returns true if this message is expired */
boolean isExpired();

/** Returns true if this message is dropped. */
boolean isDropped();

/** @return true if the message is an advisory */
boolean isAdvisory();

boolean canProcessAsExpired();
}
Loading
Loading