Skip to content

Commit

Permalink
PIN-5100: add new route GET -> /events/producerKeys
Browse files Browse the repository at this point in the history
  • Loading branch information
beetlecrunch committed Sep 6, 2024
1 parent 1e1fad7 commit 24514c5
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/main/resources/application-standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ notifier {
notification-schema-name = ${NOTIFICATION_POSTGRES_SCHEMA_NAME}
tables {
key-notification-table-name = ${KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
producer-key-notification-table-name = ${PRODUCER_KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
}
connectionPool = "HikariCP"
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ notifier {
notification-schema-name = ${NOTIFICATION_POSTGRES_SCHEMA_NAME}
tables {
key-notification-table-name = ${KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
producer-key-notification-table-name = ${PRODUCER_KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
}
connectionPool = "HikariCP"
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
Expand Down
49 changes: 48 additions & 1 deletion src/main/resources/interface-specification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,54 @@ paths:
content:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
$ref: '#/components/schemas/Problem'
'/events/producerKeys':
parameters:
- $ref: '#/components/parameters/CorrelationIdHeader'
- $ref: '#/components/parameters/IpAddress'
get:
tags:
- events
summary: Get list of the producer keys events
description: Retrieves the list of the keys events
operationId: getProducerKeysEvents
parameters:
- name: lastEventId
in: query
description: returns events starting from this last received id
required: true
schema:
type: integer
format: int64
default: 0
- name: limit
in: query
description: the number of events returned by this response
schema:
type: integer
format: int32
minimum: 1
maximum: 500
default: 100
responses:
'200':
description: Events
content:
application/json:
schema:
$ref: '#/components/schemas/Events'
'400':
description: Bad request
content:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
'401':
description: Unauthorized
content:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
components:
parameters:
CorrelationIdHeader:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ import it.pagopa.interop.commons.utils.AkkaUtils.getOrganizationIdFutureUUID
import it.pagopa.interop.commons.utils.errors.ServiceCode
import it.pagopa.interop.notifier.api.EventsApiService
import it.pagopa.interop.notifier.api.impl.ResponseHandlers.{
getAllAgreementsEventsFromIdResponse,
getAllEservicesFromIdResponse,
getEventsFromIdResponse,
getKeyEventsResponse,
getAllAgreementsEventsFromIdResponse
getProducerKeyEventsResponse
}
import it.pagopa.interop.notifier.database.{
AuthorizationEventsDao,
KeyEventRecord,
ProducerKeyEventRecord,
ProducerKeyEventsDao
}
import it.pagopa.interop.notifier.database.{AuthorizationEventsDao, KeyEventRecord}
import it.pagopa.interop.notifier.model._
import it.pagopa.interop.notifier.model.Adapters._
import it.pagopa.interop.notifier.service.converters.{allOrganizations, agreementsPartition}
import it.pagopa.interop.notifier.model._
import it.pagopa.interop.notifier.service.converters.{agreementsPartition, allOrganizations}
import it.pagopa.interop.notifier.service.impl.DynamoNotificationService

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -99,18 +105,46 @@ final class EventsServiceApiImpl(dynamoNotificationService: DynamoNotificationSe
toEntityMarshallerEvents: ToEntityMarshaller[Events],
toEntityMarshallerProblem: ToEntityMarshaller[Problem]
): Route = {
val operationLabel = s"Retrieving $limit keys messages from id $lastEventId"
val operationLabel = s"Retrieving $limit producer keys messages from id $lastEventId"
val result: Future[Events] = AuthorizationEventsDao
.select(lastEventId, limit)
.map(convertToEvents)
.map(convertKeyRecordToEvents)

onComplete(result) {
getKeyEventsResponse[Events](operationLabel)(getKeysEvents200)
}

}

private def convertToEvents(records: Seq[KeyEventRecord]): Events = {
private def convertKeyRecordToEvents(records: Seq[KeyEventRecord]): Events = {
val events: Seq[Event] = records.map(record =>
Event(
eventId = record.eventId,
eventType = record.eventType.toString,
objectType = ObjectType.KEY,
objectId = Map("kid" -> record.kid)
)
)

Events(lastEventId = records.lastOption.map(_.eventId), events = events)
}

override def getProducerKeysEvents(lastEventId: Long, limit: Int)(implicit
contexts: Seq[(String, String)],
toEntityMarshallerEvents: ToEntityMarshaller[Events],
toEntityMarshallerProblem: ToEntityMarshaller[Problem]
): Route = {
val operationLabel = s"Retrieving $limit keys messages from id $lastEventId"
val result: Future[Events] = ProducerKeyEventsDao
.select(lastEventId, limit)
.map(convertProducerKeyRecordToEvents)

onComplete(result) {
getProducerKeyEventsResponse[Events](operationLabel)(getProducerKeysEvents200)
}
}

private def convertProducerKeyRecordToEvents(records: Seq[ProducerKeyEventRecord]): Events = {
val events: Seq[Event] = records.map(record =>
Event(
eventId = record.eventId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ object ResponseHandlers extends AkkaResponses {
case Failure(ex) => internalServerError(ex, logMessage)
}

def getProducerKeyEventsResponse[T](logMessage: String)(
success: T => Route
)(result: Try[T])(implicit contexts: Seq[(String, String)], logger: LoggerTakingImplicit[ContextFieldsToLog]): Route =
result match {
case Success(s) => success(s)
case Failure(ex) => internalServerError(ex, logMessage)
}
def getAllAgreementsEventsFromIdResponse[T](logMessage: String)(
success: T => Route
)(result: Try[T])(implicit contexts: Seq[(String, String)], logger: LoggerTakingImplicit[ContextFieldsToLog]): Route =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ object ApplicationConfiguration {
s"$schema.$table"
}

val postgresProducerKeysNotificationTable: String = {
val schema = config.getString("notifier.postgres.notification-schema-name")
val table = config.getString("notifier.postgres.tables.producer-key-notification-table-name")
s"$schema.$table"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package it.pagopa.interop.notifier.database

import it.pagopa.interop.notifier.service.converters.EventType
import it.pagopa.interop.notifier.service.converters.EventType.EventType
import slick.jdbc.GetResult

final case class ProducerKeyEventRecord(eventId: Long, kid: String, eventType: EventType)

object ProducerKeyEventRecord {
implicit val eventResult: GetResult[ProducerKeyEventRecord] =
GetResult(r => ProducerKeyEventRecord(r.<<, r.<<, EventType.withName(r.<<)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package it.pagopa.interop.notifier.database

import com.typesafe.scalalogging.Logger
import it.pagopa.interop.notifier.common.system.ApplicationConfiguration
import it.pagopa.interop.notifier.common.system.ApplicationConfiguration.postgresProducerKeysNotificationTable
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.PostgresProfile.api._
import slick.sql.SqlStreamingAction

import scala.concurrent.Future

object ProducerKeyEventsDao {

private val logger: Logger = Logger(this.getClass)

private final val postgresqlDB: Database =
Database.forConfig(path = "notifier.postgres", config = ApplicationConfiguration.config)

def select(lastEventId: Long, limit: Int): Future[Vector[ProducerKeyEventRecord]] = {
logger.debug(s"Getting keys events from lastEventId ${lastEventId.toString} with limit ${limit.toString}")

val statement: SqlStreamingAction[Vector[ProducerKeyEventRecord], ProducerKeyEventRecord, Effect] =
sql"SELECT event_id, kid, event_type FROM #$postgresProducerKeysNotificationTable WHERE event_id > $lastEventId ORDER BY event_id ASC LIMIT $limit"
.as[ProducerKeyEventRecord]
postgresqlDB.run(statement)
}

}

0 comments on commit 24514c5

Please sign in to comment.