Skip to content

Commit

Permalink
Added kafka error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
TonioGela committed Oct 27, 2022
1 parent 604c9a6 commit ebfb8be
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ class KafkaPublisherImpl(
.withBootstrapServers(bootstrapServers)
.withProperties(properties)

private val sendProducer = SendProducer(producerSettings)(system.toClassic)
private val sendProducer: SendProducer[String, String] = SendProducer(producerSettings)(system.toClassic)

def send[T](message: T)(implicit messageSerializer: JsonWriter[T]): Future[String] = {
val messageString = message.toJson.compactPrint
val producerRecord = new ProducerRecord[String, String](topic, messageString)
val result = sendProducer.send(producerRecord)

result.onComplete {
case Failure(exception) => logger.error(s"Error on sending $messageString on kafka - ${exception.getMessage}")
sendProducer.send(producerRecord).transformWith {
case Failure(exception) =>
logger.error(s"Error on sending $messageString on kafka - ${exception.getMessage}")
Future.failed(exception)
case Success(recordMetadata) =>
logger.debug("Published message [{}] to topic/partition {}/{}", messageString, topic, recordMetadata.partition)
logger.info("Published message to topic/partition {}/{}", topic, recordMetadata.partition)
Future.successful("OK")
}
Future.successful("OK")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import spray.json.RootJsonFormat
import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import akka.projection.HandlerRecoveryStrategy
import scala.concurrent.duration._

class PartyPersistentContractsProjection(
system: ActorSystem[_],
Expand All @@ -55,16 +57,18 @@ class PartyPersistentContractsProjection(

def projection(tag: String): ExactlyOnceProjection[Offset, EventEnvelope[Event]] = {
implicit val as: ActorSystem[_] = system
SlickProjection.exactlyOnce(
projectionId = ProjectionId("party-contracts-projections", tag),
sourceProvider = sourceProvider(tag),
handler = () =>
new ProjectionContractsHandler(system, sharding, entity, relationshipService, institutionService)(
tag,
datalakeContractsPublisher
),
databaseConfig = dbConfig
)
SlickProjection
.exactlyOnce(
projectionId = ProjectionId("party-contracts-projections", tag),
sourceProvider = sourceProvider(tag),
handler = () =>
new ProjectionContractsHandler(system, sharding, entity, relationshipService, institutionService)(
tag,
datalakeContractsPublisher
),
databaseConfig = dbConfig
)
.withRecoveryStrategy(HandlerRecoveryStrategy.retryAndFail(6, 10.second))
}
}

Expand Down

0 comments on commit ebfb8be

Please sign in to comment.