Skip to content

Commit

Permalink
Removed deprecated partition-file feature (#274)
Browse files Browse the repository at this point in the history
* Removed deprecated partition-file feature
* Updated release notes
* Enforcing improved coverage after the recent test additions & removal of deprecated functions
  • Loading branch information
pravinbhat authored Jul 22, 2024
1 parent 2cd422e commit 35bf514
Show file tree
Hide file tree
Showing 39 changed files with 202 additions and 595 deletions.
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ COPY ./src /assets/src
COPY ./pom.xml /assets/pom.xml
COPY ./src/resources/cdm.properties /assets/
COPY ./src/resources/cdm-detailed.properties /assets/
COPY ./src/resources/partitions.csv /assets/
COPY ./scripts/get-latest-maven-version.sh ./get-latest-maven-version.sh

RUN chmod +x ./get-latest-maven-version.sh && \
Expand Down
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.re
Note:
- Above command generates a log file `logfile_name_*.txt` to avoid log output on the console.
- Update the memory options (driver & executor memory) based on your use-case
- To track details of a run in the `target` keyspace, pass param `--conf spark.cdm.trackRun=true`
- To filter and migrate data only in a specific token range, you can pass the below two additional params to the `Migration` or `Validation` jobs

```
--conf spark.cdm.filter.cassandra.partition.min=<token-range-min>
--conf spark.cdm.filter.cassandra.partition.max=<token-range-max>
```

# Steps for Data-Validation:

Expand Down Expand Up @@ -84,7 +91,7 @@ Note:
- The validation job will never delete records from target i.e. it only adds or updates data on target

# Rerun (previously incomplete) Migration or Validation
- You can rerun a Migration or Validation job to complete a previous run that could have stopped for any reasons. This mode will skip any token-ranges from previous run that were migrated or validated successfully. This is done by passing the `spark.cdm.trackRun.previousRunId` param as shown below
- You can rerun/resume a Migration or Validation job to complete a previous run that could have stopped (or completed with some errors) for any reasons. This mode will skip any token-ranges from the previous run that were migrated (or validated) successfully. This is done by passing the `spark.cdm.trackRun.previousRunId` param as shown below

```
./spark-submit --properties-file cdm.properties \
Expand All @@ -93,8 +100,6 @@ Note:
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```
Note:
- This feature replaces and improves upon an older similar feature (using param `spark.cdm.tokenrange.partitionFile`) that is now deprecated and will be removed soon.

# Perform large-field Guardrail violation checks
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below
Expand Down Expand Up @@ -125,7 +130,6 @@ Note:
- Validate migration accuracy and performance using a smaller randomized data-set
- Supports adding custom fixed `writetime`
- Track run information (start-time, end-time, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace
- Validation - Log partitions range level exceptions, use the exceptions file as input for rerun

# Things to know
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace.
Expand Down
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.3.3] - 2024-07-22
- Removed deprecated functionality related to processing token-ranges via partition-file

## [4.3.2] - 2024-07-19
- Removed deprecated functionality related to retry

Expand Down
24 changes: 0 additions & 24 deletions SIT/features/06_partition_range/breakData.cql

This file was deleted.

4 changes: 0 additions & 4 deletions SIT/features/06_partition_range/cdm.txt

This file was deleted.

27 changes: 0 additions & 27 deletions SIT/features/06_partition_range/execute.sh

This file was deleted.

15 changes: 0 additions & 15 deletions SIT/features/06_partition_range/expected.cql

This file was deleted.

7 changes: 0 additions & 7 deletions SIT/features/06_partition_range/expected.out

This file was deleted.

20 changes: 0 additions & 20 deletions SIT/features/06_partition_range/migrate.properties

This file was deleted.

This file was deleted.

This file was deleted.

2 changes: 0 additions & 2 deletions SIT/features/06_partition_range/partitions.csv

This file was deleted.

22 changes: 0 additions & 22 deletions SIT/features/06_partition_range/setup.cql

This file was deleted.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@
<limit>
<counter>LINE</counter>
<value>MISSEDCOUNT</value>
<maximum>1500</maximum>
<maximum>1400</maximum>
</limit>
</limits>
</rule>
Expand Down
2 changes: 0 additions & 2 deletions rat-excludes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ Dockerfile
.github/**
scripts/*
test-backup/feature/*
src/resources/partitions.csv
src/resources/primary_key_rows.csv
src/resources/log4j.xml
SIT/*/*/*.assert
SIT/*/*/*.out
Expand Down
158 changes: 79 additions & 79 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,83 +33,83 @@

public abstract class AbstractJobSession<T> extends BaseJobSession {

public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected EnhancedSession originSession;
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;
protected boolean appendPartitionOnDiff = SplitPartitions.appendPartitionOnDiff(propertyHelper);
protected String partitionFileInput = SplitPartitions.getPartitionFileInput(propertyHelper);
protected String partitionFileOutput = SplitPartitions.getPartitionFileOutput(propertyHelper);
protected JobCounter jobCounter;
protected Long printStatsAfter;
protected Boolean trackRun = false;
protected TrackRun trackRunFeature;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
}

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
super(sc);

if (originSession == null) {
return;
}

this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
}
this.jobCounter = new JobCounter(printStatsAfter, propertyHelper.getBoolean(KnownProperties.PRINT_STATS_PER_PART));

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
trackRun = propertyHelper.getBoolean(KnownProperties.TRACK_RUN);

logger.info("PARAM -- Partition file input: {}", partitionFileInput);
logger.info("PARAM -- Partition file output: {}", partitionFileOutput);
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

this.originSession = new EnhancedSession(propertyHelper, originSession, true);
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
this.originSession.getCqlTable().setOtherCqlTable(this.targetSession.getCqlTable());
this.targetSession.getCqlTable().setOtherCqlTable(this.originSession.getCqlTable());
this.originSession.getCqlTable().setFeatureMap(featureMap);
this.targetSession.getCqlTable().setFeatureMap(featureMap);

boolean allFeaturesValid = true;
for (Feature f : featureMap.values()) {
if (!f.initializeAndValidate(this.originSession.getCqlTable(), this.targetSession.getCqlTable())) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
}
}
if (!allFeaturesValid) {
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
}

PKFactory pkFactory = new PKFactory(propertyHelper, this.originSession.getCqlTable(), this.targetSession.getCqlTable());
this.originSession.setPKFactory(pkFactory);
this.targetSession.setPKFactory(pkFactory);

// Guardrail is referenced by many jobs, and is evaluated against the target table
this.guardrailFeature = (Guardrail) this.targetSession.getCqlTable().getFeature(Featureset.GUARDRAIL_CHECK);
this.guardrailEnabled = this.guardrailFeature.isEnabled();
}

public abstract void processSlice(T slice);
public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {}

public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal(trackRun, trackRunFeature);
} else {
jobCounter.printProgress();
}
}
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected EnhancedSession originSession;
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;
protected JobCounter jobCounter;
protected Long printStatsAfter;
protected TrackRun trackRunFeature;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
}

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc,
boolean isJobMigrateRowsFromFile) {
super(sc);

if (originSession == null) {
return;
}

this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of "
+ KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER,
KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
}
this.jobCounter = new JobCounter(printStatsAfter,
propertyHelper.getBoolean(KnownProperties.PRINT_STATS_PER_PART));

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));

logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

this.originSession = new EnhancedSession(propertyHelper, originSession, true);
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
this.originSession.getCqlTable().setOtherCqlTable(this.targetSession.getCqlTable());
this.targetSession.getCqlTable().setOtherCqlTable(this.originSession.getCqlTable());
this.originSession.getCqlTable().setFeatureMap(featureMap);
this.targetSession.getCqlTable().setFeatureMap(featureMap);

boolean allFeaturesValid = true;
for (Feature f : featureMap.values()) {
if (!f.initializeAndValidate(this.originSession.getCqlTable(), this.targetSession.getCqlTable())) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
}
}
if (!allFeaturesValid) {
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
}

PKFactory pkFactory = new PKFactory(propertyHelper, this.originSession.getCqlTable(),
this.targetSession.getCqlTable());
this.originSession.setPKFactory(pkFactory);
this.targetSession.setPKFactory(pkFactory);

// Guardrail is referenced by many jobs, and is evaluated against the target
// table
this.guardrailFeature = (Guardrail) this.targetSession.getCqlTable().getFeature(Featureset.GUARDRAIL_CHECK);
this.guardrailEnabled = this.guardrailFeature.isEnabled();
}

public abstract void processSlice(T slice);

public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
}

public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal(trackRunFeature);
} else {
jobCounter.printProgress();
}
}
}
Loading

0 comments on commit 35bf514

Please sign in to comment.