Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added compression feature for circe-akka-serializer #160

Merged
merged 9 commits into from
Jun 4, 2022
Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ class ExampleSerializer(actorSystem: ExtendedActorSystem)
override lazy val packagePrefix = "org.project"
}
```
`CirceAkkaSerializer` can be configured to use Gzip compression when serializing payloads greater than defined size (default is without compression). See [default reference.conf file](circe-akka-serializer/src/main/resources/reference.conf) with comments for details.

For more guidelines on how to use the serializer,
read [Akka documentation about serialization](https://doc.akka.io/docs/akka/current/serialization.html),
Expand Down
18 changes: 17 additions & 1 deletion circe-akka-serializer/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Default configuration

org.virtuslab.ash {
verbose-debug-logging = off
circe {
verbose-debug-logging = off
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved

# Settings for compression of the payload - here we decide, if compression should be possible
# when serializing an object. It does not not affect deserialization process - so even with
# `compression.algorithm = off` deserialization of objects compressed with Gzip will be successfull.
compression {
# Compression algorithm:
# - off : no compression
# - gzip : using common java gzip
algorithm = off

# If compression is enabled with the `algorithm` setting,
# the payload will be compressed if its size is bigger than this value.
compress-larger-than = 32 KiB
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
with AkkaCodecs {

private lazy val log = Logging(system, getClass)
private lazy val conf = system.settings.config.getConfig("org.virtuslab.ash")
private lazy val conf = system.settings.config.getConfig("org.virtuslab.ash.circe")
private lazy val isDebugEnabled = conf.getBoolean("verbose-debug-logging") && log.isDebugEnabled
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
private lazy val compressionAlgorithm: Compression.Algorithm = conf.getString("compression.algorithm") match {
case "off" =>
Compression.Off
case "gzip" =>
Compression.GZip(conf.getBytes("compression.compress-larger-than"))
case other =>
throw new IllegalArgumentException(
s"Unknown compression algorithm value: [$other], possible values are: 'off' and 'gzip'")
}
protected val bufferSize: Int = 1024 * 4

override lazy val classTagEvidence: ClassTag[Ser] = implicitly[ClassTag[Ser]]
override lazy val errorCallback: String => Unit = x => log.error(x)
Expand All @@ -58,9 +68,10 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
val startTime = if (isDebugEnabled) System.nanoTime else 0L
codecsMap.get(manifest(o)) match {
case Some((encoder, _)) =>
val res = printer.print(encoder.asInstanceOf[Encoder[AnyRef]](o)).getBytes(UTF_8)
logDuration("Serialization", o, startTime, res)
res
val bytes = printer.print(encoder.asInstanceOf[Encoder[AnyRef]](o)).getBytes(UTF_8)
val result = Compression.compressIfNeeded(bytes, bufferSize, compressionAlgorithm)
logDuration("Serialization", o, startTime, result)
result
case None =>
throw new RuntimeException(
s"Serialization of [${o.getClass.getName}] failed. Call Register[A] for this class or its supertype and append result to `def codecs`.")
Expand All @@ -71,9 +82,10 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
val startTime = if (isDebugEnabled) System.nanoTime else 0L
codecsMap.get(manifestMigrationsMap.getOrElse(manifest, manifest)) match {
case Some((_, decoder)) =>
val res = parser.parseByteArray(bytes).flatMap(_.as(decoder)).fold(e => throw e, identity)
logDuration("Deserialization", res, startTime, bytes)
res
val decompressedBytes = Compression.decompressIfNeeded(bytes, bufferSize)
val result = parser.parseByteArray(decompressedBytes).flatMap(_.as(decoder)).fold(e => throw e, identity)
logDuration("Deserialization", result, startTime, bytes)
result
case None =>
throw new NotSerializableException(
s"Manifest [$manifest] did not match any known codec. If you're not currently performing a rolling upgrade, you must add a manifest migration to correct codec.")
Expand Down Expand Up @@ -115,4 +127,5 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
bytes.length)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {

val errorCallback: String => Unit

Seq(
(codecs, "codecs"),
(manifestMigrations, "manifestMigrations"),
(packagePrefix, "packagePrefix"),
(classTagEvidence, "classTagEvidence"),
(errorCallback, "errorCallback")).foreach { x =>
assert(x._1 != null, s"${x._2} must be declared as a def or a lazy val to work correctly")
}

private val mirror = ru.runtimeMirror(getClass.getClassLoader)

protected val codecsMap: Map[String, (Encoder[_ <: Ser], Decoder[_ <: Ser])] = codecs
Expand All @@ -85,8 +76,9 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
.toMap
.withDefaultValue("")

def manifest(o: AnyRef): String = parentsUpToRegisteredTypeMap(o.getClass.getName)

/**
* Decoder apply method - decodes from Json into an object of type Ser
*/
override def apply(c: HCursor): Result[Ser] = {
c.value.asObject match {
case Some(obj) =>
Expand All @@ -103,6 +95,9 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
}
}

/**
* Encoder apply method - encodes given object of type Ser into Json
*/
override def apply(a: Ser): Json = {
val manifestString = manifest(a)
val encoder = codecsMap.get(manifestString) match {
Expand All @@ -114,6 +109,33 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
Json.obj((manifestString, encoder.asInstanceOf[Encoder[Ser]](a)))
}

def manifest(o: AnyRef): String = parentsUpToRegisteredTypeMap(o.getClass.getName)

/*
* All code below serves as a check - it checks,
* whether class extending this trait is a valid implementation.
* doNeededChecksOnStart() gets invoked on object creation.
*/
doNeededChecksOnStart()

private def doNeededChecksOnStart(): Unit = {
checkImplementationForInvalidMemberDeclarations()
checkSerializableTypesForMissingCodec(packagePrefix)
checkCodecsForNull()
checkCodecsForDuplication()
}

private def checkImplementationForInvalidMemberDeclarations(): Unit = {
Seq(
(codecs, "codecs"),
(manifestMigrations, "manifestMigrations"),
(packagePrefix, "packagePrefix"),
(classTagEvidence, "classTagEvidence"),
(errorCallback, "errorCallback")).foreach { x =>
assert(x._1 != null, s"${x._2} must be declared as a def or a lazy val to work correctly")
}
}

private def checkSerializableTypesForMissingCodec(packagePrefix: String): Unit = {
val reflections = new Reflections(packagePrefix)
val foundSerializables = reflections.getSubTypesOf(classTag[Ser].runtimeClass).asScala.filterNot(_.isInterface)
Expand Down Expand Up @@ -142,8 +164,4 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
errorCallback(s"Codec for class ${x._1} has been declared multiple times with types ${x._2.mkString(",")}.")
}
}

checkSerializableTypesForMissingCodec(packagePrefix)
checkCodecsForNull()
checkCodecsForDuplication()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.virtuslab.ash.circe

import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream

import scala.annotation.tailrec

object Compression {
sealed trait Algorithm
case object Off extends Algorithm
case class GZip(greaterThan: Long) extends Algorithm
// TODO (#159): add support for LZ4 java compression
// case class LZ4(greaterThan: Long) extends Algorithm

private[circe] def compressIfNeeded(
bytes: Array[Byte],
bufferSize: Int,
compressionAlgorithm: Algorithm): Array[Byte] =
compressionAlgorithm match {
case Compression.Off => bytes
case Compression.GZip(largerThan) =>
if (bytes.length <= largerThan) {
bytes
} else {
val byteArrayOutputStream = new ByteArrayOutputStream(bufferSize)
val outputStream = new GZIPOutputStream(byteArrayOutputStream)
try outputStream.write(bytes)
finally outputStream.close()
byteArrayOutputStream.toByteArray
}
}

private[circe] def decompressIfNeeded(bytes: Array[Byte], bufferSize: Int): Array[Byte] =
if (isCompressedWithGzip(bytes)) {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(bytes))
val outputStream = new ByteArrayOutputStream()
val buffer = new Array[Byte](bufferSize)

@tailrec def readChunk(): Unit =
inputStream.read(buffer) match {
case -1 => ()
case n =>
outputStream.write(buffer, 0, n)
readChunk()
}

try readChunk()
finally inputStream.close()
outputStream.toByteArray
} else {
bytes
}

/*
Since we are encoding JSON for Ser <: AnyRef types - they can start with:
a) '{' char for Json objects or
b) '[' char for Arrays or
c) '"' char for String
Thus, the first element of the `bytes` array could be one of three below:
a) 123 Byte number - which is the decimal representation of the { character
b) 91 Byte number - which is the decimal representation of the [ character
c) 34 Byte number - which is the decimal representation of the " character

So, below quick comment on why isCompressedWithGzip will not return false positives (for not compressed JSON data):

bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte
gets evaluated to:
bytes(0) == 35615.toByte
which gets evaluated to:
bytes(0) == 31 // where 31 is of type Byte
And since bytes(0) holds a Byte with value equal to 123, 91 or 34 - this will never be true.
*/
private[circe] def isCompressedWithGzip(bytes: Array[Byte]): Boolean =
(bytes != null) && (bytes.length >= 2) &&
(bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) &&
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved
(bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte)
}
24 changes: 19 additions & 5 deletions circe-akka-serializer/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,23 @@ akka.actor {
allow-java-serialization = off
}

#Uncomment below to enable logging

# Uncomment chosen settings below to enable debug logging OR change compression settings
#akka.loglevel = "DEBUG"
#org.virtuslab.ash {
# verbose-debug-logging = on
#}
org.virtuslab.ash {
circe {
# enables debug logging
#verbose-debug-logging = on

# settings for compression of the payload
compression {
# Compression algorithm.
# - off : no compression
# - gzip : using common java gzip
algorithm = off
LukaszKontowski marked this conversation as resolved.
Show resolved Hide resolved

# If compression is enabled with the `algorithm` setting,
# the payload will be compressed if it's size is bigger than this value.
compress-larger-than = 1 KiB # value set for testing purposes - do not change
}
}
}
Loading