diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb398d7..5ad619a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.12.17, 2.13.10] + scala: [2.12.18, 2.13.11] java: [temurin@11] runs-on: ${{ matrix.os }} steps: diff --git a/README.md b/README.md index 9d3d9df..2bcb1c1 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,12 @@ Kanadi is [Nakadi](https://github.com/zalando/nakadi) client for Scala. ### Goals -* Uses [akka-http](http://doc.akka.io/docs/akka-http/current/scala.html) as the underlying HTTP client for streaming +* Uses [pekko-http](https://pekko.apache.org/docs/pekko-http/current/) as the underlying HTTP client for streaming * Uses [circe](https://github.com/circe/circe) for dealing with JSON -* Uses [akka-stream-json](https://github.com/mdedetrich/akka-streams-json) for streaming of JSON +* Uses [pekko-stream-json](https://github.com/mdedetrich/pekko-streams-circe) for streaming of JSON The underlying goal is to have a high performance/throughput Nakadi client that uses streaming all the way down -(including json) that is easily configurable via akka-http. +(including json) that is easily configurable via pekko-http. ### Non-Goals @@ -28,10 +28,10 @@ means we need to consider the lifecycle and dependencies of certain task types ( types have had a good history when it comes to binary compatibility or lifecycle longevity). Although `Future` has its drawbacks, its ultra stable and portable across the Scala ecosystem. * Using another streaming library "X". Although there are other very good implementations of streaming libraries (such -as [Monix](https://monix.io/)), [akka-stream](https://doc.akka.io/docs/akka/2.5/stream/index.html) is still the best when +as [Monix](https://monix.io/)), [pekko-stream](https://pekko.apache.org/docs/pekko/current/stream/index.html) is still the best when it comes to binary compatibility. It also comes with a lot of support for monitoring. Note that interopting between streams is possible with any other stream that follows the [Reactive Streams](http://www.reactive-streams.org/) protocol. -* Scala 2.11.x support since akka 2.6 does not support Scala 2.11 +* Scala 2.11.x support since pekko does not support Scala 2.11 ### Current status @@ -43,19 +43,11 @@ a year, including handling events from Zalando's main website search. ### Installation -Kanadi is currently deployed to OSS Sonatype. For Circe 0.12.x use Kanadi 0.5.x (also supports Scala 2.13.x) +Kanadi is currently deployed to OSS Sonatype. ```sbt libraryDependencies ++= Seq( - "org.zalando" %% "kanadi" % "0.9.0" -) -``` - -For Circe 0.12.x use Kanadi 0.6.x - -```sbt -libraryDependencies ++= Seq( - "org.zalando" %% "kanadi" % "0.6.0" + "org.zalando" %% "kanadi" % "0.20.0" ) ``` @@ -311,15 +303,15 @@ Note that we have defined the implicits in the `SomeEvent`'s companion object so automatically get picked up without any imports Then we will define a subscription, we need to do this in order to consume an event's in the high level API. -We will also import the required imports that we need for akka-http (these need to be available as implicits) +We will also import the required imports that we need for pekko-http (these need to be available as implicits) ```scala import io.circe._ import io.circe.syntax._ -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.ActorMaterializer import com.typesafe.config.ConfigFactory import org.zalando.kanadi.api.Subscription import org.zalando.kanadi.api.Subscriptions @@ -368,9 +360,9 @@ Next we will define a callback, this will get executed whenever we consume an ev import io.circe._ import io.circe.syntax._ -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.ActorMaterializer import com.typesafe.config.ConfigFactory import org.zalando.kanadi.api.Subscription import org.zalando.kanadi.api.Subscriptions @@ -442,9 +434,9 @@ delays can be configured using `KanadiHttpConfig.noEmptySlotsCursorResetRetryDel import io.circe._ import io.circe.syntax._ -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.ActorMaterializer import com.typesafe.config.ConfigFactory import org.zalando.kanadi.api.Subscription import org.zalando.kanadi.api.Subscriptions @@ -524,9 +516,9 @@ uses an exponential backoff which can be configured with `kanadi.exponential-bac `reference.conf` for information on the settings). If reach the maximum number of retries then `Events.publish` will fail with the original `Events.Errors.EventValidation` exception. -#### Modifying the akka-stream source +#### Modifying the pekko-stream source -It is possible to modify the underlying akka stream when using `Subscriptions.eventsStreamed` or +It is possible to modify the underlying pekko stream when using `Subscriptions.eventsStreamed` or `Subscriptions.eventsStreamedManaged` by setting the `modifySourceFunction` parameter. One common use case for doing this is to provide some manual throttling, i.e. you can do something like this @@ -535,9 +527,9 @@ this is to provide some manual throttling, i.e. you can do something like this import io.circe._ import io.circe.syntax._ -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.{ActorMaterializer, ThrottleMode} +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.{ActorMaterializer, ThrottleMode} import com.typesafe.config.ConfigFactory import org.zalando.kanadi.api.Subscription import org.zalando.kanadi.api.Subscriptions @@ -601,8 +593,8 @@ object Main extends App with Config { The above will provide throttling for each batch of events. Note that a better way to handle this case of throttling directly may be to use the source stream directly (detailed [below](#using-the-source-stream-directly)). This is because -akka-streams are fully -[backpressured](https://doc.akka.io/docs/akka/2.5.3/scala/stream/stream-flows-and-basics.html#back-pressure-explained). +pekko-streams are fully +[backpressured](https://pekko.apache.org/docs/pekko/current//stream/stream-flows-and-basics.html#back-pressure-explained). Cursors within a `eventsStreamManaged` are committed automatically for batches which contain the `events` property (`eventCallbackData.subscriptionEvent.events`). Whenever the end of the stream is reached and no new events are received, @@ -612,7 +604,7 @@ takes the boolean flag `eventBatch` as fourth parameter. If it is set to `true` #### Using the source stream directly -There is also a `Subscriptions.eventsStreamedSource` method which exposes the Nakadi stream as an akka-stream `Source`, +There is also a `Subscriptions.eventsStreamedSource` method which exposes the Nakadi stream as an pekko-stream `Source`, this allows you to directly combine the Nakadi `Source` as part of another graph. NOTE: When you use this method directly, you also need to register the killswitch using the @@ -621,7 +613,7 @@ method to kill a stream). Also `Subscriptions.eventsStreamedSourceManaged` metho reconnect for the `Subscriptions.Errors.NoEmptySlotsOrCursorReset(_)` however it will **NOT** handle server disconnects, this needs to be done manually by calling `Subscriptions.eventsStreamedSourceManaged` since the stream is not materialized yet (all you have is a reference to an uninitialized stream), read -[here](https://doc.akka.io/docs/akka/2.5.4/scala/general/stream/stream-design.html#resulting-implementation-constraints) +[here](https://pekko.apache.org/docs/pekko/current//general/stream/stream-design.html#resulting-implementation-constraints) for more info. You also have to be very careful of how you handle errors particularly in the context of committing cursors. Since you @@ -643,22 +635,22 @@ parsing the entire JSON payload, Kanadi will combine the data to form a as the basic cursor/event information (so you are still able to commit the cursor token). So that the stream doesn't get in an endless loop when dealing with decoding errors, we use a -`akka.stream.Supervision.Decider`. The provided default `Supervision.Decider` will log details about the JSON parsing +`org.apache.pekko.stream.Supervision.Decider`. The provided default `Supervision.Decider` will log details about the JSON parsing error as well as committing the cursor. It is provided as an `implicit val` that is located in `org.zalando.kanadi.api.Subscriptions.defaultEventStreamSupervisionDecider`. You can also provide an alternate supervision decider (make sure that you don't import the default one). ```scala -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.ActorMaterializer import com.typesafe.config.ConfigFactory import org.zalando.kanadi.models.FlowId import org.zalando.kanadi.models.{SubscriptionId, StreamId} import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor} import org.zalando.kanadi.api.Subscriptions.EventStreamContext -import akka.stream.Supervision +import org.apache.pekko.stream.Supervision import org.zalando.kanadi.Config object Main extends App with Config { @@ -672,7 +664,7 @@ object Main extends App with Config { import org.zalando.kanadi.models.FlowId import org.zalando.kanadi.models.{SubscriptionId, StreamId} import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor} - import akka.stream.Supervision + import org.apache.pekko.stream.Supervision implicit val mySupervisionDecider: Subscriptions.EventStreamSupervisionDecider = @@ -703,15 +695,15 @@ Then to use this `mySupervisionDecider` you can simply provide it as a parameter ```scala import io.circe._ import io.circe.syntax._ -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.ActorMaterializer import com.typesafe.config.ConfigFactory import org.zalando.kanadi.models.FlowId import org.zalando.kanadi.models.{SubscriptionId, StreamId} import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor} import org.zalando.kanadi.api.Subscriptions.EventStreamContext -import akka.stream.Supervision +import org.apache.pekko.stream.Supervision import org.zalando.kanadi.Config import org.zalando.kanadi.api.Event import org.zalando.kanadi.api.Subscription @@ -743,7 +735,7 @@ object Main extends App with Config { import org.zalando.kanadi.models.FlowId import org.zalando.kanadi.models.{SubscriptionId, StreamId} import org.zalando.kanadi.api.{Subscriptions, SubscriptionCursor} - import akka.stream.Supervision + import org.apache.pekko.stream.Supervision implicit val mySupervisionDecider: Subscriptions.EventStreamSupervisionDecider = Subscriptions.EventStreamSupervisionDecider { @@ -867,7 +859,7 @@ When listening to events via Nakadi (with `org.zalando.kanadi.api.Subscriptions. Kanadi will make a single dedicated Http connection rather than using the standard connection pool. This is so we don't starve the connection pool with long living requests. -For this reason, it may be necessary to increase the `akka.http.host-connection-pool.max-open-requests` setting, +For this reason, it may be necessary to increase the `org.apache.pekko.http.host-connection-pool.max-open-requests` setting, although it should be fine if you only have a single stream open for listening to events. ### Testing @@ -903,7 +895,7 @@ def createSpanFromMetadata(metadata: Metadata, tracer: Tracer, operationName: St ### Monitoring -Since this project uses akka-http, you can use either +Since this project uses pekko-http, you can use either [Cinnamon](https://developer.lightbend.com/docs/telemetry/current/home.html) (for lightbend enterprise subscriptions) or [Kamon](https://github.com/kamon-io/kamon-akka-http) to monitor the Nakadi streams on a deeper level as well as the http requests. @@ -916,7 +908,7 @@ scope of the project to just processing Nakadi events. When new features are add to add them as soon as possible into Kanadi. * Migrate README.md to github pages as a better documentation format. * Migrate to ScalaTest for testing as it has better `Future` support. -* Investigate using akka-http's `akka.http.scaladsl.util.FastFuture` rather than normal `Future` in `for` comprehensions +* Investigate using pekko-http's `org.apache.pekko.http.scaladsl.util.FastFuture` rather than normal `Future` in `for` comprehensions to improve performance a bit. Note that Scala 2.13.x will make this redundant, see this [PR](https://github.com/scala/scala/pull/7470) ### Known Bugs diff --git a/build.sbt b/build.sbt index fc824c5..5d4ea82 100644 --- a/build.sbt +++ b/build.sbt @@ -1,167 +1,12 @@ -name := """kanadi""" - -val scala212Version = "2.12.17" -val scala213Version = "2.13.10" -val currentScalaVersion = scala212Version - -val logbackVersion = "1.4.5" -val scalaLoggingVersion = "3.9.5" -val ficusVersion = "1.5.2" -val akkaVersion = "2.6.20" // NOTE: the last version with apache2 license -val akkaHttpVersion = "10.2.10" // NOTE: the last version under apache2 license -val akkaStreamsJsonVersion = "0.8.3" -val enumeratumCirceVersion = "1.7.2" -val circeVersion = "0.14.4" -val specs2Version = "4.19.2" -val heikoseebergerAkkaHttpCirceVersion = "1.39.2" - -ThisBuild / scalaVersion := currentScalaVersion - -ThisBuild / crossScalaVersions := Seq(currentScalaVersion, scala213Version) - -ThisBuild / versionScheme := Some(VersionScheme.EarlySemVer) - -organization := "org.zalando" - -Test / fork := true -Test / parallelExecution := true -Test / testForkedParallel := true - -scalacOptions ++= Seq( - "-deprecation", // Emit warning and location for usages of deprecated APIs. - "-encoding", - "utf-8", // Specify character encoding used by source files. - "-explaintypes", // Explain type errors in more detail. - "-feature", // Emit warning and location for usages of features that should be imported explicitly. - "-language:existentials", // Existential types (besides wildcard types) can be written and inferred - "-language:experimental.macros", // Allow macro definition (besides implementation and application) - "-language:higherKinds", // Allow higher-kinded types - "-language:implicitConversions", // Allow definition of implicit functions called views - "-unchecked", // Enable additional warnings where generated code depends on assumptions. - "-Xcheckinit", // Wrap field accessors to throw an exception on uninitialized access. - // "-Xfatal-warnings", // Fail the compilation if there are any warnings. - "-Xfuture", // Turn on future language features. - "-Xlint:adapted-args", // Warn if an argument list is modified to match the receiver. - "-Xlint:delayedinit-select", // Selecting member of DelayedInit. - "-Xlint:doc-detached", // A Scaladoc comment appears to be detached from its element. - "-Xlint:inaccessible", // Warn about inaccessible types in method signatures. - "-Xlint:infer-any", // Warn when a type argument is inferred to be `Any`. - "-Xlint:missing-interpolator", // A string literal appears to be missing an interpolator id. - "-Xlint:option-implicit", // Option.apply used implicit view. - "-Xlint:package-object-classes", // Class or object defined in package object. - "-Xlint:poly-implicit-overload", // Parameterized overloaded implicit methods are not visible as view bounds. - "-Xlint:private-shadow", // A private field (or class parameter) shadows a superclass field. - "-Xlint:stars-align", // Pattern sequence wildcard must align with sequence component. - "-Xlint:type-parameter-shadow", // A local type parameter shadows a type already in scope. - // "-Yno-adapted-args", // Do not adapt an argument list (either by inserting () or creating a tuple) to match the receiver. - "-Ywarn-dead-code", // Warn when dead code is identified. - "-Ywarn-numeric-widen", // Warn when numerics are widened. - "-Ywarn-value-discard" // Warn when non-Unit expression results are unused. -) - -val flagsFor12 = Seq( - "-Xlint:_", - "-Ywarn-infer-any", - "-Ypartial-unification", - "-Ywarn-inaccessible", // Warn about inaccessible types in method signatures. - "-Xlint:by-name-right-associative", // By-name parameter of right associative operator. - "-Xlint:unsound-match", // Pattern match may not be typesafe. - "-Ywarn-infer-any", // Warn when a type argument is inferred to be `Any`. - "-Xlint:nullary-override", // Warn when non-nullary `def f()' overrides nullary `def f'. - "-Xlint:nullary-unit", // Warn when nullary methods return Unit. - "-opt-inline-from:" -) - -val flagsFor13 = Seq( - "-Xlint:_", - "-opt-inline-from:" -) - -scalacOptions ++= { - CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, n)) if n == 13 => - flagsFor13 - case Some((2, n)) if n == 12 => - flagsFor12 - } -} - libraryDependencies ++= { - Seq( - "com.typesafe.akka" %% "akka-http" % akkaHttpVersion % Provided, - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Provided, - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Provided, - "org.mdedetrich" %% "censored-raw-header" % "0.7.0", - "com.beachape" %% "enumeratum-circe" % enumeratumCirceVersion, - "io.circe" %% "circe-parser" % circeVersion, - "org.mdedetrich" %% "akka-stream-circe" % akkaStreamsJsonVersion, - "org.mdedetrich" %% "akka-http-circe" % akkaStreamsJsonVersion, - "de.heikoseeberger" %% "akka-http-circe" % heikoseebergerAkkaHttpCirceVersion, - "com.iheart" %% "ficus" % ficusVersion, - "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, - "ch.qos.logback" % "logback-classic" % logbackVersion, - "org.specs2" %% "specs2-core" % specs2Version % Test - ) ++ (CrossVersion.partialVersion(scalaVersion.value) match { + Dependencies.kanadi ++ (CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, n)) if n == 13 => - Seq( - "org.scala-lang.modules" %% "scala-parallel-collections" % "0.2.0" % Test - ) + Dependencies.test213Seq.map(_ % "test") case _ => Seq.empty }) } -Test / scalacOptions ++= Seq("-Yrangepos") - -homepage := Some(url("https://github.com/zalando-incubator/kanadi")) - -scmInfo := Some( - ScmInfo(url("https://github.com/zalando-incubator/kanadi"), "git@github.com:zalando-incubator/kanadi.git")) - -developers := List( - Developer("mdedetrich", "Matthew de Detrich", "mdedetrich@gmail.com", url("https://github.com/mdedetrich")), - Developer("xjrk58", "Jiri Syrovy", "jiri.syrovy@zalando.de", url("https://github.com/xjrk58")), - Developer("itachi3", "Balaji Sonachalam", "balajispsg@gmail.com", url("https://github.com/itachi3")), - Developer("Deeds67", "Pierre Marais", "pierrem@live.co.za", url("https://github.com/Deeds67")), - Developer("gouthampradhan", - "Goutham Vidya Pradhan", - "goutham.vidya.pradhan@gmail.com", - url("https://github.com/gouthampradhan")), - Developer("javierarrieta", "Javier Arrieta", "javier.arrieta@zalando.ie", url("https://github.com/javierarrieta")), - Developer("pascalh", "Pascal Hof", "pascal.hof@zalando.de", url("https://github.com/pascalh")), - Developer("gchudnov", "Grigorii Chudnov", "g.chudnov@gmail.com", url("https://github.com/gchudnov")) -) - -licenses += ("MIT", url("https://opensource.org/licenses/MIT")) - -publishMavenStyle := true - -publishTo := sonatypePublishTo.value - -def emptyStringToNone(string: String): Option[String] = - if (string.trim.isEmpty) - None - else - Some(string) - -javaOptions ++= sys.props - .get("TOKEN") - .flatMap(emptyStringToNone) - .map { token => - s"-DTOKEN=$token" - } - .toList - -envVars ++= Map("TOKEN" -> sys.env.get("TOKEN").flatMap(emptyStringToNone)).collect { case (k, Some(v)) => - (k, v) -} - -Test / publishArtifact := false - -pomIncludeRepository := (_ => false) - -resolvers += Resolver.jcenterRepo - ThisBuild / githubWorkflowBuild := Seq( WorkflowStep.Use( UseRef.Public("docker", "login-action", "v1"), @@ -216,6 +61,12 @@ releaseProcess := Seq[ReleaseStep]( pushChanges ) +lazy val root = (project in file(".")) + .settings(Settings.shared) + .settings( + name := "kanadi" + ) + // COMMANDS addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt it:scalafmt") addCommandAlias("chk", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck it:scalafmtCheck") diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..1d9f8ba --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,55 @@ +import sbt._ + +object Dependencies { + + object versions { + val logback = "1.4.11" + val scalaLogging = "3.9.5" + val ficus = "1.5.2" + val pekko = "1.0.1" + val pekkoHttp = "1.0.0" + val pekkoStreamJson = "1.0.0" + val enumeratumCirce = "1.7.3" + val circe = "0.14.5" + val specs2 = "4.20.2" + val scalaParallelCollections = "0.2.0" + } + + private val pekkoHttp = "org.apache.pekko" %% "pekko-http" % versions.pekkoHttp + private val pekkoSlf4j = "org.apache.pekko" %% "pekko-slf4j" % versions.pekko + private val pekkoStream = "org.apache.pekko" %% "pekko-stream" % versions.pekko + private val enumeratumCirce = "com.beachape" %% "enumeratum-circe" % versions.enumeratumCirce + private val circeParser = "io.circe" %% "circe-parser" % versions.circe + private val pekkoStreamCirce = "org.mdedetrich" %% "pekko-stream-circe" % versions.pekkoStreamJson + private val pekkoHttpCirce = "org.mdedetrich" %% "pekko-http-circe" % versions.pekkoStreamJson + private val ficus = "com.iheart" %% "ficus" % versions.ficus + private val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % versions.scalaLogging + private val logbackClassic = "ch.qos.logback" % "logback-classic" % versions.logback + private val specs2 = "org.specs2" %% "specs2-core" % versions.specs2 + + private val scalaParallelCollections = + "org.scala-lang.modules" %% "scala-parallel-collections" % versions.scalaParallelCollections + + private val testSeq = Seq(specs2) + + val test213Seq: Seq[ModuleID] = Seq(scalaParallelCollections) + + val kanadi: Seq[ModuleID] = { + val provided = Seq( + pekkoHttp, + pekkoSlf4j, + pekkoStream + ) map (_ % Provided) + val compile = Seq( + enumeratumCirce, + circeParser, + pekkoStreamCirce, + pekkoHttpCirce, + ficus, + scalaLogging, + logbackClassic + ) map (_ % "compile") + val test = testSeq map (_ % "test") + compile ++ test + } +} diff --git a/project/Settings.scala b/project/Settings.scala new file mode 100644 index 0000000..84d11d7 --- /dev/null +++ b/project/Settings.scala @@ -0,0 +1,140 @@ +import sbt.Keys.{credentials, resolvers} +import sbt.* +import sbt.Keys.* +import sbtrelease.ReleasePlugin.autoImport.* +import sbtrelease.ReleaseStateTransformations.* +import xerial.sbt.Sonatype.autoImport.* + +object Settings { + val scala212Version: String = "2.12.18" + val scala213Version: String = "2.13.11" + val currentScalaVersion: String = scala212Version + + val globalScalaVersion: String = currentScalaVersion + val supportedScalaVersions: List[String] = List(scala212Version, scala213Version) + + private val extraJavaOptions: List[String] = sys.props + .get("TOKEN") + .flatMap(Settings.emptyStringToNone) + .map { token => + s"-DTOKEN=$token" + } + .toList + + private val sharedScalacOptions = Seq( + "-deprecation", // Emit warning and location for usages of deprecated APIs. + "-encoding", + "utf-8", // Specify character encoding used by source files. + "-explaintypes", // Explain type errors in more detail. + "-feature", // Emit warning and location for usages of features that should be imported explicitly. + "-language:existentials", // Existential types (besides wildcard types) can be written and inferred + "-language:experimental.macros", // Allow macro definition (besides implementation and application) + "-language:higherKinds", // Allow higher-kinded types + "-language:implicitConversions", // Allow definition of implicit functions called views + "-unchecked", // Enable additional warnings where generated code depends on assumptions. + "-Xcheckinit", // Wrap field accessors to throw an exception on uninitialized access. + // "-Xfatal-warnings", // Fail the compilation if there are any warnings. + "-Xfuture", // Turn on future language features. + "-Xlint:adapted-args", // Warn if an argument list is modified to match the receiver. + "-Xlint:delayedinit-select", // Selecting member of DelayedInit. + "-Xlint:doc-detached", // A Scaladoc comment appears to be detached from its element. + "-Xlint:inaccessible", // Warn about inaccessible types in method signatures. + "-Xlint:infer-any", // Warn when a type argument is inferred to be `Any`. + "-Xlint:missing-interpolator", // A string literal appears to be missing an interpolator id. + "-Xlint:option-implicit", // Option.apply used implicit view. + "-Xlint:package-object-classes", // Class or object defined in package object. + "-Xlint:poly-implicit-overload", // Parameterized overloaded implicit methods are not visible as view bounds. + "-Xlint:private-shadow", // A private field (or class parameter) shadows a superclass field. + "-Xlint:stars-align", // Pattern sequence wildcard must align with sequence component. + "-Xlint:type-parameter-shadow", // A local type parameter shadows a type already in scope. + // "-Yno-adapted-args", // Do not adapt an argument list (either by inserting () or creating a tuple) to match the receiver. + "-Ywarn-dead-code", // Warn when dead code is identified. + "-Ywarn-numeric-widen", // Warn when numerics are widened. + "-Ywarn-value-discard" // Warn when non-Unit expression results are unused. + ) + + private val scalacOptionsFor12 = Seq( + "-Xlint:_", + "-Ywarn-infer-any", + "-Ypartial-unification", + "-Ywarn-inaccessible", // Warn about inaccessible types in method signatures. + "-Xlint:by-name-right-associative", // By-name parameter of right associative operator. + "-Xlint:unsound-match", // Pattern match may not be typesafe. + "-Ywarn-infer-any", // Warn when a type argument is inferred to be `Any`. + "-Xlint:nullary-override", // Warn when non-nullary `def f()' overrides nullary `def f'. + "-Xlint:nullary-unit", // Warn when nullary methods return Unit. + "-opt-inline-from:" + ) + + private val scalacOptionsFor13 = Seq( + "-Xlint:_", + "-opt-inline-from:" + ) + + val sharedResolvers: Vector[MavenRepository] = + Vector( + Resolver.mavenLocal, + Resolver.ApacheMavenSnapshotsRepo, + Resolver.jcenterRepo + ) ++ Resolver.sonatypeOssRepos("releases") + + val shared: Seq[Setting[?]] = Seq( + scalacOptions ++= sharedScalacOptions ++ { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, n)) if n == 13 => + scalacOptionsFor13 + case Some((2, n)) if n == 12 => + scalacOptionsFor12 + } + }, + ThisBuild / crossScalaVersions := supportedScalaVersions, + ThisBuild / scalaVersion := globalScalaVersion, + ThisBuild / turbo := true, + ThisBuild / versionScheme := Some(VersionScheme.EarlySemVer), + resolvers := Resolver.combineDefaultResolvers(sharedResolvers), + compileOrder := CompileOrder.JavaThenScala, + organization := "org.zalando", + Test / fork := true, + Test / parallelExecution := true, + Test / testForkedParallel := true, + Test / publishArtifact := false, + Test / scalacOptions ++= Seq("-Yrangepos"), + homepage := Some(url("https://github.com/zalando-incubator/kanadi")), + scmInfo := Some( + ScmInfo(url("https://github.com/zalando-incubator/kanadi"), "git@github.com:zalando-incubator/kanadi.git") + ), + developers := List( + Developer("mdedetrich", "Matthew de Detrich", "mdedetrich@gmail.com", url("https://github.com/mdedetrich")), + Developer("xjrk58", "Jiri Syrovy", "jiri.syrovy@zalando.de", url("https://github.com/xjrk58")), + Developer("itachi3", "Balaji Sonachalam", "balajispsg@gmail.com", url("https://github.com/itachi3")), + Developer("Deeds67", "Pierre Marais", "pierrem@live.co.za", url("https://github.com/Deeds67")), + Developer("gouthampradhan", + "Goutham Vidya Pradhan", + "goutham.vidya.pradhan@gmail.com", + url("https://github.com/gouthampradhan")), + Developer("javierarrieta", + "Javier Arrieta", + "javier.arrieta@zalando.ie", + url("https://github.com/javierarrieta")), + Developer("pascalh", "Pascal Hof", "pascal.hof@zalando.de", url("https://github.com/pascalh")), + Developer("gchudnov", "Grigorii Chudnov", "g.chudnov@gmail.com", url("https://github.com/gchudnov")) + ), + licenses += ("MIT", url("https://opensource.org/licenses/MIT")), + pomIncludeRepository := (_ => false), + javaOptions ++= extraJavaOptions, + envVars ++= Map("TOKEN" -> sys.env.get("TOKEN").flatMap(Settings.emptyStringToNone)).collect { case (k, Some(v)) => + (k, v) + } + ) + + val publish: Seq[Setting[?]] = Seq( + publishMavenStyle := true, + publishTo := sonatypePublishTo.value + ) + + def emptyStringToNone(string: String): Option[String] = + if (string.trim.isEmpty) + None + else + Some(string) +} diff --git a/project/build.properties b/project/build.properties index 46e43a9..52413ab 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.8.2 +sbt.version=1.9.3 diff --git a/project/plugins.sbt b/project/plugins.sbt index 559d50b..a2251ba 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,9 +1,9 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0") addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.2.1") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.17") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.21") addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2") -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.7") -addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.6") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.8") +addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.9") addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.14.2") addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0") addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.4") diff --git a/src/main/scala/org/apache/pekko/http/scaladsl/model/headers/MaskedRawHeader.scala b/src/main/scala/org/apache/pekko/http/scaladsl/model/headers/MaskedRawHeader.scala new file mode 100644 index 0000000..682a43c --- /dev/null +++ b/src/main/scala/org/apache/pekko/http/scaladsl/model/headers/MaskedRawHeader.scala @@ -0,0 +1,28 @@ +package org.apache.pekko.http.scaladsl.model.headers + +import org.apache.pekko.http.impl.util.{Rendering, _} +import org.apache.pekko.http.javadsl.{model => jm} +import org.apache.pekko.http.scaladsl.model.HttpHeader + +/** A Header that replaces value with a mask when converted to a string Used to hide sensitive information (e.g. token + * values) in logs + */ +final case class MaskedRawHeader(name: String, value: String, mask: String) extends jm.headers.RawHeader { + override def renderInRequests = true + override def renderInResponses = true + override val lowercaseName: String = name.toRootLowerCase + + def render[R <: Rendering](r: R): r.type = + r ~~ name ~~ ':' ~~ ' ' ~~ value + + private def renderWithMask[R <: Rendering](r: R): r.type = + r ~~ name ~~ ':' ~~ ' ' ~~ mask + + override def toString(): String = + renderWithMask(new StringRendering).get +} + +object MaskedRawHeader { + def unapply[H <: HttpHeader](header: H): Option[(String, String)] = + Some(header.name -> header.value) +} diff --git a/src/main/scala/org/zalando/kanadi/api/EventTypes.scala b/src/main/scala/org/zalando/kanadi/api/EventTypes.scala index 62e0530..6a31341 100644 --- a/src/main/scala/org/zalando/kanadi/api/EventTypes.scala +++ b/src/main/scala/org/zalando/kanadi/api/EventTypes.scala @@ -3,21 +3,19 @@ package org.zalando.kanadi.api import java.net.URI import java.time.OffsetDateTime -import defaults._ -import akka.http.scaladsl.HttpExt -import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.model._ -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.Materializer +import org.apache.pekko.http.scaladsl.HttpExt +import org.apache.pekko.http.scaladsl.marshalling.Marshal +import org.apache.pekko.http.scaladsl.model.headers.RawHeader +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.stream.Materializer import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit} -import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._ import enumeratum._ import io.circe._ import io.circe.syntax._ import org.zalando.kanadi.models.HttpHeaders.XFlowID import org.zalando.kanadi.api.defaults._ import org.zalando.kanadi.models._ +import org.mdedetrich.pekko.http.support.CirceHttpSupport._ import scala.concurrent.{ExecutionContext, Future} @@ -416,8 +414,7 @@ case class EventTypes(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] response <- http.singleRequest(request) result <- { if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[List[EventType]] + unmarshalAs[List[EventType]](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) } else processNotSuccessful(request, response) } @@ -515,8 +512,7 @@ case class EventTypes(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] response.discardEntityBytes() Future.successful(None) } else if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[EventType] + unmarshalAs[EventType](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) .map(Some.apply) } else processNotSuccessful(request, response) diff --git a/src/main/scala/org/zalando/kanadi/api/Events.scala b/src/main/scala/org/zalando/kanadi/api/Events.scala index ca9a1d3..8f63a22 100644 --- a/src/main/scala/org/zalando/kanadi/api/Events.scala +++ b/src/main/scala/org/zalando/kanadi/api/Events.scala @@ -4,14 +4,13 @@ import java.net.URI import java.time.OffsetDateTime import defaults._ -import akka.http.scaladsl.HttpExt -import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.Materializer +import org.apache.pekko.http.scaladsl.HttpExt +import org.apache.pekko.http.scaladsl.marshalling.Marshal +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.http.scaladsl.model.headers.RawHeader +import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal +import org.apache.pekko.stream.Materializer import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit} -import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._ import enumeratum._ import io.circe.Decoder.Result import io.circe.syntax._ @@ -327,7 +326,7 @@ case class Events(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] = N logger.warn( s"Events with eid's ${eventIds.map(_.id).mkString(",")} failed to submit, retrying in ${newDuration.toMillis} millis") - akka.pattern.after(newDuration, http.system.scheduler)( + org.apache.pekko.pattern.after(newDuration, http.system.scheduler)( publishWithRecover(name, events, currentNotValidEvents, fillMetadata, newDuration, count + 1)) } } @@ -373,7 +372,7 @@ case class Events(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] = N val newNotValidEvents = (currentNotValidEvents ++ noNeedToRetryResponse).distinct - akka.pattern.after(newDuration, http.system.scheduler)( + org.apache.pekko.pattern.after(newDuration, http.system.scheduler)( publishWithRecover(name, eventsToRetry, newNotValidEvents, fillMetadata, newDuration, count + 1)) } case e: RuntimeException @@ -408,6 +407,8 @@ case class Events(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] = N flowId: FlowId = randomFlowId(), executionContext: ExecutionContext ): Future[Unit] = { + import org.mdedetrich.pekko.http.support.CirceHttpSupport._ + val uri = baseUri_.withPath(baseUri_.path / "event-types" / name.name / "events") diff --git a/src/main/scala/org/zalando/kanadi/api/Registry.scala b/src/main/scala/org/zalando/kanadi/api/Registry.scala index 1be38d7..f979a6c 100644 --- a/src/main/scala/org/zalando/kanadi/api/Registry.scala +++ b/src/main/scala/org/zalando/kanadi/api/Registry.scala @@ -2,13 +2,11 @@ package org.zalando.kanadi.api import java.net.URI -import akka.http.scaladsl.HttpExt -import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.model.{ContentTypes, HttpMethods, HttpRequest, Uri} -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.Materializer +import org.apache.pekko.http.scaladsl.HttpExt +import org.apache.pekko.http.scaladsl.model.headers.RawHeader +import org.apache.pekko.http.scaladsl.model.{ContentTypes, HttpMethods, HttpRequest, Uri} +import org.apache.pekko.stream.Materializer import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit} -import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._ import org.zalando.kanadi.models.HttpHeaders.XFlowID import org.zalando.kanadi.models._ @@ -51,8 +49,7 @@ case class Registry(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] = response <- http.singleRequest(request) result <- { if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[List[String]] + unmarshalAs[List[String]](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) } else processNotSuccessful(request, response) } @@ -103,8 +100,8 @@ case class Registry(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] = response <- http.singleRequest(request) result <- { if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[List[PartitionStrategy]] + unmarshalAs[List[PartitionStrategy]]( + response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) } else processNotSuccessful(request, response) } diff --git a/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala b/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala index 4277b4b..f1ab4f6 100644 --- a/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala +++ b/src/main/scala/org/zalando/kanadi/api/Subscriptions.scala @@ -3,25 +3,23 @@ package org.zalando.kanadi.api import java.net.URI import java.time.OffsetDateTime import java.util.concurrent.ConcurrentHashMap -import akka.NotUsed +import org.apache.pekko.NotUsed import defaults._ -import akka.http.scaladsl.HttpExt -import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model.Uri.Query -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{Connection, RawHeader} -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream._ -import akka.stream.scaladsl._ -import akka.util.ByteString +import org.apache.pekko.http.scaladsl.HttpExt +import org.apache.pekko.http.scaladsl.marshalling.Marshal +import org.apache.pekko.http.scaladsl.model.Uri.Query +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.http.scaladsl.model.headers.{Connection, RawHeader} +import org.apache.pekko.stream._ +import org.apache.pekko.stream.scaladsl._ +import org.apache.pekko.util.ByteString import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit} -import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._ +import org.mdedetrich.pekko.http.support.CirceHttpSupport._ import enumeratum._ import io.circe.{Decoder, Encoder, JsonObject} import io.circe.syntax._ -import org.zalando.kanadi.api.defaults._ +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.zalando.kanadi.models._ -import org.mdedetrich.akka.stream.support.CirceStreamSupport import org.zalando.kanadi.models import scala.collection.JavaConverters._ @@ -504,7 +502,7 @@ object Subscriptions { streamKeepAliveLimit: Option[Int] = None, commitTimeout: Option[FiniteDuration] = None) - /** Nakadi stream represented as an akka-stream [[Source]] + /** Nakadi stream represented as an pekko-stream [[Source]] * @param streamId * @param source * @param request @@ -566,10 +564,10 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid response <- http.singleRequest(request).map(decodeCompressed) result <- { if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[Subscription] - } else + unmarshalAs[Subscription](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) + } else { processNotSuccessful(request, response) + } } } yield result } @@ -667,8 +665,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid response <- http.singleRequest(request).map(decodeCompressed) result <- { if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[SubscriptionQuery] + unmarshalAs[SubscriptionQuery](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) } else response.status match { case _ => processNotSuccessful(request, response) @@ -708,8 +705,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid response.discardEntityBytes() Future.successful(None) } else if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[Subscription] + unmarshalAs[Subscription](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) .map(Some.apply) } else processNotSuccessful(request, response) @@ -784,8 +780,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid response.discardEntityBytes() Future.successful(None) case s if s.isSuccess() => - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[SubscriptionCursor] + unmarshalAs[SubscriptionCursor](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) .map(x => Some(x)) case _ => processNotSuccessful(request, response) } @@ -846,8 +841,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid response.discardEntityBytes() Future.successful(None) } else if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[CommitCursorResponse] + unmarshalAs[CommitCursorResponse](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) .map { commitCursorsResponse => logger.warn( s"SubscriptionId: ${subscriptionId.id.toString}, StreamId: ${streamId.id} At least one cursor failed to commit, details are $commitCursorsResponse") @@ -904,7 +898,6 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid : Graph[FlowShape[ByteString, Either[Throwable, SubscriptionEvent[T]]], NotUsed] = GraphDSL.create() { implicit b => import GraphDSL.Implicits._ - import org.mdedetrich.akka.stream.support.CirceStreamSupport implicit def successDecoder[A](implicit decoder: Decoder[A]): Decoder[Success[A]] = Decoder.instance[Success[A]] { c => @@ -1043,8 +1036,8 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid case _ => if (response.status.isSuccess()) { for { - string <- Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[String] + string <- + unmarshalAs[String](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) result <- Source( string @@ -1238,7 +1231,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid * he gets in a stream. * * This call lets you register a callback which gets execute every time an event is streamed. There are different - * types of callbacks depending on how you want to handle failure. The timeout for the akka http request is the same + * types of callbacks depending on how you want to handle failure. The timeout for the pekko http request is the same * as streamTimeout with a small buffer. Note that typically clients should be using [[eventsStreamedManaged]] as * this will handle disconnects/reconnects * @@ -1463,7 +1456,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid executionContext: ExecutionContext, eventStreamSupervisionDecider: Subscriptions.EventStreamSupervisionDecider ): Future[StreamId] = - akka.pattern + org.apache.pekko.pattern .after(reconnectDelay, http.system.scheduler)( eventsStreamedManaged[T]( subscriptionId, @@ -1488,9 +1481,10 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid /** Creates an event stream using [[eventsStreamed]] however also manages disconnects and reconnects from the server. * Typically clients want to use this as they don't need to handle these situations manually. * - * This uses [[akka.pattern.after]] to recreate the streams in the case of server disconnects/no empty slots and - * cursor resets. The timeouts respectively can be configured with [[HttpConfig.serverDisconnectRetryDelay]] and - * [[HttpConfig.noEmptySlotsCursorResetRetryDelay]]. The `connectionClosedCallback` parameter is still respected. + * This uses [[org.apache.pekko.pattern.after]] to recreate the streams in the case of server disconnects/no empty + * slots and cursor resets. The timeouts respectively can be configured with + * [[HttpConfig.serverDisconnectRetryDelay]] and [[HttpConfig.noEmptySlotsCursorResetRetryDelay]]. The + * `connectionClosedCallback` parameter is still respected. * * NOTE: If the connection is closed by the client explicitly using the [[closeHttpConnection]] method then * [[eventsStreamedManaged]] will not re-establish a connection. @@ -1592,7 +1586,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid }.recoverWith { case _: Subscriptions.Errors.NoEmptySlotsOrCursorReset => logger.info(s"No empty slots/cursor reset, reconnecting in ${kanadiHttpConfig.noEmptySlotsCursorResetRetryDelay .toString()}, SubscriptionId: ${subscriptionId.id.toString}") - akka.pattern.after(kanadiHttpConfig.noEmptySlotsCursorResetRetryDelay, http.system.scheduler)( + org.apache.pekko.pattern.after(kanadiHttpConfig.noEmptySlotsCursorResetRetryDelay, http.system.scheduler)( eventsStreamedSourceManaged[T]( subscriptionId, connectionClosedCallback, @@ -1652,8 +1646,7 @@ case class Subscriptions(baseUri: URI, authTokenProvider: Option[AuthTokenProvid response.discardEntityBytes() Future.successful(None) } else if (response.status.isSuccess()) { - Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) - .to[SubscriptionStats] + unmarshalAs[SubscriptionStats](response.entity.httpEntity.withContentType(ContentTypes.`application/json`)) .map(x => Some(x)) } else processNotSuccessful(request, response) diff --git a/src/main/scala/org/zalando/kanadi/api/SubscriptionsInterface.scala b/src/main/scala/org/zalando/kanadi/api/SubscriptionsInterface.scala index b7c54e4..7ce3923 100644 --- a/src/main/scala/org/zalando/kanadi/api/SubscriptionsInterface.scala +++ b/src/main/scala/org/zalando/kanadi/api/SubscriptionsInterface.scala @@ -1,7 +1,7 @@ package org.zalando.kanadi.api -import akka.stream.UniqueKillSwitch -import akka.stream.scaladsl.Source +import org.apache.pekko.stream.UniqueKillSwitch +import org.apache.pekko.stream.scaladsl.Source import io.circe.Decoder import org.zalando.kanadi.models.{EventTypeName, FlowId, StreamId, SubscriptionId} diff --git a/src/main/scala/org/zalando/kanadi/api/package.scala b/src/main/scala/org/zalando/kanadi/api/package.scala index 639fdd8..ccf28a9 100644 --- a/src/main/scala/org/zalando/kanadi/api/package.scala +++ b/src/main/scala/org/zalando/kanadi/api/package.scala @@ -1,13 +1,13 @@ package org.zalando.kanadi -import akka.http.scaladsl.coding._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.model.{HttpEntity, HttpHeader, HttpRequest, HttpResponse} -import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} -import akka.stream.Materializer +import org.apache.pekko.http.scaladsl.model.headers._ +import org.apache.pekko.http.scaladsl.model.{HttpEntity, HttpHeader, HttpRequest, HttpResponse} +import org.apache.pekko.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import org.apache.pekko.stream.Materializer import com.typesafe.scalalogging.CanLog import io.circe._ import io.circe.parser._ +import org.apache.pekko.util.ByteString import org.zalando.kanadi.models.HttpHeaders.XFlowID import org.slf4j.MDC import org.zalando.kanadi.models._ @@ -27,6 +27,8 @@ package object api { List(RawHeader(XFlowID, flowId.value), `Accept-Encoding`(HttpEncodings.gzip, HttpEncodings.deflate)) def decodeCompressed(response: HttpResponse): HttpResponse = { + import org.apache.pekko.http.scaladsl.coding._ + val decoder = response.encoding match { case HttpEncodings.gzip => Coders.Gzip @@ -42,7 +44,7 @@ package object api { private[api] def toHeader(authToken: AuthToken)(implicit kanadiHttpConfig: HttpConfig): HttpHeader = if (kanadiHttpConfig.censorAuthToken) - CensoredRawHeader("Authorization", s"Bearer ${authToken.value}", "Bearer ") + MaskedRawHeader("Authorization", s"Bearer ${authToken.value}", "Bearer ") else RawHeader("Authorization", s"Bearer ${authToken.value}") private[api] def stripAuthToken(request: HttpRequest)(implicit kanadiHttpConfig: HttpConfig): HttpRequest = { @@ -100,4 +102,13 @@ package object api { tryDecodeAsProblem = maybeStringToProblem(asString) } yield tryDecodeAsProblem.toRight(asString) + + private[api] def unmarshalAs[T: Decoder]( + entity: HttpEntity)(implicit materializer: Materializer, executionContext: ExecutionContext): Future[T] = { + val dataF = entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String) + dataF.flatMap { data => + val errOrDecoder = decode[T](data) + Future.fromTry(errOrDecoder.toTry) + } + } } diff --git a/src/main/scala/org/zalando/kanadi/models/CustomHeaders.scala b/src/main/scala/org/zalando/kanadi/models/CustomHeaders.scala deleted file mode 100644 index 29af8b3..0000000 --- a/src/main/scala/org/zalando/kanadi/models/CustomHeaders.scala +++ /dev/null @@ -1,8 +0,0 @@ -package org.zalando.kanadi.models - -import akka.http.scaladsl.model.headers.RawHeader - -final case class CustomHeaders(headers: Map[String, String]) extends AnyVal { - def toRawHeaders: List[RawHeader] = - headers.toList.map { case (k, v) => RawHeader(k, v) } -} diff --git a/src/main/scala/org/zalando/kanadi/models/GeneralError.scala b/src/main/scala/org/zalando/kanadi/models/GeneralError.scala index 3f53656..5e45dda 100644 --- a/src/main/scala/org/zalando/kanadi/models/GeneralError.scala +++ b/src/main/scala/org/zalando/kanadi/models/GeneralError.scala @@ -1,6 +1,6 @@ package org.zalando.kanadi.models -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import org.apache.pekko.http.scaladsl.model.{HttpRequest, HttpResponse} class GeneralError(val problem: Problem, override val httpRequest: HttpRequest, override val httpResponse: HttpResponse) extends HttpServiceError(httpRequest, httpResponse, Right(problem)) { diff --git a/src/main/scala/org/zalando/kanadi/models/HttpServiceError.scala b/src/main/scala/org/zalando/kanadi/models/HttpServiceError.scala index 771e917..e1ac137 100644 --- a/src/main/scala/org/zalando/kanadi/models/HttpServiceError.scala +++ b/src/main/scala/org/zalando/kanadi/models/HttpServiceError.scala @@ -1,6 +1,6 @@ package org.zalando.kanadi.models -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import org.apache.pekko.http.scaladsl.model.{HttpRequest, HttpResponse} class HttpServiceError(val httpRequest: HttpRequest, val httpResponse: HttpResponse, diff --git a/src/test/resources/logback.xml b/src/test/resources/logback-test.xml similarity index 100% rename from src/test/resources/logback.xml rename to src/test/resources/logback-test.xml diff --git a/src/test/scala/org/zalando/kanadi/AuthorizationSpec.scala b/src/test/scala/org/zalando/kanadi/AuthorizationSpec.scala index efd8694..c4316d5 100644 --- a/src/test/scala/org/zalando/kanadi/AuthorizationSpec.scala +++ b/src/test/scala/org/zalando/kanadi/AuthorizationSpec.scala @@ -2,8 +2,8 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv diff --git a/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala b/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala index db9714d..9f58cc0 100644 --- a/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala +++ b/src/test/scala/org/zalando/kanadi/BadJsonDecodingSpec.scala @@ -2,9 +2,9 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.Supervision +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.Supervision import com.typesafe.config.ConfigFactory import io.circe.{Decoder, Encoder} import org.specs2.Specification @@ -185,7 +185,7 @@ class BadJsonDecodingSpec(implicit ec: ExecutionEnv) extends Specification with } yield subscriptionsClient.closeHttpConnection(subscriptionId, streamId) val waitForCloseFuture = - akka.pattern.after(6 seconds, system.scheduler)(Future.successful(subscriptionClosed)) + org.apache.pekko.pattern.after(6 seconds, system.scheduler)(Future.successful(subscriptionClosed)) val future = for { closed <- closedFuture diff --git a/src/test/scala/org/zalando/kanadi/BasicSourceSpec.scala b/src/test/scala/org/zalando/kanadi/BasicSourceSpec.scala index 2d6022e..b9698ff 100644 --- a/src/test/scala/org/zalando/kanadi/BasicSourceSpec.scala +++ b/src/test/scala/org/zalando/kanadi/BasicSourceSpec.scala @@ -2,9 +2,9 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.scaladsl.{Keep, Sink} +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.scaladsl.{Keep, Sink} import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv diff --git a/src/test/scala/org/zalando/kanadi/BasicSpec.scala b/src/test/scala/org/zalando/kanadi/BasicSpec.scala index 7204b30..81bcb92 100644 --- a/src/test/scala/org/zalando/kanadi/BasicSpec.scala +++ b/src/test/scala/org/zalando/kanadi/BasicSpec.scala @@ -3,8 +3,8 @@ package org.zalando.kanadi import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv @@ -180,7 +180,7 @@ class BasicSpec(implicit ec: ExecutionEnv) extends Specification with FutureMatc } yield subscriptionsClient.closeHttpConnection(subscriptionId, streamId) val waitForCloseFuture = - akka.pattern.after(3 seconds, system.scheduler)(Future.successful(subscriptionClosed.get())) + org.apache.pekko.pattern.after(3 seconds, system.scheduler)(Future.successful(subscriptionClosed.get())) val future = for { closed <- closedFuture diff --git a/src/test/scala/org/zalando/kanadi/CommitCursorBadResponseSpec.scala b/src/test/scala/org/zalando/kanadi/CommitCursorBadResponseSpec.scala index 5d1ab02..6083bd0 100644 --- a/src/test/scala/org/zalando/kanadi/CommitCursorBadResponseSpec.scala +++ b/src/test/scala/org/zalando/kanadi/CommitCursorBadResponseSpec.scala @@ -1,9 +1,9 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.scaladsl.Sink +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.scaladsl.Sink import com.typesafe.config.ConfigFactory import io.circe.JsonObject import org.specs2.Specification diff --git a/src/test/scala/org/zalando/kanadi/CommitCursorOmittedSpec.scala b/src/test/scala/org/zalando/kanadi/CommitCursorOmittedSpec.scala index 20d9fff..12241aa 100644 --- a/src/test/scala/org/zalando/kanadi/CommitCursorOmittedSpec.scala +++ b/src/test/scala/org/zalando/kanadi/CommitCursorOmittedSpec.scala @@ -1,9 +1,9 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.scaladsl.Sink +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.scaladsl.Sink import com.typesafe.config.ConfigFactory import io.circe.JsonObject import org.specs2.Specification diff --git a/src/test/scala/org/zalando/kanadi/NoEmptySlotsSpec.scala b/src/test/scala/org/zalando/kanadi/NoEmptySlotsSpec.scala index 631860f..eb4251a 100644 --- a/src/test/scala/org/zalando/kanadi/NoEmptySlotsSpec.scala +++ b/src/test/scala/org/zalando/kanadi/NoEmptySlotsSpec.scala @@ -3,8 +3,8 @@ package org.zalando.kanadi import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv @@ -133,9 +133,9 @@ class NoEmptySlotsSpec(implicit ec: ExecutionEnv) extends Specification with Fut val future = for { subscriptionId <- currentSubscriptionId.future streamId <- eventualStreamOne - _ <- akka.pattern.after(100 millis, system.scheduler)(Future.successful(())) + _ <- org.apache.pekko.pattern.after(100 millis, system.scheduler)(Future.successful(())) streamTwo = eventualStreamTwo - _ <- akka.pattern.after(100 millis, system.scheduler)(Future.successful(())) + _ <- org.apache.pekko.pattern.after(100 millis, system.scheduler)(Future.successful(())) _ = subscriptionsClient.closeHttpConnection(subscriptionId, streamId) _ <- streamTwo } yield () diff --git a/src/test/scala/org/zalando/kanadi/OAuthFailedSpec.scala b/src/test/scala/org/zalando/kanadi/OAuthFailedSpec.scala index 47a3682..743ed78 100644 --- a/src/test/scala/org/zalando/kanadi/OAuthFailedSpec.scala +++ b/src/test/scala/org/zalando/kanadi/OAuthFailedSpec.scala @@ -1,7 +1,7 @@ package org.zalando.kanadi -import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.execute.Skipped diff --git a/src/test/scala/org/zalando/kanadi/ResetCursorsSpec.scala b/src/test/scala/org/zalando/kanadi/ResetCursorsSpec.scala index bb5d953..1ea7c9c 100644 --- a/src/test/scala/org/zalando/kanadi/ResetCursorsSpec.scala +++ b/src/test/scala/org/zalando/kanadi/ResetCursorsSpec.scala @@ -1,9 +1,9 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.scaladsl.{Keep, Sink} +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.stream.scaladsl.{Keep, Sink} import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv diff --git a/src/test/scala/org/zalando/kanadi/SubscriptionsSpec.scala b/src/test/scala/org/zalando/kanadi/SubscriptionsSpec.scala index 582facf..b6d7e8d 100644 --- a/src/test/scala/org/zalando/kanadi/SubscriptionsSpec.scala +++ b/src/test/scala/org/zalando/kanadi/SubscriptionsSpec.scala @@ -1,8 +1,8 @@ package org.zalando.kanadi import java.util.UUID -import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http import com.typesafe.config.ConfigFactory import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv diff --git a/src/test/scala/org/zalando/kanadi/api/EventPublishRetrySpec.scala b/src/test/scala/org/zalando/kanadi/api/EventPublishRetrySpec.scala index 8d6878c..e0159f0 100644 --- a/src/test/scala/org/zalando/kanadi/api/EventPublishRetrySpec.scala +++ b/src/test/scala/org/zalando/kanadi/api/EventPublishRetrySpec.scala @@ -3,12 +3,11 @@ package org.zalando.kanadi.api import java.net.{ServerSocket, URI} import java.util.UUID import defaults._ -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.StatusCodes -import akka.http.scaladsl.server.Directives._ +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.apache.pekko.http.scaladsl.server.Directives._ import com.typesafe.config.ConfigFactory -import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._ import io.circe._ import org.specs2.Specification import org.specs2.concurrent.ExecutionEnv @@ -19,10 +18,11 @@ import org.zalando.kanadi.models.{EventTypeName, ExponentialBackoffConfig, FlowI import scala.concurrent.{Future, Promise} import scala.concurrent.duration._ -import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase +import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import net.ceedubs.ficus.Ficus._ import org.zalando.kanadi.api.Events.Errors +import org.mdedetrich.pekko.http.support.CirceHttpSupport._ class EventPublishRetrySpec(implicit ec: ExecutionEnv) extends Specification with FutureMatchers with Config { diff --git a/version.sbt b/version.sbt index 349c2b4..bb14533 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "0.11.0-SNAPSHOT" +ThisBuild / version := "0.20.0-SNAPSHOT"