Skip to content

Commit

Permalink
Fixed connection issue caused when using different types of origin an…
Browse files Browse the repository at this point in the history
…d target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB). (#334)
  • Loading branch information
pravinbhat authored Nov 27, 2024
1 parent bcfff40 commit 3665573
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 22 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [5.1.3] - 2024-11-27
- Bug fix: Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB).

## [5.1.2] - 2024-11-26
- Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
// TODO: Remove this code block after a few releases, its only added for backward compatibility
try {
this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT");
} catch (Exception e) { // ignore if column already exists
logger.debug("Column 'status' already exists in table {}", cdmKsTabInfo);
}
try {
this.session.execute("ALTER TABLE " + cdmKsTabDetails + " ADD run_info TEXT");
} catch (Exception e) {
// ignore if column already exists
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
} catch (Exception e) { // ignore if column already exists
logger.debug("Column 'run_info' already exists in table {}", cdmKsTabDetails);
}

boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
Expand Down
12 changes: 5 additions & 7 deletions src/main/scala/com/datastax/cdm/job/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.datastax.cdm.job

import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
Expand All @@ -40,7 +40,6 @@ abstract class BaseJob[T: ClassTag] extends App {

var spark: SparkSession = _
var sContext: SparkContext = _
var sc: SparkConf = _
var propertyHelper: PropertyHelper = _

var consistencyLevel: String = _
Expand Down Expand Up @@ -69,8 +68,7 @@ abstract class BaseJob[T: ClassTag] extends App {
.appName(jobName)
.getOrCreate()
sContext = spark.sparkContext
sc = sContext.getConf
propertyHelper = PropertyHelper.getInstance(sc);
propertyHelper = PropertyHelper.getInstance(sContext.getConf);

runId = propertyHelper.getLong(KnownProperties.RUN_ID)
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
Expand All @@ -79,9 +77,9 @@ abstract class BaseJob[T: ClassTag] extends App {
runId = System.nanoTime();
}
consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
connectionFetcher = new ConnectionFetcher(sc, propertyHelper)
originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel, runId)
targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel, runId)
connectionFetcher = new ConnectionFetcher(propertyHelper)
originConnection = connectionFetcher.getConnection(sContext.getConf, Side.ORIGIN, consistencyLevel, runId)
targetConnection = connectionFetcher.getConnection(sContext.getConf, Side.TARGET, consistencyLevel, runId)

val hasRandomPartitioner: Boolean = {
val partitionerName = originConnection.withSessionDo(_.getMetadata.getTokenMap.get().getPartitionerName)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.datastax.cdm.data.DataUtility.generateSCB
import com.datastax.cdm.data.PKFactory.Side

// TODO: CDM-31 - add localDC configuration support
class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) extends Serializable {
class ConnectionFetcher(propertyHelper: IPropertyHelper) extends Serializable {
val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)

def getConnectionDetails(side: Side): ConnectionDetails = {
Expand Down Expand Up @@ -63,7 +63,7 @@ class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) exte
}
}

def getConnection(side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
def getConnection(config: SparkConf, side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
val connectionDetails = getConnectionDetails(side)

logger.info("PARAM -- SSL Enabled: "+connectionDetails.sslEnabled);
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/com/datastax/cdm/job/DiffData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.datastax.cdm.job

import com.datastax.cdm.feature.TrackRun
import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.properties.KnownProperties
import com.datastax.cdm.job.IJobSessionFactory.JobType

object DiffData extends BasePartitionJob {
Expand All @@ -34,6 +34,8 @@ object DiffData extends BasePartitionJob {
var ma = new CDMMetricsAccumulator(jobType)
sContext.register(ma, "CDMMetricsAccumulator")

val bcOriginConfig = sContext.broadcast(sContext.getConf)
val bcTargetConfig = sContext.broadcast(sContext.getConf)
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)
Expand All @@ -42,8 +44,8 @@ object DiffData extends BasePartitionJob {

slices.foreach(slice => {
if (null == originConnection) {
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value))
}
originConnection.withSessionDo(originSession =>
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.datastax.cdm.job

import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.properties.KnownProperties
import com.datastax.cdm.job.IJobSessionFactory.JobType

object GuardrailCheck extends BasePartitionJob {
Expand All @@ -32,13 +32,14 @@ object GuardrailCheck extends BasePartitionJob {
var ma = new CDMMetricsAccumulator(jobType)
sContext.register(ma, "CDMMetricsAccumulator")

val bcOriginConfig = sContext.broadcast(sContext.getConf)
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)

slices.foreach(slice => {
if (null == originConnection) {
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0)
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0)
}
originConnection.withSessionDo(originSession =>
bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value)
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/datastax/cdm/job/Migrate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.datastax.cdm.job
import com.datastax.cdm.feature.TrackRun
import com.datastax.cdm.job.CDMMetricsAccumulator
import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.properties.KnownProperties
import com.datastax.cdm.job.IJobSessionFactory.JobType

object Migrate extends BasePartitionJob {
Expand All @@ -34,7 +34,9 @@ object Migrate extends BasePartitionJob {
jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, jobType)))
var ma = new CDMMetricsAccumulator(jobType)
sContext.register(ma, "CDMMetricsAccumulator")


val bcOriginConfig = sContext.broadcast(sContext.getConf)
val bcTargetConfig = sContext.broadcast(sContext.getConf)
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)
Expand All @@ -43,8 +45,8 @@ object Migrate extends BasePartitionJob {

slices.foreach(slice => {
if (null == originConnection) {
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value))
}
originConnection.withSessionDo(originSession =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setup() {
commonSetupWithoutDefaultClassVariables();
MockitoAnnotations.openMocks(this);

cf = new ConnectionFetcher(conf, propertyHelper);
cf = new ConnectionFetcher(propertyHelper);
}

@Test
Expand Down

0 comments on commit 3665573

Please sign in to comment.