From 0439a8298b4d091e1a2a631dfa63b9e4c077f9f3 Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Fri, 25 Oct 2024 10:24:57 +0800 Subject: [PATCH 1/2] Add in support for ODCS v3 --- ...taContractStandardDataSourceMetadata.scala | 97 ++------- .../OpenDataContractStandardV2Mapper.scala | 102 +++++++++ .../OpenDataContractStandardV3Mapper.scala | 82 +++++++ .../OpenDataContractStandardV3Models.scala | 202 ++++++++++++++++++ ...OpenDataContractStandardV2MapperTest.scala | 7 + ...OpenDataContractStandardV3MapperTest.scala | 93 ++++++++ 6 files changed, 498 insertions(+), 85 deletions(-) create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala create mode 100644 app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala create mode 100644 app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala create mode 100644 app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala index b73134e..be8887b 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala @@ -5,7 +5,7 @@ import io.github.datacatering.datacaterer.api.model.Constants._ import io.github.datacatering.datacaterer.api.model.{ArrayType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType} import io.github.datacatering.datacaterer.core.exception.{InvalidDataContractFileFormatException, MissingDataContractFilePathException} import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata -import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset, OpenDataContractStandardV3} import io.github.datacatering.datacaterer.core.generator.metadata.datasource.{DataSourceMetadata, SubDataSourceMetadata} import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil import org.apache.log4j.Logger @@ -20,8 +20,6 @@ case class OpenDataContractStandardDataSourceMetadata( connectionConfig: Map[String, String], ) extends DataSourceMetadata { - private val LOGGER = Logger.getLogger(getClass.getName) - override val hasSourceData: Boolean = false override def toStepName(options: Map[String, String]): String = { @@ -38,93 +36,22 @@ case class OpenDataContractStandardDataSourceMetadata( optDataContractFile match { case Some(dataContractPath) => val dataContractFile = new File(dataContractPath) - val tryParseYaml = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandard])) - tryParseYaml match { - case Failure(exception) => throw InvalidDataContractFileFormatException(dataContractPath, exception) + val tryParseYamlV2 = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandard])) + tryParseYamlV2 match { + case Failure(exception) => + //try parse as v3 model + val tryParseYamlV3 = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandardV3])) + tryParseYamlV3 match { + case Failure(exception) => throw InvalidDataContractFileFormatException(dataContractPath, exception) + case Success(value) => + value.schema.getOrElse(Array()).map(schema => OpenDataContractStandardV3Mapper.toSubDataSourceMetadata(value, schema, connectionConfig)) + } case Success(value) => // val optSchemaName = connectionConfig.get(DATA_CONTRACT_SCHEMA) //TODO filter for schema if schema name is defined, otherwise return back all schemas - value.dataset.map(dataset => { - val readOptions = getDataSourceOptions(value, dataset) - val columnMetadata = dataset.columns.map(cols => { - val mappedColumns = cols.map(column => { - val dataType = getDataType(dataset, column) - val metadata = getBaseColumnMetadata(column, dataType) - FieldMetadata(column.column, readOptions, metadata) - }).toList - sparkSession.createDataset(mappedColumns) - }) - SubDataSourceMetadata(readOptions, columnMetadata) - }) + value.dataset.map(dataset => OpenDataContractStandardV2Mapper.toSubDataSourceMetadata(value, dataset, connectionConfig)) } case None => throw MissingDataContractFilePathException(name, format) } } - - private def getDataSourceOptions(contract: OpenDataContractStandard, dataset: OpenDataContractStandardDataset): Map[String, String] = { - //identifier should be based on schema name - val baseMap = Map(METADATA_IDENTIFIER -> contract.uuid) - //Tables, BigQuery, serverName, jdbc:driver, myDatabase, user, pw - //(contract.`type`, contract.sourceSystem, contract.server, contract.driver, contract.database, contract.username, contract.password) - val serverMap = contract.server.map(server => Map(URL -> server)).getOrElse(Map()) //TODO assume database would be part of server url? Probably not - val credentialsMap = Map(USERNAME -> contract.username, PASSWORD -> contract.password) //TODO don't need to get credentials as it should be part of data source connection details - .filter(_._2.nonEmpty) - .map(kv => (kv._1, kv._2.getOrElse(""))) - val dataSourceMap = if (contract.driver.map(_.toLowerCase).contains(CASSANDRA_NAME) && contract.database.nonEmpty) { - Map( - FORMAT -> CASSANDRA, - CASSANDRA_KEYSPACE -> contract.database.getOrElse(""), - CASSANDRA_TABLE -> dataset.table - ) - } else if (contract.driver.map(_.toLowerCase).contains(JDBC)) { - Map( - FORMAT -> JDBC, - DRIVER -> contract.driver.getOrElse(""), - JDBC_TABLE -> dataset.table - ) - } else { - //TODO data source type will probably change in v3 - contract.`type`.map(_.toLowerCase) match { - case Some(CSV) | Some(JSON) | Some(PARQUET) | Some(ORC) | Some(KAFKA) | Some(HTTP) | Some(SOLACE) => - Map(FORMAT -> contract.`type`.get.toLowerCase) //TODO need to get PATH from somewhere - case _ => - LOGGER.warn(s"Defaulting to format CSV since contract type is not supported, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") - Map(FORMAT -> CSV) //TODO default to CSV for now - // case _ => throw new RuntimeException(s"Unable to determine data source type from ODCS file, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") - } - } - - baseMap ++ connectionConfig - } - - private def getBaseColumnMetadata(column: OpenDataContractStandardColumn, dataType: DataType): Map[String, String] = { - Map( - FIELD_DATA_TYPE -> dataType.toString(), - IS_NULLABLE -> column.isNullable.getOrElse(false).toString, - ENABLED_NULL -> column.isNullable.getOrElse(false).toString, - IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString, - PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString, - IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString, - PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString, - CLUSTERING_POSITION -> column.clusterKeyPosition.getOrElse("-1").toString, - IS_UNIQUE -> column.isUnique.getOrElse(false).toString, - ) - } - - private def getDataType(dataset: OpenDataContractStandardDataset, column: OpenDataContractStandardColumn): DataType = { - column.logicalType.toLowerCase match { - case "string" => StringType - case "integer" => IntegerType - case "double" => DoubleType - case "float" => FloatType - case "long" => LongType - case "date" => DateType - case "timestamp" => TimestampType - case "array" => new ArrayType(StringType) //TODO: wait for how inner data type of array will be defined - case "object" => new StructType(List()) //TODO: wait for how nested data structures will be made - case x => - LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, dataset=${dataset.table} column=$column, data-type=$x") - StringType - } - } } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala new file mode 100644 index 0000000..3baa4fb --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala @@ -0,0 +1,102 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.api.model.Constants._ +import io.github.datacatering.datacaterer.api.model.{ArrayType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset} +import org.apache.log4j.Logger +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} + +object OpenDataContractStandardV2Mapper { + + private val LOGGER = Logger.getLogger(getClass.getName) + + implicit val columnMetadataEncoder: Encoder[FieldMetadata] = Encoders.kryo[FieldMetadata] + + def toSubDataSourceMetadata( + value: OpenDataContractStandard, + dataset: OpenDataContractStandardDataset, + connectionConfig: Map[String, String] + )(implicit sparkSession: SparkSession): SubDataSourceMetadata = { + val readOptions = getDataSourceOptions(value, dataset, connectionConfig) + val columnMetadata = dataset.columns.map(cols => { + val mappedColumns = cols.map(column => { + val dataType = getDataType(dataset, column) + val metadata = getBaseColumnMetadata(column, dataType) + FieldMetadata(column.column, readOptions, metadata) + }).toList + sparkSession.createDataset(mappedColumns) + }) + SubDataSourceMetadata(readOptions, columnMetadata) + } + + private def getDataSourceOptions( + contract: OpenDataContractStandard, + dataset: OpenDataContractStandardDataset, + connectionConfig: Map[String, String] + ): Map[String, String] = { + //identifier should be based on schema name + val baseMap = Map(METADATA_IDENTIFIER -> contract.uuid) + //Tables, BigQuery, serverName, jdbc:driver, myDatabase, user, pw + //(contract.`type`, contract.sourceSystem, contract.server, contract.driver, contract.database, contract.username, contract.password) + val serverMap = contract.server.map(server => Map(URL -> server)).getOrElse(Map()) + val credentialsMap = Map(USERNAME -> contract.username, PASSWORD -> contract.password) + .filter(_._2.nonEmpty) + .map(kv => (kv._1, kv._2.getOrElse(""))) + val dataSourceMap = if (contract.driver.map(_.toLowerCase).contains(CASSANDRA_NAME) && contract.database.nonEmpty) { + Map( + FORMAT -> CASSANDRA, + CASSANDRA_KEYSPACE -> contract.database.getOrElse(""), + CASSANDRA_TABLE -> dataset.table + ) + } else if (contract.driver.map(_.toLowerCase).contains(JDBC)) { + Map( + FORMAT -> JDBC, + DRIVER -> contract.driver.getOrElse(""), + JDBC_TABLE -> dataset.table + ) + } else { + contract.`type`.map(_.toLowerCase) match { + case Some(CSV) | Some(JSON) | Some(PARQUET) | Some(ORC) | Some(KAFKA) | Some(HTTP) | Some(SOLACE) => + Map(FORMAT -> contract.`type`.get.toLowerCase) + case _ => + LOGGER.warn(s"Defaulting to format CSV since contract type is not supported, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") + Map(FORMAT -> CSV) + // case _ => throw new RuntimeException(s"Unable to determine data source type from ODCS file, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") + } + } + + baseMap ++ connectionConfig + } + + private def getBaseColumnMetadata(column: OpenDataContractStandardColumn, dataType: DataType): Map[String, String] = { + Map( + FIELD_DATA_TYPE -> dataType.toString(), + IS_NULLABLE -> column.isNullable.getOrElse(false).toString, + ENABLED_NULL -> column.isNullable.getOrElse(false).toString, + IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString, + PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString, + CLUSTERING_POSITION -> column.clusterKeyPosition.getOrElse("-1").toString, + IS_UNIQUE -> column.isUnique.getOrElse(false).toString, + ) + } + + private def getDataType(dataset: OpenDataContractStandardDataset, column: OpenDataContractStandardColumn): DataType = { + column.logicalType.toLowerCase match { + case "string" => StringType + case "integer" => IntegerType + case "double" => DoubleType + case "float" => FloatType + case "long" => LongType + case "date" => DateType + case "timestamp" => TimestampType + case "array" => new ArrayType(StringType) //TODO: wait for how inner data type of array will be defined + case "object" => new StructType(List()) //TODO: wait for how nested data structures will be made + case x => + LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, dataset=${dataset.table} column=$column, data-type=$x") + StringType + } + } + +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala new file mode 100644 index 0000000..f8534c0 --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala @@ -0,0 +1,82 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.api.model.Constants.{ENABLED_NULL, FIELD_DATA_TYPE, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PRIMARY_KEY_POSITION} +import io.github.datacatering.datacaterer.api.model.{ArrayType, BooleanType, DataType, DateType, DoubleType, IntegerType, StringType, StructType} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{LogicalTypeEnum, OpenDataContractStandardElementV3, OpenDataContractStandardSchemaV3, OpenDataContractStandardV3} +import org.apache.log4j.Logger +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} + +object OpenDataContractStandardV3Mapper { + + private val LOGGER = Logger.getLogger(getClass.getName) + + implicit val columnMetadataEncoder: Encoder[FieldMetadata] = Encoders.kryo[FieldMetadata] + + def toSubDataSourceMetadata( + value: OpenDataContractStandardV3, + schema: OpenDataContractStandardSchemaV3, + connectionConfig: Map[String, String] + )(implicit sparkSession: SparkSession): SubDataSourceMetadata = { + val readOptions = getDataSourceOptions(value, connectionConfig) + val propertyMetadata = schema.properties.map(props => { + val mappedProperties = props.map(property => toFieldMetadata(readOptions, property)).toList + sparkSession.createDataset(mappedProperties) + }) + SubDataSourceMetadata(readOptions, propertyMetadata) + } + + private def toFieldMetadata( + readOptions: Map[String, String], + property: OpenDataContractStandardElementV3 + ): FieldMetadata = { + val dataType = getDataType(property) + val metadata = getBasePropertyMetadata(property, dataType) + val nestedFields = if (dataType.isInstanceOf[StructType]) { + property.properties.getOrElse(Array()) + .map(prop2 => toFieldMetadata(readOptions, prop2)) + .toList + } else List() + + FieldMetadata(property.name, readOptions, metadata, nestedFields) + } + + private def getDataSourceOptions( + contract: OpenDataContractStandardV3, + connectionConfig: Map[String, String] + ): Map[String, String] = { + val baseMap = Map(METADATA_IDENTIFIER -> contract.id) + baseMap ++ connectionConfig + } + + private def getBasePropertyMetadata(property: OpenDataContractStandardElementV3, dataType: DataType): Map[String, String] = { + Map( + FIELD_DATA_TYPE -> dataType.toString(), + IS_NULLABLE -> property.required.getOrElse(false).toString, + ENABLED_NULL -> property.required.getOrElse(false).toString, + IS_PRIMARY_KEY -> property.primaryKey.getOrElse(false).toString, + PRIMARY_KEY_POSITION -> property.primaryKeyPosition.getOrElse("-1").toString, + IS_UNIQUE -> property.unique.getOrElse(false).toString, + ) + } + + private def getDataType(element: OpenDataContractStandardElementV3): DataType = { + element.logicalType match { + case LogicalTypeEnum.string => StringType + case LogicalTypeEnum.date => DateType + case LogicalTypeEnum.number => DoubleType + case LogicalTypeEnum.integer => IntegerType + case LogicalTypeEnum.array => new ArrayType(StringType) + case LogicalTypeEnum.`object` => + val innerType = element.properties.getOrElse(Array()) + .map(prop => prop.name -> getDataType(prop)) + .toList + new StructType(innerType) + case LogicalTypeEnum.boolean => BooleanType + case x => + LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, property=$element, data-type=$x") + StringType + } + } +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala new file mode 100644 index 0000000..22c33e1 --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala @@ -0,0 +1,202 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + + +object KindEnum extends Enumeration { + val DataContract = Value +} + + +object ApiVersionEnum extends Enumeration { + val `v3.0.0`, `v2.2.2`, `v2.2.1`, `v2.2.0` = Value +} + +object ServerTypeEnum extends Enumeration { + val api, athena, azure, bigquery, clickhouse, databricks, denodo, dremio, duckdb, glue, cloudsql, db2, informix, kafka, kinesis, local, mysql, oracle, postgresql, postgres, presto, pubsub, redshift, s3, sftp, snowflake, sqlserver, synapse, trino, vertica, custom = Value +} + +object LogicalTypeEnum extends Enumeration { + val string, date, number, integer, `object`, array, boolean = Value +} + +object DataQualityTypeEnum extends Enumeration { + val text, library, sql, custom = Value +} + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardV3( + apiVersion: ApiVersionEnum.Value, + id: String, + kind: KindEnum.Value, + status: String, + version: String, + contractCreatedTs: Option[String] = None, + customProperties: Option[List[OpenDataContractStandardCustomProperty]] = None, + dataProduct: Option[String] = None, + description: Option[OpenDataContractStandardDescription] = None, + domain: Option[String] = None, + name: Option[String] = None, + price: Option[OpenDataContractStandardPrice] = None, + roles: Option[Array[OpenDataContractStandardRole]] = None, + schema: Option[Array[OpenDataContractStandardSchemaV3]] = None, + server: Option[List[OpenDataContractStandardServerV3]] = None, + slaDefaultElement: Option[String] = None, + slaProperties: Option[Array[OpenDataContractStandardServiceLevelAgreementProperty]] = None, + support: Option[List[OpenDataContractStandardSupport]] = None, + tags: Option[Array[String]] = None, + team: Option[Array[OpenDataContractStandardTeam]] = None, + tenant: Option[String] = None, + `type`: Option[String] = None, + ) + +/** + * Data source details of where data is physically stored. + * + * @param server Identifier of the server. + * @param type Type of the server. + * @param description Description of the server. + * @param environment Environment of the server. + * @param roles List of roles that have access to the server. + * @param customProperties A list of key/value pairs for custom properties. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardServerV3( + server: String, + `type`: ServerTypeEnum.Value, + description: Option[String], + environment: Option[String], + roles: Option[Array[OpenDataContractStandardRole]], + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardSchemaV3( + name: String, + authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, + businessName: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, + dataGranularityDescription: Option[String] = None, + description: Option[String] = None, + logicalType: Option[String] = None, + physicalName: Option[String] = None, + physicalType: Option[String] = None, + properties: Option[Array[OpenDataContractStandardElementV3]] = None, + priorTableName: Option[String] = None, + quality: Option[Array[OpenDataContractStandardDataQualityV3]] = None, + tags: Option[Array[String]] = None, + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardElementV3( + name: String, + logicalType: LogicalTypeEnum.Value, + physicalType: String, + authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, + businessName: Option[String] = None, + criticalDataElement: Option[Boolean] = None, + classification: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, + description: Option[String] = None, + encryptedName: Option[String] = None, + examples: Option[Array[Any]] = None, + logicalTypeOptions: Option[OpenDataContractStandardLogicalTypeOptionsV3] = None, + partitioned: Option[Boolean] = None, + partitionKeyPosition: Option[Int] = None, + properties: Option[Array[OpenDataContractStandardElementV3]] = None, + primaryKey: Option[Boolean] = None, + primaryKeyPosition: Option[Int] = None, + quality: Option[Array[OpenDataContractStandardDataQualityV3]] = None, + required: Option[Boolean] = None, + tags: Option[Array[String]] = None, + transformDescription: Option[String] = None, + transformLogic: Option[String] = None, + transformSourceObjects: Option[Array[String]] = None, + unique: Option[Boolean] = None, + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardLogicalTypeOptionsV3( + exclusiveMaximum: Option[Boolean] = None, + exclusiveMinimum: Option[Boolean] = None, + format: Option[String] = None, + maximum: Option[Any] = None, + maxItems: Option[Int] = None, + maxLength: Option[Int] = None, + maxProperties: Option[Int] = None, + minimum: Option[Any] = None, + minItems: Option[Int] = None, + minLength: Option[Int] = None, + minProperties: Option[Int] = None, + multipleOf: Option[Int] = None, + pattern: Option[String] = None, + required: Option[Array[String]] = None, + uniqueItems: Option[Boolean] = None, + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardDataQualityV3( + `type`: DataQualityTypeEnum.Value, + authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, + businessImpact: Option[String] = None, + code: Option[String] = None, + column: Option[String] = None, + columns: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, + description: Option[String] = None, + dimension: Option[String] = None, + engine: Option[String] = None, + implementation: Option[String] = None, + method: Option[String] = None, + mustBe: Option[Any] = None, + mustBeBetween: Option[Array[Double]] = None, + mustBeGreaterThan: Option[Double] = None, + mustBeGreaterOrEqualTo: Option[Double] = None, + mustBeLessThan: Option[Double] = None, + mustBeLessOrEqualTo: Option[Double] = None, + mustNotBe: Option[Any] = None, + mustNotBeBetween: Option[Array[Double]] = None, + name: Option[String] = None, + query: Option[String] = None, + rule: Option[String] = None, + scheduler: Option[String] = None, + schedule: Option[String] = None, + severity: Option[String] = None, + tags: Option[Array[String]] = None, + toolRuleName: Option[String] = None, + unit: Option[String] = None, + ) + +/** + * @param channel Channel name or identifier. + * @param url Access URL using normal [URL scheme](https://en.wikipedia.org/wiki/URL#Syntax) (https, mailto, etc.). + * @param description Description of the channel, free text. + * @param tool Name of the tool, value can be `email`, `slack`, `teams`, `discord`, `ticket`, or `other`. + * @param scope Scope can be: `interactive`, `announcements`, `issues`. + * @param invitationUrl Some tools uses invitation URL for requesting or subscribing. Follows the [URL scheme](https://en.wikipedia.org/wiki/URL#Syntax). + */ +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardSupport( + channel: String, + url: String, + description: Option[String] = None, + tool: Option[String] = None, + scope: Option[String] = None, + invitationUrl: Option[String] = None + ) + +/** + * @param username The user's username or email. + * @param role The user's job role; Examples might be owner, data steward. There is no limit on the role. + * @param dateIn The date when the user joined the team. + * @param dateOut The date when the user ceased to be part of the team. + * @param replacedByUsername The username of the user who replaced the previous user. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardTeam( + dateIn: Option[String] = None, + dateOut: Option[String] = None, + username: Option[String] = None, + replacedByUsername: Option[String] = None, + role: Option[String] = None, + ) diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala new file mode 100644 index 0000000..fab81c2 --- /dev/null +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala @@ -0,0 +1,7 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.core.util.SparkSuite + +class OpenDataContractStandardV2MapperTest extends SparkSuite { + +} diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala new file mode 100644 index 0000000..6a21af7 --- /dev/null +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala @@ -0,0 +1,93 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.api.model.Constants.FIELD_DATA_TYPE +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{ApiVersionEnum, KindEnum, LogicalTypeEnum, OpenDataContractStandardDescription, OpenDataContractStandardElementV3, OpenDataContractStandardSchemaV3, OpenDataContractStandardV3} +import io.github.datacatering.datacaterer.core.util.SparkSuite + +class OpenDataContractStandardV3MapperTest extends SparkSuite { + + private val mapper = OpenDataContractStandardV3Mapper + private val baseContract = createBaseContract + + test("OpenDataContractStandardV3Mapper correctly map to SubDataSourceMetadata") { + val value = OpenDataContractStandardSchemaV3( + name = "Test Dataset", + properties = Some(Array( + OpenDataContractStandardElementV3( + name = "field1", + logicalType = LogicalTypeEnum.string, + physicalType = "string", + ) + )) + ) + val connectionConfig = Map("key" -> "value") + + val result = mapper.toSubDataSourceMetadata(baseContract, value, connectionConfig)(sparkSession) + + assert(result.optFieldMetadata.isDefined) + val fields = result.optFieldMetadata.get.collect() + assertResult(1)(fields.length) + val headField = fields.head + assertResult("field1")(headField.field) + assertResult("string")(headField.metadata(FIELD_DATA_TYPE)) + } + + test("OpenDataContractStandardV3Mapper handle dataset with multiple fields") { + val schemaFields = Array( + OpenDataContractStandardElementV3(name = "field1", logicalType = LogicalTypeEnum.string, physicalType = "string"), + OpenDataContractStandardElementV3(name = "field2", logicalType = LogicalTypeEnum.integer, physicalType = "int"), + OpenDataContractStandardElementV3(name = "field3", logicalType = LogicalTypeEnum.boolean, physicalType = "bool"), + OpenDataContractStandardElementV3(name = "field4", logicalType = LogicalTypeEnum.number, physicalType = "double"), + OpenDataContractStandardElementV3(name = "field5", logicalType = LogicalTypeEnum.date, physicalType = "timestamp"), + OpenDataContractStandardElementV3(name = "field6", logicalType = LogicalTypeEnum.array, physicalType = "array"), + OpenDataContractStandardElementV3( + name = "field7", + logicalType = LogicalTypeEnum.`object`, + physicalType = "struct", + properties = Some(Array( + OpenDataContractStandardElementV3(name = "name", logicalType = LogicalTypeEnum.string, physicalType = "string"), + OpenDataContractStandardElementV3(name = "age", logicalType = LogicalTypeEnum.integer, physicalType = "int"), + )) + ) + ) + val value = OpenDataContractStandardSchemaV3(name = "my big data", properties = Some(schemaFields)) + val connectionConfig = Map("format" -> "parquet") + + val result = mapper.toSubDataSourceMetadata(baseContract, value, connectionConfig)(sparkSession) + + assert(result.optFieldMetadata.isDefined) + val fields = result.optFieldMetadata.get.collect() + assertResult(7)(fields.length) + assertResult("field1")(fields.head.field) + assertResult("string")(fields.head.metadata(FIELD_DATA_TYPE)) + assertResult("field2")(fields(1).field) + assertResult("integer")(fields(1).metadata(FIELD_DATA_TYPE)) + assertResult("field3")(fields(2).field) + assertResult("boolean")(fields(2).metadata(FIELD_DATA_TYPE)) + assertResult("field4")(fields(3).field) + assertResult("double")(fields(3).metadata(FIELD_DATA_TYPE)) + assertResult("field5")(fields(4).field) + assertResult("date")(fields(4).metadata(FIELD_DATA_TYPE)) + assertResult("field6")(fields(5).field) + assertResult("array")(fields(5).metadata(FIELD_DATA_TYPE)) + assertResult("field7")(fields(6).field) + assertResult("struct")(fields(6).metadata(FIELD_DATA_TYPE)) + assertResult(2)(fields(6).nestedFields.size) + assertResult("name")(fields(6).nestedFields.head.field) + assertResult("string")(fields(6).nestedFields.head.metadata(FIELD_DATA_TYPE)) + assertResult("age")(fields(6).nestedFields(1).field) + assertResult("integer")(fields(6).nestedFields(1).metadata(FIELD_DATA_TYPE)) + } + + private def createBaseContract: OpenDataContractStandardV3 = { + OpenDataContractStandardV3( + apiVersion = ApiVersionEnum.`v3.0.0`, + id = "abc123", + kind = KindEnum.DataContract, + status = "current", + version = "1.0", + name = Some("Test Dataset"), + description = Some(OpenDataContractStandardDescription(purpose = Some("Test Description"))) + ) + } +} From f57a1b8ace68bba843184dc1a8414b418db061ec Mon Sep 17 00:00:00 2001 From: Flook Peter Date: Fri, 25 Oct 2024 13:45:42 +0800 Subject: [PATCH 2/2] Don't allow for additional properties for top level ODCS v2, add in end-to-end test for ODCS v3 --- .../OpenDataContractStandardModels.scala | 8 +- .../OpenDataContractStandardV3Models.scala | 33 ++- .../metadata/odcs/full-example-v3.odcs.yaml | 233 ++++++++++++++++++ ...ntractStandardDataSourceMetadataTest.scala | 38 +-- gradle.properties | 2 +- 5 files changed, 289 insertions(+), 25 deletions(-) create mode 100644 app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala index 6199501..25a96f0 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala @@ -2,7 +2,6 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.op import com.fasterxml.jackson.annotation.JsonIgnoreProperties -@JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandard( dataset: Array[OpenDataContractStandardDataset], datasetName: String, @@ -12,9 +11,12 @@ case class OpenDataContractStandard( uuid: String, version: String, apiVersion: Option[String] = None, + contractCreatedTs: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, database: Option[String] = None, datasetDomain: Option[String] = None, datasetKind: Option[String] = None, + datasetProject: Option[String] = None, description: Option[OpenDataContractStandardDescription] = None, driver: Option[String] = None, driverVersion: Option[String] = None, @@ -28,12 +30,14 @@ case class OpenDataContractStandard( tags: Option[Array[String]] = None, tenant: Option[String] = None, `type`: Option[String] = None, + schedulerAppName: Option[String] = None, server: Option[String] = None, slaDefaultColumn: Option[String] = None, slaProperties: Option[Array[OpenDataContractStandardServiceLevelAgreementProperty]] = None, sourceSystem: Option[String] = None, sourcePlatform: Option[String] = None, stakeholders: Option[Array[OpenDataContractStandardStakeholder]] = None, + systemInstance: Option[String] = None, username: Option[String] = None, userConsumptionMode: Option[String] = None, ) @@ -143,5 +147,5 @@ case class OpenDataContractStandardAuthoritativeDefinition( @JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandardCustomProperty( property: String, - value: String, + value: Any, ) \ No newline at end of file diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala index 22c33e1..5034c3b 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala @@ -1,34 +1,51 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.module.scala.JsonScalaEnumeration +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model object KindEnum extends Enumeration { + type KindEnum = Value val DataContract = Value } - +class KindEnumCls extends TypeReference[KindEnum.type] object ApiVersionEnum extends Enumeration { - val `v3.0.0`, `v2.2.2`, `v2.2.1`, `v2.2.0` = Value + type ApiVersionEnum = Value + val `v3.0.0`: model.ApiVersionEnum.Value = Value(0, "v3.0.0") + val `v2.2.2`: model.ApiVersionEnum.Value = Value(1, "v2.2.2") + val `v2.2.1`: model.ApiVersionEnum.Value = Value(2, "v2.2.1") + val `v2.2.0`: model.ApiVersionEnum.Value = Value(3, "v2.2.0") } +class ApiVersionEnumCls extends TypeReference[ApiVersionEnum.type] object ServerTypeEnum extends Enumeration { - val api, athena, azure, bigquery, clickhouse, databricks, denodo, dremio, duckdb, glue, cloudsql, db2, informix, kafka, kinesis, local, mysql, oracle, postgresql, postgres, presto, pubsub, redshift, s3, sftp, snowflake, sqlserver, synapse, trino, vertica, custom = Value + type ServerTypeEnum = Value + val api, athena, azure, bigquery, clickhouse, databricks, denodo, dremio, duckdb, glue, cloudsql, db2, informix, + kafka, kinesis, local, mysql, oracle, postgresql, postgres, presto, pubsub, redshift, s3, sftp, snowflake, + sqlserver, synapse, trino, vertica, custom = Value } +class ServerTypeEnumCls extends TypeReference[ServerTypeEnum.type] object LogicalTypeEnum extends Enumeration { + type LogicalTypeEnum = Value val string, date, number, integer, `object`, array, boolean = Value } +class LogicalTypeEnumCls extends TypeReference[LogicalTypeEnum.type] object DataQualityTypeEnum extends Enumeration { + type DataQualityTypeEnum = Value val text, library, sql, custom = Value } +class DataQualityTypeEnumCls extends TypeReference[DataQualityTypeEnum.type] @JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandardV3( - apiVersion: ApiVersionEnum.Value, + @JsonScalaEnumeration(classOf[ApiVersionEnumCls]) apiVersion: ApiVersionEnum.ApiVersionEnum, id: String, - kind: KindEnum.Value, + @JsonScalaEnumeration(classOf[KindEnumCls]) kind: KindEnum.KindEnum, status: String, version: String, contractCreatedTs: Option[String] = None, @@ -63,7 +80,7 @@ case class OpenDataContractStandardV3( @JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandardServerV3( server: String, - `type`: ServerTypeEnum.Value, + @JsonScalaEnumeration(classOf[ServerTypeEnumCls]) `type`: ServerTypeEnum.ServerTypeEnum, description: Option[String], environment: Option[String], roles: Option[Array[OpenDataContractStandardRole]], @@ -90,7 +107,7 @@ case class OpenDataContractStandardSchemaV3( @JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandardElementV3( name: String, - logicalType: LogicalTypeEnum.Value, + @JsonScalaEnumeration(classOf[LogicalTypeEnumCls]) logicalType: LogicalTypeEnum.LogicalTypeEnum, physicalType: String, authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, businessName: Option[String] = None, @@ -136,7 +153,7 @@ case class OpenDataContractStandardLogicalTypeOptionsV3( @JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandardDataQualityV3( - `type`: DataQualityTypeEnum.Value, + @JsonScalaEnumeration(classOf[DataQualityTypeEnumCls]) `type`: DataQualityTypeEnum.DataQualityTypeEnum, authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, businessImpact: Option[String] = None, code: Option[String] = None, diff --git a/app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml b/app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml new file mode 100644 index 0000000..3dfffe2 --- /dev/null +++ b/app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml @@ -0,0 +1,233 @@ +# What's this data contract about? +domain: seller # Domain +dataProduct: my quantum # Data product name +version: 1.1.0 # Version (follows semantic versioning) +status: current +id: 53581432-6c55-4ba2-a65f-72344a91553a + +# Lots of information +description: + purpose: Views built on top of the seller tables. + limitations: Data based on seller perspective, no buyer information + usage: Predict sales over time +tenant: ClimateQuantumInc + +kind: DataContract +apiVersion: v3.0.0 # Standard version (follows semantic versioning) + +# Infrastructure & servers +servers: + - server: my-postgres + type: postgres + host: localhost + port: 5432 + database: pypl-edw + schema: pp_access_views + +# Dataset, schema and quality +schema: + - name: tbl + physicalName: tbl_1 + physicalType: table + description: Provides core payment metrics + authoritativeDefinitions: + - url: https://catalog.data.gov/dataset/air-quality + type: businessDefinition + - url: https://youtu.be/jbY1BKFj9ec + type: videoTutorial + tags: [ ] + dataGranularityDescription: Aggregation on columns txn_ref_dt, pmt_txn_id + properties: + - name: txn_ref_dt + primaryKey: false + primaryKeyPosition: -1 + businessName: transaction reference date + logicalType: date + physicalType: date + required: false + description: Reference date for transaction + partitioned: true + partitionKeyPosition: 1 + criticalDataElement: false + tags: [ ] + classification: public + transformSourceObjects: + - table_name_1 + - table_name_2 + - table_name_3 + transformLogic: sel t1.txn_dt as txn_ref_dt from table_name_1 as t1, table_name_2 as t2, table_name_3 as t3 where t1.txn_dt=date-3 + transformDescription: defines the logic in business terms; logic for dummies + examples: + - "2022-10-03" + - "2020-01-28" + customProperties: + - property: anonymizationStrategy + value: none + - name: rcvr_id + primaryKey: true + primaryKeyPosition: 1 + businessName: receiver id + logicalType: string + physicalType: varchar(18) + required: false + description: A description for column rcvr_id. + partitioned: false + partitionKeyPosition: -1 + criticalDataElement: false + tags: [ ] + classification: restricted + - name: rcvr_cntry_code + primaryKey: false + primaryKeyPosition: -1 + businessName: receiver country code + logicalType: string + physicalType: varchar(2) + required: false + description: Country code + partitioned: false + partitionKeyPosition: -1 + criticalDataElement: false + tags: [ ] + classification: public + authoritativeDefinitions: + - url: https://collibra.com/asset/742b358f-71a5-4ab1-bda4-dcdba9418c25 + type: businessDefinition + - url: https://github.com/myorg/myrepo + type: transformationImplementation + - url: jdbc:postgresql://localhost:5432/adventureworks/tbl_1/rcvr_cntry_code + type: implementation + encryptedName: rcvr_cntry_code_encrypted + quality: + - rule: nullCheck + description: column should not contain null values + dimension: completeness # dropdown 7 values + type: library + severity: error + businessImpact: operational + schedule: 0 20 * * * + scheduler: cron + customProperties: + - property: FIELD_NAME + value: + - property: COMPARE_TO + value: + - property: COMPARISON_TYPE + value: Greater than + quality: + - rule: countCheck + type: library + description: Ensure row count is within expected volume range + dimension: completeness + method: reconciliation + severity: error + businessImpact: operational + schedule: 0 20 * * * + scheduler: cron + customProperties: + - property: business-key + value: + - txn_ref_dt + - rcvr_id + + +# Pricing +price: + priceAmount: 9.95 + priceCurrency: USD + priceUnit: megabyte + + +# Team +team: + - username: ceastwood + role: Data Scientist + dateIn: "2022-08-02" + dateOut: "2022-10-01" + replacedByUsername: mhopper + - username: mhopper + role: Data Scientist + dateIn: "2022-10-01" + - username: daustin + role: Owner + comment: Keeper of the grail + dateIn: "2022-10-01" + + +# Roles +roles: + - role: microstrategy_user_opr + access: read + firstLevelApprovers: Reporting Manager + secondLevelApprovers: 'mandolorian' + - role: bq_queryman_user_opr + access: read + firstLevelApprovers: Reporting Manager + secondLevelApprovers: na + - role: risk_data_access_opr + access: read + firstLevelApprovers: Reporting Manager + secondLevelApprovers: 'dathvador' + - role: bq_unica_user_opr + access: write + firstLevelApprovers: Reporting Manager + secondLevelApprovers: 'mickey' + +# SLA +slaDefaultElement: tab1.txn_ref_dt +slaProperties: + - property: latency # Property, see list of values in DP QoS + value: 4 + unit: d # d, day, days for days; y, yr, years for years + element: tab1.txn_ref_dt # This would not be needed as it is the same table.column as the default one + - property: generalAvailability + value: "2022-05-12T09:30:10-08:00" + - property: endOfSupport + value: "2032-05-12T09:30:10-08:00" + - property: endOfLife + value: "2042-05-12T09:30:10-08:00" + - property: retention + value: 3 + unit: y + element: tab1.txn_ref_dt + - property: frequency + value: 1 + valueExt: 1 + unit: d + element: tab1.txn_ref_dt + - property: timeOfAvailability + value: 09:00-08:00 + element: tab1.txn_ref_dt + driver: regulatory # Describes the importance of the SLA: [regulatory|analytics|operational|...] + - property: timeOfAvailability + value: 08:00-08:00 + element: tab1.txn_ref_dt + driver: analytics + + +# Support +support: + - channel: '#product-help' # Simple Slack communication channel + tool: slack + url: https://aidaug.slack.com/archives/C05UZRSBKLY + - channel: datacontract-ann # Simple distribution list + tool: email + url: mailto:datacontract-ann@bitol.io + - channel: Feedback # Product Feedback + description: General Product Feedback (Public) + url: https://product-feedback.com + +# Tags +tags: + - transactions + + +# Custom properties +customProperties: + - property: refRulesetName + value: gcsc.ruleset.name + - property: somePropertyName + value: property.value + - property: dataprocClusterName # Used for specific applications like Elevate + value: [ cluster name ] + +contractCreatedTs: "2022-11-15T02:59:43+00:00" diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala index d0c9f4e..404a432 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala @@ -1,6 +1,7 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard -import io.github.datacatering.datacaterer.api.model.Constants.{CLUSTERING_POSITION, DATA_CONTRACT_FILE, ENABLED_NULL, FIELD_DATA_TYPE, FORMAT, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PASSWORD, PRIMARY_KEY_POSITION, URL, USERNAME} +import io.github.datacatering.datacaterer.api.model.Constants.{CLUSTERING_POSITION, DATA_CONTRACT_FILE, ENABLED_NULL, FIELD_DATA_TYPE, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PRIMARY_KEY_POSITION} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata import io.github.datacatering.datacaterer.core.util.SparkSuite import org.junit.runner.RunWith import org.scalatestplus.junit.JUnitRunner @@ -13,13 +14,23 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite { val odcsMetadata = OpenDataContractStandardDataSourceMetadata("odcs", "parquet", connectionConfig) val result = odcsMetadata.getSubDataSourcesMetadata + validateResult(connectionConfig, result) + } + + test("Can convert ODCS v3.0.0 file to column metadata") { + val connectionConfig = Map(DATA_CONTRACT_FILE -> "src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml") + val odcsMetadata = OpenDataContractStandardDataSourceMetadata("odcs", "parquet", connectionConfig) + val result = odcsMetadata.getSubDataSourcesMetadata + + validateResult(connectionConfig, result, false) + } + + private def validateResult( + connectionConfig: Map[String, String], + result: Array[SubDataSourceMetadata], + isVersion2: Boolean = true + ) = { assertResult(1)(result.length) - val expectedReadOptions = Map( - URL -> "localhost:5432", - USERNAME -> "${env.username}", - PASSWORD -> "${env.password}", - FORMAT -> "csv", - ) connectionConfig.foreach(kv => assert(result.head.readOptions(kv._1) == kv._2)) assertResult(true)(result.head.readOptions.contains(METADATA_IDENTIFIER)) assertResult(true)(result.head.optFieldMetadata.isDefined) @@ -28,42 +39,41 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite { assertResult(true)(resultCols.exists(_.field == "txn_ref_dt")) val txnDateCol = resultCols.filter(_.field == "txn_ref_dt").head + val txnCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map() val expectedTxnDateMetadata = Map( IS_PRIMARY_KEY -> "false", IS_NULLABLE -> "false", ENABLED_NULL -> "false", IS_UNIQUE -> "false", PRIMARY_KEY_POSITION -> "-1", - CLUSTERING_POSITION -> "-1", FIELD_DATA_TYPE -> "date" - ) + ) ++ txnCluster assertResult(expectedTxnDateMetadata)(txnDateCol.metadata) assertResult(true)(resultCols.exists(_.field == "rcvr_id")) val rcvrIdCol = resultCols.filter(_.field == "rcvr_id").head + val rcvrIdCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "1") else Map() val expectedRcvrIdMetadata = Map( IS_PRIMARY_KEY -> "true", IS_NULLABLE -> "false", ENABLED_NULL -> "false", IS_UNIQUE -> "false", PRIMARY_KEY_POSITION -> "1", - CLUSTERING_POSITION -> "1", FIELD_DATA_TYPE -> "string" - ) + ) ++ rcvrIdCluster assertResult(expectedRcvrIdMetadata)(rcvrIdCol.metadata) assertResult(true)(resultCols.exists(_.field == "rcvr_cntry_code")) val countryCodeCol = resultCols.filter(_.field == "rcvr_cntry_code").head + val countryCodeCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map() val expectedCountryCodeMetadata = Map( IS_PRIMARY_KEY -> "false", IS_NULLABLE -> "false", ENABLED_NULL -> "false", IS_UNIQUE -> "false", PRIMARY_KEY_POSITION -> "-1", - CLUSTERING_POSITION -> "-1", FIELD_DATA_TYPE -> "string" - ) + ) ++ countryCodeCluster assertResult(expectedCountryCodeMetadata)(countryCodeCol.metadata) } - } diff --git a/gradle.properties b/gradle.properties index de7e51c..18550c9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ groupId=io.github.data-catering -version=0.12.1 +version=0.12.2 scalaVersion=2.12 scalaSpecificVersion=2.12.19