Skip to content
This repository has been archived by the owner on Nov 10, 2022. It is now read-only.

How shall we surface acknowledged Sources in Akka Streams? #13

Open
rkuhn opened this issue Mar 11, 2016 · 10 comments
Open

How shall we surface acknowledged Sources in Akka Streams? #13

rkuhn opened this issue Mar 11, 2016 · 10 comments

Comments

@rkuhn
Copy link
Contributor

rkuhn commented Mar 11, 2016

These are some initial thoughts, exemplified on a hypothetical streaming Kafka connector API. Use-cases that need to be supported:

  • auto-commit (i.e. at-most-once delivery to the stream)
  • manual commit (i.e. at-least-once delivery to some stream section after which the commit is generated)
  • manual confirmed commit (i.e. at-least-once delivery to a section with at-most-once follow-up effects after the commit)

Option 1: Streams all the way down

Here we only use in-stream data elements, exclusively.

class KafkaSource extends
  GraphStage[FanOutShape2[Confirmation, Message, ConfirmationAck]]

The primary output is the Message, which has an associated Confirmation member that will eventually be fed back. After confirmation has successfully moved the cursor, a ConfirmationAck is produced.

Pros

  • streams only—once it is wired, it will work

Cons

  • complex wiring required, does not work without GraphDSL
  • slightly convoluted and rather unintuitive type signature for KafkaSource

Option 2: Using materialized values

Here we provide multiple independent pieces in order to allow usage in normal Flow syntax.

class KafkaSource extends
  GraphStageWithMaterializedValue[SourceShape[Message], KafkaReader]
class KafkaConfirmation extends
  GraphStageWithMaterializedValue[FlowShape[Message, Confirmation], KafkaConfirmer]
trait KafkaReader {
  def wireUp(c: KafkaConfirmer): Unit
}

Here the idea is that users can use the linear Source & Flow DSL as in:

KafkaSource().map(...).viaMat(KafkaReader())(_ wireUp _)

This only works well if the whole setup is created in a place that has this kind of overview. The KafkaReader will forward the confirmations to the KafkaSource using the GraphStage’s asyncInput facility, all Kafka interaction is owned by the KafkaSource.

Pros

  • allows usage of linear DSL while retaining reusable blueprint character (current reactive-kafka solution goes in this direction but requires pre-materialized Source and Flow that are exposed using RS interfaces).

Cons

  • forgetting to set up the wiring will be hard to detect in a timely/direct fashion
  • putting yet more weight on a potentially overused feature

Option 3: Using side-effects

While the correct message and confirmation types need to travel through streams to the right places in all solutions, this one encapsulates the concern of confirmation within the data elements themselves.

class KafkaSource extends GraphStage[SourceShape[Message]]
trait Message {
  def confirm(): Future[Done]
}

This allows usage as a bog-standard Source.

Pros

*easy to set up

Cons

  • data elements retain a reference to the Source and are not (de)serializable

Backpressure of confirmations is a potential problem which probably can be handled by conflating them (since a confirmation moves a cursor and therefore confirmations are cumulative).

Other API Options / Considerations

Factory Methods

Any of the above can be combined with factory methods that turn a Flow[Message, Confirmation] into a Source[ConfirmationAck], but this has several downsides:

  • it limits what can be expressed since only ConfirmationAck emerges from the overall stream
  • it violates the blueprint principle in the sense that Kafka pieces are not freely composable if offered in this fashion
  • it goes against our recommendation that libraries shall provide stream topologies, not consume them—this violation is only acceptable for framework-like usage as in Akka HTTP server

Adapters

It would be possible to turn a stream of confirmable messages (Option 3) into “Flow” that forwards immutable messages and wants a stream of confirmations back. This adapter would make it possible to get around the serialization limitations.

Fan-Out of confirmable messages

Broadcasting confirmable messages to multiple destinations is troublesome: which of them shall be routed back to the source in order to move the cursor? There is no general solution to this problem, we may just not do anything about it. If we do something, the drawback will be that the messages need to become mutable and thread-safe because the required confirmation count should match the fan-out factor.

@13h3r
Copy link

13h3r commented Mar 11, 2016

Option 1

complex wiring required, does not work without GraphDSL
Lets provide simple wrappers that exposes complex shapes as Source or as Flow[Confirmation, Msg]. Why this will not work?

Option 2

This only works well if the whole setup is created in a place that has this kind of overview. The KafkaReader will forward the confirmations to the KafkaSource using the GraphStage’s asyncInput facility, all Kafka interaction is owned by the KafkaSource.

There is the problem with shutdown. If kafka client owned by source at which moment it can be closed? If at moment of source completes then such client unable to commit messages that currently processing.

Option 3
Same problem with shutdown. When can we close kafka client? Should we track status of emitted messages somehow?

@rkuhn
Copy link
Contributor Author

rkuhn commented Mar 11, 2016

