Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ODCS v3 #81

Merged
merged 3 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.api.model.{ArrayType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType}
import io.github.datacatering.datacaterer.core.exception.{InvalidDataContractFileFormatException, MissingDataContractFilePathException}
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset}
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset, OpenDataContractStandardV3}
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.{DataSourceMetadata, SubDataSourceMetadata}
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
import org.apache.log4j.Logger
Expand All @@ -20,8 +20,6 @@ case class OpenDataContractStandardDataSourceMetadata(
connectionConfig: Map[String, String],
) extends DataSourceMetadata {

private val LOGGER = Logger.getLogger(getClass.getName)

override val hasSourceData: Boolean = false

override def toStepName(options: Map[String, String]): String = {
Expand All @@ -38,93 +36,22 @@ case class OpenDataContractStandardDataSourceMetadata(
optDataContractFile match {
case Some(dataContractPath) =>
val dataContractFile = new File(dataContractPath)
val tryParseYaml = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandard]))
tryParseYaml match {
case Failure(exception) => throw InvalidDataContractFileFormatException(dataContractPath, exception)
val tryParseYamlV2 = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandard]))
tryParseYamlV2 match {
case Failure(exception) =>
//try parse as v3 model
val tryParseYamlV3 = Try(ObjectMapperUtil.yamlObjectMapper.readValue(dataContractFile, classOf[OpenDataContractStandardV3]))
tryParseYamlV3 match {
case Failure(exception) => throw InvalidDataContractFileFormatException(dataContractPath, exception)
case Success(value) =>
value.schema.getOrElse(Array()).map(schema => OpenDataContractStandardV3Mapper.toSubDataSourceMetadata(value, schema, connectionConfig))
}
case Success(value) =>
// val optSchemaName = connectionConfig.get(DATA_CONTRACT_SCHEMA)
//TODO filter for schema if schema name is defined, otherwise return back all schemas
value.dataset.map(dataset => {
val readOptions = getDataSourceOptions(value, dataset)
val columnMetadata = dataset.columns.map(cols => {
val mappedColumns = cols.map(column => {
val dataType = getDataType(dataset, column)
val metadata = getBaseColumnMetadata(column, dataType)
FieldMetadata(column.column, readOptions, metadata)
}).toList
sparkSession.createDataset(mappedColumns)
})
SubDataSourceMetadata(readOptions, columnMetadata)
})
value.dataset.map(dataset => OpenDataContractStandardV2Mapper.toSubDataSourceMetadata(value, dataset, connectionConfig))
}
case None => throw MissingDataContractFilePathException(name, format)
}
}

private def getDataSourceOptions(contract: OpenDataContractStandard, dataset: OpenDataContractStandardDataset): Map[String, String] = {
//identifier should be based on schema name
val baseMap = Map(METADATA_IDENTIFIER -> contract.uuid)
//Tables, BigQuery, serverName, jdbc:driver, myDatabase, user, pw
//(contract.`type`, contract.sourceSystem, contract.server, contract.driver, contract.database, contract.username, contract.password)
val serverMap = contract.server.map(server => Map(URL -> server)).getOrElse(Map()) //TODO assume database would be part of server url? Probably not
val credentialsMap = Map(USERNAME -> contract.username, PASSWORD -> contract.password) //TODO don't need to get credentials as it should be part of data source connection details
.filter(_._2.nonEmpty)
.map(kv => (kv._1, kv._2.getOrElse("")))
val dataSourceMap = if (contract.driver.map(_.toLowerCase).contains(CASSANDRA_NAME) && contract.database.nonEmpty) {
Map(
FORMAT -> CASSANDRA,
CASSANDRA_KEYSPACE -> contract.database.getOrElse(""),
CASSANDRA_TABLE -> dataset.table
)
} else if (contract.driver.map(_.toLowerCase).contains(JDBC)) {
Map(
FORMAT -> JDBC,
DRIVER -> contract.driver.getOrElse(""),
JDBC_TABLE -> dataset.table
)
} else {
//TODO data source type will probably change in v3
contract.`type`.map(_.toLowerCase) match {
case Some(CSV) | Some(JSON) | Some(PARQUET) | Some(ORC) | Some(KAFKA) | Some(HTTP) | Some(SOLACE) =>
Map(FORMAT -> contract.`type`.get.toLowerCase) //TODO need to get PATH from somewhere
case _ =>
LOGGER.warn(s"Defaulting to format CSV since contract type is not supported, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}")
Map(FORMAT -> CSV) //TODO default to CSV for now
// case _ => throw new RuntimeException(s"Unable to determine data source type from ODCS file, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}")
}
}

baseMap ++ connectionConfig
}

