From 8fdf09c91bc20b206c17f62373edffe577bb77bc Mon Sep 17 00:00:00 2001 From: david-masters Date: Mon, 23 Sep 2024 17:06:49 -0400 Subject: [PATCH] Qu 1512 avro decoding for v2 ingest # Description GitOrigin-RevId: ad6aefac0505dfd2090ca1773ccd8ea2df6fb8d1 --- build.sbt | 1 + project/Dependencies.scala | 1 + .../thatdot/quine/routes/IngestRoutes.scala | 1 + .../com/thatdot/quine/app/QuineApp.scala | 5 +- .../app/ingest2/codec/FrameDecoder.scala | 37 ++++- .../app/ingest2/core/DataFoldableFrom.scala | 43 ++++++ .../app/ingest2/source/DecodedSource.scala | 14 +- .../app/ingest2/sources/FileSource.scala | 4 +- .../quine/app/routes/IngestStreamState.scala | 9 +- .../app/serialization/AvroSchemaCache.scala | 51 +++++++ .../serialization/ProtobufSchemaCache.scala | 4 +- ...bufSchemaError.scala => SchemaError.scala} | 11 ++ .../v2api/endpoints/V2IngestEntities.scala | 8 + .../quine/ingest2/core/AvroDecoderTest.scala | 138 ++++++++++++++++++ 14 files changed, 314 insertions(+), 13 deletions(-) create mode 100644 quine/src/main/scala/com/thatdot/quine/app/serialization/AvroSchemaCache.scala rename quine/src/main/scala/com/thatdot/quine/app/serialization/{ProtobufSchemaError.scala => SchemaError.scala} (74%) create mode 100644 quine/src/test/scala/com/thatdot/quine/ingest2/core/AvroDecoderTest.scala diff --git a/build.sbt b/build.sbt index 74182b47..e7e66485 100644 --- a/build.sbt +++ b/build.sbt @@ -311,6 +311,7 @@ lazy val `quine`: Project = project "org.webjars.npm" % "sugar-date" % sugarV, "org.webjars.npm" % "vis-network" % visNetworkV, "org.xerial.snappy" % "snappy-java" % snappyV, + "org.apache.avro" % "avro" % avroV, ), ) .enablePlugins(WebScalaJSBundlerPlugin) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 468feb45..7b0b2c72 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -83,4 +83,5 @@ object Dependencies { val circeOpticsV = "0.15.0" val visNetworkV = "8.2.0" val webjarsLocatorV = "0.52" + val avroV = "1.12.0" } diff --git a/quine-endpoints/src/main/scala/com/thatdot/quine/routes/IngestRoutes.scala b/quine-endpoints/src/main/scala/com/thatdot/quine/routes/IngestRoutes.scala index f3f5cefd..cde7644d 100644 --- a/quine-endpoints/src/main/scala/com/thatdot/quine/routes/IngestRoutes.scala +++ b/quine-endpoints/src/main/scala/com/thatdot/quine/routes/IngestRoutes.scala @@ -725,6 +725,7 @@ object FileIngestFormat { require(delimiter != escapeChar, "Different characters must be used for `delimiter` and `escapeChar`.") require(quoteChar != escapeChar, "Different characters must be used for `quoteChar` and `escapeChar`.") } + } @title("File Ingest Mode") diff --git a/quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala b/quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala index ef6b83a2..a856489d 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala @@ -21,7 +21,7 @@ import com.thatdot.quine.app.ingest.serialization.{CypherParseProtobuf, CypherTo import com.thatdot.quine.app.ingest.{IngestSrcDef, QuineIngestSource} import com.thatdot.quine.app.ingest2.source.DecodedSource import com.thatdot.quine.app.routes._ -import com.thatdot.quine.app.serialization.ProtobufSchemaCache +import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache} import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestConfiguration => V2IngestConfiguration} import com.thatdot.quine.compiler.cypher import com.thatdot.quine.compiler.cypher.{CypherStandingWiretap, registerUserDefinedProcedure} @@ -406,6 +406,7 @@ final class QuineApp(graph: GraphService)(implicit val logConfig: LogConfig) } private[this] val protobufSchemaCache: ProtobufSchemaCache = new ProtobufSchemaCache.AsyncLoading(graph.dispatchers) + private[this] val avroSchemaCache: AvroSchemaCache = new AvroSchemaCache.AsyncLoading(graph.dispatchers) def addIngestStream( name: String, @@ -521,7 +522,7 @@ final class QuineApp(graph: GraphService)(implicit val logConfig: LogConfig) metrics, meter, graph, - )(protobufSchemaCache, logConfig) + )(protobufSchemaCache, avroSchemaCache, logConfig) trySource.map { quineIngestSrc => val streamSource = quineIngestSrc.stream( diff --git a/quine/src/main/scala/com/thatdot/quine/app/ingest2/codec/FrameDecoder.scala b/quine/src/main/scala/com/thatdot/quine/app/ingest2/codec/FrameDecoder.scala index ab7f3015..f0c4cbc4 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/ingest2/codec/FrameDecoder.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/ingest2/codec/FrameDecoder.scala @@ -10,11 +10,15 @@ import scala.util.{Success, Try} import com.google.protobuf.{Descriptors, DynamicMessage} import io.circe.{Json, parser} +import org.apache.avro.Schema +import org.apache.avro.file.SeekableByteArrayInput +import org.apache.avro.generic.{GenericDatumReader, GenericRecord} +import org.apache.avro.io.DecoderFactory import org.apache.commons.csv.CSVFormat import com.thatdot.quine.app.ingest2.core.{DataFoldableFrom, DataFolderTo} import com.thatdot.quine.app.ingest2.sources.DEFAULT_CHARSET -import com.thatdot.quine.app.serialization.ProtobufSchemaCache +import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache} import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestFormat => V2IngestFormat} import com.thatdot.quine.graph.cypher @@ -88,6 +92,32 @@ case class ProtobufDecoder(query: String, parameter: String = "that", schemaUrl: } +case class AvroDecoder(schemaUrl: String)(implicit schemaCache: AvroSchemaCache) extends FrameDecoder[GenericRecord] { + + // this is a blocking call, but it should only actually block until the first time a type is successfully + // loaded. + // + // This was left as blocking because lifting the effect to a broader context would mean either: + // - making ingest startup async, which would require extensive changes to QuineApp, startup, and potentially + // clustering protocols, OR + // - making the decode bytes step of ingest async, which violates the Kafka APIs expectation that a + // `org.apache.kafka.common.serialization.Deserializer` is synchronous. + val schema: Schema = Await.result( + schemaCache.getSchema(filenameOrUrl(schemaUrl)), + Duration.Inf, + ) + + val foldable: DataFoldableFrom[GenericRecord] = DataFoldableFrom.avroDataFoldable + + def decode(bytes: Array[Byte]): Try[GenericRecord] = Try { + val datumReader = new GenericDatumReader[GenericRecord](schema) + val inputStream = new SeekableByteArrayInput(bytes) + val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) + datumReader.read(null, decoder) + } + +} + case class CsvVecDecoder(delimiterChar: Char, quoteChar: Char, escapeChar: Char, charset: Charset = DEFAULT_CHARSET) extends FrameDecoder[Iterable[String]] { @@ -132,7 +162,9 @@ case class CsvMapDecoder( } object FrameDecoder { - def apply(format: V2IngestFormat)(implicit protobufCache: ProtobufSchemaCache): FrameDecoder[_] = format match { + def apply( + format: V2IngestFormat, + )(implicit protobufCache: ProtobufSchemaCache, avroCache: AvroSchemaCache): FrameDecoder[_] = format match { case V2IngestEntities.JsonIngestFormat => JsonDecoder case V2IngestEntities.CsvIngestFormat(headers, delimiter, quote, escape) => headers match { @@ -152,6 +184,7 @@ object FrameDecoder { ProtobufDecoder("query TBD", "paramter TBD", schemaUrl, typeName) //Query,Parameter tbd case V2IngestEntities.RawIngestFormat => CypherRawDecoder case V2IngestEntities.DropFormat => DropDecoder + case V2IngestEntities.AvroIngestFormat(schemaUrl) => AvroDecoder(schemaUrl = schemaUrl) } def apply(format: StreamedRecordFormat)(implicit protobufCache: ProtobufSchemaCache): FrameDecoder[_] = diff --git a/quine/src/main/scala/com/thatdot/quine/app/ingest2/core/DataFoldableFrom.scala b/quine/src/main/scala/com/thatdot/quine/app/ingest2/core/DataFoldableFrom.scala index cd4645f2..94492b5d 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/ingest2/core/DataFoldableFrom.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/ingest2/core/DataFoldableFrom.scala @@ -3,6 +3,7 @@ package com.thatdot.quine.app.ingest2.core import scala.collection.{SeqView, View, mutable} import io.circe.{Json, JsonNumber, JsonObject} +import org.apache.avro.generic.{GenericArray, GenericEnumSymbol, GenericFixed, GenericRecord} import com.thatdot.quine.graph.cypher.Expr import com.thatdot.quine.util.Log.{LazySafeLogging, Safe, SafeLoggableInterpolator} @@ -213,4 +214,46 @@ object DataFoldableFrom { } } + implicit val avroDataFoldable: DataFoldableFrom[GenericRecord] = new DataFoldableFrom[GenericRecord] { + + private def foldMapLike[B](kv: Iterable[(String, Any)], folder: DataFolderTo[B]): B = { + val mapBuilder = folder.mapBuilder() + kv.foreach { case (k, v) => mapBuilder.add(k, foldField(v, folder)) } + mapBuilder.finish() + } + + //All of the underlying types for avro were taken from here: https://stackoverflow.com/questions/34070028/get-a-typed-value-from-an-avro-genericrecord/34234039#34234039 + private def foldField[B](field: Any, folder: DataFolderTo[B]): B = field match { + case b: java.lang.Boolean if b => folder.trueValue + case b: java.lang.Boolean if !b => folder.falseValue + case i: java.lang.Integer => folder.integer(i.longValue) + case i: java.lang.Long => folder.integer(i) + case f: java.lang.Float => folder.floating(f.doubleValue) + case d: java.lang.Double => folder.floating(d) + case bytes: java.nio.ByteBuffer => folder.bytes(bytes.array) + case str: CharSequence => folder.string(str.toString) + case record: GenericRecord => + foldMapLike( + record.getSchema.getFields.asScala.collect { + case k if record.hasField(k.name) => (k.name, record.get(k.name)) + }, + folder, + ) + case map: java.util.Map[_, _] => foldMapLike(map.asScala.map { case (k, v) => (k.toString, v) }, folder) + case symbol: GenericEnumSymbol[_] => folder.string(symbol.toString) + case array: GenericArray[_] => + val vector = folder.vectorBuilder() + array.forEach(elem => vector.add(foldField(elem, folder))) + vector.finish() + case fixed: GenericFixed => folder.bytes(fixed.bytes) + case n if n == null => folder.nullValue + case other => + throw new IllegalArgumentException( + s"Got an unexpected value: ${other} of type: ${other.getClass.getName} from avro. This shouldn't happen...", + ) + } + + override def fold[B](record: GenericRecord, folder: DataFolderTo[B]): B = foldField(record, folder) + } + } diff --git a/quine/src/main/scala/com/thatdot/quine/app/ingest2/source/DecodedSource.scala b/quine/src/main/scala/com/thatdot/quine/app/ingest2/source/DecodedSource.scala index da241528..dad39421 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/ingest2/source/DecodedSource.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/ingest2/source/DecodedSource.scala @@ -20,7 +20,7 @@ import com.thatdot.quine.app.ingest2.sources.S3Source.s3Source import com.thatdot.quine.app.ingest2.sources.StandardInputSource.stdInSource import com.thatdot.quine.app.ingest2.sources._ import com.thatdot.quine.app.routes.{IngestMeter, IngestMetered} -import com.thatdot.quine.app.serialization.ProtobufSchemaCache +import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache} import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{ CsvIngestFormat, IngestFormat, @@ -353,7 +353,10 @@ object DecodedSource extends LazySafeLogging { import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities._ //V2 configuration - def apply(src: FramedSource, format: IngestFormat)(implicit protobufCache: ProtobufSchemaCache): DecodedSource = + def apply(src: FramedSource, format: IngestFormat)(implicit + protobufCache: ProtobufSchemaCache, + avroCache: AvroSchemaCache, + ): DecodedSource = src.toDecoded(FrameDecoder(format)) def apply( @@ -361,7 +364,12 @@ object DecodedSource extends LazySafeLogging { config: IngestConfiguration, meter: IngestMeter, system: ActorSystem, - )(implicit protobufCache: ProtobufSchemaCache, ec: ExecutionContext, logConfig: LogConfig): DecodedSource = + )(implicit + protobufCache: ProtobufSchemaCache, + avroCache: AvroSchemaCache, + ec: ExecutionContext, + logConfig: LogConfig, + ): DecodedSource = config.source match { case FileIngest(path, mode, maximumLineSize, startOffset, limit, charset, recordDecoders) => FileSource.decodedSourceFromFileStream( diff --git a/quine/src/main/scala/com/thatdot/quine/app/ingest2/sources/FileSource.scala b/quine/src/main/scala/com/thatdot/quine/app/ingest2/sources/FileSource.scala index ea203d7b..ed5eea07 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/ingest2/sources/FileSource.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/ingest2/sources/FileSource.scala @@ -129,8 +129,8 @@ object FileSource extends LazyLogging { ).decodedSource // TODO Protobuf, Raw, Drop not supported on file types since there is no way to delimit them: - case V2IngestEntities.ProtobufIngestFormat(_, _) | V2IngestEntities.RawIngestFormat | - V2IngestEntities.DropFormat => + case V2IngestEntities.AvroIngestFormat(_) | V2IngestEntities.ProtobufIngestFormat(_, _) | + V2IngestEntities.RawIngestFormat | V2IngestEntities.DropFormat => throw new UnsupportedOperationException(s"Ingest format $format not supported on file-like sources") } diff --git a/quine/src/main/scala/com/thatdot/quine/app/routes/IngestStreamState.scala b/quine/src/main/scala/com/thatdot/quine/app/routes/IngestStreamState.scala index be2c998f..d6307d79 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/routes/IngestStreamState.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/routes/IngestStreamState.scala @@ -7,7 +7,7 @@ import org.apache.pekko.util.Timeout import com.thatdot.quine.app.NamespaceNotFoundException import com.thatdot.quine.app.ingest.QuineIngestSource import com.thatdot.quine.app.ingest2.source.{DecodedSource, QuineValueIngestQuery} -import com.thatdot.quine.app.serialization.ProtobufSchemaCache +import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache} import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestConfiguration => V2IngestConfiguration} import com.thatdot.quine.graph.{CypherOpsGraph, MemberIdx, NamespaceId, defaultNamespaceId, namespaceToString} import com.thatdot.quine.routes._ @@ -121,7 +121,11 @@ trait IngestStreamState { metrics: IngestMetrics, meter: IngestMeter, graph: CypherOpsGraph, - )(implicit protobufCache: ProtobufSchemaCache, logConfig: LogConfig): Try[QuineIngestSource] = + )(implicit + protobufCache: ProtobufSchemaCache, + avroCache: AvroSchemaCache, + logConfig: LogConfig, + ): Try[QuineIngestSource] = ingestStreams.get(intoNamespace) match { // TODO Note for review comparison: v1 version fails silently here. // TODO Also, shouldn't this just add the namespace if it's not found? @@ -138,6 +142,7 @@ trait IngestStreamState { //TODO should return ValidatedNel[IngestName, DecodedSource] val decodedSource: DecodedSource = DecodedSource.apply(name, settings, meter, graph.system)( protobufCache, + avroCache, graph.materializer.executionContext, logConfig, ) diff --git a/quine/src/main/scala/com/thatdot/quine/app/serialization/AvroSchemaCache.scala b/quine/src/main/scala/com/thatdot/quine/app/serialization/AvroSchemaCache.scala new file mode 100644 index 00000000..70a187dc --- /dev/null +++ b/quine/src/main/scala/com/thatdot/quine/app/serialization/AvroSchemaCache.scala @@ -0,0 +1,51 @@ +package com.thatdot.quine.app.serialization + +import java.net.URL + +import scala.concurrent.{ExecutionContext, Future, blocking} +import scala.util.Using + +import com.github.blemale.scaffeine.{AsyncLoadingCache, Scaffeine} +import org.apache.avro.Schema + +import com.thatdot.quine.app.serialization.AvroSchemaError.{InvalidAvroSchema, UnreachableAvroSchema} +import com.thatdot.quine.util.ComputeAndBlockingExecutionContext + +/** Provides common utilities for its inheritors to parse avro objects. + */ +trait AvroSchemaCache { + def getSchema(schemaUrl: URL): Future[Schema] + +} +object AvroSchemaCache { + class AsyncLoading(val ecs: ComputeAndBlockingExecutionContext) extends AvroSchemaCache { + private val avroSchemaCache: AsyncLoadingCache[URL, Schema] = + Scaffeine() + .maximumSize(5) + .buildAsyncFuture { schemaUrl => + // NB if this Future fails (with an error), the cache will not store the schema. + // This allows the user to retry the schema resolution after updating their environment + resolveSchema(schemaUrl)(ecs.blockingDispatcherEC) + } + + /** Invalidate the schema for the given URI. This will cause the next call to [[avroSchemaCache.get]] + * to re-parse the schema. This may be desirable when, for example, a message type lookup fails, even if the + * schema lookup succeeds (so that the user can update their schema file to include the missing type). + */ + def flush(uri: URL): Unit = + avroSchemaCache.put(uri, Future.successful(null)) + + def getSchema(schemaUrl: URL): Future[Schema] = + avroSchemaCache.get(schemaUrl) + + val parser = new org.apache.avro.Schema.Parser() + + private[this] def resolveSchema(uri: URL)(blockingEc: ExecutionContext): Future[Schema] = + Future(blocking { + Using.resource(uri.openStream())(parser.parse) + })(blockingEc).recoverWith { + case e: org.apache.avro.SchemaParseException => Future.failed(new InvalidAvroSchema(uri, e)) + case e: java.io.IOException => Future.failed(new UnreachableAvroSchema(uri, e)) + }(blockingEc) + } +} diff --git a/quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaCache.scala b/quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaCache.scala index 6aa113d5..6fd43f11 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaCache.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaCache.scala @@ -61,7 +61,8 @@ object ProtobufSchemaCache { * to re-parse the schema. This may be desirable when, for example, a message type lookup fails, even if the * schema lookup succeeds (so that the user can update their schema file to include the missing type). */ - def flush(uri: URL): Unit = parsedDescriptorCache.put(uri, Future.successful(null)) + def flush(uri: URL): Unit = + parsedDescriptorCache.put(uri, Future.successful(null)) def getSchema(schemaUrl: URL): Future[DynamicSchema] = parsedDescriptorCache.get(schemaUrl) @@ -108,5 +109,4 @@ object ProtobufSchemaCache { new AmbiguousMessageType(messageType, messagesFoundByShortName) } } - } diff --git a/quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaError.scala b/quine/src/main/scala/com/thatdot/quine/app/serialization/SchemaError.scala similarity index 74% rename from quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaError.scala rename to quine/src/main/scala/com/thatdot/quine/app/serialization/SchemaError.scala index 5ef85d5c..171a34b1 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/serialization/ProtobufSchemaError.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/serialization/SchemaError.scala @@ -3,10 +3,21 @@ package com.thatdot.quine.app.serialization import java.net.URL sealed trait ProtobufSchemaError extends IllegalArgumentException +sealed trait AvroSchemaError extends IllegalArgumentException sealed trait ProtobufSchemaMessageTypeException extends ProtobufSchemaError { def typeName: String } +object AvroSchemaError { + class UnreachableAvroSchema(val fileUri: URL, cause: java.io.IOException) + extends IllegalArgumentException(s"Unreachable avro schema file: $fileUri", cause) + with AvroSchemaError + class InvalidAvroSchema(val fileUri: URL, cause: Throwable) + extends IllegalArgumentException(s"Invalid avro schema file: $fileUri", cause) + with AvroSchemaError + +} + object ProtobufSchemaError { class UnreachableProtobufSchema(val fileUri: URL, cause: java.io.IOException) extends IllegalArgumentException(s"Unreachable protobuf schema file: $fileUri", cause) diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntities.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntities.scala index b2333a2f..c30d9242 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntities.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntities.scala @@ -222,6 +222,14 @@ object V2IngestEntities { typeName: String, ) extends IngestFormat + @title("Avro format") + case class AvroIngestFormat( + @description( + "URL (or local filename) of the file to load to parse the avro schema.", + ) + schemaUrl: String, + ) extends IngestFormat + case object RawIngestFormat extends IngestFormat case object DropFormat extends IngestFormat diff --git a/quine/src/test/scala/com/thatdot/quine/ingest2/core/AvroDecoderTest.scala b/quine/src/test/scala/com/thatdot/quine/ingest2/core/AvroDecoderTest.scala new file mode 100644 index 00000000..19ca288a --- /dev/null +++ b/quine/src/test/scala/com/thatdot/quine/ingest2/core/AvroDecoderTest.scala @@ -0,0 +1,138 @@ +package com.thatdot.quine.ingest2.core + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets + +import scala.collection.immutable.{SortedMap, TreeMap} +import scala.jdk.CollectionConverters._ + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import com.thatdot.quine.app.ingest2.core.DataFoldableFrom + +class AvroDecoderTest extends AnyFunSpec with Matchers { + + def canonicalize(v: Any): Any = v match { + case b: Array[_] => b.toVector + case m: Map[_, _] => m.view.mapValues(canonicalize).toMap + case m: java.util.Map[_, _] => m.asScala.view.mapValues(canonicalize).toMap + case bytes: ByteBuffer => bytes.array().toVector + case _ => v + } + + it("Avro - simple types") { + val schema1 = new Schema.Parser().parse(""" + |{ + | "type": "record", + | "name": "testRecord", + | "fields": [ + | {"name": "astring", "type": "string"}, + | {"name": "anull", "type": "null"}, + | {"name": "abool", "type": "boolean"}, + | {"name": "aint", "type": "int"}, + | {"name": "along", "type": "long"}, + | {"name": "afloat", "type": "float"}, + | {"name": "adouble", "type": "double"}, + | {"name": "abytes", "type": "bytes"} + | ] + |} + |""".stripMargin) + val record1: GenericData.Record = new GenericData.Record(schema1) + val fieldVals = SortedMap[String, Any]( + ("astring" -> "string1"), + ("anull" -> null), + ("abool" -> true), + ("aint" -> 100), + ("along" -> Long.MaxValue), + ("afloat" -> 101F), + ("adouble" -> Double.MaxValue), + ("abytes" -> ByteBuffer.wrap("some bytes".getBytes(StandardCharsets.UTF_8))), + ) + fieldVals.foreach { case (s, v) => record1.put(s, v) } + val result = + DataFoldableFrom.avroDataFoldable.fold(record1, FoldableTestData.toAnyDataFolder).asInstanceOf[TreeMap[Any, Any]] + assert(canonicalize(result) == canonicalize(fieldVals)) + } + + it("Avro - record of records") { + val schema1 = new Schema.Parser().parse(""" + |{ + | "name": "multi", + | "type": "record", + | "fields": [ + | { + | "name": "left", + | "type": { + | "name": "leftT", + | "type": "record", + | "fields": [ {"name": "leftA", "type": "string"}, {"name": "leftB", "type": "int"} ] + | } + | }, + | { + | "name": "right", + | "type": { + | "name": "rightT", + | "type": "record", + | "fields": [ {"name": "rightA", "type": "boolean"}, {"name": "rightB", "type": "string"} ] + | } + | } + | ] + |} + |""".stripMargin) + val left: GenericData.Record = new GenericData.Record(schema1.getField("left").schema()) + left.put("leftA", "a string") + left.put("leftB", 101) + val right: GenericData.Record = new GenericData.Record(schema1.getField("right").schema) + right.put("rightA", false) + right.put("rightB", "another string") + val record: GenericData.Record = new GenericData.Record(schema1) + record.put("left", left) + record.put("right", right) + val result = DataFoldableFrom.avroDataFoldable.fold(record, FoldableTestData.toAnyDataFolder) + assert( + result == TreeMap[String, TreeMap[String, Any]]( + ("left" -> TreeMap[String, Any](("leftA" -> "a string"), ("leftB" -> 101))), + ("right" -> TreeMap[String, Any](("rightA" -> false), ("rightB" -> "another string"))), + ), + ) + } + it("Avro - array of maps") { + val schema1 = new Schema.Parser().parse(""" + | { + | "name": "ArrayOfMaps", + | "type": "record", + | "fields": [{ + | "name": "alist", + | "type": { + | "type": "array", + | "items": { + | "type": "map", + | "values": "long" + | } + | } + | }] + | } + |""".stripMargin) + val record: GenericData.Record = new GenericData.Record(schema1) + val maps: List[java.util.Map[String, Long]] = List( + Map(("k1a" -> 101L), ("k1b" -> 102L)).asJava, + Map(("k2a" -> 102L), ("k2b" -> 103L)).asJava, + ) + record.put( + "alist", + new GenericData.Array[java.util.Map[String, Long]](schema1.getField("alist").schema(), maps.asJava), + ) + val result = DataFoldableFrom.avroDataFoldable.fold(record, FoldableTestData.toAnyDataFolder) + assert( + canonicalize(result) == Map( + ("alist" -> List( + Map(("k1a" -> 101), ("k1b" -> 102)), + Map(("k2a" -> 102), ("k2b" -> 103)), + )), + ), + ) + } +}