diff --git a/pom.xml b/pom.xml index 5afd8425..edc158ca 100644 --- a/pom.xml +++ b/pom.xml @@ -1,217 +1,241 @@ - - 4.0.0 + + 4.0.0 - datastax.astra.migrate - cassandra-data-migrator - 3.0.0 - jar + datastax.astra.migrate + cassandra-data-migrator + 3.0.0 + jar - - UTF-8 - 2.12.17 - 2.12 - 3.3.1 - 3.2.12 - 3.2.0 - 3.11.13 - 4.13.2 - + + UTF-8 + 2.12.17 + 2.12 + 3.3.1 + 3.2.12 + 3.2.0 + 3.11.13 + 4.13.2 + - - - github - GitHub Packages - https://maven.pkg.github.com/datastax/cassandra-data-migrator - - + + + github + GitHub Packages + https://maven.pkg.github.com/datastax/cassandra-data-migrator + + - - - com.google.guava - guava - 31.1-jre - - - org.scala-lang - scala-library - ${scala.version} - - - org.apache.spark - spark-core_${scala.main.version} - ${spark.version} - - - log4j - log4j - - - - - org.apache.spark - spark-sql_${scala.main.version} - ${spark.version} - - - org.apache.spark - spark-hive_${scala.main.version} - ${spark.version} - - - log4j - log4j - - - log4j - apache-log4j-extras - - - - - com.datastax.spark - spark-cassandra-connector_${scala.main.version} - ${connector.version} - - - com.github.jnr - jnr-posix - 3.1.15 - + + + com.google.guava + guava + 31.1-jre + + + org.scala-lang + scala-library + ${scala.version} + + + org.apache.spark + spark-core_${scala.main.version} + ${spark.version} + + + log4j + log4j + + + + + org.apache.spark + spark-sql_${scala.main.version} + ${spark.version} + + + org.apache.spark + spark-hive_${scala.main.version} + ${spark.version} + + + log4j + log4j + + + log4j + apache-log4j-extras + + + + + com.datastax.spark + spark-cassandra-connector_${scala.main.version} + ${connector.version} + + + com.github.jnr + jnr-posix + 3.1.15 + + + com.google.cloud + google-cloud-secretmanager + 2.13.0 + + + org.json + json + 20180813 + + + org.apache.logging.log4j + log4j-api + 2.19.0 + + + org.apache.logging.log4j + log4j-core + 2.19.0 + + + org.apache.logging.log4j + log4j-to-slf4j + 2.19.0 + + + + org.scalatest + scalatest_${scala.main.version} + ${scalatest.version} + test + + + junit + junit + ${junit.version} + test + + + org.apache.cassandra + cassandra-all + ${cassandra.version} + test + + + + org.slf4j + log4j-over-slf4j + + + + + - - org.apache.logging.log4j - log4j-api - 2.19.0 - - - org.apache.logging.log4j - log4j-core - 2.19.0 - - - org.apache.logging.log4j - log4j-to-slf4j - 2.19.0 - + + + + src/resources + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + process-sources + + compile + testCompile + - - - org.scalatest - scalatest_${scala.main.version} - ${scalatest.version} - test - - - junit - junit - ${junit.version} - test - - - org.apache.cassandra - cassandra-all - ${cassandra.version} - test - - - - org.slf4j - log4j-over-slf4j - - - - - - - - - - src/resources - - - - - net.alchim31.maven - scala-maven-plugin - 3.2.2 - - - process-sources - - compile - testCompile - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 2.4.3 - - - - package - - shade - - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.7 - - true - - - - - org.scalatest - scalatest-maven-plugin - 1.0 - - ${project.build.directory}/surefire-reports - . - WDF TestSuite.txt - - - - test - - test - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - 1.8 - 1.8 - - - - - + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.google.common. + org.shaded.datastax.google.common. + + + com.google.thirdparty. + org.shaded.datastax.google.thirdparty. + + + com.google.protobuf. + org.shaded.datastax.protobuf. + + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.7 + + true + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + + + + test + + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + \ No newline at end of file diff --git a/src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java b/src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java new file mode 100644 index 00000000..2b2080ed --- /dev/null +++ b/src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java @@ -0,0 +1,101 @@ +package datastax.astra.migrate; + +import java.nio.charset.Charset; + +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; + +public class GoogleSecretManagerHelper { + + private static GoogleSecretManagerHelper googleSecretManagerHelper; + public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); + public static final String ASTRA_CLIENT_ID_KEY = "client_id"; + public static final String ASTRA_CLIENT_SECRET_KEY = "secret"; + public static final String ASTRA_SECRET_VERSION_ID = "latest"; + + private String astraClusterUserName; + private String astraAuthenToken; + + private GoogleSecretManagerHelper() { + logger.info("Private constructor GoogleSecretManagerHelper invoked"); + } + + public static GoogleSecretManagerHelper getInstance() { + if (googleSecretManagerHelper == null) { + synchronized (GoogleSecretManagerHelper.class) { + if (googleSecretManagerHelper == null) { + googleSecretManagerHelper = new GoogleSecretManagerHelper(); + } + } + } + return googleSecretManagerHelper; + } + + public void populateAstraCredentials(String astraProjectId, String astraSecretManagerSecredId) throws Exception { + try { + JSONObject astraSecretJson = getAstraClientSecretJson(astraProjectId, astraSecretManagerSecredId); + astraClusterUserName = astraSecretJson.getString(ASTRA_CLIENT_ID_KEY); + astraAuthenToken = astraSecretJson.getString(ASTRA_CLIENT_SECRET_KEY); + } catch (JSONException e) { + throw new Exception("Error Occured while extracting client Id/secret from Json:" + e); + } catch (Exception e) { + throw e; + } + } + + public String getAstraClusterUserName() { + return astraClusterUserName; + } + + public String getAstraAuthenToken() { + return astraAuthenToken; + } + + private JSONObject getAstraClientSecretJson(String projectId, String secretId) throws Exception { + JSONObject astraSecretJson = new JSONObject(); + if (secretId == null || projectId == null) { + logger.error( + "Required Google Secret Manager properties are missing, please check."); + } else { + astraSecretJson = getClientSecretJson(projectId, secretId); + } + return astraSecretJson; + } + + private JSONObject getClientSecretJson(String projectId, String secretId) throws Exception { + JSONObject clientSecJsonObj = new JSONObject(); + ByteString payloadData = accessSecretVersion(projectId, secretId, ASTRA_SECRET_VERSION_ID); + if (ByteString.EMPTY != payloadData) + clientSecJsonObj = new JSONObject(payloadData.toString(Charset.defaultCharset())); + return clientSecJsonObj; + } + + private ByteString accessSecretVersion(String projectId, String secretId, String version) throws Exception { + ByteString payloadData = ByteString.EMPTY; + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, version); + // Access the secret version. + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + payloadData = response.getPayload().getData(); + long payloadChecksum = response.getPayload().getDataCrc32C() & 0xFFFFFFFFL; + if (!validChecksum(payloadData, payloadChecksum)) + throw new Exception( + "The checksum received is invalid for Google Secret Paylod. Corrupted Data Detected!!"); + } + return payloadData; + } + + private boolean validChecksum(ByteString data, long crc32Checksum) { + long checksum = Hashing.crc32c().hashBytes(data.toByteArray()).asInt() & 0xFFFFFFFFL; // to convert it into long appended 0xFFFFFFFFL + return checksum == crc32Checksum; + } + +} diff --git a/src/main/scala/datastax/astra/migrate/AbstractJob.scala b/src/main/scala/datastax/astra/migrate/AbstractJob.scala index d2760f08..a864471f 100644 --- a/src/main/scala/datastax/astra/migrate/AbstractJob.scala +++ b/src/main/scala/datastax/astra/migrate/AbstractJob.scala @@ -5,20 +5,26 @@ import org.apache.spark.SparkConf class AbstractJob extends BaseJob { + val ASTRA_SECRET_VERSION_ID: String = "latest" + abstractLogger.info("PARAM -- Min Partition: " + minPartition) abstractLogger.info("PARAM -- Max Partition: " + maxPartition) abstractLogger.info("PARAM -- Split Size: " + splitSize) - abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent) + abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent) var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword, - sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms); + sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, + sourceEnabledAlgorithms, gsmProjectId, secretName); var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword, - destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms); + destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, + destinationKeyStorePassword, destinationEnabledAlgorithms, gsmProjectId, secretName); private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String, trustStorePath: String, trustStorePassword: String, trustStoreType: String, - keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = { + keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String, gsmProjectId: String, + secretName: String): CassandraConnector = { + var connType: String = "Source" if (!isSource) { connType = "Destination" @@ -26,8 +32,33 @@ class AbstractJob extends BaseJob { var config: SparkConf = sContext.getConf if (scbPath.nonEmpty) { + abstractLogger.info(connType + ": Connecting to Astra using SCB: " + scbPath); + abstractLogger.info("GSM Project Id Value : "+gsmProjectId) + abstractLogger.info("Secret Name : "+secretName) + if(gsmProjectId.nonEmpty && secretName.nonEmpty){ + abstractLogger.info(connType + ": Connecting to Astra using secure manager : "); + GoogleSecretManagerHelper.getInstance().populateAstraCredentials(gsmProjectId,secretName) + val id: String = GoogleSecretManagerHelper.getInstance().getAstraClusterUserName() + val secret: String = GoogleSecretManagerHelper.getInstance().getAstraAuthenToken() + + if(id.nonEmpty && secret.nonEmpty){ + abstractLogger.info("Connecting using GSM"); + return CassandraConnector(config + .set("spark.cassandra.auth.username", id) + .set("spark.cassandra.auth.password", secret) + .set("spark.cassandra.input.consistency.level", consistencyLevel) + .set("spark.cassandra.connection.config.cloud.path", scbPath)) + }else{ + abstractLogger.info("Connecting using USERNAME and PASSWORD"); + return CassandraConnector(config + .set("spark.cassandra.auth.username", username) + .set("spark.cassandra.auth.password", password) + .set("spark.cassandra.input.consistency.level", consistencyLevel) + .set("spark.cassandra.connection.config.cloud.path", scbPath)) + } + } return CassandraConnector(config .set("spark.cassandra.auth.username", username) .set("spark.cassandra.auth.password", password) diff --git a/src/main/scala/datastax/astra/migrate/BaseJob.scala b/src/main/scala/datastax/astra/migrate/BaseJob.scala index 1d816652..fc7e7e81 100644 --- a/src/main/scala/datastax/astra/migrate/BaseJob.scala +++ b/src/main/scala/datastax/astra/migrate/BaseJob.scala @@ -42,6 +42,9 @@ class BaseJob extends App { val destinationKeyStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.keyStore.password") val destinationEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, "spark.target.enabledAlgorithms") + val gsmProjectId = Util.getSparkPropOrEmpty(sc, "spark.validator.gsmprojectid") + val secretName = Util.getSparkPropOrEmpty(sc, "spark.validator.secretname") + val minPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.minPartition", "-9223372036854775808")) val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807")) val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")