Skip to content

Commit

Permalink
spike/CAS-1341-CAS2-Prison-Transfer-Event-Emit-POC
Browse files Browse the repository at this point in the history
This is a POC branch to demonstrate how to set up a listener for the domain events topic.

Message Listener listens to the inboundQueue which is filtered by the allocation and handover event types and prints out the message.

You can run it locally and send messages using the scripts found in test-messages in the test resources folder.

It has a simple Integration test that checks that the listener was called and also a tests to check the health of the queue.
  • Loading branch information
besscerule committed Jan 21, 2025
1 parent 2bea89b commit f422f8d
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 1 deletion.
4 changes: 4 additions & 0 deletions helm_deploy/hmpps-approved-premises-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ generic-service:
elasticache-redis:
SPRING_DATA_REDIS_HOST: "primary_endpoint_address"
SPRING_DATA_REDIS_PASSWORD: "auth_token"
inbound-queue-sqs-instance-output:
HMPPS_SQS_QUEUES_INBOUNDQUEUE_QUEUE_NAME: "inboundqueue"
inbound-queue-sqs-dl-instance-output:
HMPPS_SQS_QUEUES_INBOUNDQUEUE_DLQ_NAME: "mainDlq"

allowlist:
office: "217.33.148.210/32"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package uk.gov.justice.digital.hmpps.approvedpremisesapi.service.cas2

import io.awspring.cloud.sqs.annotation.SqsListener
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

@Service
class MessageListener {
private companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}

@SqsListener("inboundqueue", factory = "hmppsQueueContainerFactoryProxy")
fun processMessage(message: Message) {
log.info("received event: {}", message)

print(message)
}
}

@SuppressWarnings("ConstructorParameterNaming")
data class MessageAttribute(val Value: String, val Type: String)
typealias EventType = MessageAttribute

class MessageAttributes() : HashMap<String, MessageAttribute>() {
constructor(attribute: EventType) : this() {
put(attribute.Value, attribute)
}
}

@SuppressWarnings("ConstructorParameterNaming")
data class Message(
val Message: String,
val MessageId: String,
val MessageAttributes: MessageAttributes,
)
6 changes: 6 additions & 0 deletions src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ spring:

hmpps.sqs:
provider: localstack
queues:
inboundqueue:
queueName: inbound-queue
dlqName: inbound-dlq
subscribeTopicId: domainevents
subscribeFilter: '{"eventType":["offender-management.allocation.changed", "offender-management.handover.changed"]}'
topics:
domainevents:
arn: arn:aws:sns:eu-west-2:000000000000:domainevents
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.cas2

import com.ninjasquad.springmockk.SpykBean
import io.mockk.verify
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.IntegrationTestBase
import uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.setup.putMessageOnQueue
import uk.gov.justice.digital.hmpps.approvedpremisesapi.service.cas2.MessageListener
import uk.gov.justice.hmpps.sqs.HmppsQueueService
import uk.gov.justice.hmpps.sqs.MissingQueueException
import java.util.concurrent.TimeUnit

