diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..120c689 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" \ No newline at end of file diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..4f13950 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,23 @@ +name: Publish package to GitHub Packages +on: + release: + types: [created] +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: coursier/cache-action@v6 + - uses: actions/setup-java@v2.3.1 + with: + distribution: temurin + java-version: 11 + check-latest: true + - name: Set output + id: vars + run: echo ::set-output name=tag::${GITHUB_REF#refs/*/} + - name: Publish package + run: sbt clean test publish + env: + CREATED_TAG: ${{ steps.vars.outputs.tag }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/scala-steward.yml b/.github/workflows/scala-steward.yml new file mode 100644 index 0000000..333a0b2 --- /dev/null +++ b/.github/workflows/scala-steward.yml @@ -0,0 +1,16 @@ +# This workflow will launch everyday at 00:00 +on: + schedule: + - cron: '0 0 * * *' + +name: Launch Scala Steward + +jobs: + scala-steward: + runs-on: ubuntu-latest + name: Launch Scala Steward + steps: + - name: Launch Scala Steward + uses: scala-steward-org/scala-steward-action@v2 + with: + github-token: ${{ secrets.SCALA_STEWARD }} \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..0ef33e8 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,19 @@ +name: Tests +on: + push: + branches: [main, master] + pull_request: + branches: ["*"] +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: coursier/cache-action@v6 + - uses: actions/setup-java@v2.3.1 + with: + distribution: temurin + java-version: 11 + check-latest: true + - name: Tests + run: sbt clean test diff --git a/.gitignore b/.gitignore index 9c07d4a..2e992c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,111 @@ +### VisualStudioCode template +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace + +# Local History for Visual Studio Code +.history/ + +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +.idea/ + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Scala template *.class *.log + +### SBT template +# Simple Build Tool +# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control + +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +.history +.cache +.lib/ + +### Metals template + # Generated Metals (Scala Language Server) files + # Reference: https://scalameta.org/metals/ +.metals/ +.bloop/ +project/metals.sbt +.bsp/ diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..0257832 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,25 @@ +version = "3.0.8" +runner.dialect = scala213 +maxColumn = 140 +align.preset = most +continuationIndent.defnSite = 2 +assumeStandardLibraryStripMargin = true +docstrings.style = Asterisk +lineEndings = preserve +includeCurlyBraceInSelectChains = false +danglingParentheses.preset = true +optIn.annotationNewlines = true +newlines.alwaysBeforeMultilineDef = false +trailingCommas = preserve + +rewrite.rules = [RedundantBraces, SortModifiers] + +rewrite.sortModifiers.order = [ + "implicit", "override", "private", "protected", "final", "sealed", "abstract", "lazy" +] +rewrite.redundantBraces.generalExpressions = false +rewriteTokens = { + "⇒": "=>" + "→": "->" + "←": "<-" +} diff --git a/README.md b/README.md index e553d51..b0e6d4c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,87 @@ # my_custom_deserializers -Kafka deserializer examples + +Kafka deserializers examples to use to demo/test the Conduktor "custom deserializers" feature. + +You can download the latest jar containing these deserializers here: TODO Jules + +To learn how to use the "custom deserializer" feature provided in Condukot, you can read the documentation of the feature: https://docs.conduktor.io/features/consuming-data/custom-deserializers + +## Deserializers implementation details + +Here's the list of the deserializers available in this jar and a quick explanation of each deserializer behaviour. +These deserializers are for demo/test purpose only. + +### io.example.conduktor.custom.deserializers.constant.ConstantString + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns `this is a message`. + +### io.example.conduktor.custom.deserializers.constant.ConstantChar + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the letter `c`. + +### io.example.conduktor.custom.deserializers.constant.ConstantInt + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the value `1`. + +### io.example.conduktor.custom.deserializers.constant.ConstantDouble + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the value `1.234`. + +### io.example.conduktor.custom.deserializers.constant.ConstantFloat + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the value `1.456`. + +### io.example.conduktor.custom.deserializers.constant.ConstantShort + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the value `2`. + +### io.example.conduktor.custom.deserializers.constant.ConstantBoolean + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the value `true`. + +### io.example.conduktor.custom.deserializers.constant.ConstantByte + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns the value `6`. + +### io.example.conduktor.custom.deserializers.constant.ConstantNull + +This deserializer completely ignores the messages it receives from Kafka. +For each message it receives from Kafka, it returns `null`. + +### io.example.conduktor.custom.deserializers.MyCustomDeserializer + +This deserializer transforms the data (bytes) it receives from Kafka into a `String` (text), +then sees if it matches then following format: +``` +-- this is the serialized data +``` +- If the message received from Kakfa effectively starts with a `--` characters sequence then followed by some text, +it creates a new instance of a data structure named `MyMessage`, that contains only one field named `value` and is of type `String`, as following: + ```scala + MyMessage(value = "this is the serialized data") + ``` + + In Conduktor, this data structure will be interpreted and displayed as JSON: + ```json + { "value": "this is the serialized data" } + ``` + +- If the message received from Kafka doesn't match the expected format, then the deserializer fails with an error message: + ``` + Invalid format received for message: + ``` + +This simple example is here to demonstrate 2 things: + - What's happening when a custom deserializer fails to deserialize a message. + - Give a simple example of "deserialization" (the message has to respect of certain format so that the deserializer can extract the data) + + + diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..322e133 --- /dev/null +++ b/build.sbt @@ -0,0 +1,20 @@ +name := "my_custom_deserializers" +version := sys.env.getOrElse("CREATED_TAG", "0.1") +scalaVersion := "2.13.7" +libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.1" + +// ## Github Packages publish configs +// More info, see: https://gist.github.com/guizmaii/2ca47b74ad8e26c772d7df6ada8ddb00 +val GITHUB_OWNER = "conduktor" +val GITHUB_PROJECT = "my_custom_deserializers" + +ThisBuild / publishTo := Some( + s"GitHub $GITHUB_OWNER Apache Maven Packages" at s"https://maven.pkg.github.com/$GITHUB_OWNER/$GITHUB_PROJECT" +) +ThisBuild / publishMavenStyle := true +ThisBuild / credentials += Credentials( + "GitHub Package Registry", + "maven.pkg.github.com", + GITHUB_OWNER, + System.getenv("GITHUB_TOKEN") +) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..215ddd2 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.5.5 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..f338c2a --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.20") diff --git a/src/main/scala/io/example/conduktor/custom/deserializers/MyCustomDeserializer.scala b/src/main/scala/io/example/conduktor/custom/deserializers/MyCustomDeserializer.scala new file mode 100644 index 0000000..04ca7b6 --- /dev/null +++ b/src/main/scala/io/example/conduktor/custom/deserializers/MyCustomDeserializer.scala @@ -0,0 +1,19 @@ +package io.example.conduktor.custom.deserializers + +import org.apache.kafka.common.serialization.{Deserializer, Serializer} + +import java.nio.charset.StandardCharsets + +final case class MyMessage(value: String) + +final class MyCustomSerializer extends Serializer[MyMessage] { + override def serialize(topic: String, data: MyMessage): Array[Byte] = s"-- ${data.value}".getBytes(StandardCharsets.UTF_8) +} + +final class MyCustomDeserializer extends Deserializer[MyMessage] { + override def deserialize(topic: String, data: Array[Byte]): MyMessage = + new String(data, StandardCharsets.UTF_8) match { + case s"-- $data" => MyMessage(value = data) + case m => throw new RuntimeException(s"Invalid format received for message: $m") + } +} diff --git a/src/main/scala/io/example/conduktor/custom/deserializers/constant/Deserializers.scala b/src/main/scala/io/example/conduktor/custom/deserializers/constant/Deserializers.scala new file mode 100644 index 0000000..3054ab6 --- /dev/null +++ b/src/main/scala/io/example/conduktor/custom/deserializers/constant/Deserializers.scala @@ -0,0 +1,31 @@ +package io.example.conduktor.custom.deserializers.constant + +import org.apache.kafka.common.serialization.Deserializer + +class ConstantString extends Deserializer[String] { + override def deserialize(topic: String, data: Array[Byte]): String = "this is a message" +} +class ConstantChar extends Deserializer[Char] { + override def deserialize(topic: String, data: Array[Byte]): Char = 'c' +} +class ConstantInt extends Deserializer[Int] { + override def deserialize(topic: String, data: Array[Byte]): Int = 1 +} +class ConstantDouble extends Deserializer[Double] { + override def deserialize(topic: String, data: Array[Byte]): Double = 1.234 +} +class ConstantFloat extends Deserializer[Float] { + override def deserialize(topic: String, data: Array[Byte]): Float = 1.456f +} +class ConstantShort extends Deserializer[Short] { + override def deserialize(topic: String, data: Array[Byte]): Short = 2 +} +class ConstantBoolean extends Deserializer[Boolean] { + override def deserialize(topic: String, data: Array[Byte]): Boolean = true +} +class ConstantByte extends Deserializer[Byte] { + override def deserialize(topic: String, data: Array[Byte]): Byte = 6.toByte +} +class ConstantNull extends Deserializer[Any] { + override def deserialize(topic: String, data: Array[Byte]): Any = null +}