Skip to content

Commit

Permalink
Changes applied by UKG/Anant team
Browse files Browse the repository at this point in the history
  • Loading branch information
faizalrub-datastax committed Mar 16, 2023
1 parent 4d706b2 commit 45d6d00
Show file tree
Hide file tree
Showing 25 changed files with 3,014 additions and 2,200 deletions.
430 changes: 217 additions & 213 deletions pom.xml

Large diffs are not rendered by default.

581 changes: 281 additions & 300 deletions src/main/java/datastax/astra/migrate/AbstractJobSession.java

Large diffs are not rendered by default.

228 changes: 118 additions & 110 deletions src/main/java/datastax/astra/migrate/BaseJobSession.java
Original file line number Diff line number Diff line change
@@ -1,110 +1,118 @@
package datastax.astra.migrate;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import org.apache.spark.SparkConf;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public abstract class BaseJobSession {

protected PreparedStatement sourceSelectStatement;
protected PreparedStatement astraSelectStatement;
protected PreparedStatement astraInsertStatement;
protected ConsistencyLevel readConsistencyLevel;
protected ConsistencyLevel writeConsistencyLevel;

// Read/Write Rate limiter
// Determine the total throughput for the entire cluster in terms of wries/sec,
// reads/sec
// then do the following to set the values as they are only applicable per JVM
// (hence spark Executor)...
// Rate = Total Throughput (write/read per sec) / Total Executors
protected RateLimiter readLimiter;
protected RateLimiter writeLimiter;
protected Integer maxRetries = 10;

protected CqlSession sourceSession;
protected CqlSession astraSession;
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();

protected Integer batchSize = 1;
protected Integer fetchSizeInRows = 1000;
protected Integer printStatsAfter = 100000;

protected Boolean writeTimeStampFilter = Boolean.FALSE;
protected Long minWriteTimeStampFilter = 0l;
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
protected Long customWritetime = 0l;

protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
protected List<Integer> ttlCols = new ArrayList<Integer>();
protected Boolean isCounterTable;

protected String sourceKeyspaceTable;
protected String astraKeyspaceTable;

protected Boolean hasRandomPartitioner;
protected Boolean filterData;
protected String filterColName;
protected String filterColType;
protected Integer filterColIndex;
protected String filterColValue;

protected String[] allCols;
protected String tsReplaceValStr;
protected long tsReplaceVal;

protected BaseJobSession(SparkConf sc) {
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
}

public String getKey(Row sourceRow) {
StringBuffer key = new StringBuffer();
for (int index = 0; index < idColTypes.size(); index++) {
MigrateDataType dataType = idColTypes.get(index);
if (index == 0) {
key.append(getData(dataType, index, sourceRow));
} else {
key.append(" %% " + getData(dataType, index, sourceRow));
}
}

return key.toString();
}

public List<MigrateDataType> getTypes(String types) {
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
for (String type : types.split(",")) {
dataTypes.add(new MigrateDataType(type));
}

return dataTypes;
}

public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
if (dataType.typeClass == Map.class) {
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
} else if (dataType.typeClass == List.class) {
return sourceRow.getList(index, dataType.subTypes.get(0));
} else if (dataType.typeClass == Set.class) {
return sourceRow.getSet(index, dataType.subTypes.get(0));
} else if (isCounterTable && dataType.typeClass == Long.class) {
Object data = sourceRow.get(index, dataType.typeClass);
if (data == null) {
return new Long(0);
}
}

return sourceRow.get(index, dataType.typeClass);
}
}
package datastax.astra.migrate;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import org.apache.spark.SparkConf;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public abstract class BaseJobSession {

protected PreparedStatement sourceSelectStatement;
protected PreparedStatement sourceSelectLatestStatement;
protected PreparedStatement astraSelectStatement;
protected PreparedStatement astraInsertStatement;
protected ConsistencyLevel readConsistencyLevel;
protected ConsistencyLevel writeConsistencyLevel;

// Read/Write Rate limiter
// Determine the total throughput for the entire cluster in terms of wries/sec,
// reads/sec
// then do the following to set the values as they are only applicable per JVM
// (hence spark Executor)...
// Rate = Total Throughput (write/read per sec) / Total Executors
protected RateLimiter readLimiter;
protected RateLimiter writeLimiter;
protected Integer maxRetries = 10;
protected Integer maxRetriesRowFailure = 2;

protected CqlSession sourceSession;
protected CqlSession astraSession;
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();

protected Integer batchSize = 1;
protected Integer fetchSizeInRows = 1000;
protected Integer printStatsAfter = 100000;

protected Boolean writeTimeStampFilter = Boolean.FALSE;
protected Long minWriteTimeStampFilter = 0l;
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
protected Long customWritetime = 0l;

protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
protected List<Integer> ttlCols = new ArrayList<Integer>();
protected Boolean isCounterTable;

protected String sourceKeyspaceTable;
protected String astraKeyspaceTable;

protected Boolean hasRandomPartitioner;
protected Boolean filterData;
protected String filterColName;
protected String filterColType;
protected Integer filterColIndex;
protected String filterColValue;

protected Boolean enableDefaultTTL = Boolean.FALSE;
protected Integer defaultTTL = 7776000; //default TTL as 90days

protected Boolean enableDefaultWriteTime = Boolean.FALSE;
protected Long defaultWriteTime = 1640998861000l; //default as Saturday, January 1, 2022 2:01:01 AM GMT+01:00 in epoch microseconds

protected String tokenRangeExceptionDir;
protected String rowExceptionDir;
protected String exceptionFileName;

protected BaseJobSession(SparkConf sc) {
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
}

public String getKey(Row sourceRow) {
StringBuffer key = new StringBuffer();
for (int index = 0; index < idColTypes.size(); index++) {
MigrateDataType dataType = idColTypes.get(index);
if (index == 0) {
key.append(getData(dataType, index, sourceRow));
} else {
key.append(" %% " + getData(dataType, index, sourceRow));
}
}

return key.toString();
}

public List<MigrateDataType> getTypes(String types) {
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
for (String type : types.split(",")) {
dataTypes.add(new MigrateDataType(type));
}

return dataTypes;
}

public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
if (dataType.typeClass == Map.class) {
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
} else if (dataType.typeClass == List.class) {
return sourceRow.getList(index, dataType.subTypes.get(0));
} else if (dataType.typeClass == Set.class) {
return sourceRow.getSet(index, dataType.subTypes.get(0));
} else if (isCounterTable && dataType.typeClass == Long.class) {
Object data = sourceRow.get(index, dataType.typeClass);
if (data == null) {
return new Long(0);
}
}

return sourceRow.get(index, dataType.typeClass);
}
}
Loading

0 comments on commit 45d6d00

Please sign in to comment.