Skip to content

Commit

Permalink
Add seed job progress logs
Browse files Browse the repository at this point in the history
When running seed jobs processing large CSVs there is no default progress reporting in the logs, making it hard to determine if the seed job is still running, and how far through processing it is. This commit adds a log line every 10,000 rows to help track progress.
  • Loading branch information
davidatkinsuk committed Jan 21, 2025
1 parent 2bea89b commit f88f069
Showing 1 changed file with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,33 +121,46 @@ class SeedService(
// so we first do a full pass but only deserializing each row
seedLogger.info("Processing CSV file ${Path.of(job.resolveCsvPath()).absolutePathString()}")
enforcePresenceOfRequiredHeaders(job, resolveCsvPath)
ensureCsvCanBeDeserialized(job, resolveCsvPath)
val rowCount = ensureCsvCanBeDeserialized(job, resolveCsvPath)

job.preSeed()
val rowsProcessed = processCsv(job, resolveCsvPath)
val rowsProcessed = processCsv(
job = job,
resolveCsvPath = resolveCsvPath,
rowCount = rowCount,
)
job.postSeed()

return rowsProcessed
}

@SuppressWarnings("TooGenericExceptionThrown")
private fun <T> processCsv(job: SeedJob<T>, resolveCsvPath: SeedJob<T>.() -> String): Int {
var rowNumber = 1
@SuppressWarnings("TooGenericExceptionThrown", "MagicNumber")
private fun <T> processCsv(
job: SeedJob<T>,
resolveCsvPath: SeedJob<T>.() -> String,
rowCount: Int,
): Int {
var rowNumber = 0
val errors = mutableListOf<String>()

seedLogger.info("Processing $rowCount rows")

try {
csvReader().open(job.resolveCsvPath()) {
readAllWithHeaderAsSequence().forEach { row ->
rowNumber += 1
val deserializedRow = job.deserializeRow(row)
try {
job.processRow(deserializedRow)
} catch (exception: RuntimeException) {
val rootCauseException = findRootCause(exception)
errors.add("Error on row $rowNumber: ${exception.message} ${if (rootCauseException != null) rootCauseException.message else "no exception cause"}")
seedLogger.error("Error on row $rowNumber:", exception)
} finally {
if ((rowNumber % 10_000) == 0) {
seedLogger.info("Have processed $rowNumber of $rowCount rows")
}
}

rowNumber += 1
}
}
} catch (exception: Exception) {
Expand Down Expand Up @@ -178,21 +191,20 @@ class SeedService(
}
}

private fun <T> ensureCsvCanBeDeserialized(job: SeedJob<T>, resolveCsvPath: SeedJob<T>.() -> String) {
private fun <T> ensureCsvCanBeDeserialized(job: SeedJob<T>, resolveCsvPath: SeedJob<T>.() -> String): Int {
seedLogger.info("Validating that CSV can be fully read")
var rowNumber = 1
var rowNumber = 0
val errors = mutableListOf<String>()

try {
csvReader().open(job.resolveCsvPath()) {
readAllWithHeaderAsSequence().forEach { row ->
rowNumber += 1
try {
job.deserializeRow(row)
} catch (exception: Exception) {
errors += "Unable to deserialize CSV at row: $rowNumber: ${exception.message} ${exception.stackTrace.joinToString("\n")}"
}

rowNumber += 1
}
}
} catch (exception: Exception) {
Expand All @@ -202,5 +214,7 @@ class SeedService(
if (errors.any()) {
throw RuntimeException("There were issues deserializing the CSV:\n${errors.joinToString(", \n")}")
}

return rowNumber
}
}

0 comments on commit f88f069

Please sign in to comment.