Skip to content

Latest commit

 

History

History
412 lines (314 loc) · 18 KB

README.adoc

File metadata and controls

412 lines (314 loc) · 18 KB

Spring for Apache Pulsar

This project provides a basic Spring friendly API for developing Apache Pulsar applications.

Most of the ideas in this project are borrowed from the Spring for Apache Kafka project, thus a familiarity with the Spring support available in Spring for Apache Kafka would help a lot.

  • IMPORTANT: This is a WIP project. Many of the support available at the moment are prototypes and experimental.

Building the project

Minimum Supported Versions

JDK - JDK 17

Apache Pulsar - 2.10.0

Spring Boot - 3.0.0

Spring Framework - 6.0.0

./gradlew clean build

The build will produce two artifacts — spring-pulsar and spring-pulsar-boot-autoconfigure

Spring Boot Auto Configuration

We recommend using the library spring-pulsar in association with Spring Boot and therefore should also use spring-pulsar-boot-autoconfigure. If you simply use the module spring-pulsar-boot-autoconfigure, then that will transitively include spring-pulsar.

Pulsar Client

When using spring-pulsar-boot-autoconfigure, you get the PulsarClient auto-configured. This is done through a factory bean called PulsarClientFactoryBean, which takes a configuration object PulsarClientConfiguration. By default, the application tries to connect a local Pulsar instance available at pulsar://localhost:6650.

The properties listed below are available to be set on the Pulsar client.

The same defaults expected by the Pulsar client are honored when no configuration provided for these properties.

spring.pulsar.client.serviceUrl
spring.pulsar.client.authPluginClassName
spring.pulsar.client.authParams
spring.pulsar.client.operationTimeoutMs
spring.pulsar.client.statsIntervalSeconds
spring.pulsar.client.numIoThreads
spring.pulsar.client.useTcpNoDelayuseTls
spring.pulsar.client.tlsAllowInsecureConnection
spring.pulsar.client.tlsHostnameVerificationEnable
spring.pulsar.client.concurrentLookupRequest
spring.pulsar.client.maxLookupRequest
spring.pulsar.client.maxNumberOfRejectedRequestPerConnection
spring.pulsar.client.keepAliveIntervalSeconds
spring.pulsar.client.connectionTimeoutMs
spring.pulsar.client.requestTimeoutMs
spring.pulsar.client.initialBackoffIntervalNanos
spring.pulsar.client.maxBackoffIntervalNanos;

Pulsar Producer

On the Pulsar producer side, Sprig Boot auto-configuration will provide a PulsarTemplate which is backed by a PulsarProducerFactory. We will see more details of these components later in this document, but in this section, let us look at the configuration properties available on the producer.

These properties must be prefixed with spring.pulsar.producer.

The same defaults expected by the Pulsar producer are honored when no configuration provided for these properties.

spring.pulsar.producer.topicName
spring.pulsar.producer.producerName
spring.pulsar.producer.sendTimeoutMs
spring.pulsar.producer.blockIfQueueFull
spring.pulsar.producer.maxPendingMessages
spring.pulsar.producer.maxPendingMessagesAcrossPartitions
spring.pulsar.producer.messageRoutingMode
spring.pulsar.producer.hashingScheme
spring.pulsar.producer.cryptoFailureAction
spring.pulsar.producer.batchingMaxPublishDelayMicros
spring.pulsar.producer.batchingMaxMessages
spring.pulsar.producer.batchingEnabled
spring.pulsar.producer.chunkingEnabled
spring.pulsar.producer.compressionType
spring.pulsar.producer.initialSubscriptionName
spring.pulsar.producer.accessMode

Pulsar Consumer

When it comes to Pulsar consumer, we recommend the end user applications to make use of the PulsarListener annotation. In order to use PulsarListener, you need to use the EnablePulsar annotation. When using the Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener such as the message listener infrastructure which is responsible for creating the Pulsar consumer. PulsarMessageListenerContainer uses a PulsarConsumerFactory in order to create and manage the Pulsar consumer. This consumer factory is auto-configured through Spring Boot. Following are the consumer properties that you can configure through the Boot support.

The same defaults expected by the Pulsar consumer are honored when no configuration provided for these properties.