class InboundQueueTest : IntegrationTestBase() {

@Autowired
private lateinit var hmppsQueueService: HmppsQueueService

@SpykBean
private lateinit var mockMessageListener: MessageListener

private val inboundQueue by lazy {
hmppsQueueService.findByQueueId("inboundqueue") ?: throw MissingQueueException("HmppsQueue inboundqueue not found")
}

private val inboundQueueClient by lazy { inboundQueue.sqsClient }

fun putMessageOnInboundQueue() = putMessageOnQueue(
inboundQueueClient,
inboundQueue.queueUrl,
)

@Test
fun `Put Message on Inbound Queue Request is successful`() {
putMessageOnInboundQueue()
TimeUnit.MILLISECONDS.sleep(10000)
verify(exactly = 1) { mockMessageListener.processMessage(any()) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.health

import org.junit.jupiter.api.Test
import org.springframework.test.context.bean.override.mockito.MockitoSpyBean
import uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.IntegrationTestBase
import uk.gov.justice.hmpps.sqs.HmppsSqsProperties
import uk.gov.justice.hmpps.sqs.MissingQueueException

class QueueHealthCheckTest : IntegrationTestBase() {

@MockitoSpyBean
private lateinit var hmppsSqsPropertiesSpy: HmppsSqsProperties

fun HmppsSqsProperties.inboundQueueConfig() = queues["inboundqueue"]
?: throw MissingQueueException("inboundqueue has not been loaded from configuration properties")

@Test
fun `Inbound queue health ok`() {
webTestClient.get().uri("/health").exchange().expectStatus().isOk.expectBody().jsonPath("status").isEqualTo("UP")
.jsonPath("components.inboundqueue-health.status").isEqualTo("UP")
.jsonPath("components.inboundqueue-health.details.queueName")
.isEqualTo(hmppsSqsPropertiesSpy.inboundQueueConfig().queueName)
.jsonPath("components.inboundqueue-health.details.messagesOnQueue").isEqualTo(0)
.jsonPath("components.inboundqueue-health.details.messagesInFlight").isEqualTo(0)
.jsonPath("components.inboundqueue-health.details.dlqName")
.isEqualTo(hmppsSqsPropertiesSpy.inboundQueueConfig().dlqName)
.jsonPath("components.inboundqueue-health.details.dlqStatus").isEqualTo("UP")
.jsonPath("components.inboundqueue-health.details.messagesOnDlq").isEqualTo(0)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.setup

import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import java.nio.file.Files
import java.nio.file.Paths

fun putMessageOnQueue(client: SqsAsyncClient, queueUrl: String) {
val message = getMessage()
val sendMessageRequest = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build()
client.sendMessage(sendMessageRequest).get()
}

private fun getMessage(): String {
return Files.readString(Paths.get("src/test/resources/fixtures/sqs/deallocation.json"))
}
8 changes: 7 additions & 1 deletion src/test/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ hmpps.sqs:
dlqName: ${random.uuid}
queueName: ${random.uuid}
subscribeTopicId: domainevents

inboundqueue:
dlqName: ${random.uuid}
queueName: ${random.uuid}
subscribeTopicId: domainevents
subscribeFilter: '{"eventType":["offender-management.allocation.changed", "offender-management.handover.changed"]}'
dlqMaxReceiveCount: 1
errorVisibilityTimeout: 0
domain-events:
cas1:
emit-enabled: true
Expand Down
13 changes: 13 additions & 0 deletions src/test/resources/fixtures/sqs/deallocation.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"Type": "Notification",
"MessageId": "63ba0b4b-340e-4853-b343-6429965a3961",
"TopicArn": "dynamically set per test",
"Message": "{\"eventType\":\"offender-management.allocation.changed\",\"detailUrl\":\"http://localhost:8080/api/pom-allocation/A0123BY/3\",\"occurredAt\":\"2023-10-10T14:25:19.102Z\",\"personReference\":{\"identifiers\":[{\"type\":\"NOMS\",\"value\":\"A0123BY\"}]}}",
"Timestamp": "2023-10-10T14:25:20.000Z",
"MessageAttributes": {
"eventType": {
"Type": "String",
"Value": "offender-management.allocation.changed"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
aws --endpoint-url=http://localhost:4566 sns publish \
--topic-arn arn:aws:sns:eu-west-2:000000000000:domainevents \
--message-attributes '{"eventType" : { "DataType":"String", "StringValue":"offender-management.allocation.changed"}}' \
--message '{"type":"offender-management.allocation.changed","id":"1","contents":"allocation_message_contents"}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
aws --endpoint-url=http://localhost:4566 sns publish \
--topic-arn arn:aws:sns:eu-west-2:000000000000:domainevents \
--message-attributes '{"eventType" : { "DataType":"String", "StringValue":"offender-management.handover.changed"}}' \
--message '{"type":"offender-management.handover.changed","id":"2","contents":"handover_message_contents"}'

0 comments on commit f422f8d

Please sign in to comment.