diff --git a/helm_deploy/hmpps-approved-premises-api/values.yaml b/helm_deploy/hmpps-approved-premises-api/values.yaml index c5f9c01da2..7afc13b257 100644 --- a/helm_deploy/hmpps-approved-premises-api/values.yaml +++ b/helm_deploy/hmpps-approved-premises-api/values.yaml @@ -64,6 +64,10 @@ generic-service: elasticache-redis: SPRING_DATA_REDIS_HOST: "primary_endpoint_address" SPRING_DATA_REDIS_PASSWORD: "auth_token" + inbound-queue-sqs-instance-output: + HMPPS_SQS_QUEUES_INBOUNDQUEUE_QUEUE_NAME: "inboundqueue" + inbound-queue-sqs-dl-instance-output: + HMPPS_SQS_QUEUES_INBOUNDQUEUE_DLQ_NAME: "mainDlq" allowlist: office: "217.33.148.210/32" diff --git a/src/main/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/service/cas2/MessageListener.kt b/src/main/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/service/cas2/MessageListener.kt new file mode 100644 index 0000000000..97d6cec696 --- /dev/null +++ b/src/main/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/service/cas2/MessageListener.kt @@ -0,0 +1,37 @@ +package uk.gov.justice.digital.hmpps.approvedpremisesapi.service.cas2 + +import io.awspring.cloud.sqs.annotation.SqsListener +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service + +@Service +class MessageListener { + private companion object { + private val log: Logger = LoggerFactory.getLogger(this::class.java) + } + + @SqsListener("inboundqueue", factory = "hmppsQueueContainerFactoryProxy") + fun processMessage(message: Message) { + log.info("received event: {}", message) + + print(message) + } +} + +@SuppressWarnings("ConstructorParameterNaming") +data class MessageAttribute(val Value: String, val Type: String) +typealias EventType = MessageAttribute + +class MessageAttributes() : HashMap() { + constructor(attribute: EventType) : this() { + put(attribute.Value, attribute) + } +} + +@SuppressWarnings("ConstructorParameterNaming") +data class Message( + val Message: String, + val MessageId: String, + val MessageAttributes: MessageAttributes, +) diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index b4247510bc..a25f3416e0 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -19,6 +19,12 @@ spring: hmpps.sqs: provider: localstack + queues: + inboundqueue: + queueName: inbound-queue + dlqName: inbound-dlq + subscribeTopicId: domainevents + subscribeFilter: '{"eventType":["offender-management.allocation.changed", "offender-management.handover.changed"]}' topics: domainevents: arn: arn:aws:sns:eu-west-2:000000000000:domainevents diff --git a/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/cas2/InboundQueueTest.kt b/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/cas2/InboundQueueTest.kt new file mode 100644 index 0000000000..a79bbca834 --- /dev/null +++ b/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/cas2/InboundQueueTest.kt @@ -0,0 +1,39 @@ +package uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.cas2 + +import com.ninjasquad.springmockk.SpykBean +import io.mockk.verify +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.IntegrationTestBase +import uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.setup.putMessageOnQueue +import uk.gov.justice.digital.hmpps.approvedpremisesapi.service.cas2.MessageListener +import uk.gov.justice.hmpps.sqs.HmppsQueueService +import uk.gov.justice.hmpps.sqs.MissingQueueException +import java.util.concurrent.TimeUnit + +class InboundQueueTest : IntegrationTestBase() { + + @Autowired + private lateinit var hmppsQueueService: HmppsQueueService + + @SpykBean + private lateinit var mockMessageListener: MessageListener + + private val inboundQueue by lazy { + hmppsQueueService.findByQueueId("inboundqueue") ?: throw MissingQueueException("HmppsQueue inboundqueue not found") + } + + private val inboundQueueClient by lazy { inboundQueue.sqsClient } + + fun putMessageOnInboundQueue() = putMessageOnQueue( + inboundQueueClient, + inboundQueue.queueUrl, + ) + + @Test + fun `Put Message on Inbound Queue Request is successful`() { + putMessageOnInboundQueue() + TimeUnit.MILLISECONDS.sleep(10000) + verify(exactly = 1) { mockMessageListener.processMessage(any()) } + } +} diff --git a/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/health/QueueHealthCheckTest.kt b/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/health/QueueHealthCheckTest.kt new file mode 100644 index 0000000000..afdbf22f53 --- /dev/null +++ b/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/health/QueueHealthCheckTest.kt @@ -0,0 +1,30 @@ +package uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.health + +import org.junit.jupiter.api.Test +import org.springframework.test.context.bean.override.mockito.MockitoSpyBean +import uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.IntegrationTestBase +import uk.gov.justice.hmpps.sqs.HmppsSqsProperties +import uk.gov.justice.hmpps.sqs.MissingQueueException + +class QueueHealthCheckTest : IntegrationTestBase() { + + @MockitoSpyBean + private lateinit var hmppsSqsPropertiesSpy: HmppsSqsProperties + + fun HmppsSqsProperties.inboundQueueConfig() = queues["inboundqueue"] + ?: throw MissingQueueException("inboundqueue has not been loaded from configuration properties") + + @Test + fun `Inbound queue health ok`() { + webTestClient.get().uri("/health").exchange().expectStatus().isOk.expectBody().jsonPath("status").isEqualTo("UP") + .jsonPath("components.inboundqueue-health.status").isEqualTo("UP") + .jsonPath("components.inboundqueue-health.details.queueName") + .isEqualTo(hmppsSqsPropertiesSpy.inboundQueueConfig().queueName) + .jsonPath("components.inboundqueue-health.details.messagesOnQueue").isEqualTo(0) + .jsonPath("components.inboundqueue-health.details.messagesInFlight").isEqualTo(0) + .jsonPath("components.inboundqueue-health.details.dlqName") + .isEqualTo(hmppsSqsPropertiesSpy.inboundQueueConfig().dlqName) + .jsonPath("components.inboundqueue-health.details.dlqStatus").isEqualTo("UP") + .jsonPath("components.inboundqueue-health.details.messagesOnDlq").isEqualTo(0) + } +} diff --git a/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/setup/QueueHelper.kt b/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/setup/QueueHelper.kt new file mode 100644 index 0000000000..9dfac411a7 --- /dev/null +++ b/src/test/kotlin/uk/gov/justice/digital/hmpps/approvedpremisesapi/integration/setup/QueueHelper.kt @@ -0,0 +1,16 @@ +package uk.gov.justice.digital.hmpps.approvedpremisesapi.integration.setup + +import software.amazon.awssdk.services.sqs.SqsAsyncClient +import software.amazon.awssdk.services.sqs.model.SendMessageRequest +import java.nio.file.Files +import java.nio.file.Paths + +fun putMessageOnQueue(client: SqsAsyncClient, queueUrl: String) { + val message = getMessage() + val sendMessageRequest = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build() + client.sendMessage(sendMessageRequest).get() +} + +private fun getMessage(): String { + return Files.readString(Paths.get("src/test/resources/fixtures/sqs/deallocation.json")) +} diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index 29192597d6..136b699206 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -62,7 +62,13 @@ hmpps.sqs: dlqName: ${random.uuid} queueName: ${random.uuid} subscribeTopicId: domainevents - + inboundqueue: + dlqName: ${random.uuid} + queueName: ${random.uuid} + subscribeTopicId: domainevents + subscribeFilter: '{"eventType":["offender-management.allocation.changed", "offender-management.handover.changed"]}' + dlqMaxReceiveCount: 1 + errorVisibilityTimeout: 0 domain-events: cas1: emit-enabled: true diff --git a/src/test/resources/fixtures/sqs/deallocation.json b/src/test/resources/fixtures/sqs/deallocation.json new file mode 100644 index 0000000000..2258e50918 --- /dev/null +++ b/src/test/resources/fixtures/sqs/deallocation.json @@ -0,0 +1,13 @@ +{ + "Type": "Notification", + "MessageId": "63ba0b4b-340e-4853-b343-6429965a3961", + "TopicArn": "dynamically set per test", + "Message": "{\"eventType\":\"offender-management.allocation.changed\",\"detailUrl\":\"http://localhost:8080/api/pom-allocation/A0123BY/3\",\"occurredAt\":\"2023-10-10T14:25:19.102Z\",\"personReference\":{\"identifiers\":[{\"type\":\"NOMS\",\"value\":\"A0123BY\"}]}}", + "Timestamp": "2023-10-10T14:25:20.000Z", + "MessageAttributes": { + "eventType": { + "Type": "String", + "Value": "offender-management.allocation.changed" + } + } +} \ No newline at end of file diff --git a/src/test/resources/test-messages/offender-management-allocation-changed.sh b/src/test/resources/test-messages/offender-management-allocation-changed.sh new file mode 100644 index 0000000000..4855c06c8e --- /dev/null +++ b/src/test/resources/test-messages/offender-management-allocation-changed.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +aws --endpoint-url=http://localhost:4566 sns publish \ + --topic-arn arn:aws:sns:eu-west-2:000000000000:domainevents \ + --message-attributes '{"eventType" : { "DataType":"String", "StringValue":"offender-management.allocation.changed"}}' \ + --message '{"type":"offender-management.allocation.changed","id":"1","contents":"allocation_message_contents"}' \ No newline at end of file diff --git a/src/test/resources/test-messages/offender-management-handover-changed.sh b/src/test/resources/test-messages/offender-management-handover-changed.sh new file mode 100644 index 0000000000..0f09d1908e --- /dev/null +++ b/src/test/resources/test-messages/offender-management-handover-changed.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +aws --endpoint-url=http://localhost:4566 sns publish \ + --topic-arn arn:aws:sns:eu-west-2:000000000000:domainevents \ + --message-attributes '{"eventType" : { "DataType":"String", "StringValue":"offender-management.handover.changed"}}' \ + --message '{"type":"offender-management.handover.changed","id":"2","contents":"handover_message_contents"}' \ No newline at end of file