diff --git a/kafka-manager/src/main/scala/it/pagopa/interop/commons/queue/kafka/impl/KafkaPublisherImpl.scala b/kafka-manager/src/main/scala/it/pagopa/interop/commons/queue/kafka/impl/KafkaPublisherImpl.scala index 0fe79dee..f0dd1ecc 100644 --- a/kafka-manager/src/main/scala/it/pagopa/interop/commons/queue/kafka/impl/KafkaPublisherImpl.scala +++ b/kafka-manager/src/main/scala/it/pagopa/interop/commons/queue/kafka/impl/KafkaPublisherImpl.scala @@ -28,19 +28,20 @@ class KafkaPublisherImpl( .withBootstrapServers(bootstrapServers) .withProperties(properties) - private val sendProducer = SendProducer(producerSettings)(system.toClassic) + private val sendProducer: SendProducer[String, String] = SendProducer(producerSettings)(system.toClassic) def send[T](message: T)(implicit messageSerializer: JsonWriter[T]): Future[String] = { val messageString = message.toJson.compactPrint val producerRecord = new ProducerRecord[String, String](topic, messageString) - val result = sendProducer.send(producerRecord) - result.onComplete { - case Failure(exception) => logger.error(s"Error on sending $messageString on kafka - ${exception.getMessage}") + sendProducer.send(producerRecord).transformWith { + case Failure(exception) => + logger.error(s"Error on sending $messageString on kafka - ${exception.getMessage}") + Future.failed(exception) case Success(recordMetadata) => logger.debug("Published message [{}] to topic/partition {}/{}", messageString, topic, recordMetadata.partition) logger.info("Published message to topic/partition {}/{}", topic, recordMetadata.partition) + Future.successful("OK") } - Future.successful("OK") } } diff --git a/src/main/scala/it/pagopa/interop/partymanagement/model/persistence/PartyPersistentContractsProjection.scala b/src/main/scala/it/pagopa/interop/partymanagement/model/persistence/PartyPersistentContractsProjection.scala index dc22b9d6..bc0d0da3 100644 --- a/src/main/scala/it/pagopa/interop/partymanagement/model/persistence/PartyPersistentContractsProjection.scala +++ b/src/main/scala/it/pagopa/interop/partymanagement/model/persistence/PartyPersistentContractsProjection.scala @@ -37,6 +37,8 @@ import spray.json.RootJsonFormat import java.util.UUID import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +import akka.projection.HandlerRecoveryStrategy +import scala.concurrent.duration._ class PartyPersistentContractsProjection( system: ActorSystem[_], @@ -55,16 +57,18 @@ class PartyPersistentContractsProjection( def projection(tag: String): ExactlyOnceProjection[Offset, EventEnvelope[Event]] = { implicit val as: ActorSystem[_] = system - SlickProjection.exactlyOnce( - projectionId = ProjectionId("party-contracts-projections", tag), - sourceProvider = sourceProvider(tag), - handler = () => - new ProjectionContractsHandler(system, sharding, entity, relationshipService, institutionService)( - tag, - datalakeContractsPublisher - ), - databaseConfig = dbConfig - ) + SlickProjection + .exactlyOnce( + projectionId = ProjectionId("party-contracts-projections", tag), + sourceProvider = sourceProvider(tag), + handler = () => + new ProjectionContractsHandler(system, sharding, entity, relationshipService, institutionService)( + tag, + datalakeContractsPublisher + ), + databaseConfig = dbConfig + ) + .withRecoveryStrategy(HandlerRecoveryStrategy.retryAndFail(6, 10.second)) } }