Skip to content

Commit

Permalink
#516 Fix JDBC source and other sources error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 11, 2024
1 parent 2d1f104 commit e86255d
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ object ExternalChannelFactoryReflect {
throw new IllegalArgumentException(s"A class should be specified for the $channelType at '$parentPath'.")
}
val clazz = conf.getString(FACTORY_CLASS_KEY)
try {
val factory = ClassLoaderUtils.loadSingletonClassOfType[ExternalChannelFactoryV2[T]](clazz)
factory.apply(conf, workflowConfig, parentPath, spark)
val factory = try {
ClassLoaderUtils.loadSingletonClassOfType[ExternalChannelFactoryV2[T]](clazz)
} catch {
case _: Throwable =>
val factory = ClassLoaderUtils.loadSingletonClassOfType[ExternalChannelFactory[T]](clazz)
factory.apply(conf, parentPath, spark)
ClassLoaderUtils.loadSingletonClassOfType[ExternalChannelFactory[T]](clazz)
}

factory match {
case fv2: ExternalChannelFactoryV2[T] =>
fv2.apply(conf, workflowConfig, parentPath, spark)
case fv1: ExternalChannelFactory[T] =>
fv1.apply(conf, parentPath, spark)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ object TableReaderJdbcConfig {

val infoDateTypeStr = ConfigUtils.getOptionString(conf, INFORMATION_DATE_TYPE).getOrElse("date")

val infoDateType = SqlColumnType.fromStringStrict(infoDateTypeStr, parent)
val infoDateType = if (hasInformationDate) {
SqlColumnType.fromStringStrict(infoDateTypeStr, parent)
} else {
SqlColumnType.DATE
}

val saveTimestampsAsDates = ConfigUtils.getOptionBoolean(conf, JDBC_TIMESTAMPS_AS_DATES).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,42 @@ class ExternalChannelFactorySuite extends AnyWordSpec with SparkTestBase {

assert(ex.getMessage.contains("A class should be specified for the dummy"))
}

"re-throw an exception from the V1 factory call" in {
val workflowConf = ConfigFactory.parseString(
"""test = "test"
|""".stripMargin)


val conf = ConfigFactory.parseString(
"""factory.class = "za.co.absa.pramen.core.mocks.ExternalChannelV2Mock"
|fail = true
|""".stripMargin)

val ex = intercept[RuntimeException] {
ExternalChannelFactoryReflect.fromConfig[ExternalChannelV2Mock](conf, workflowConf, "", "dummy")
}

assert(ex.getMessage == "Test exception")
}

"re-throw an exception from the V2 factory call" in {
val workflowConf = ConfigFactory.parseString(
"""test = "test"
|""".stripMargin)


val conf = ConfigFactory.parseString(
"""factory.class = "za.co.absa.pramen.core.mocks.ExternalChannelMock"
|fail = true
|""".stripMargin)

val ex = intercept[RuntimeException] {
ExternalChannelFactoryReflect.fromConfig[ExternalChannelMock](conf, workflowConf, "", "dummy")
}

assert(ex.getMessage == "Test exception")
}
}

"fromConfigByName" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ class ExternalChannelMock(conf: Config, val value1: String, val value2: String)
}

object ExternalChannelMock extends ExternalChannelFactory[ExternalChannelMock] {
val CONFIG_FAIL_KEY = "fail"
val CONFIG_KEY1 = "key1"
val CONFIG_KEY2 = "key2"

override def apply(conf: Config, parentPath: String, spark: SparkSession): ExternalChannelMock = {
if (conf.hasPath(CONFIG_FAIL_KEY)) throw new RuntimeException("Test exception")
val value1 = conf.getString(CONFIG_KEY1)
val value2 = conf.getString(CONFIG_KEY2)
new ExternalChannelMock(conf, value1, value2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ class ExternalChannelV2Mock(conf: Config, val workflowConfig: Config, val value1
}

object ExternalChannelV2Mock extends ExternalChannelFactoryV2[ExternalChannelV2Mock] {
val CONFIG_FAIL_KEY = "fail"
val CONFIG_KEY1 = "key1"
val CONFIG_KEY2 = "key2"

override def apply(conf: Config, workflowConfig: Config, parentPath: String, spark: SparkSession): ExternalChannelV2Mock = {
if (conf.hasPath(CONFIG_FAIL_KEY)) throw new RuntimeException("Test exception")
val value1 = conf.getString(CONFIG_KEY1)
val value2 = conf.getString(CONFIG_KEY2)

new ExternalChannelV2Mock(conf, workflowConfig, value1, value2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,35 @@ class JdbcSourceSuite extends AnyWordSpec with BeforeAndAfterAll with SparkTestB
| information.date.type = "date"
| information.date.app.format = "yyyy-MM-DD"
| information.date.sql.format = "YYYY-mm-DD"
| },
| {
| name = "jdbc3"
| factory.class = "za.co.absa.pramen.core.source.JdbcSource"
| jdbc {
| driver = "driver3"
| connection.string = "url3"
| user = "user3"
| password = "password3"
| }
|
| has.information.date.column = false
| information.date.type = "wrong"
| },
| {
| name = "jdbc4"
| factory.class = "za.co.absa.pramen.core.source.JdbcSource"
| jdbc {
| driver = "driver4"
| connection.string = "url4"
| user = "user4"
| password = "password4"
| }
|
| has.information.date.column = true
| information.date.type = "wrong"
| information.date.column = "INFO_DATE"
| information.date.app.format = "yyyy-MM-DD"
| information.date.sql.format = "YYYY-mm-DD"
| }
| ]
| }
Expand Down Expand Up @@ -116,6 +145,24 @@ class JdbcSourceSuite extends AnyWordSpec with BeforeAndAfterAll with SparkTestB
assert(fetchedConfig.hasPath(DISABLE_COUNT_QUERY))
}

"be able to get a source if the source is snapshot and info date column type is wrong" in {
val src = ExternalChannelFactoryReflect.fromConfigByName[Source](conf, None, "pramen.sources", "Jdbc3", "source").asInstanceOf[JdbcSource]
val (fetchedConfig, index) = ExternalChannelFactoryReflect.getConfigByName(conf, None, "pramen.sources", "jdbc2", "source")

assert(src.jdbcReaderConfig.jdbcConfig.driver == "driver3")
assert(!src.jdbcReaderConfig.hasInfoDate)
assert(index == 1)
assert(fetchedConfig.hasPath(DISABLE_COUNT_QUERY))
}

"throw a proper exception when info date column type is wrong" in {
val ex = intercept[IllegalArgumentException] {
ExternalChannelFactoryReflect.fromConfigByName[Source](conf, None, "pramen.sources", "Jdbc4", "source").asInstanceOf[JdbcSource]
}

assert(ex.getMessage.contains("nknown information type 'wrong'"))
}

"be able to get a source by its name from the manager" in {
val src = SourceManager.getSourceByName("Jdbc2", conf, None).asInstanceOf[JdbcSource]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
| has.information.date.column = false
|
| information.date.column = "INFO_DATE"
| information.date.type = "number"
| information.date.format = "yyyy-MM-DD"
|
| offset.column {
| name = "ts"
Expand Down Expand Up @@ -95,7 +93,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
| has.information.date.column = true
|
| information.date.column = "INFO_DATE"
| information.date.type = "date"
| information.date.type = "number"
|}""".stripMargin)

"be able to be constructed properly from config" in {
Expand All @@ -108,9 +106,6 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
assert(jdbc.jdbcConfig.primaryUrl.get == url)
assert(jdbc.jdbcConfig.user.contains(user))
assert(jdbc.jdbcConfig.password.contains(password))
assert(jdbc.infoDateColumn == "INFO_DATE")
assert(jdbc.infoDateType == SqlColumnType.NUMBER)
assert(jdbc.infoDateFormat == "yyyy-MM-DD")
assert(jdbc.offsetInfoOpt.nonEmpty)
assert(jdbc.offsetInfoOpt.get.offsetColumn == "ts")
assert(jdbc.offsetInfoOpt.get.offsetType.dataTypeString == "datetime")
Expand Down Expand Up @@ -150,7 +145,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
assert(jdbc.jdbcConfig.user.contains(user))
assert(jdbc.jdbcConfig.password.contains(password))
assert(jdbc.infoDateColumn == "INFO_DATE")
assert(jdbc.infoDateType == SqlColumnType.DATE)
assert(jdbc.infoDateType == SqlColumnType.NUMBER)
assert(jdbc.infoDateFormat == "yyyy-MM-dd")
assert(jdbc.offsetInfoOpt.isEmpty)
assert(jdbc.hasInfoDate)
Expand Down Expand Up @@ -202,6 +197,27 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
assert(!jdbc.hasInfoDate)
}

"ensure jdbc minimal snapshot configuration works even if wrong format for the info date is specified" in {
val testConfig = ConfigFactory.parseString(
s"""reader {
| jdbc {
| driver = "$driver"
| connection.string = "$url"
| user = "$user"
| password = "$password"
| }
|
| has.information.date.column = false
| information.date.column = "INFO_DATE"
| information.date.type = "wrong"
|}""".stripMargin)
val reader = TableReaderJdbc(testConfig.getConfig("reader"), testConfig.getConfig("reader"), "reader")

val jdbc = reader.getJdbcConfig

assert(!jdbc.hasInfoDate)
}

"ensure jdbc minimal event configuration works" in {
val testConfig = ConfigFactory.parseString(
s"""reader {
Expand Down

0 comments on commit e86255d

Please sign in to comment.