page_type | languages | products | description | urlFragment | ||
---|---|---|---|---|---|---|
sample |
|
|
Azure Spring Cloud Stream Binder Sample project for Service Bus Topic client library |
azure-spring-cloud-sample-service-bus-topic-binder |
This code sample demonstrates how to use the Spring Cloud Stream binder for Azure Service Bus topic. The sample app has two operating modes. One way is to expose a Restful API to receive string message, another way is to automatically provide string messages. These messages are published to a service bus topic. The sample will also consume messages from the same service bus topic.
Running this sample will be charged by Azure. You can check the usage and bill at this link.
We have several ways to config the Spring Cloud Stream Binder for Azure Service Bus Topic. You can choose anyone of them.
Important
When using the Restful API to send messages, the Active profiles must contain manual
.
-
Create Azure Service Bus namespace and topic. Please see how to create.
-
Update application.yaml.
spring: cloud: azure: servicebus: connection-string: [servicebus-namespace-connection-string] stream: function: definition: consume;supply bindings: consume-in-0: destination: [servicebus-queue-name] supply-out-0: destination: [servicebus-queue-name-same-as-above] poller: fixed-delay: 1000 initial-delay: 0
-
Create a service principal for use in by your app. Please follow create service principal from Azure CLI.
-
Create Azure Service Bus namespace and queue. Please see how to create.
-
Add Role Assignment for Service Bus. See Service principal for Azure resources with Service Bus to add role assignment for Service Bus. Assign
Contributor
role for managed identity. -
Update application-sp.yaml.
spring: cloud: azure: client-id: [service-principal-id] client-secret: [service-principal-secret] tenant-id: [tenant-id] resource-group: [resource-group] servicebus: namespace: [servicebus-namespace] stream: function: definition: consume;supply bindings: consume-in-0: destination: [servicebus-queue-name] supply-out-0: destination: [servicebus-queue-name-same-as-above] poller: fixed-delay: 1000 initial-delay: 0
We should specify
spring.profiles.active=sp
to run the Spring Boot application.
Please follow create managed identity to set up managed identity.
-
Create Azure Service Bus namespace and queue. Please see how to create.
-
Add Role Assignment for Service Bus. See Managed identities for Azure resources with Service Bus to add role assignment for Service Bus. Assign
Contributor
role for managed identity.
- Update application-mi.yaml
spring: cloud: azure: msi-enabled: true client-id: [the-id-of-managed-identity] resource-group: [resource-group] subscription-id: [subscription-id] servicebus: namespace: [servicebus-namespace] stream: function: definition: consume;supply bindings: consume-in-0: destination: [servicebus-queue-name] supply-out-0: destination: [servicebus-queue-name-same-as-above] poller: fixed-delay: 1000 initial-delay: 0
We should specify
spring.profiles.active=mi
to run the Spring Boot application. For App Service, please add a configuration entry for this.
If you update the spring.cloud.azure.managed-identity.client-id
property after deploying the app, or update the role assignment for
services, please try to redeploy the app again.
You can follow Deploy a Spring Boot JAR file to Azure App Service to deploy this application to App Service
If you want to auto create the Azure Service Bus instances, make sure you add such properties (only support the service principal and managed identity cases):
spring:
cloud:
azure:
subscription-id: [subscription-id]
auto-create-resources: true
environment: Azure
region: [region]
-
Run the
mvn spring-boot:run
in the root of the code sample to get the app running. -
Send a POST request
$ curl -X POST http://localhost:8080/messages?message=hello
or when the app runs on App Service or VM
$ curl -d -X POST https://[your-app-URL]/messages?message=hello
-
Verify in your app’s logs that a similar message was posted:
New message received: 'hello' Message 'hello' successfully checkpointed
-
Delete the resources on Azure Portal to avoid unexpected charges.
The binder provides the following configuration options:
Name | Description | Required | Default |
---|---|---|---|
spring.cloud.azure.credential-file-path | Location of azure credential file | Yes | |
spring.cloud.azure.resource-group | Name of Azure resource group | Yes | |
spring.cloud.azure.region | Region name of the Azure resource group, e.g. westus | Yes | |
spring.cloud.azure.servicebus.namespace | Service Bus Namespace. Auto creating if missing | Yes | |
spring.cloud.azure.servicebus.transportType | Service Bus transportType, supported value of AMQP and AMQP_WEB_SOCKETS |
No | AMQP |
spring.cloud.azure.servicebus.retry-Options | Service Bus retry options | No | Default value of AmqpRetryOptions |
It supports the following configurations with the format of spring.cloud.stream.servicebus.queue.bindings.<channelName>.producer
.
sync
Whether the producer should act in a synchronous manner with respect to writing messages into a stream. If true, the producer will wait for a response after a send operation.
Default: false
send-timeout
Effective only if sync
is set to true. The amount of time to wait for a response after a send operation, in milliseconds.
Default: 10000
It supports the following configurations with the format of spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer
.
checkpoint-mode
The mode in which checkpoints are updated.
RECORD
, checkpoints occur after each record successfully processed by user-defined message handler without any exception.
MANUAL
, checkpoints occur on demand by the user via the Checkpointer
. You can get Checkpointer
by Message.getHeaders.get(AzureHeaders.CHECKPOINTER)
callback.
Default: RECORD
prefetch-count
Prefetch count of underlying service bus client.
Default: 1
maxConcurrentCalls
Controls the max concurrent calls of service bus message handler and the size of fixed thread pool that handles user's business logic
Default: 1
maxConcurrentSessions
Controls the maximum number of concurrent sessions to process at any given time.
Default: 1
concurrency
When sessionsEnabled
is true, controls the maximum number of concurrent sessions to process at any given time.
When sessionsEnabled
is false, controls the max concurrent calls of service bus message handler and the size of fixed thread pool that handles user's business logic.
Deprecated, replaced with maxConcurrentSessions
when sessionsEnabled
is true and maxConcurrentCalls
when sessionsEnabled
is false
Default: 1
sessionsEnabled
Controls if is a session aware consumer. Set it to true
if is a queue with sessions enabled.
Default: false
requeueRejected
Controls if is a message that trigger any exception in consumer will be force to DLQ.
Set it to true
if a message that trigger any exception in consumer will be force to DLQ.
Set it to false
if a message that trigger any exception in consumer will be re-queued.
Default: false
receiveMode
The modes for receiving messages.
PEEK_LOCK
, received message is not deleted from the queue or subscription, instead it is temporarily locked to the receiver, making it invisible to other receivers.
RECEIVE_AND_DELETE
, received message is removed from the queue or subscription and immediately deleted.
Default: PEEK_LOCK
enableAutoComplete
Enable auto-complete and auto-abandon of received messages. 'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.
Default: false
The following table illustrates how Spring message headers are mapped to Service Bus message headers and properties. When creat a message, developers can specify the header or property of a Service Bus message by below constants.
@Autowired
private Sinks.Many<Message<String>> many;
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(SESSION_ID, "group1")
.build(),
Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
For some Service Bus headers that can be mapped to multiple Spring header constants, the priority of different Spring headers is listed.
Service Bus Message Headers and Properties | Spring Message Header Constants | Type | Priority Number (Descending priority) |
---|---|---|---|
ContentType | org.springframework.messaging.MessageHeaders.CONTENT_TYPE | String | N/A |
CorrelationId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.CORRELATION_ID | String | N/A |
MessageId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.MESSAGE_ID | String | 1 |
MessageId | com.azure.spring.integration.core.AzureHeaders.RAW_ID | String | 2 |
MessageId | org.springframework.messaging.MessageHeaders.ID | UUID | 3 |
PartitionKey | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.PARTITION_KEY | String | N/A |
ReplyTo | org.springframework.messaging.MessageHeaders.REPLY_CHANNEL | String | N/A |
ReplyToSessionId | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.REPLY_TO_SESSION_ID | String | N/A |
ScheduledEnqueueTimeUtc | com.azure.spring.integration.core.AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE | Integer | 1 |
ScheduledEnqueueTimeUtc | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.SCHEDULED_ENQUEUE_TIME | Instant | 2 |
SessionID | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.SESSION_ID | String | N/A |
TimeToLive | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.TIME_TO_LIVE | Duration | N/A |
To | com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.TO | String | N/A |