The “simple wrappers” proposal does not work because the complexity is essential, it cannot be hidden. In particular it is impossible to break up the complex shape and offer it as a Source plus Flow, that is exactly what does not work—the kafka2 code right now does something like this, but it cheats by pre-materializing and using RS interfaces, which is undesirable because the resulting pieces are not reusable.

@rkuhn
Copy link
Contributor Author

rkuhn commented Mar 11, 2016

Concerning termination: that would need to be communicated back to the Source when it hits the confirmation input. This is of course trivial in Option 1, but the equivalent thing can be done in the other options as well.

@13h3r
Copy link

13h3r commented Mar 11, 2016

The “simple wrappers” proposal does not work because the complexity is essential, it cannot be hidden. In particular it is impossible to break up the complex shape and offer it as a Source plus Flow,

Sure it is unable to split complex shape into Flow and Source. But, most of the times you do not cares about commit confirmation and Flow[Messages, Commit] will be enough. I am talking about providing easy ways to get just Flow instead of complex shape. If you need fullfeatured processing then use GraphDSL or/and factory methods (have same idea in my user level code already).

that is exactly what does not work—the kafka2 code right now does something like this, but it cheats by pre-materializing and using RS interfaces, which is undesirable because the resulting pieces are not reusable

Could you please point me when kafka2 uses RS and which parts are not reusable?

@13h3r
Copy link

13h3r commented Mar 11, 2016

Concerning termination: that would need to be communicated back to the Source when it hits the confirmation input. This is of course trivial in Option 1, but the equivalent thing can be done in the other options as well.

Could you explain what kind of communication you are talking about? Communication on graph level? On akka-stream implementation level? Or some other way?

I am asking because I try to implement it in such way and failed because of shape reusage, lifecycles and absence of way to communicate.

@ktoso
Copy link
Member

ktoso commented Mar 12, 2016

Could you please point me when kafka2 uses RS and which parts are not reusable?

Highlighting the prematerialization bit which we think is very undesirable as akka streams are "reusable blueprints, which one materializes". The below stream allocates resources before it actually is run:

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties) // <<< already "alive!"

Source.fromPublisher(consumerWithOffsetSink.publisher)
  .map(processMessage(_))
  .to(consumerWithOffsetSink.offsetCommitSink) // above needed in order to inject in fluent api in 2 places
  .run() // we want only when run() to allocate resources

@13h3r
Copy link

13h3r commented Mar 12, 2016

@ktoso got it. I will try to explain current situation in more details to be clear.

Right now there are 3 API's in the reactive kafka's master:

  • RS API (we agreed to drop it already)
  • graph stage API (com.softwaremill.react.kafka) originally developed by @kciesielski
  • "new API" (com.softwaremill.react.kafka2, also referenced as kafka2 or "new API") developed by me.

All these API provides kafka reading/writing capabilities. I started kafka2 because the current graph stage API does not fully conform akka-stream design principles. Problems that you pointed is problems of current graph stage API and because of these problems I had started to design the kafka2 API. We agreed with @kciesielski that the kafka2 API will replace current graph stage API in a near future.

As far as I can see Option 1 references kafka2 API which should not have problems with akka-stream design and stages reusage. If it still has could you please point to it.

Thanks for reading I hope I explained current state of reactive-kafka :)

@13h3r
Copy link

13h3r commented Mar 13, 2016

@rkuhn There are also two (optional) additional aspects we may warn about:

  • tracking assigned topics and partitions. Kafka supports multiple clients reading from the same topic and automatically balance them. On the client side you may be notified via ConsumerRebalanceListener. This also may contains manually topic and partitions assignment after flow started
  • manually set the offset for topic and partition after flow started.

These cases are less important, but we may want to check solutions against them.

@patriknw
Copy link
Member

I think manually setting topic, partitions assignment, and offset should be done before materialization, i.e. user can do it in the ConsumerProvider. Changing it afterwards should not be necessary.

ConsumerRebalanceListener should perhaps be part of the materialized value, possibly also the KafkaConsumer (or a subset of its api) to be able to grab current position, which is a typical use case for the ConsumerRebalanceListener according to the docs

@13h3r
Copy link

13h3r commented Mar 16, 2016

@patriknw I did not get how ConsumerRebalanceListener can be used to get position? It invoked when partition assigned/revoked from client. Client may setup offset of partition or save current offset, but it can be done without ConsumerRebalanceListener at all.

Also, we can expose subset of KafkaConsumer API, but it should be done via callbacks and in asynchronous manner because kafka client is not thread safe

Something like this:

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
    val consumer = consumerProvider()
    // skipped
    val logic = new TimerGraphStageLogic(shape) {
      // existing logic
      val committedOffset = getAsyncCallback[(TopicPartition, Promise[OffsetAndMetadata])] {
        case (partition, promise) =>
          promise.success(consumer.committed(partition))
          ()
      }
    }
    val control = new Control {
      def committedOffset(partition: TopicPartition) = {
        val p = Promise[OffsetAndMetadata]
        logic.committedOffset.invoke((partition, p))
        p.future
      }
      override def stop(): Unit = ??? // skipped

    }
    (logic, control)
  }

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants