Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

Commit

Permalink
Migrate from Akka to Pekko (#206)
Browse files Browse the repository at this point in the history
* migrate akka to pekko

---------

Signed-off-by: Grigorii Chudnov <[email protected]>
  • Loading branch information
gchudnov authored Aug 14, 2023
1 parent e2660e8 commit 6d6641d
Show file tree
Hide file tree
Showing 30 changed files with 387 additions and 331 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.17, 2.13.10]
scala: [2.12.18, 2.13.11]
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
Expand Down
88 changes: 40 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ Kanadi is [Nakadi](https://github.com/zalando/nakadi) client for Scala.

### Goals

* Uses [akka-http](http://doc.akka.io/docs/akka-http/current/scala.html) as the underlying HTTP client for streaming
* Uses [pekko-http](https://pekko.apache.org/docs/pekko-http/current/) as the underlying HTTP client for streaming
* Uses [circe](https://github.com/circe/circe) for dealing with JSON
* Uses [akka-stream-json](https://github.com/mdedetrich/akka-streams-json) for streaming of JSON
* Uses [pekko-stream-json](https://github.com/mdedetrich/pekko-streams-circe) for streaming of JSON

The underlying goal is to have a high performance/throughput Nakadi client that uses streaming all the way down
(including json) that is easily configurable via akka-http.
(including json) that is easily configurable via pekko-http.

### Non-Goals

Expand All @@ -28,10 +28,10 @@ means we need to consider the lifecycle and dependencies of certain task types (
types have had a good history when it comes to binary compatibility or lifecycle longevity). Although `Future` has its
drawbacks, its ultra stable and portable across the Scala ecosystem.
* Using another streaming library "X". Although there are other very good implementations of streaming libraries (such
as [Monix](https://monix.io/)), [akka-stream](https://doc.akka.io/docs/akka/2.5/stream/index.html) is still the best when
as [Monix](https://monix.io/)), [pekko-stream](https://pekko.apache.org/docs/pekko/current/stream/index.html) is still the best when
it comes to binary compatibility. It also comes with a lot of support for monitoring. Note that interopting between
streams is possible with any other stream that follows the [Reactive Streams](http://www.reactive-streams.org/) protocol.
* Scala 2.11.x support since akka 2.6 does not support Scala 2.11
* Scala 2.11.x support since pekko does not support Scala 2.11

### Current status

Expand All @@ -43,19 +43,11 @@ a year, including handling events from Zalando's main website search.

### Installation

Kanadi is currently deployed to OSS Sonatype. For Circe 0.12.x use Kanadi 0.5.x (also supports Scala 2.13.x)
Kanadi is currently deployed to OSS Sonatype.

```sbt
libraryDependencies ++= Seq(
"org.zalando" %% "kanadi" % "0.9.0"
)
```

For Circe 0.12.x use Kanadi 0.6.x

```sbt
libraryDependencies ++= Seq(
"org.zalando" %% "kanadi" % "0.6.0"
"org.zalando" %% "kanadi" % "0.20.0"
)
```

Expand Down Expand Up @@ -311,15 +303,15 @@ Note that we have defined the implicits in the `SomeEvent`'s companion object so
automatically get picked up without any imports

Then we will define a subscription, we need to do this in order to consume an event's in the high level API.
We will also import the required imports that we need for akka-http (these need to be available as implicits)
We will also import the required imports that we need for pekko-http (these need to be available as implicits)

```scala
import io.circe._
import io.circe.syntax._

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.zalando.kanadi.api.Subscription
import org.zalando.kanadi.api.Subscriptions
Expand Down Expand Up @@ -368,9 +360,9 @@ Next we will define a callback, this will get executed whenever we consume an ev
import io.circe._
import io.circe.syntax._

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.zalando.kanadi.api.Subscription
import org.zalando.kanadi.api.Subscriptions
Expand Down Expand Up @@ -442,9 +434,9 @@ delays can be configured using `KanadiHttpConfig.noEmptySlotsCursorResetRetryDel
import io.circe._
import io.circe.syntax._

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.zalando.kanadi.api.Subscription
import org.zalando.kanadi.api.Subscriptions
Expand Down Expand Up @@ -524,9 +516,9 @@ uses an exponential backoff which can be configured with `kanadi.exponential-bac
`reference.conf` for information on the settings). If reach the maximum number of retries then `Events.publish`
will fail with the original `Events.Errors.EventValidation` exception.

#### Modifying the akka-stream source
#### Modifying the pekko-stream source

It is possible to modify the underlying akka stream when using `Subscriptions.eventsStreamed` or
It is possible to modify the underlying pekko stream when using `Subscriptions.eventsStreamed` or
`Subscriptions.eventsStreamedManaged` by setting the `modifySourceFunction` parameter. One common use case for doing
this is to provide some manual throttling, i.e. you can do something like this

Expand All @@ -535,9 +527,9 @@ this is to provide some manual throttling, i.e. you can do something like this
import io.circe._
import io.circe.syntax._

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.stream.{ActorMaterializer, ThrottleMode}
import com.typesafe.config.ConfigFactory
import org.zalando.kanadi.api.Subscription
import org.zalando.kanadi.api.Subscriptions
Expand Down Expand Up @@ -601,8 +593,8 @@ object Main extends App with Config {

The above will provide throttling for each batch of events. Note that a better way to handle this case of throttling
directly may be to use the source stream directly (detailed [below](#using-the-source-stream-directly)). This is because
akka-streams are fully
[backpressured](https://doc.akka.io/docs/akka/2.5.3/scala/stream/stream-flows-and-basics.html#back-pressure-explained).
pekko-streams are fully
[backpressured](https://pekko.apache.org/docs/pekko/current//stream/stream-flows-and-basics.html#back-pressure-explained).

Cursors within a `eventsStreamManaged` are committed automatically for batches which contain the `events` property
(`eventCallbackData.subscriptionEvent.events`). Whenever the end of the stream is reached and no new events are received,
Expand All @@ -612,7 +604,7 @@ takes the boolean flag `eventBatch` as fourth parameter. If it is set to `true`

#### Using the source stream directly

There is also a `Subscriptions.eventsStreamedSource` method which exposes the Nakadi stream as an akka-stream `Source`,
There is also a `Subscriptions.eventsStreamedSource` method which exposes the Nakadi stream as an pekko-stream `Source`,
this allows you to directly combine the Nakadi `Source` as part of another graph.

NOTE: When you use this method directly, you also need to register the killswitch using the
Expand All @@ -621,7 +613,7 @@ method to kill a stream). Also `Subscriptions.eventsStreamedSourceManaged` metho
reconnect for the `Subscriptions.Errors.NoEmptySlotsOrCursorReset(_)` however it will **NOT** handle server disconnects,
this needs to be done manually by calling `Subscriptions.eventsStreamedSourceManaged` since the stream is not
materialized yet (all you have is a reference to an uninitialized stream), read
[here](https://doc.akka.io/docs/akka/2.5.4/scala/general/stream/stream-design.html#resulting-implementation-constraints)
[here](https://pekko.apache.org/docs/pekko/current//general/stream/stream-design.html#resulting-implementation-constraints)
for more info.

You also have to be very careful of how you handle errors particularly in the context of committing cursors. Since you
Expand All @@ -643,22 +635,22 @@ parsing the entire JSON payload, Kanadi will combine the data to form a
as the basic cursor/event information (so you are still able to commit the cursor token).

So that the stream doesn't get in an endless loop when dealing with decoding errors, we use a
`akka.stream.Supervision.Decider`. The provided default `Supervision.Decider` will log details about the JSON parsing
`org.apache.pekko.stream.Supervision.Decider`. The provided default `Supervision.Decider` will log details about the JSON parsing
error as well as committing the cursor. It is provided as an `implicit val` that is located
in `org.zalando.kanadi.api.Subscriptions.defaultEventStreamSupervisionDecider`.

You can also provide an alternate supervision decider (make sure that you don't import the default one).

```scala
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.zalando.kanadi.models.FlowId
import org.zalando.kanadi.models.{SubscriptionId, StreamId}
import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor}
import org.zalando.kanadi.api.Subscriptions.EventStreamContext
import akka.stream.Supervision
import org.apache.pekko.stream.Supervision
import org.zalando.kanadi.Config

object Main extends App with Config {
Expand All @@ -672,7 +664,7 @@ object Main extends App with Config {
import org.zalando.kanadi.models.FlowId
import org.zalando.kanadi.models.{SubscriptionId, StreamId}
import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor}
import akka.stream.Supervision
import org.apache.pekko.stream.Supervision


implicit val mySupervisionDecider: Subscriptions.EventStreamSupervisionDecider =
Expand Down Expand Up @@ -703,15 +695,15 @@ Then to use this `mySupervisionDecider` you can simply provide it as a parameter
```scala
import io.circe._
import io.circe.syntax._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.zalando.kanadi.models.FlowId
import org.zalando.kanadi.models.{SubscriptionId, StreamId}
import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor}
import org.zalando.kanadi.api.Subscriptions.EventStreamContext
import akka.stream.Supervision
import org.apache.pekko.stream.Supervision
import org.zalando.kanadi.Config
import org.zalando.kanadi.api.Event
import org.zalando.kanadi.api.Subscription
Expand Down Expand Up @@ -743,7 +735,7 @@ object Main extends App with Config {
import org.zalando.kanadi.models.FlowId
import org.zalando.kanadi.models.{SubscriptionId, StreamId}
import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor}
import akka.stream.Supervision
import org.apache.pekko.stream.Supervision

implicit val mySupervisionDecider: Subscriptions.EventStreamSupervisionDecider =
Subscriptions.EventStreamSupervisionDecider {
Expand Down Expand Up @@ -867,7 +859,7 @@ When listening to events via Nakadi (with `org.zalando.kanadi.api.Subscriptions.
Kanadi will make a single dedicated Http connection rather than using the standard connection pool. This is so we don't
starve the connection pool with long living requests.

For this reason, it may be necessary to increase the `akka.http.host-connection-pool.max-open-requests` setting,
For this reason, it may be necessary to increase the `org.apache.pekko.http.host-connection-pool.max-open-requests` setting,
although it should be fine if you only have a single stream open for listening to events.

### Testing
Expand Down Expand Up @@ -903,7 +895,7 @@ def createSpanFromMetadata(metadata: Metadata, tracer: Tracer, operationName: St

### Monitoring

Since this project uses akka-http, you can use either
Since this project uses pekko-http, you can use either
[Cinnamon](https://developer.lightbend.com/docs/telemetry/current/home.html)
(for lightbend enterprise subscriptions) or [Kamon](https://github.com/kamon-io/kamon-akka-http) to monitor the Nakadi
streams on a deeper level as well as the http requests.
Expand All @@ -916,7 +908,7 @@ scope of the project to just processing Nakadi events. When new features are add
to add them as soon as possible into Kanadi.
* Migrate README.md to github pages as a better documentation format.
* Migrate to ScalaTest for testing as it has better `Future` support.
* Investigate using akka-http's `akka.http.scaladsl.util.FastFuture` rather than normal `Future` in `for` comprehensions
* Investigate using pekko-http's `org.apache.pekko.http.scaladsl.util.FastFuture` rather than normal `Future` in `for` comprehensions
to improve performance a bit. Note that Scala 2.13.x will make this redundant, see this [PR](https://github.com/scala/scala/pull/7470)

### Known Bugs
Expand Down
Loading

0 comments on commit 6d6641d

Please sign in to comment.