-
Notifications
You must be signed in to change notification settings - Fork 23
How shall we surface acknowledged Sources in Akka Streams? #13
Comments
Option 1
Option 2
There is the problem with shutdown. If kafka client owned by source at which moment it can be closed? If at moment of source completes then such client unable to commit messages that currently processing. Option 3 |
The “simple wrappers” proposal does not work because the complexity is essential, it cannot be hidden. In particular it is impossible to break up the complex shape and offer it as a Source plus Flow, that is exactly what does not work—the kafka2 code right now does something like this, but it cheats by pre-materializing and using RS interfaces, which is undesirable because the resulting pieces are not reusable. |
Concerning termination: that would need to be communicated back to the Source when it hits the confirmation input. This is of course trivial in Option 1, but the equivalent thing can be done in the other options as well. |
Sure it is unable to split complex shape into Flow and Source. But, most of the times you do not cares about commit confirmation and
Could you please point me when kafka2 uses RS and which parts are not reusable? |
Could you explain what kind of communication you are talking about? Communication on graph level? On akka-stream implementation level? Or some other way? I am asking because I try to implement it in such way and failed because of shape reusage, lifecycles and absence of way to communicate. |
Highlighting the prematerialization bit which we think is very undesirable as akka streams are "reusable blueprints, which one materializes". The below stream allocates resources before it actually is run: val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties) // <<< already "alive!"
Source.fromPublisher(consumerWithOffsetSink.publisher)
.map(processMessage(_))
.to(consumerWithOffsetSink.offsetCommitSink) // above needed in order to inject in fluent api in 2 places
.run() // we want only when run() to allocate resources |
@ktoso got it. I will try to explain current situation in more details to be clear. Right now there are 3 API's in the reactive kafka's master:
All these API provides kafka reading/writing capabilities. I started As far as I can see Option 1 references Thanks for reading I hope I explained current state of reactive-kafka :) |
@rkuhn There are also two (optional) additional aspects we may warn about:
These cases are less important, but we may want to check solutions against them. |
I think manually setting topic, partitions assignment, and offset should be done before materialization, i.e. user can do it in the
|
@patriknw I did not get how Also, we can expose subset of Something like this: override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val consumer = consumerProvider()
// skipped
val logic = new TimerGraphStageLogic(shape) {
// existing logic
val committedOffset = getAsyncCallback[(TopicPartition, Promise[OffsetAndMetadata])] {
case (partition, promise) =>
promise.success(consumer.committed(partition))
()
}
}
val control = new Control {
def committedOffset(partition: TopicPartition) = {
val p = Promise[OffsetAndMetadata]
logic.committedOffset.invoke((partition, p))
p.future
}
override def stop(): Unit = ??? // skipped
}
(logic, control)
} |
These are some initial thoughts, exemplified on a hypothetical streaming Kafka connector API. Use-cases that need to be supported:
Option 1: Streams all the way down
Here we only use in-stream data elements, exclusively.
The primary output is the Message, which has an associated Confirmation member that will eventually be fed back. After confirmation has successfully moved the cursor, a ConfirmationAck is produced.
Pros
Cons
Option 2: Using materialized values
Here we provide multiple independent pieces in order to allow usage in normal Flow syntax.
Here the idea is that users can use the linear Source & Flow DSL as in:
This only works well if the whole setup is created in a place that has this kind of overview. The KafkaReader will forward the confirmations to the KafkaSource using the GraphStage’s asyncInput facility, all Kafka interaction is owned by the KafkaSource.
Pros
Cons
Option 3: Using side-effects
While the correct message and confirmation types need to travel through streams to the right places in all solutions, this one encapsulates the concern of confirmation within the data elements themselves.
This allows usage as a bog-standard Source.
Pros
*easy to set up
Cons
Backpressure of confirmations is a potential problem which probably can be handled by conflating them (since a confirmation moves a cursor and therefore confirmations are cumulative).
Other API Options / Considerations
Factory Methods
Any of the above can be combined with factory methods that turn a
Flow[Message, Confirmation]
into aSource[ConfirmationAck]
, but this has several downsides:Adapters
It would be possible to turn a stream of confirmable messages (Option 3) into “Flow” that forwards immutable messages and wants a stream of confirmations back. This adapter would make it possible to get around the serialization limitations.
Fan-Out of confirmable messages
Broadcasting confirmable messages to multiple destinations is troublesome: which of them shall be routed back to the source in order to move the cursor? There is no general solution to this problem, we may just not do anything about it. If we do something, the drawback will be that the messages need to become mutable and thread-safe because the required confirmation count should match the fan-out factor.
The text was updated successfully, but these errors were encountered: