Skip to content

Commit

Permalink
Adding Option for custome date format string (lensesio#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
jphillips-grier committed Dec 22, 2022
1 parent bbd3c5b commit 5c60585
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ object CassandraConfigSource {
ConfigDef.Width.MEDIUM,
CassandraConfigConstants.INITIAL_OFFSET,
)
.define(
CassandraConfigConstants.DATE_FORMAT_STRING,
Type.STRING,
CassandraConfigConstants.DATE_FORMAT_STRING_DEFAULT,
Importance.LOW,
CassandraConfigConstants.DATE_FORMAT_STRING_DOC,
"Import",
10,
ConfigDef.Width.MEDIUM,
CassandraConfigConstants.DATE_FORMAT_STRING,
)
.define(
CassandraConfigConstants.MAPPING_COLLECTION_TO_JSON,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ object CassandraConfigConstants {
val INITIAL_OFFSET_DOC =
"The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS'Z'). Default 1900-01-01 00:00:00.0000000Z"

val DATE_FORMAT_STRING = s"$CONNECTOR_PREFIX.date.format.string"
val DATE_FORMAT_STRING_DEFAULT = "yyyy-MM-dd HH:mm:ss.SSS'Z'"
val DATE_FORMAT_STRING_DOC =
"The format string to use for formatting the initial offset and other "

val MAPPING_COLLECTION_TO_JSON = s"$CONNECTOR_PREFIX.mapping.collection.to.json"
val MAPPING_COLLECTION_TO_JSON_DOC = "Mapping columns with type Map, List and Set like json"
val MAPPING_COLLECTION_TO_JSON_DEFAULT = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ case class CassandraSourceSetting(
timeSliceDuration: Long = CassandraConfigConstants.TIMESLICE_DURATION_DEFAULT,
timeSliceDelay: Long = CassandraConfigConstants.TIMESLICE_DELAY_DEFAULT,
initialOffset: String = CassandraConfigConstants.INITIAL_OFFSET_DEFAULT,
dateFormatString: String = CassandraConfigConstants.DATE_FORMAT_STRING_DEFAULT,
timeSliceMillis: Long = CassandraConfigConstants.TIME_SLICE_MILLIS_DEFAULT,
mappingCollectionToJson: Boolean = CassandraConfigConstants.MAPPING_COLLECTION_TO_JSON_DEFAULT,
connectTimeout: Int = CassandraConfigConstants.DEFAULT_CONNECT_TIMEOUT,
Expand Down Expand Up @@ -116,6 +117,7 @@ object CassandraSettings extends StrictLogging {
val timeSliceDuration = config.getLong(CassandraConfigConstants.TIMESLICE_DURATION)
val timeSliceDelay = config.getLong(CassandraConfigConstants.TIMESLICE_DELAY)
val initialOffset = config.getString(CassandraConfigConstants.INITIAL_OFFSET)
val dateFormatString = config.getString(CassandraConfigConstants.DATE_FORMAT_STRING)
val timeSliceMillis = config.getLong(CassandraConfigConstants.TIME_SLICE_MILLIS)
val mappingCollectionToJson = config.getBoolean(CassandraConfigConstants.MAPPING_COLLECTION_TO_JSON)

Expand Down Expand Up @@ -159,6 +161,7 @@ object CassandraSettings extends StrictLogging {
fetchSize = fetchSize,
timeSliceDuration = timeSliceDuration,
timeSliceDelay = timeSliceDelay,
dateFormatString = dateFormatString,
initialOffset = initialOffset,
timeSliceMillis = timeSliceMillis,
mappingCollectionToJson = mappingCollectionToJson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ class CassandraTableReader(
private val cqlGenerator = new CqlGenerator(setting)

class CassandraDateFormatter {
private val dateFormatPattern = "yyyy-MM-dd HH:mm:ss.SSS'Z'"

def parse(date: String): Date = {
val dateFormatter = new SimpleDateFormat(dateFormatPattern)
val dateFormatter = new SimpleDateFormat(dateFormatString)
dateFormatter.parse(date)
}

def format(date: Date): String = {
val dateFormatter = new SimpleDateFormat(dateFormatPattern)
val dateFormatter = new SimpleDateFormat(dateFormatString)
dateFormatter.format(date)
}

Expand All @@ -82,6 +81,7 @@ class CassandraTableReader(
}
}

private val dateFormatString = setting.dateFormatString
private val dateFormatter = new CassandraDateFormatter()
private val primaryKeyCol = setting.primaryKeyColumn.getOrElse("")
private val querying = new AtomicBoolean(false)
Expand Down

0 comments on commit 5c60585

Please sign in to comment.