diff --git a/pom.xml b/pom.xml
index 8a248f35..5afd8425 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,213 +1,217 @@
-
- 4.0.0
-
- datastax.astra.migrate
- cassandra-data-migrator
- ${revision}
- jar
-
-
- UTF-8
- 3.2.2
- 2.12.17
- 2.12
- 3.3.1
- 3.2.12
- 3.2.0
- 3.11.13
- 4.13.2
-
-
-
-
- github
- GitHub Packages
- https://maven.pkg.github.com/datastax/cassandra-data-migrator
-
-
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
- org.apache.spark
- spark-core_${scala.main.version}
- ${spark.version}
-
-
- log4j
- log4j
-
-
-
-
- org.apache.spark
- spark-sql_${scala.main.version}
- ${spark.version}
-
-
- org.apache.spark
- spark-hive_${scala.main.version}
- ${spark.version}
-
-
- log4j
- log4j
-
-
- log4j
- apache-log4j-extras
-
-
-
-
- com.datastax.spark
- spark-cassandra-connector_${scala.main.version}
- ${connector.version}
-
-
- com.github.jnr
- jnr-posix
- 3.1.15
-
-
-
- org.apache.logging.log4j
- log4j-api
- 2.19.0
-
-
- org.apache.logging.log4j
- log4j-core
- 2.19.0
-
-
- org.apache.logging.log4j
- log4j-to-slf4j
- 2.19.0
-
-
-
-
- org.scalatest
- scalatest_${scala.main.version}
- ${scalatest.version}
- test
-
-
- junit
- junit
- ${junit.version}
- test
-
-
- org.apache.cassandra
- cassandra-all
- ${cassandra.version}
- test
-
-
-
- org.slf4j
- log4j-over-slf4j
-
-
-
-
-
-
-
-
-
- src/resources
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
- 4.8.0
-
-
- process-sources
-
- compile
- testCompile
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.4.1
-
-
-
- package
-
- shade
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
- 2.22.2
-
- true
-
-
-
-
- org.scalatest
- scalatest-maven-plugin
- 2.2.0
-
- ${project.build.directory}/surefire-reports
- .
- WDF TestSuite.txt
-
-
-
- test
-
- test
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.10.1
-
-
- 1.8
-
-
-
-
-
+
+ 4.0.0
+
+ datastax.astra.migrate
+ cassandra-data-migrator
+ 3.0.0
+ jar
+
+
+ UTF-8
+ 2.12.17
+ 2.12
+ 3.3.1
+ 3.2.12
+ 3.2.0
+ 3.11.13
+ 4.13.2
+
+
+
+
+ github
+ GitHub Packages
+ https://maven.pkg.github.com/datastax/cassandra-data-migrator
+
+
+
+
+
+ com.google.guava
+ guava
+ 31.1-jre
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+
+
+ org.apache.spark
+ spark-core_${scala.main.version}
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+
+
+ org.apache.spark
+ spark-sql_${scala.main.version}
+ ${spark.version}
+
+
+ org.apache.spark
+ spark-hive_${scala.main.version}
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ log4j
+ apache-log4j-extras
+
+
+
+
+ com.datastax.spark
+ spark-cassandra-connector_${scala.main.version}
+ ${connector.version}
+
+
+ com.github.jnr
+ jnr-posix
+ 3.1.15
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.19.0
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.19.0
+
+
+ org.apache.logging.log4j
+ log4j-to-slf4j
+ 2.19.0
+
+
+
+
+ org.scalatest
+ scalatest_${scala.main.version}
+ ${scalatest.version}
+ test
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.apache.cassandra
+ cassandra-all
+ ${cassandra.version}
+ test
+
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+
+
+
+
+
+
+
+ src/resources
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+ process-sources
+
+ compile
+ testCompile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.3
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.7
+
+ true
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ WDF TestSuite.txt
+
+
+
+ test
+
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+
+ 1.8
+
+
+
+
+
diff --git a/src/main/java/datastax/astra/migrate/AbstractJobSession.java b/src/main/java/datastax/astra/migrate/AbstractJobSession.java
index e1ad398d..df7f7a2d 100644
--- a/src/main/java/datastax/astra/migrate/AbstractJobSession.java
+++ b/src/main/java/datastax/astra/migrate/AbstractJobSession.java
@@ -1,300 +1,281 @@
-package datastax.astra.migrate;
-
-import com.datastax.oss.driver.api.core.CqlSession;
-import com.datastax.oss.driver.api.core.cql.BoundStatement;
-import com.datastax.oss.driver.api.core.cql.PreparedStatement;
-import com.datastax.oss.driver.api.core.cql.Row;
-import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.IntStream;
-
-public class AbstractJobSession extends BaseJobSession {
-
- public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
-
- protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
- this(sourceSession, astraSession, sc, false);
- }
-
- protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
- super(sc);
-
- if (sourceSession == null) {
- return;
- }
-
- this.sourceSession = sourceSession;
- this.astraSession = astraSession;
-
- batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "5"));
- fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
- printStatsAfter = new Integer(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
- if (printStatsAfter < 1) {
- printStatsAfter = 100000;
- }
-
- readLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
- writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
- maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "0"));
-
- sourceKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
- astraKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
-
- String ttlColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.ttl.cols");
- if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
- for (String ttlCol : ttlColsStr.split(",")) {
- ttlCols.add(Integer.parseInt(ttlCol));
- }
- }
-
- String writeTimestampColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.writetime.cols");
- if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
- for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
- writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
- }
- }
-
- writeTimeStampFilter = Boolean
- .parseBoolean(Util.getSparkPropOr(sc, "spark.origin.writeTimeStampFilter", "false"));
- // batchsize set to 1 if there is a writeFilter
- if (writeTimeStampFilter) {
- batchSize = 1;
- }
-
- String minWriteTimeStampFilterStr =
- Util.getSparkPropOr(sc, "spark.origin.minWriteTimeStampFilter", "0");
- if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr.trim().length() > 1) {
- minWriteTimeStampFilter = Long.parseLong(minWriteTimeStampFilterStr);
- }
- String maxWriteTimeStampFilterStr =
- Util.getSparkPropOr(sc, "spark.origin.maxWriteTimeStampFilter", "0");
- if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
- maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
- }
-
- String customWriteTimeStr =
- Util.getSparkPropOr(sc, "spark.target.custom.writeTime", "0");
- if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
- customWritetime = Long.parseLong(customWriteTimeStr);
- }
-
- logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
- logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
- logger.info("PARAM -- Write Batch Size: {}", batchSize);
- logger.info("PARAM -- Max Retries: {}", maxRetries);
- logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
- logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
- logger.info("PARAM -- Destination Keyspace Table: {}", astraKeyspaceTable);
- logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
- logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
- logger.info("PARAM -- TTLCols: {}", ttlCols);
- logger.info("PARAM -- WriteTimestampFilterCols: {}", writeTimeStampCols);
- logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
- if (writeTimeStampFilter) {
- logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
- Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
- logger.info("PARAM -- maxWriteTimeStampFilter: {} datetime is {}", maxWriteTimeStampFilter,
- Instant.ofEpochMilli(maxWriteTimeStampFilter / 1000));
- }
-
- String selectCols = Util.getSparkProp(sc, "spark.query.origin");
- String partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
- String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
- if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
- sourceSelectCondition = " AND " + sourceSelectCondition;
- }
-
- final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
- allCols = selectCols.split(",");
- ttlCols.forEach(col -> {
- selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
- });
- writeTimeStampCols.forEach(col -> {
- selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
- });
- selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
- String idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
- idColTypes = selectColTypes.subList(0, idCols.split(",").length);
-
- String insertCols = Util.getSparkPropOrEmpty(sc, "spark.query.target");
- if (null == insertCols || insertCols.trim().isEmpty()) {
- insertCols = selectCols;
- }
- String insertBinds = "";
- for (String str : idCols.split(",")) {
- if (insertBinds.isEmpty()) {
- insertBinds = str + "= ?";
- } else {
- insertBinds += " and " + str + "= ?";
- }
- }
-
- String fullSelectQuery;
- if (!isJobMigrateRowsFromFile) {
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable +
- " where token(" + partitionKey.trim() + ") >= ? and token(" + partitionKey.trim() + ") <= ? " +
- sourceSelectCondition + " ALLOW FILTERING";
- } else {
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds;
- }
- sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
- logger.info("PARAM -- Query used: {}", fullSelectQuery);
-
- astraSelectStatement = astraSession.prepare(
- "select " + insertCols + " from " + astraKeyspaceTable
- + " where " + insertBinds);
-
- hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
- isCounterTable = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.counterTable", "false"));
- if (isCounterTable) {
- String updateSelectMappingStr = Util.getSparkPropOr(sc, "spark.counterTable.cql.index", "0");
- for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
- updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
- }
-
- String counterTableUpdate = Util.getSparkProp(sc, "spark.counterTable.cql");
- astraInsertStatement = astraSession.prepare(counterTableUpdate);
- } else {
- insertBinds = "";
- for (String str : insertCols.split(",")) {
- if (insertBinds.isEmpty()) {
- insertBinds += "?";
- } else {
- insertBinds += ", ?";
- }
- }
-
- String fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
- if (!ttlCols.isEmpty()) {
- fullInsertQuery += " USING TTL ?";
- if (!writeTimeStampCols.isEmpty()) {
- fullInsertQuery += " AND TIMESTAMP ?";
- }
- } else if (!writeTimeStampCols.isEmpty()) {
- fullInsertQuery += " USING TIMESTAMP ?";
- }
- astraInsertStatement = astraSession.prepare(fullInsertQuery);
- }
-
- // Handle rows with blank values for 'timestamp' data-type in primary-key fields
- tsReplaceValStr = Util.getSparkPropOr(sc, "spark.target.replace.blankTimestampKeyUsingEpoch", "");
- if (!tsReplaceValStr.isEmpty()) {
- tsReplaceVal = Long.parseLong(tsReplaceValStr);
- }
- }
-
- public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
- BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);
-
- if (isCounterTable) {
- for (int index = 0; index < selectColTypes.size(); index++) {
- MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
- // compute the counter delta if reading from astra for the difference
- if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
- boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
- } else {
- boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
- }
- }
- } else {
- int index = 0;
- for (index = 0; index < selectColTypes.size(); index++) {
- boundInsertStatement = getBoundStatement(sourceRow, boundInsertStatement, index, selectColTypes);
- if (boundInsertStatement == null) return null;
- }
-
- if (!ttlCols.isEmpty()) {
- boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
- index++;
- }
- if (!writeTimeStampCols.isEmpty()) {
- if (customWritetime > 0) {
- boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
- } else {
- boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
- }
- }
- }
-
- // Batch insert for large records may take longer, hence 10 secs to avoid timeout errors
- return boundInsertStatement.setTimeout(Duration.ofSeconds(10));
- }
-
- public int getLargestTTL(Row sourceRow) {
- return IntStream.range(0, ttlCols.size())
- .map(i -> sourceRow.getInt(selectColTypes.size() + i)).max().getAsInt();
- }
-
- public long getLargestWriteTimeStamp(Row sourceRow) {
- return IntStream.range(0, writeTimeStampCols.size())
- .mapToLong(i -> sourceRow.getLong(selectColTypes.size() + ttlCols.size() + i)).max().getAsLong();
- }
-
- public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
- BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
- for (int index = 0; index < idColTypes.size(); index++) {
- boundSelectStatement = getBoundStatement(sourceRow, boundSelectStatement, index, idColTypes);
- if (boundSelectStatement == null) return null;
- }
-
- return boundSelectStatement;
- }
-
- private BoundStatement getBoundStatement(Row sourceRow, BoundStatement boundSelectStatement, int index,
- List cols) {
- MigrateDataType dataTypeObj = cols.get(index);
- Object colData = getData(dataTypeObj, index, sourceRow);
-
- // Handle rows with blank values in primary-key fields
- if (index < idColTypes.size()) {
- Optional