private def getBaseColumnMetadata(column: OpenDataContractStandardColumn, dataType: DataType): Map[String, String] = {
Map(
FIELD_DATA_TYPE -> dataType.toString(),
IS_NULLABLE -> column.isNullable.getOrElse(false).toString,
ENABLED_NULL -> column.isNullable.getOrElse(false).toString,
IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString,
PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString,
IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString,
PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString,
CLUSTERING_POSITION -> column.clusterKeyPosition.getOrElse("-1").toString,
IS_UNIQUE -> column.isUnique.getOrElse(false).toString,
)
}

private def getDataType(dataset: OpenDataContractStandardDataset, column: OpenDataContractStandardColumn): DataType = {
column.logicalType.toLowerCase match {
case "string" => StringType
case "integer" => IntegerType
case "double" => DoubleType
case "float" => FloatType
case "long" => LongType
case "date" => DateType
case "timestamp" => TimestampType
case "array" => new ArrayType(StringType) //TODO: wait for how inner data type of array will be defined
case "object" => new StructType(List()) //TODO: wait for how nested data structures will be made
case x =>
LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, dataset=${dataset.table} column=$column, data-type=$x")
StringType
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard

import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.api.model.{ArrayType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType}
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{OpenDataContractStandard, OpenDataContractStandardColumn, OpenDataContractStandardDataset}
import org.apache.log4j.Logger
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object OpenDataContractStandardV2Mapper {

private val LOGGER = Logger.getLogger(getClass.getName)

implicit val columnMetadataEncoder: Encoder[FieldMetadata] = Encoders.kryo[FieldMetadata]

def toSubDataSourceMetadata(
value: OpenDataContractStandard,
dataset: OpenDataContractStandardDataset,
connectionConfig: Map[String, String]
)(implicit sparkSession: SparkSession): SubDataSourceMetadata = {
val readOptions = getDataSourceOptions(value, dataset, connectionConfig)
val columnMetadata = dataset.columns.map(cols => {
val mappedColumns = cols.map(column => {
val dataType = getDataType(dataset, column)
val metadata = getBaseColumnMetadata(column, dataType)
FieldMetadata(column.column, readOptions, metadata)
}).toList
sparkSession.createDataset(mappedColumns)
})
SubDataSourceMetadata(readOptions, columnMetadata)
}

private def getDataSourceOptions(
contract: OpenDataContractStandard,
dataset: OpenDataContractStandardDataset,
connectionConfig: Map[String, String]
): Map[String, String] = {
//identifier should be based on schema name
val baseMap = Map(METADATA_IDENTIFIER -> contract.uuid)
//Tables, BigQuery, serverName, jdbc:driver, myDatabase, user, pw
//(contract.`type`, contract.sourceSystem, contract.server, contract.driver, contract.database, contract.username, contract.password)
val serverMap = contract.server.map(server => Map(URL -> server)).getOrElse(Map())
val credentialsMap = Map(USERNAME -> contract.username, PASSWORD -> contract.password)
.filter(_._2.nonEmpty)
.map(kv => (kv._1, kv._2.getOrElse("")))
val dataSourceMap = if (contract.driver.map(_.toLowerCase).contains(CASSANDRA_NAME) && contract.database.nonEmpty) {
Map(
FORMAT -> CASSANDRA,
CASSANDRA_KEYSPACE -> contract.database.getOrElse(""),
CASSANDRA_TABLE -> dataset.table
)
} else if (contract.driver.map(_.toLowerCase).contains(JDBC)) {
Map(
FORMAT -> JDBC,
DRIVER -> contract.driver.getOrElse(""),
JDBC_TABLE -> dataset.table
)
} else {
contract.`type`.map(_.toLowerCase) match {
case Some(CSV) | Some(JSON) | Some(PARQUET) | Some(ORC) | Some(KAFKA) | Some(HTTP) | Some(SOLACE) =>
Map(FORMAT -> contract.`type`.get.toLowerCase)
case _ =>
LOGGER.warn(s"Defaulting to format CSV since contract type is not supported, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}")
Map(FORMAT -> CSV)
// case _ => throw new RuntimeException(s"Unable to determine data source type from ODCS file, name=${contract.datasetName}, type=${contract.`type`.getOrElse("")}")
}
}

baseMap ++ connectionConfig
}

private def getBaseColumnMetadata(column: OpenDataContractStandardColumn, dataType: DataType): Map[String, String] = {
Map(
FIELD_DATA_TYPE -> dataType.toString(),
IS_NULLABLE -> column.isNullable.getOrElse(false).toString,
ENABLED_NULL -> column.isNullable.getOrElse(false).toString,
IS_PRIMARY_KEY -> column.isPrimaryKey.getOrElse(false).toString,
PRIMARY_KEY_POSITION -> column.primaryKeyPosition.getOrElse("-1").toString,
CLUSTERING_POSITION -> column.clusterKeyPosition.getOrElse("-1").toString,
IS_UNIQUE -> column.isUnique.getOrElse(false).toString,
)
}

private def getDataType(dataset: OpenDataContractStandardDataset, column: OpenDataContractStandardColumn): DataType = {
column.logicalType.toLowerCase match {
case "string" => StringType
case "integer" => IntegerType
case "double" => DoubleType
case "float" => FloatType
case "long" => LongType
case "date" => DateType
case "timestamp" => TimestampType
case "array" => new ArrayType(StringType) //TODO: wait for how inner data type of array will be defined
case "object" => new StructType(List()) //TODO: wait for how nested data structures will be made
case x =>
LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, dataset=${dataset.table} column=$column, data-type=$x")
StringType
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard

import io.github.datacatering.datacaterer.api.model.Constants.{ENABLED_NULL, FIELD_DATA_TYPE, IS_NULLABLE, IS_PRIMARY_KEY, IS_UNIQUE, METADATA_IDENTIFIER, PRIMARY_KEY_POSITION}
import io.github.datacatering.datacaterer.api.model.{ArrayType, BooleanType, DataType, DateType, DoubleType, IntegerType, StringType, StructType}
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.SubDataSourceMetadata
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.database.FieldMetadata
import io.github.datacatering.datacaterer.core.generator.metadata.datasource.opendatacontractstandard.model.{LogicalTypeEnum, OpenDataContractStandardElementV3, OpenDataContractStandardSchemaV3, OpenDataContractStandardV3}
import org.apache.log4j.Logger
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object OpenDataContractStandardV3Mapper {

private val LOGGER = Logger.getLogger(getClass.getName)

implicit val columnMetadataEncoder: Encoder[FieldMetadata] = Encoders.kryo[FieldMetadata]

def toSubDataSourceMetadata(
value: OpenDataContractStandardV3,
schema: OpenDataContractStandardSchemaV3,
connectionConfig: Map[String, String]
)(implicit sparkSession: SparkSession): SubDataSourceMetadata = {
val readOptions = getDataSourceOptions(value, connectionConfig)
val propertyMetadata = schema.properties.map(props => {
val mappedProperties = props.map(property => toFieldMetadata(readOptions, property)).toList
sparkSession.createDataset(mappedProperties)
})
SubDataSourceMetadata(readOptions, propertyMetadata)
}

private def toFieldMetadata(
readOptions: Map[String, String],
property: OpenDataContractStandardElementV3
): FieldMetadata = {
val dataType = getDataType(property)
val metadata = getBasePropertyMetadata(property, dataType)
val nestedFields = if (dataType.isInstanceOf[StructType]) {
property.properties.getOrElse(Array())
.map(prop2 => toFieldMetadata(readOptions, prop2))
.toList
} else List()

FieldMetadata(property.name, readOptions, metadata, nestedFields)
}

private def getDataSourceOptions(
contract: OpenDataContractStandardV3,
connectionConfig: Map[String, String]
): Map[String, String] = {
val baseMap = Map(METADATA_IDENTIFIER -> contract.id)
baseMap ++ connectionConfig
}

private def getBasePropertyMetadata(property: OpenDataContractStandardElementV3, dataType: DataType): Map[String, String] = {
Map(
FIELD_DATA_TYPE -> dataType.toString(),
IS_NULLABLE -> property.required.getOrElse(false).toString,
ENABLED_NULL -> property.required.getOrElse(false).toString,
IS_PRIMARY_KEY -> property.primaryKey.getOrElse(false).toString,
PRIMARY_KEY_POSITION -> property.primaryKeyPosition.getOrElse("-1").toString,
IS_UNIQUE -> property.unique.getOrElse(false).toString,
)
}

private def getDataType(element: OpenDataContractStandardElementV3): DataType = {
element.logicalType match {
case LogicalTypeEnum.string => StringType
case LogicalTypeEnum.date => DateType
case LogicalTypeEnum.number => DoubleType
case LogicalTypeEnum.integer => IntegerType
case LogicalTypeEnum.array => new ArrayType(StringType)
case LogicalTypeEnum.`object` =>
val innerType = element.properties.getOrElse(Array())
.map(prop => prop.name -> getDataType(prop))
.toList
new StructType(innerType)
case LogicalTypeEnum.boolean => BooleanType
case x =>
LOGGER.warn(s"Unable to find corresponding known data type for column in ODCS file, defaulting to string, property=$element, data-type=$x")
StringType
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.github.datacatering.datacaterer.core.generator.metadata.datasource.op

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

@JsonIgnoreProperties(ignoreUnknown = true)
case class OpenDataContractStandard(
dataset: Array[OpenDataContractStandardDataset],
datasetName: String,
Expand All @@ -12,9 +11,12 @@ case class OpenDataContractStandard(
uuid: String,
version: String,
apiVersion: Option[String] = None,
contractCreatedTs: Option[String] = None,
customProperties: Option[Array[OpenDataContractStandardCustomProperty]] = None,
database: Option[String] = None,
datasetDomain: Option[String] = None,
datasetKind: Option[String] = None,
datasetProject: Option[String] = None,
description: Option[OpenDataContractStandardDescription] = None,
driver: Option[String] = None,
driverVersion: Option[String] = None,
Expand All @@ -28,12 +30,14 @@ case class OpenDataContractStandard(
tags: Option[Array[String]] = None,
tenant: Option[String] = None,
`type`: Option[String] = None,
schedulerAppName: Option[String] = None,
server: Option[String] = None,
slaDefaultColumn: Option[String] = None,
slaProperties: Option[Array[OpenDataContractStandardServiceLevelAgreementProperty]] = None,
sourceSystem: Option[String] = None,
sourcePlatform: Option[String] = None,
stakeholders: Option[Array[OpenDataContractStandardStakeholder]] = None,
systemInstance: Option[String] = None,
username: Option[String] = None,
userConsumptionMode: Option[String] = None,
)
Expand Down Expand Up @@ -143,5 +147,5 @@ case class OpenDataContractStandardAuthoritativeDefinition(
@JsonIgnoreProperties(ignoreUnknown = true)
case class OpenDataContractStandardCustomProperty(
property: String,
value: String,
value: Any,
)
Loading
Loading