spring.pulsar.consumer.topicNames
spring.pulsar.consumer.topicsPattern
spring.pulsar.consumer.subscriptionName
spring.pulsar.consumer.subscriptionType
spring.pulsar.consumer.receiverQueueSize
spring.pulsar.consumer.acknowledgementsGroupTimeMicros
spring.pulsar.consumer.negativeAckRedeliveryDelayMicros
spring.pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
spring.pulsar.consumer.consumerName
spring.pulsar.consumer.ackTimeoutMillis
spring.pulsar.consumer.tickDurationMillis
spring.pulsar.consumer.priorityLevel
spring.pulsar.consumer.cryptoFailureAction
spring.pulsar.consumer.properties
spring.pulsar.consumer.readCompacted
spring.pulsar.consumer.subscriptionInitialPosition
spring.pulsar.consumer.patternAutoDiscoveryPeriod
spring.pulsar.consumer.regexSubscriptionMode
spring.pulsar.consumer.autoUpdatePartitions
spring.pulsar.consumer.replicateSubscriptionState
spring.pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
spring.pulsar.consumer.maxPendingChunkedMessageexpireTimeOfIncompleteChunkedMessageMillis
spring.pulsar.consumer.maxPendingChunkedMessageexpireTimeOfIncompleteChunkedMessageMillis

Sample Applications

In this section, we will show a number of various applications to demonstrate how you can use the Spring for Apache Pulsar library using Spring Boot.

Here are the maven coordinates of the artifacts needed for using this library.

<dependency>
    <groupId>org.springframework.pulsar</groupId>
    <artifactId>spring-pulsar</artifactId>
    <version>0.1.0-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>org.springframework.pulsar</groupId>
    <artifactId>spring-pulsar-boot-autoconfiguration</artifactId>
    <version>0.1.0-SNAPSHOT</version>
</dependency>

If you simply include the spring-pulsar-boot-autoconfiguration module, then that will transitively include the spring-pulsar module.

Quick Primer on Publishing to and Consuming from an Apache Pulsar Topic

In order to publish to a Pulsar topic, the easiest way is to use the PulsarTemplate API. Similarly, the easiest way to consume from a Pulsar topic is to use the PulsarListener annotation.

The following is a complete Spring Boot application that publishes to a Pulsar topic called hello-pulsar and then consumes from it.

@SpringBootApplication
public class PulsarBootHelloWorld {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootHelloWorld.class, args);
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar");
		return args -> {
			for (int i = 0; i < 10; i ++) {
				pulsarTemplate.send("This is message " + (i + 1));
			}

		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}
}

Although this is a very trivial application, it conveys several important concepts nonetheless. Let’s take a look at them.

PulsarTemplate is autoconfigured by Spring Boot, and we are injecting that in the application. There is a producer factory behind the scenes and properties can be provided to it using spring.pulsar.producer…​ (See above for more details). When injecting, note that, we can provide a specific type - String in this case. If we have a need for a different PulsarTemplate with a different type in the same application, then we can add another injection with that type. PulsarTemplate currently requires the application to set a default topic. Once the topic is set, then we can call the various send methods on it. In this example, we are calling the very basic send method that calls the synchronous send from the Pulsar producer internally that returns the MessageId. We are ignoring the return value in this quick sample application. PulsarTemplate also provides API’s for sending asynchronously among other send methods. We will look at the details of it, later on in the reference documentation.

We use the PulsarListener annotation to consume from the topic. We are providing the subscription and topic names as annotation arguments. These are optional and the application may prefer to set them using the spring.pulsar.consumer.. properties provided above. However, if properties are provided as annotation arguments, they get precedence. There are four consumer level properties that you can set on the PulsarListener annotation - they are topics, topicPattern, subscriptionName and subscriptionType. When providing on the annotation, they get preference regardless of what is set through consumer properties.

By default, PulsarListener uses an exclusive subscription type and this can be changed by using the subscriptionType property on the annotation or through the consumer property.

PulsarListner internally creates a message listener container. The message listener container is responsible for creating the Pulsar Consumer using a consumer factory. When the consumer receives data, the container calls a message listener. This message listener is an adapter around the user provided method - the listen method in this case. The message listener container delegates to the adapter with the received data and the adapter then invokes the user method.

In order to complete the discussion about this sample application, we also need to chime in on how data conversion is done. In the user method, we receive the data as String, but we don’t specify any schema types. Internally, the framework by default relies on Pulsar’s schema mechanism to convert the data to the required type. The framework detects that you are expecting the String type and then infer the schema type based on that information. Then it provides that schema to the consumer. For all the primitive types in Java, the framework does this inference. For any complex types, such as JSON, AVRO etc. the framework cannot do this inference and the user needs to provide the schema type on the annotation using the schemaType property.

