diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index acb016694..8d3b1e602 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -3,7 +3,7 @@ project.description: Alpakka has support for Kafka Transactions which provide gu --- # Transactions -Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as "committed" only once the transformed messages are successfully produced to the sink. +Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as "committed" only once the transformed messages are successfully produced to the sink. For full details on how transactions are achieved in Kafka you may wish to review the Kafka Improvement Proposal [KIP-98: Exactly Once Delivery and Transactional Messaging](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging) and its associated [design document](https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0ee1vnpz4o). @@ -55,18 +55,17 @@ For more details see [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow. They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions. - -A `transactional.id` must be defined and unique for each instance of the application. +The `transaction.timeout.ms` is set to 10s as recommended in KIP-447. ## Consume-Transform-Produce Workflow -Kafka transactions are handled transparently to the user. The @apidoc[Transactional.source](Transactional$) will enforce that a consumer group id is specified and the @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) will enforce that a `transactional.id` is specified. All other Kafka consumer and producer properties required to enable transactions are overridden. +Kafka transactions are handled transparently to the user. The @apidoc[Transactional.source](Transactional$) will enforce that a consumer group id is specified. All other Kafka consumer and producer properties required to enable transactions are overridden. Transactions are committed on an interval which can be controlled with the producer config `akka.kafka.producer.eos-commit-interval`, similar to how exactly once works with Kafka Streams. The default value is `100ms`. The larger commit interval is the more records will need to be reprocessed in the event of failure and the transaction is aborted. -When the stream is materialized the producer will initialize the transaction for the provided `transactional.id` and a transaction will begin. Every commit interval (`eos-commit-interval`) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages. +When the stream is materialized and the producer sees the first message it will initialize a transaction. Every commit interval (`eos-commit-interval`) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages. -Messages are also drained from the stream when the consumer gets a rebalance of partitions. In that case, the consumer will wait in the `onPartitionsRevoked` callback until all of the messages have been drained from the stream and the transaction is committed before allowing the rebalance to continue. The amount of total time the consumer will wait for draining is controlled by the `akka.kafka.consumer.commit-timeout`, and the interval between checks is controlled by the `akka.kafka.consuner.eos-draining-check-interval` configuration settings. +Messages are also drained from the stream when the consumer gets a rebalance of partitions. In that case, the consumer will wait in the `onPartitionsRevoked` callback until all of the messages have been drained from the stream and the transaction is committed before allowing the rebalance to continue. The amount of total time the consumer will wait for draining is controlled by the `akka.kafka.consumer.commit-timeout`, and the interval between checks is controlled by the `akka.kafka.consumer.eos-draining-check-interval` configuration settings. To gracefully shutdown the stream and commit the current transaction you must call `shutdown()` on the @apidoc[(javadsl|scaladsl).Consumer.Control] materialized value to await all produced message acknowledgements and commit the final transaction. @@ -107,15 +106,13 @@ There are several scenarios that this library's implementation of Kafka transact All of the scenarios covered in the @ref[At-Least-Once Delivery documentation](atleastonce.md) (Multiple Effects per Commit, Non-Sequential Processing, and Conditional Message Processing) are applicable to transactional scenarios as well. -Only one application instance per `transactional.id` is allowed. If two application instances with the same `transactional.id` are run at the same time then the instance that registers with Kafka's transaction coordinator second will throw a @javadoc[ProducerFencedException](org.apache.kafka.common.errors.ProducerFencedException) so it doesn't interfere with transactions in process by the first instance. To distribute multiple transactional workflows for the same subscription the user must manually subdivide the subscription across multiple instances of the application. This may be handled internally in future versions. - Any state in the transformation logic is not part of a transaction. It's left to the user to rebuild state when applying stateful operations with transaction. It's possible to encode state into messages produced to topics during a transaction. For example you could produce messages to a topic that represents an event log as part of a transaction. This event log can be replayed to reconstitute the correct state before the stateful stream resumes consuming again at startup. -Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to an database). +Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to a database). The exactly-once-semantics are guaranteed only when your flow consumes from and produces to the same Kafka cluster. Producing to partitions from a 3rd-party source or consuming partitions from one Kafka cluster and producing to another Kafka cluster are not supported.