diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala index b73134e..be8887b 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadata.scala @@ -5,7 +5,7 @@ import io.github.datacatering.datacaterer.api.model.Constants._ import io.github.datacatering.datacaterer.api.model.{ArrayType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType} import io.github.datacatering.datacaterer.core.exception.{InvalidDataContractFileFormatException, MissingDataContractFilePathException} import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata -import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset, OpenDataContractStandardV3} import io.github.datacatering.datacaterer.core.generator.metadata.datasource.{DataSourceMetadata, SubDataSourceMetadata} import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil import org.apache.log4j.Logger @@ -20,8 +20,6 @@ case class OpenDataContractStandardDataSourceMetadata( connectionConfig: Map[String, String], ) extends DataSourceMetadata { - private val LOGGER = Logger.getLogger(getClass.getName) - override val hasSourceData: Boolean = false override def toStepName(options: Map[String, String]): String = { @@ -38,93 +36,22 @@ case class OpenDataContractStandardDataSourceMetadata( optDataContractFile match { case Some(dataContractPath) => val dataContractFile = new File(dataContractPath) - val tryParseYaml = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandard])) - tryParseYaml match { - case Failure(exception) => throw InvalidDataContractFileFormatException(dataContractPath, exception) + val tryParseYamlV2 = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandard])) + tryParseYamlV2 match { + case Failure(exception) => + //try parse as v3 model + val tryParseYamlV3 = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandardV3])) + tryParseYamlV3 match { + case Failure(exception) => throw InvalidDataContractFileFormatException(dataContractPath, exception) + case Success(value) => + value.schema.getOrElse(Array()).map(schema => OpenDataContractStandardV3Mapper.toSubDataSourceMetadata(value, schema, connectionConfig)) + } case Success(value) => // val optSchemaName = connectionConfig.get(DATA_CONTRACT_SCHEMA) //TODO filter for schema if schema name is defined, otherwise return back all schemas - value.dataset.map(dataset => { - val readOptions = getDataSourceOptions(value, dataset) - val columnMetadata = dataset.columns.map(cols => { - val mappedColumns = cols.map(column => { - val dataType = getDataType(dataset, column) - val metadata = getBaseColumnMetadata(column, dataType) - FieldMetadata(column.column, readOptions, metadata) - }).toList - sparkSession.createDataset(mappedColumns) - }) - SubDataSourceMetadata(readOptions, columnMetadata) - }) + value.dataset.map(dataset => OpenDataContractStandardV2Mapper.toSubDataSourceMetadata(value, dataset, connectionConfig)) } case None => throw MissingDataContractFilePathException(name, format) } } - - private def getDataSourceOptions(contract: OpenDataContractStandard, dataset: OpenDataContractStandardDataset): Map[String, String] = { - //identifier should be based on schema name - val baseMap = Map(METADATA_IDENTIFIER -> contract.uuid) - //Tables, BigQuery, serverName, jdbc:driver, myDatabase, user, pw - //(contract.`type`, contract.sourceSystem, contract.server, contract.driver, contract.database, contract.username, contract.password) - val serverMap = contract.server.map(server => Map(URL -> server)).getOrElse(Map()) //TODO assume database would be part of server url? Probably not - val credentialsMap = Map(USERNAME -> contract.username, PASSWORD -> contract.password) //TODO don't need to get credentials as it should be part of data source connection details - .filter(_._2.nonEmpty) - .map(kv => (kv._1, kv._2.getOrElse(""))) - val dataSourceMap = if (contract.driver.map(_.toLowerCase).contains(CASSANDRA_NAME) && contract.database.nonEmpty) { - Map( - FORMAT -> CASSANDRA, - CASSANDRA_KEYSPACE -> contract.database.getOrElse(""), - CASSANDRA_TABLE -> dataset.table - ) - } else if (contract.driver.map(_.toLowerCase).contains(JDBC)) { - Map( - FORMAT -> JDBC, - DRIVER -> contract.driver.getOrElse(""), - JDBC_TABLE -> dataset.table - ) - } else { - //TODO data source type will probably change in v3 - contract.`type`.map(_.toLowerCase) match { - case Some(CSV) | Some(JSON) | Some(PARQUET) | Some(ORC) | Some(KAFKA) | Some(HTTP) | Some(SOLACE) => - Map(FORMAT -> contract.`type`.get.toLowerCase) //TODO need to get PATH from somewhere - case _ => - LOGGER.warn(s"Defaulting to format CSV since contract type is not supported, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") - Map(FORMAT -> CSV) //TODO default to CSV for now - // case _ => throw new RuntimeException(s"Unable to determine data source type from ODCS file, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") - } - } - - baseMap ++ connectionConfig - } - - private def getBaseColumnMetadata(column: OpenDataContractStandardColumn, dataType: DataType): Map[String, String] = { - Map( - FIELD_DATA_TYPE -> dataType.toString(), - IS_NULLABLE -> column.isNullable.getOrElse(false).toString, - ENABLED_NULL -> column.isNullable.getOrElse(false).toString, - IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString, - PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString, - IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString, - PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString, - CLUSTERING_POSITION -> column.clusterKeyPosition.getOrElse("-1").toString, - IS_UNIQUE -> column.isUnique.getOrElse(false).toString, - ) - } - - private def getDataType(dataset: OpenDataContractStandardDataset, column: OpenDataContractStandardColumn): DataType = { - column.logicalType.toLowerCase match { - case "string" => StringType - case "integer" => IntegerType - case "double" => DoubleType - case "float" => FloatType - case "long" => LongType - case "date" => DateType - case "timestamp" => TimestampType - case "array" => new ArrayType(StringType) //TODO: wait for how inner data type of array will be defined - case "object" => new StructType(List()) //TODO: wait for how nested data structures will be made - case x => - LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, dataset=${dataset.table} column=$column, data-type=$x") - StringType - } - } } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala new file mode 100644 index 0000000..3baa4fb --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2Mapper.scala @@ -0,0 +1,102 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.api.model.Constants._ +import io.github.datacatering.datacaterer.api.model.{ArrayType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset} +import org.apache.log4j.Logger +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} + +object OpenDataContractStandardV2Mapper { + + private val LOGGER = Logger.getLogger(getClass.getName) + + implicit val columnMetadataEncoder: Encoder[FieldMetadata] = Encoders.kryo[FieldMetadata] + + def toSubDataSourceMetadata( + value: OpenDataContractStandard, + dataset: OpenDataContractStandardDataset, + connectionConfig: Map[String, String] + )(implicit sparkSession: SparkSession): SubDataSourceMetadata = { + val readOptions = getDataSourceOptions(value, dataset, connectionConfig) + val columnMetadata = dataset.columns.map(cols => { + val mappedColumns = cols.map(column => { + val dataType = getDataType(dataset, column) + val metadata = getBaseColumnMetadata(column, dataType) + FieldMetadata(column.column, readOptions, metadata) + }).toList + sparkSession.createDataset(mappedColumns) + }) + SubDataSourceMetadata(readOptions, columnMetadata) + } + + private def getDataSourceOptions( + contract: OpenDataContractStandard, + dataset: OpenDataContractStandardDataset, + connectionConfig: Map[String, String] + ): Map[String, String] = { + //identifier should be based on schema name + val baseMap = Map(METADATA_IDENTIFIER -> contract.uuid) + //Tables, BigQuery, serverName, jdbc:driver, myDatabase, user, pw + //(contract.`type`, contract.sourceSystem, contract.server, contract.driver, contract.database, contract.username, contract.password) + val serverMap = contract.server.map(server => Map(URL -> server)).getOrElse(Map()) + val credentialsMap = Map(USERNAME -> contract.username, PASSWORD -> contract.password) + .filter(_._2.nonEmpty) + .map(kv => (kv._1, kv._2.getOrElse(""))) + val dataSourceMap = if (contract.driver.map(_.toLowerCase).contains(CASSANDRA_NAME) && contract.database.nonEmpty) { + Map( + FORMAT -> CASSANDRA, + CASSANDRA_KEYSPACE -> contract.database.getOrElse(""), + CASSANDRA_TABLE -> dataset.table + ) + } else if (contract.driver.map(_.toLowerCase).contains(JDBC)) { + Map( + FORMAT -> JDBC, + DRIVER -> contract.driver.getOrElse(""), + JDBC_TABLE -> dataset.table + ) + } else { + contract.`type`.map(_.toLowerCase) match { + case Some(CSV) | Some(JSON) | Some(PARQUET) | Some(ORC) | Some(KAFKA) | Some(HTTP) | Some(SOLACE) => + Map(FORMAT -> contract.`type`.get.toLowerCase) + case _ => + LOGGER.warn(s"Defaulting to format CSV since contract type is not supported, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") + Map(FORMAT -> CSV) + // case _ => throw new RuntimeException(s"Unable to determine data source type from ODCS file, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}") + } + } + + baseMap ++ connectionConfig + } + + private def getBaseColumnMetadata(column: OpenDataContractStandardColumn, dataType: DataType): Map[String, String] = { + Map( + FIELD_DATA_TYPE -> dataType.toString(), + IS_NULLABLE -> column.isNullable.getOrElse(false).toString, + ENABLED_NULL -> column.isNullable.getOrElse(false).toString, + IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString, + PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString, + CLUSTERING_POSITION -> column.clusterKeyPosition.getOrElse("-1").toString, + IS_UNIQUE -> column.isUnique.getOrElse(false).toString, + ) + } + + private def getDataType(dataset: OpenDataContractStandardDataset, column: OpenDataContractStandardColumn): DataType = { + column.logicalType.toLowerCase match { + case "string" => StringType + case "integer" => IntegerType + case "double" => DoubleType + case "float" => FloatType + case "long" => LongType + case "date" => DateType + case "timestamp" => TimestampType + case "array" => new ArrayType(StringType) //TODO: wait for how inner data type of array will be defined + case "object" => new StructType(List()) //TODO: wait for how nested data structures will be made + case x => + LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, dataset=${dataset.table} column=$column, data-type=$x") + StringType + } + } + +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala new file mode 100644 index 0000000..f8534c0 --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala @@ -0,0 +1,82 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.api.model.Constants.{ENABLED_NULL, FIELD_DATA_TYPE, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PRIMARY_KEY_POSITION} +import io.github.datacatering.datacaterer.api.model.{ArrayType, BooleanType, DataType, DateType, DoubleType, IntegerType, StringType, StructType} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{LogicalTypeEnum, OpenDataContractStandardElementV3, OpenDataContractStandardSchemaV3, OpenDataContractStandardV3} +import org.apache.log4j.Logger +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} + +object OpenDataContractStandardV3Mapper { + + private val LOGGER = Logger.getLogger(getClass.getName) + + implicit val columnMetadataEncoder: Encoder[FieldMetadata] = Encoders.kryo[FieldMetadata] + + def toSubDataSourceMetadata( + value: OpenDataContractStandardV3, + schema: OpenDataContractStandardSchemaV3, + connectionConfig: Map[String, String] + )(implicit sparkSession: SparkSession): SubDataSourceMetadata = { + val readOptions = getDataSourceOptions(value, connectionConfig) + val propertyMetadata = schema.properties.map(props => { + val mappedProperties = props.map(property => toFieldMetadata(readOptions, property)).toList + sparkSession.createDataset(mappedProperties) + }) + SubDataSourceMetadata(readOptions, propertyMetadata) + } + + private def toFieldMetadata( + readOptions: Map[String, String], + property: OpenDataContractStandardElementV3 + ): FieldMetadata = { + val dataType = getDataType(property) + val metadata = getBasePropertyMetadata(property, dataType) + val nestedFields = if (dataType.isInstanceOf[StructType]) { + property.properties.getOrElse(Array()) + .map(prop2 => toFieldMetadata(readOptions, prop2)) + .toList + } else List() + + FieldMetadata(property.name, readOptions, metadata, nestedFields) + } + + private def getDataSourceOptions( + contract: OpenDataContractStandardV3, + connectionConfig: Map[String, String] + ): Map[String, String] = { + val baseMap = Map(METADATA_IDENTIFIER -> contract.id) + baseMap ++ connectionConfig + } + + private def getBasePropertyMetadata(property: OpenDataContractStandardElementV3, dataType: DataType): Map[String, String] = { + Map( + FIELD_DATA_TYPE -> dataType.toString(), + IS_NULLABLE -> property.required.getOrElse(false).toString, + ENABLED_NULL -> property.required.getOrElse(false).toString, + IS_PRIMARY_KEY -> property.primaryKey.getOrElse(false).toString, + PRIMARY_KEY_POSITION -> property.primaryKeyPosition.getOrElse("-1").toString, + IS_UNIQUE -> property.unique.getOrElse(false).toString, + ) + } + + private def getDataType(element: OpenDataContractStandardElementV3): DataType = { + element.logicalType match { + case LogicalTypeEnum.string => StringType + case LogicalTypeEnum.date => DateType + case LogicalTypeEnum.number => DoubleType + case LogicalTypeEnum.integer => IntegerType + case LogicalTypeEnum.array => new ArrayType(StringType) + case LogicalTypeEnum.`object` => + val innerType = element.properties.getOrElse(Array()) + .map(prop => prop.name -> getDataType(prop)) + .toList + new StructType(innerType) + case LogicalTypeEnum.boolean => BooleanType + case x => + LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, property=$element, data-type=$x") + StringType + } + } +} diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala index 6199501..25a96f0 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardModels.scala @@ -2,7 +2,6 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.op import com.fasterxml.jackson.annotation.JsonIgnoreProperties -@JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandard( dataset: Array[OpenDataContractStandardDataset], datasetName: String, @@ -12,9 +11,12 @@ case class OpenDataContractStandard( uuid: String, version: String, apiVersion: Option[String] = None, + contractCreatedTs: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, database: Option[String] = None, datasetDomain: Option[String] = None, datasetKind: Option[String] = None, + datasetProject: Option[String] = None, description: Option[OpenDataContractStandardDescription] = None, driver: Option[String] = None, driverVersion: Option[String] = None, @@ -28,12 +30,14 @@ case class OpenDataContractStandard( tags: Option[Array[String]] = None, tenant: Option[String] = None, `type`: Option[String] = None, + schedulerAppName: Option[String] = None, server: Option[String] = None, slaDefaultColumn: Option[String] = None, slaProperties: Option[Array[OpenDataContractStandardServiceLevelAgreementProperty]] = None, sourceSystem: Option[String] = None, sourcePlatform: Option[String] = None, stakeholders: Option[Array[OpenDataContractStandardStakeholder]] = None, + systemInstance: Option[String] = None, username: Option[String] = None, userConsumptionMode: Option[String] = None, ) @@ -143,5 +147,5 @@ case class OpenDataContractStandardAuthoritativeDefinition( @JsonIgnoreProperties(ignoreUnknown = true) case class OpenDataContractStandardCustomProperty( property: String, - value: String, + value: Any, ) \ No newline at end of file diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala new file mode 100644 index 0000000..5034c3b --- /dev/null +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/model/OpenDataContractStandardV3Models.scala @@ -0,0 +1,219 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.module.scala.JsonScalaEnumeration +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model + + +object KindEnum extends Enumeration { + type KindEnum = Value + val DataContract = Value +} +class KindEnumCls extends TypeReference[KindEnum.type] + +object ApiVersionEnum extends Enumeration { + type ApiVersionEnum = Value + val `v3.0.0`: model.ApiVersionEnum.Value = Value(0, "v3.0.0") + val `v2.2.2`: model.ApiVersionEnum.Value = Value(1, "v2.2.2") + val `v2.2.1`: model.ApiVersionEnum.Value = Value(2, "v2.2.1") + val `v2.2.0`: model.ApiVersionEnum.Value = Value(3, "v2.2.0") +} +class ApiVersionEnumCls extends TypeReference[ApiVersionEnum.type] + +object ServerTypeEnum extends Enumeration { + type ServerTypeEnum = Value + val api, athena, azure, bigquery, clickhouse, databricks, denodo, dremio, duckdb, glue, cloudsql, db2, informix, + kafka, kinesis, local, mysql, oracle, postgresql, postgres, presto, pubsub, redshift, s3, sftp, snowflake, + sqlserver, synapse, trino, vertica, custom = Value +} +class ServerTypeEnumCls extends TypeReference[ServerTypeEnum.type] + +object LogicalTypeEnum extends Enumeration { + type LogicalTypeEnum = Value + val string, date, number, integer, `object`, array, boolean = Value +} +class LogicalTypeEnumCls extends TypeReference[LogicalTypeEnum.type] + +object DataQualityTypeEnum extends Enumeration { + type DataQualityTypeEnum = Value + val text, library, sql, custom = Value +} +class DataQualityTypeEnumCls extends TypeReference[DataQualityTypeEnum.type] + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardV3( + @JsonScalaEnumeration(classOf[ApiVersionEnumCls]) apiVersion: ApiVersionEnum.ApiVersionEnum, + id: String, + @JsonScalaEnumeration(classOf[KindEnumCls]) kind: KindEnum.KindEnum, + status: String, + version: String, + contractCreatedTs: Option[String] = None, + customProperties: Option[List[OpenDataContractStandardCustomProperty]] = None, + dataProduct: Option[String] = None, + description: Option[OpenDataContractStandardDescription] = None, + domain: Option[String] = None, + name: Option[String] = None, + price: Option[OpenDataContractStandardPrice] = None, + roles: Option[Array[OpenDataContractStandardRole]] = None, + schema: Option[Array[OpenDataContractStandardSchemaV3]] = None, + server: Option[List[OpenDataContractStandardServerV3]] = None, + slaDefaultElement: Option[String] = None, + slaProperties: Option[Array[OpenDataContractStandardServiceLevelAgreementProperty]] = None, + support: Option[List[OpenDataContractStandardSupport]] = None, + tags: Option[Array[String]] = None, + team: Option[Array[OpenDataContractStandardTeam]] = None, + tenant: Option[String] = None, + `type`: Option[String] = None, + ) + +/** + * Data source details of where data is physically stored. + * + * @param server Identifier of the server. + * @param type Type of the server. + * @param description Description of the server. + * @param environment Environment of the server. + * @param roles List of roles that have access to the server. + * @param customProperties A list of key/value pairs for custom properties. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardServerV3( + server: String, + @JsonScalaEnumeration(classOf[ServerTypeEnumCls]) `type`: ServerTypeEnum.ServerTypeEnum, + description: Option[String], + environment: Option[String], + roles: Option[Array[OpenDataContractStandardRole]], + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardSchemaV3( + name: String, + authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, + businessName: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, + dataGranularityDescription: Option[String] = None, + description: Option[String] = None, + logicalType: Option[String] = None, + physicalName: Option[String] = None, + physicalType: Option[String] = None, + properties: Option[Array[OpenDataContractStandardElementV3]] = None, + priorTableName: Option[String] = None, + quality: Option[Array[OpenDataContractStandardDataQualityV3]] = None, + tags: Option[Array[String]] = None, + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardElementV3( + name: String, + @JsonScalaEnumeration(classOf[LogicalTypeEnumCls]) logicalType: LogicalTypeEnum.LogicalTypeEnum, + physicalType: String, + authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, + businessName: Option[String] = None, + criticalDataElement: Option[Boolean] = None, + classification: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, + description: Option[String] = None, + encryptedName: Option[String] = None, + examples: Option[Array[Any]] = None, + logicalTypeOptions: Option[OpenDataContractStandardLogicalTypeOptionsV3] = None, + partitioned: Option[Boolean] = None, + partitionKeyPosition: Option[Int] = None, + properties: Option[Array[OpenDataContractStandardElementV3]] = None, + primaryKey: Option[Boolean] = None, + primaryKeyPosition: Option[Int] = None, + quality: Option[Array[OpenDataContractStandardDataQualityV3]] = None, + required: Option[Boolean] = None, + tags: Option[Array[String]] = None, + transformDescription: Option[String] = None, + transformLogic: Option[String] = None, + transformSourceObjects: Option[Array[String]] = None, + unique: Option[Boolean] = None, + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardLogicalTypeOptionsV3( + exclusiveMaximum: Option[Boolean] = None, + exclusiveMinimum: Option[Boolean] = None, + format: Option[String] = None, + maximum: Option[Any] = None, + maxItems: Option[Int] = None, + maxLength: Option[Int] = None, + maxProperties: Option[Int] = None, + minimum: Option[Any] = None, + minItems: Option[Int] = None, + minLength: Option[Int] = None, + minProperties: Option[Int] = None, + multipleOf: Option[Int] = None, + pattern: Option[String] = None, + required: Option[Array[String]] = None, + uniqueItems: Option[Boolean] = None, + ) + +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardDataQualityV3( + @JsonScalaEnumeration(classOf[DataQualityTypeEnumCls]) `type`: DataQualityTypeEnum.DataQualityTypeEnum, + authoritativeDefinitions: Option[Array[OpenDataContractStandardAuthoritativeDefinition]] = None, + businessImpact: Option[String] = None, + code: Option[String] = None, + column: Option[String] = None, + columns: Option[String] = None, + customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None, + description: Option[String] = None, + dimension: Option[String] = None, + engine: Option[String] = None, + implementation: Option[String] = None, + method: Option[String] = None, + mustBe: Option[Any] = None, + mustBeBetween: Option[Array[Double]] = None, + mustBeGreaterThan: Option[Double] = None, + mustBeGreaterOrEqualTo: Option[Double] = None, + mustBeLessThan: Option[Double] = None, + mustBeLessOrEqualTo: Option[Double] = None, + mustNotBe: Option[Any] = None, + mustNotBeBetween: Option[Array[Double]] = None, + name: Option[String] = None, + query: Option[String] = None, + rule: Option[String] = None, + scheduler: Option[String] = None, + schedule: Option[String] = None, + severity: Option[String] = None, + tags: Option[Array[String]] = None, + toolRuleName: Option[String] = None, + unit: Option[String] = None, + ) + +/** + * @param channel Channel name or identifier. + * @param url Access URL using normal [URL scheme](https://en.wikipedia.org/wiki/URL#Syntax) (https, mailto, etc.). + * @param description Description of the channel, free text. + * @param tool Name of the tool, value can be `email`, `slack`, `teams`, `discord`, `ticket`, or `other`. + * @param scope Scope can be: `interactive`, `announcements`, `issues`. + * @param invitationUrl Some tools uses invitation URL for requesting or subscribing. Follows the [URL scheme](https://en.wikipedia.org/wiki/URL#Syntax). + */ +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardSupport( + channel: String, + url: String, + description: Option[String] = None, + tool: Option[String] = None, + scope: Option[String] = None, + invitationUrl: Option[String] = None + ) + +/** + * @param username The user's username or email. + * @param role The user's job role; Examples might be owner, data steward. There is no limit on the role. + * @param dateIn The date when the user joined the team. + * @param dateOut The date when the user ceased to be part of the team. + * @param replacedByUsername The username of the user who replaced the previous user. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +case class OpenDataContractStandardTeam( + dateIn: Option[String] = None, + dateOut: Option[String] = None, + username: Option[String] = None, + replacedByUsername: Option[String] = None, + role: Option[String] = None, + ) diff --git a/app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml b/app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml new file mode 100644 index 0000000..3dfffe2 --- /dev/null +++ b/app/src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml @@ -0,0 +1,233 @@ +# What's this data contract about? +domain: seller # Domain +dataProduct: my quantum # Data product name +version: 1.1.0 # Version (follows semantic versioning) +status: current +id: 53581432-6c55-4ba2-a65f-72344a91553a + +# Lots of information +description: + purpose: Views built on top of the seller tables. + limitations: Data based on seller perspective, no buyer information + usage: Predict sales over time +tenant: ClimateQuantumInc + +kind: DataContract +apiVersion: v3.0.0 # Standard version (follows semantic versioning) + +# Infrastructure & servers +servers: + - server: my-postgres + type: postgres + host: localhost + port: 5432 + database: pypl-edw + schema: pp_access_views + +# Dataset, schema and quality +schema: + - name: tbl + physicalName: tbl_1 + physicalType: table + description: Provides core payment metrics + authoritativeDefinitions: + - url: https://catalog.data.gov/dataset/air-quality + type: businessDefinition + - url: https://youtu.be/jbY1BKFj9ec + type: videoTutorial + tags: [ ] + dataGranularityDescription: Aggregation on columns txn_ref_dt, pmt_txn_id + properties: + - name: txn_ref_dt + primaryKey: false + primaryKeyPosition: -1 + businessName: transaction reference date + logicalType: date + physicalType: date + required: false + description: Reference date for transaction + partitioned: true + partitionKeyPosition: 1 + criticalDataElement: false + tags: [ ] + classification: public + transformSourceObjects: + - table_name_1 + - table_name_2 + - table_name_3 + transformLogic: sel t1.txn_dt as txn_ref_dt from table_name_1 as t1, table_name_2 as t2, table_name_3 as t3 where t1.txn_dt=date-3 + transformDescription: defines the logic in business terms; logic for dummies + examples: + - "2022-10-03" + - "2020-01-28" + customProperties: + - property: anonymizationStrategy + value: none + - name: rcvr_id + primaryKey: true + primaryKeyPosition: 1 + businessName: receiver id + logicalType: string + physicalType: varchar(18) + required: false + description: A description for column rcvr_id. + partitioned: false + partitionKeyPosition: -1 + criticalDataElement: false + tags: [ ] + classification: restricted + - name: rcvr_cntry_code + primaryKey: false + primaryKeyPosition: -1 + businessName: receiver country code + logicalType: string + physicalType: varchar(2) + required: false + description: Country code + partitioned: false + partitionKeyPosition: -1 + criticalDataElement: false + tags: [ ] + classification: public + authoritativeDefinitions: + - url: https://collibra.com/asset/742b358f-71a5-4ab1-bda4-dcdba9418c25 + type: businessDefinition + - url: https://github.com/myorg/myrepo + type: transformationImplementation + - url: jdbc:postgresql://localhost:5432/adventureworks/tbl_1/rcvr_cntry_code + type: implementation + encryptedName: rcvr_cntry_code_encrypted + quality: + - rule: nullCheck + description: column should not contain null values + dimension: completeness # dropdown 7 values + type: library + severity: error + businessImpact: operational + schedule: 0 20 * * * + scheduler: cron + customProperties: + - property: FIELD_NAME + value: + - property: COMPARE_TO + value: + - property: COMPARISON_TYPE + value: Greater than + quality: + - rule: countCheck + type: library + description: Ensure row count is within expected volume range + dimension: completeness + method: reconciliation + severity: error + businessImpact: operational + schedule: 0 20 * * * + scheduler: cron + customProperties: + - property: business-key + value: + - txn_ref_dt + - rcvr_id + + +# Pricing +price: + priceAmount: 9.95 + priceCurrency: USD + priceUnit: megabyte + + +# Team +team: + - username: ceastwood + role: Data Scientist + dateIn: "2022-08-02" + dateOut: "2022-10-01" + replacedByUsername: mhopper + - username: mhopper + role: Data Scientist + dateIn: "2022-10-01" + - username: daustin + role: Owner + comment: Keeper of the grail + dateIn: "2022-10-01" + + +# Roles +roles: + - role: microstrategy_user_opr + access: read + firstLevelApprovers: Reporting Manager + secondLevelApprovers: 'mandolorian' + - role: bq_queryman_user_opr + access: read + firstLevelApprovers: Reporting Manager + secondLevelApprovers: na + - role: risk_data_access_opr + access: read + firstLevelApprovers: Reporting Manager + secondLevelApprovers: 'dathvador' + - role: bq_unica_user_opr + access: write + firstLevelApprovers: Reporting Manager + secondLevelApprovers: 'mickey' + +# SLA +slaDefaultElement: tab1.txn_ref_dt +slaProperties: + - property: latency # Property, see list of values in DP QoS + value: 4 + unit: d # d, day, days for days; y, yr, years for years + element: tab1.txn_ref_dt # This would not be needed as it is the same table.column as the default one + - property: generalAvailability + value: "2022-05-12T09:30:10-08:00" + - property: endOfSupport + value: "2032-05-12T09:30:10-08:00" + - property: endOfLife + value: "2042-05-12T09:30:10-08:00" + - property: retention + value: 3 + unit: y + element: tab1.txn_ref_dt + - property: frequency + value: 1 + valueExt: 1 + unit: d + element: tab1.txn_ref_dt + - property: timeOfAvailability + value: 09:00-08:00 + element: tab1.txn_ref_dt + driver: regulatory # Describes the importance of the SLA: [regulatory|analytics|operational|...] + - property: timeOfAvailability + value: 08:00-08:00 + element: tab1.txn_ref_dt + driver: analytics + + +# Support +support: + - channel: '#product-help' # Simple Slack communication channel + tool: slack + url: https://aidaug.slack.com/archives/C05UZRSBKLY + - channel: datacontract-ann # Simple distribution list + tool: email + url: mailto:datacontract-ann@bitol.io + - channel: Feedback # Product Feedback + description: General Product Feedback (Public) + url: https://product-feedback.com + +# Tags +tags: + - transactions + + +# Custom properties +customProperties: + - property: refRulesetName + value: gcsc.ruleset.name + - property: somePropertyName + value: property.value + - property: dataprocClusterName # Used for specific applications like Elevate + value: [ cluster name ] + +contractCreatedTs: "2022-11-15T02:59:43+00:00" diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala index d0c9f4e..404a432 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala @@ -1,6 +1,7 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard -import io.github.datacatering.datacaterer.api.model.Constants.{CLUSTERING_POSITION, DATA_CONTRACT_FILE, ENABLED_NULL, FIELD_DATA_TYPE, FORMAT, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PASSWORD, PRIMARY_KEY_POSITION, URL, USERNAME} +import io.github.datacatering.datacaterer.api.model.Constants.{CLUSTERING_POSITION, DATA_CONTRACT_FILE, ENABLED_NULL, FIELD_DATA_TYPE, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PRIMARY_KEY_POSITION} +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata import io.github.datacatering.datacaterer.core.util.SparkSuite import org.junit.runner.RunWith import org.scalatestplus.junit.JUnitRunner @@ -13,13 +14,23 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite { val odcsMetadata = OpenDataContractStandardDataSourceMetadata("odcs", "parquet", connectionConfig) val result = odcsMetadata.getSubDataSourcesMetadata + validateResult(connectionConfig, result) + } + + test("Can convert ODCS v3.0.0 file to column metadata") { + val connectionConfig = Map(DATA_CONTRACT_FILE -> "src/test/resources/sample/metadata/odcs/full-example-v3.odcs.yaml") + val odcsMetadata = OpenDataContractStandardDataSourceMetadata("odcs", "parquet", connectionConfig) + val result = odcsMetadata.getSubDataSourcesMetadata + + validateResult(connectionConfig, result, false) + } + + private def validateResult( + connectionConfig: Map[String, String], + result: Array[SubDataSourceMetadata], + isVersion2: Boolean = true + ) = { assertResult(1)(result.length) - val expectedReadOptions = Map( - URL -> "localhost:5432", - USERNAME -> "${env.username}", - PASSWORD -> "${env.password}", - FORMAT -> "csv", - ) connectionConfig.foreach(kv => assert(result.head.readOptions(kv._1) == kv._2)) assertResult(true)(result.head.readOptions.contains(METADATA_IDENTIFIER)) assertResult(true)(result.head.optFieldMetadata.isDefined) @@ -28,42 +39,41 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite { assertResult(true)(resultCols.exists(_.field == "txn_ref_dt")) val txnDateCol = resultCols.filter(_.field == "txn_ref_dt").head + val txnCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map() val expectedTxnDateMetadata = Map( IS_PRIMARY_KEY -> "false", IS_NULLABLE -> "false", ENABLED_NULL -> "false", IS_UNIQUE -> "false", PRIMARY_KEY_POSITION -> "-1", - CLUSTERING_POSITION -> "-1", FIELD_DATA_TYPE -> "date" - ) + ) ++ txnCluster assertResult(expectedTxnDateMetadata)(txnDateCol.metadata) assertResult(true)(resultCols.exists(_.field == "rcvr_id")) val rcvrIdCol = resultCols.filter(_.field == "rcvr_id").head + val rcvrIdCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "1") else Map() val expectedRcvrIdMetadata = Map( IS_PRIMARY_KEY -> "true", IS_NULLABLE -> "false", ENABLED_NULL -> "false", IS_UNIQUE -> "false", PRIMARY_KEY_POSITION -> "1", - CLUSTERING_POSITION -> "1", FIELD_DATA_TYPE -> "string" - ) + ) ++ rcvrIdCluster assertResult(expectedRcvrIdMetadata)(rcvrIdCol.metadata) assertResult(true)(resultCols.exists(_.field == "rcvr_cntry_code")) val countryCodeCol = resultCols.filter(_.field == "rcvr_cntry_code").head + val countryCodeCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map() val expectedCountryCodeMetadata = Map( IS_PRIMARY_KEY -> "false", IS_NULLABLE -> "false", ENABLED_NULL -> "false", IS_UNIQUE -> "false", PRIMARY_KEY_POSITION -> "-1", - CLUSTERING_POSITION -> "-1", FIELD_DATA_TYPE -> "string" - ) + ) ++ countryCodeCluster assertResult(expectedCountryCodeMetadata)(countryCodeCol.metadata) } - } diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala new file mode 100644 index 0000000..fab81c2 --- /dev/null +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV2MapperTest.scala @@ -0,0 +1,7 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.core.util.SparkSuite + +class OpenDataContractStandardV2MapperTest extends SparkSuite { + +} diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala new file mode 100644 index 0000000..6a21af7 --- /dev/null +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3MapperTest.scala @@ -0,0 +1,93 @@ +package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard + +import io.github.datacatering.datacaterer.api.model.Constants.FIELD_DATA_TYPE +import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{ApiVersionEnum, KindEnum, LogicalTypeEnum, OpenDataContractStandardDescription, OpenDataContractStandardElementV3, OpenDataContractStandardSchemaV3, OpenDataContractStandardV3} +import io.github.datacatering.datacaterer.core.util.SparkSuite + +class OpenDataContractStandardV3MapperTest extends SparkSuite { + + private val mapper = OpenDataContractStandardV3Mapper + private val baseContract = createBaseContract + + test("OpenDataContractStandardV3Mapper correctly map to SubDataSourceMetadata") { + val value = OpenDataContractStandardSchemaV3( + name = "Test Dataset", + properties = Some(Array( + OpenDataContractStandardElementV3( + name = "field1", + logicalType = LogicalTypeEnum.string, + physicalType = "string", + ) + )) + ) + val connectionConfig = Map("key" -> "value") + + val result = mapper.toSubDataSourceMetadata(baseContract, value, connectionConfig)(sparkSession) + + assert(result.optFieldMetadata.isDefined) + val fields = result.optFieldMetadata.get.collect() + assertResult(1)(fields.length) + val headField = fields.head + assertResult("field1")(headField.field) + assertResult("string")(headField.metadata(FIELD_DATA_TYPE)) + } + + test("OpenDataContractStandardV3Mapper handle dataset with multiple fields") { + val schemaFields = Array( + OpenDataContractStandardElementV3(name = "field1", logicalType = LogicalTypeEnum.string, physicalType = "string"), + OpenDataContractStandardElementV3(name = "field2", logicalType = LogicalTypeEnum.integer, physicalType = "int"), + OpenDataContractStandardElementV3(name = "field3", logicalType = LogicalTypeEnum.boolean, physicalType = "bool"), + OpenDataContractStandardElementV3(name = "field4", logicalType = LogicalTypeEnum.number, physicalType = "double"), + OpenDataContractStandardElementV3(name = "field5", logicalType = LogicalTypeEnum.date, physicalType = "timestamp"), + OpenDataContractStandardElementV3(name = "field6", logicalType = LogicalTypeEnum.array, physicalType = "array"), + OpenDataContractStandardElementV3( + name = "field7", + logicalType = LogicalTypeEnum.`object`, + physicalType = "struct", + properties = Some(Array( + OpenDataContractStandardElementV3(name = "name", logicalType = LogicalTypeEnum.string, physicalType = "string"), + OpenDataContractStandardElementV3(name = "age", logicalType = LogicalTypeEnum.integer, physicalType = "int"), + )) + ) + ) + val value = OpenDataContractStandardSchemaV3(name = "my big data", properties = Some(schemaFields)) + val connectionConfig = Map("format" -> "parquet") + + val result = mapper.toSubDataSourceMetadata(baseContract, value, connectionConfig)(sparkSession) + + assert(result.optFieldMetadata.isDefined) + val fields = result.optFieldMetadata.get.collect() + assertResult(7)(fields.length) + assertResult("field1")(fields.head.field) + assertResult("string")(fields.head.metadata(FIELD_DATA_TYPE)) + assertResult("field2")(fields(1).field) + assertResult("integer")(fields(1).metadata(FIELD_DATA_TYPE)) + assertResult("field3")(fields(2).field) + assertResult("boolean")(fields(2).metadata(FIELD_DATA_TYPE)) + assertResult("field4")(fields(3).field) + assertResult("double")(fields(3).metadata(FIELD_DATA_TYPE)) + assertResult("field5")(fields(4).field) + assertResult("date")(fields(4).metadata(FIELD_DATA_TYPE)) + assertResult("field6")(fields(5).field) + assertResult("array")(fields(5).metadata(FIELD_DATA_TYPE)) + assertResult("field7")(fields(6).field) + assertResult("struct")(fields(6).metadata(FIELD_DATA_TYPE)) + assertResult(2)(fields(6).nestedFields.size) + assertResult("name")(fields(6).nestedFields.head.field) + assertResult("string")(fields(6).nestedFields.head.metadata(FIELD_DATA_TYPE)) + assertResult("age")(fields(6).nestedFields(1).field) + assertResult("integer")(fields(6).nestedFields(1).metadata(FIELD_DATA_TYPE)) + } + + private def createBaseContract: OpenDataContractStandardV3 = { + OpenDataContractStandardV3( + apiVersion = ApiVersionEnum.`v3.0.0`, + id = "abc123", + kind = KindEnum.DataContract, + status = "current", + version = "1.0", + name = Some("Test Dataset"), + description = Some(OpenDataContractStandardDescription(purpose = Some("Test Description"))) + ) + } +} diff --git a/gradle.properties b/gradle.properties index de7e51c..18550c9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ groupId=io.github.data-catering -version=0.12.1 +version=0.12.2 scalaVersion=2.12 scalaSpecificVersion=2.12.19