In the example above, and in the ones that follows below, we provide both producer and consumer as part of the same application. However, in real apps, you might find it as convenient to separate them especially if you are following a microservice style architecture.

Partitioned topics - Publishing and Consuming.

In the sample below, we are publishing to a topic called hello-pulsar-partitioned. It is a topic that is partitioned and for this sample we assume that the topic is already created with three partitions.

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.messageRoutingMode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

A few things require explanation in the application above. We are publishing to a partitioned topic and we would like to publish some data segment to a specific partition. If you leave it to Pulsar’s default, it follows a round-robin mode of partition assignments, and we would like to override that. In order to do that, we are providing a message router object with the send method. Look at the three message routers implemented. FooRouter always sends data to partition 0, BarRouter to partition 1 and BuzzRouter to partition 2. Also note that, we are now using the sendAsync method of PulsarTemplate that returns a CompletableFuture. When running the application, we also need to set the messageRoutingMode on the producer to CustomPartition (spring.pulsar.producer.messageRoutingMode).

On the consumer side, we are using a PulsarListener with the exclusive subscription type. This means that data from all the partitions will end up in the same consumer and there is no ordering guarantee.

What can we do if we want each partition to be consumed by a single distinct consumer? We can switch to the failover subscription mode and add three separate consumers.

Here is an example.

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = "failover")
public void liste3n(String foo) {
    System.out.println("Message Received 3: " + foo);
}

When following this approach, you can see that a single partition always gets consumed by a dedicated consumer.

In the similar vein, if you want to use Pulsar’s shared consumer type, you can use the subscription type shared. Keep in mind though, that when using the shared mode, you lose any ordering guarantees as a single consumer may receive messages from all the partitions before another consumer gets a chance.

Here is an example.

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "shared")
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "shared")
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

Consuming Messages in Batch Mode

In the examples above, we used PulsarListener in record mode, i.e. we receive a single message on each method invocation. Sometimes, for various reasons, we may want to receive Pulsar messages in batch mode. Here is a PulsarListener that is enabled with batch consumption.

@PulsarListener(subscriptionName = "batch-subscription", topics = "hello-pulsar", batch = "true")
public void listen(List<String> records) {
    System.out.println("Message Received from batch: " + records);
}

In order to enable batch consumption, you need to set the batch property to true on PulsarListener. Then you can receive the records as a List type. Based on the actual type that the List holds, the framework tries to infer the schema to use. If the List contains a complex type, then the schemaType still needs to be provided on PulsarListener.

Accessing the Pulsar Message Object

In your PulsarListener method, you can receive the record directly as a Pulsar Message instead of the actual payload type. Here is an example.

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println("Data Received: " + message.getValue());
}

Accessing the Pulsar Messages Object

When consuming messages in batch mode using PulsarListener, instead of receiving them as a `List, you can receive them as Pulsar Messages type. Here is an example.

@PulsarListener(subscriptionName = "batch-subscription", topics = "hello-pulsar", batch = "true")
public void listen(org.apache.pulsar.client.api.Messages<String> messages) {
    // Iterate on the messages
    // Each iteration gives access to a org.apache.pulsar.client.api.Message object
}

Accessing the Pulsar Consumer Object

Sometimes, it is necessary to gain direct access to the Pulsar Consumer object. Here is how you may do so.

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}

When accessing the Consumer object this way, make sure NOT to invoke any operations that would change the Consumer’s cursor position by invoking any receive methods. All such operations must be done by the container.

Specify schema information

As indicated above, for normal Java types (the primitive ones), Spring Pulsar framework can infer the proper Schema to use on the PulsarListener. However, for more complex types such as JSON or AVRO, you need to specify the schema type on the annotation. Here is how you provide that.

@PulsarListener(subscriptionName = "json-subscription", topics = "hello-pulsar-json", schemaType = SchemaType.JSON)
public void listen(Foo foo) {
    System.out.println("Message received: " + foo);
}

On the producer side also, for the Java primitive types, the framework can infer the Schema, but for any other types, you need set that on the PulsarTemmplate.

PulsarTemplate API details

More support to come — stay tuned…​