diff --git a/src/main/resources/application-standalone.conf b/src/main/resources/application-standalone.conf index 03ced9b..a139916 100644 --- a/src/main/resources/application-standalone.conf +++ b/src/main/resources/application-standalone.conf @@ -77,6 +77,7 @@ notifier { notification-schema-name = ${NOTIFICATION_POSTGRES_SCHEMA_NAME} tables { key-notification-table-name = ${KEY_NOTIFICATION_POSTGRES_TABLE_NAME} + producer-key-notification-table-name = ${PRODUCER_KEY_NOTIFICATION_POSTGRES_TABLE_NAME} } connectionPool = "HikariCP" dataSourceClass = "org.postgresql.ds.PGSimpleDataSource" diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index a909937..7b9f14a 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -32,6 +32,7 @@ notifier { notification-schema-name = ${NOTIFICATION_POSTGRES_SCHEMA_NAME} tables { key-notification-table-name = ${KEY_NOTIFICATION_POSTGRES_TABLE_NAME} + producer-key-notification-table-name = ${PRODUCER_KEY_NOTIFICATION_POSTGRES_TABLE_NAME} } connectionPool = "HikariCP" dataSourceClass = "org.postgresql.ds.PGSimpleDataSource" diff --git a/src/main/resources/interface-specification.yml b/src/main/resources/interface-specification.yml index 87b33d0..78a1e0b 100644 --- a/src/main/resources/interface-specification.yml +++ b/src/main/resources/interface-specification.yml @@ -228,7 +228,54 @@ paths: content: application/problem+json: schema: - $ref: '#/components/schemas/Problem' + $ref: '#/components/schemas/Problem' + '/events/producerKeys': + parameters: + - $ref: '#/components/parameters/CorrelationIdHeader' + - $ref: '#/components/parameters/IpAddress' + get: + tags: + - events + summary: Get list of the producer keys events + description: Retrieves the list of the keys events + operationId: getProducerKeysEvents + parameters: + - name: lastEventId + in: query + description: returns events starting from this last received id + required: true + schema: + type: integer + format: int64 + default: 0 + - name: limit + in: query + description: the number of events returned by this response + schema: + type: integer + format: int32 + minimum: 1 + maximum: 500 + default: 100 + responses: + '200': + description: Events + content: + application/json: + schema: + $ref: '#/components/schemas/Events' + '400': + description: Bad request + content: + application/problem+json: + schema: + $ref: '#/components/schemas/Problem' + '401': + description: Unauthorized + content: + application/problem+json: + schema: + $ref: '#/components/schemas/Problem' components: parameters: CorrelationIdHeader: diff --git a/src/main/scala/it/pagopa/interop/notifier/api/impl/EventsServiceApiImpl.scala b/src/main/scala/it/pagopa/interop/notifier/api/impl/EventsServiceApiImpl.scala index 725ef7a..f911826 100644 --- a/src/main/scala/it/pagopa/interop/notifier/api/impl/EventsServiceApiImpl.scala +++ b/src/main/scala/it/pagopa/interop/notifier/api/impl/EventsServiceApiImpl.scala @@ -10,15 +10,21 @@ import it.pagopa.interop.commons.utils.AkkaUtils.getOrganizationIdFutureUUID import it.pagopa.interop.commons.utils.errors.ServiceCode import it.pagopa.interop.notifier.api.EventsApiService import it.pagopa.interop.notifier.api.impl.ResponseHandlers.{ + getAllAgreementsEventsFromIdResponse, getAllEservicesFromIdResponse, getEventsFromIdResponse, getKeyEventsResponse, - getAllAgreementsEventsFromIdResponse + getProducerKeyEventsResponse +} +import it.pagopa.interop.notifier.database.{ + AuthorizationEventsDao, + KeyEventRecord, + ProducerKeyEventRecord, + ProducerKeyEventsDao } -import it.pagopa.interop.notifier.database.{AuthorizationEventsDao, KeyEventRecord} -import it.pagopa.interop.notifier.model._ import it.pagopa.interop.notifier.model.Adapters._ -import it.pagopa.interop.notifier.service.converters.{allOrganizations, agreementsPartition} +import it.pagopa.interop.notifier.model._ +import it.pagopa.interop.notifier.service.converters.{agreementsPartition, allOrganizations} import it.pagopa.interop.notifier.service.impl.DynamoNotificationService import scala.concurrent.{ExecutionContext, Future} @@ -99,10 +105,10 @@ final class EventsServiceApiImpl(dynamoNotificationService: DynamoNotificationSe toEntityMarshallerEvents: ToEntityMarshaller[Events], toEntityMarshallerProblem: ToEntityMarshaller[Problem] ): Route = { - val operationLabel = s"Retrieving $limit keys messages from id $lastEventId" + val operationLabel = s"Retrieving $limit producer keys messages from id $lastEventId" val result: Future[Events] = AuthorizationEventsDao .select(lastEventId, limit) - .map(convertToEvents) + .map(convertKeyRecordToEvents) onComplete(result) { getKeyEventsResponse[Events](operationLabel)(getKeysEvents200) @@ -110,7 +116,35 @@ final class EventsServiceApiImpl(dynamoNotificationService: DynamoNotificationSe } - private def convertToEvents(records: Seq[KeyEventRecord]): Events = { + private def convertKeyRecordToEvents(records: Seq[KeyEventRecord]): Events = { + val events: Seq[Event] = records.map(record => + Event( + eventId = record.eventId, + eventType = record.eventType.toString, + objectType = ObjectType.KEY, + objectId = Map("kid" -> record.kid) + ) + ) + + Events(lastEventId = records.lastOption.map(_.eventId), events = events) + } + + override def getProducerKeysEvents(lastEventId: Long, limit: Int)(implicit + contexts: Seq[(String, String)], + toEntityMarshallerEvents: ToEntityMarshaller[Events], + toEntityMarshallerProblem: ToEntityMarshaller[Problem] + ): Route = { + val operationLabel = s"Retrieving $limit keys messages from id $lastEventId" + val result: Future[Events] = ProducerKeyEventsDao + .select(lastEventId, limit) + .map(convertProducerKeyRecordToEvents) + + onComplete(result) { + getProducerKeyEventsResponse[Events](operationLabel)(getProducerKeysEvents200) + } + } + + private def convertProducerKeyRecordToEvents(records: Seq[ProducerKeyEventRecord]): Events = { val events: Seq[Event] = records.map(record => Event( eventId = record.eventId, diff --git a/src/main/scala/it/pagopa/interop/notifier/api/impl/ResponseHandler.scala b/src/main/scala/it/pagopa/interop/notifier/api/impl/ResponseHandler.scala index b09acfc..a3388bf 100644 --- a/src/main/scala/it/pagopa/interop/notifier/api/impl/ResponseHandler.scala +++ b/src/main/scala/it/pagopa/interop/notifier/api/impl/ResponseHandler.scala @@ -35,6 +35,13 @@ object ResponseHandlers extends AkkaResponses { case Failure(ex) => internalServerError(ex, logMessage) } + def getProducerKeyEventsResponse[T](logMessage: String)( + success: T => Route + )(result: Try[T])(implicit contexts: Seq[(String, String)], logger: LoggerTakingImplicit[ContextFieldsToLog]): Route = + result match { + case Success(s) => success(s) + case Failure(ex) => internalServerError(ex, logMessage) + } def getAllAgreementsEventsFromIdResponse[T](logMessage: String)( success: T => Route )(result: Try[T])(implicit contexts: Seq[(String, String)], logger: LoggerTakingImplicit[ContextFieldsToLog]): Route = diff --git a/src/main/scala/it/pagopa/interop/notifier/common/system/ApplicationConfiguration.scala b/src/main/scala/it/pagopa/interop/notifier/common/system/ApplicationConfiguration.scala index d84ec0e..3f3af95 100644 --- a/src/main/scala/it/pagopa/interop/notifier/common/system/ApplicationConfiguration.scala +++ b/src/main/scala/it/pagopa/interop/notifier/common/system/ApplicationConfiguration.scala @@ -48,4 +48,10 @@ object ApplicationConfiguration { s"$schema.$table" } + val postgresProducerKeysNotificationTable: String = { + val schema = config.getString("notifier.postgres.notification-schema-name") + val table = config.getString("notifier.postgres.tables.producer-key-notification-table-name") + s"$schema.$table" + } + } diff --git a/src/main/scala/it/pagopa/interop/notifier/database/ProducerKeyEventRecord.scala b/src/main/scala/it/pagopa/interop/notifier/database/ProducerKeyEventRecord.scala new file mode 100644 index 0000000..751ca61 --- /dev/null +++ b/src/main/scala/it/pagopa/interop/notifier/database/ProducerKeyEventRecord.scala @@ -0,0 +1,12 @@ +package it.pagopa.interop.notifier.database + +import it.pagopa.interop.notifier.service.converters.EventType +import it.pagopa.interop.notifier.service.converters.EventType.EventType +import slick.jdbc.GetResult + +final case class ProducerKeyEventRecord(eventId: Long, kid: String, eventType: EventType) + +object ProducerKeyEventRecord { + implicit val eventResult: GetResult[ProducerKeyEventRecord] = + GetResult(r => ProducerKeyEventRecord(r.<<, r.<<, EventType.withName(r.<<))) +} diff --git a/src/main/scala/it/pagopa/interop/notifier/database/ProducerKeyEventsDao.scala b/src/main/scala/it/pagopa/interop/notifier/database/ProducerKeyEventsDao.scala new file mode 100644 index 0000000..86d2e97 --- /dev/null +++ b/src/main/scala/it/pagopa/interop/notifier/database/ProducerKeyEventsDao.scala @@ -0,0 +1,28 @@ +package it.pagopa.interop.notifier.database + +import com.typesafe.scalalogging.Logger +import it.pagopa.interop.notifier.common.system.ApplicationConfiguration +import it.pagopa.interop.notifier.common.system.ApplicationConfiguration.postgresProducerKeysNotificationTable +import slick.jdbc.JdbcBackend.Database +import slick.jdbc.PostgresProfile.api._ +import slick.sql.SqlStreamingAction + +import scala.concurrent.Future + +object ProducerKeyEventsDao { + + private val logger: Logger = Logger(this.getClass) + + private final val postgresqlDB: Database = + Database.forConfig(path = "notifier.postgres", config = ApplicationConfiguration.config) + + def select(lastEventId: Long, limit: Int): Future[Vector[ProducerKeyEventRecord]] = { + logger.debug(s"Getting keys events from lastEventId ${lastEventId.toString} with limit ${limit.toString}") + + val statement: SqlStreamingAction[Vector[ProducerKeyEventRecord], ProducerKeyEventRecord, Effect] = + sql"SELECT event_id, kid, event_type FROM #$postgresProducerKeysNotificationTable WHERE event_id > $lastEventId ORDER BY event_id ASC LIMIT $limit" + .as[ProducerKeyEventRecord] + postgresqlDB.run(statement) + } + +}