diff --git a/src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java b/src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java new file mode 100644 index 00000000..2b2080ed --- /dev/null +++ b/src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java @@ -0,0 +1,101 @@ +package datastax.astra.migrate; + +import java.nio.charset.Charset; + +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; + +public class GoogleSecretManagerHelper { + + private static GoogleSecretManagerHelper googleSecretManagerHelper; + public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); + public static final String ASTRA_CLIENT_ID_KEY = "client_id"; + public static final String ASTRA_CLIENT_SECRET_KEY = "secret"; + public static final String ASTRA_SECRET_VERSION_ID = "latest"; + + private String astraClusterUserName; + private String astraAuthenToken; + + private GoogleSecretManagerHelper() { + logger.info("Private constructor GoogleSecretManagerHelper invoked"); + } + + public static GoogleSecretManagerHelper getInstance() { + if (googleSecretManagerHelper == null) { + synchronized (GoogleSecretManagerHelper.class) { + if (googleSecretManagerHelper == null) { + googleSecretManagerHelper = new GoogleSecretManagerHelper(); + } + } + } + return googleSecretManagerHelper; + } + + public void populateAstraCredentials(String astraProjectId, String astraSecretManagerSecredId) throws Exception { + try { + JSONObject astraSecretJson = getAstraClientSecretJson(astraProjectId, astraSecretManagerSecredId); + astraClusterUserName = astraSecretJson.getString(ASTRA_CLIENT_ID_KEY); + astraAuthenToken = astraSecretJson.getString(ASTRA_CLIENT_SECRET_KEY); + } catch (JSONException e) { + throw new Exception("Error Occured while extracting client Id/secret from Json:" + e); + } catch (Exception e) { + throw e; + } + } + + public String getAstraClusterUserName() { + return astraClusterUserName; + } + + public String getAstraAuthenToken() { + return astraAuthenToken; + } + + private JSONObject getAstraClientSecretJson(String projectId, String secretId) throws Exception { + JSONObject astraSecretJson = new JSONObject(); + if (secretId == null || projectId == null) { + logger.error( + "Required Google Secret Manager properties are missing, please check."); + } else { + astraSecretJson = getClientSecretJson(projectId, secretId); + } + return astraSecretJson; + } + + private JSONObject getClientSecretJson(String projectId, String secretId) throws Exception { + JSONObject clientSecJsonObj = new JSONObject(); + ByteString payloadData = accessSecretVersion(projectId, secretId, ASTRA_SECRET_VERSION_ID); + if (ByteString.EMPTY != payloadData) + clientSecJsonObj = new JSONObject(payloadData.toString(Charset.defaultCharset())); + return clientSecJsonObj; + } + + private ByteString accessSecretVersion(String projectId, String secretId, String version) throws Exception { + ByteString payloadData = ByteString.EMPTY; + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, version); + // Access the secret version. + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + payloadData = response.getPayload().getData(); + long payloadChecksum = response.getPayload().getDataCrc32C() & 0xFFFFFFFFL; + if (!validChecksum(payloadData, payloadChecksum)) + throw new Exception( + "The checksum received is invalid for Google Secret Paylod. Corrupted Data Detected!!"); + } + return payloadData; + } + + private boolean validChecksum(ByteString data, long crc32Checksum) { + long checksum = Hashing.crc32c().hashBytes(data.toByteArray()).asInt() & 0xFFFFFFFFL; // to convert it into long appended 0xFFFFFFFFL + return checksum == crc32Checksum; + } + +} diff --git a/src/main/scala/datastax/astra/migrate/AbstractJob.scala b/src/main/scala/datastax/astra/migrate/AbstractJob.scala index d2760f08..a864471f 100644 --- a/src/main/scala/datastax/astra/migrate/AbstractJob.scala +++ b/src/main/scala/datastax/astra/migrate/AbstractJob.scala @@ -5,20 +5,26 @@ import org.apache.spark.SparkConf class AbstractJob extends BaseJob { + val ASTRA_SECRET_VERSION_ID: String = "latest" + abstractLogger.info("PARAM -- Min Partition: " + minPartition) abstractLogger.info("PARAM -- Max Partition: " + maxPartition) abstractLogger.info("PARAM -- Split Size: " + splitSize) - abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent) + abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent) var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword, - sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms); + sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, + sourceEnabledAlgorithms, gsmProjectId, secretName); var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword, - destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms); + destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, + destinationKeyStorePassword, destinationEnabledAlgorithms, gsmProjectId, secretName); private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String, trustStorePath: String, trustStorePassword: String, trustStoreType: String, - keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = { + keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String, gsmProjectId: String, + secretName: String): CassandraConnector = { + var connType: String = "Source" if (!isSource) { connType = "Destination" @@ -26,8 +32,33 @@ class AbstractJob extends BaseJob { var config: SparkConf = sContext.getConf if (scbPath.nonEmpty) { + abstractLogger.info(connType + ": Connecting to Astra using SCB: " + scbPath); + abstractLogger.info("GSM Project Id Value : "+gsmProjectId) + abstractLogger.info("Secret Name : "+secretName) + if(gsmProjectId.nonEmpty && secretName.nonEmpty){ + abstractLogger.info(connType + ": Connecting to Astra using secure manager : "); + GoogleSecretManagerHelper.getInstance().populateAstraCredentials(gsmProjectId,secretName) + val id: String = GoogleSecretManagerHelper.getInstance().getAstraClusterUserName() + val secret: String = GoogleSecretManagerHelper.getInstance().getAstraAuthenToken() + + if(id.nonEmpty && secret.nonEmpty){ + abstractLogger.info("Connecting using GSM"); + return CassandraConnector(config + .set("spark.cassandra.auth.username", id) + .set("spark.cassandra.auth.password", secret) + .set("spark.cassandra.input.consistency.level", consistencyLevel) + .set("spark.cassandra.connection.config.cloud.path", scbPath)) + }else{ + abstractLogger.info("Connecting using USERNAME and PASSWORD"); + return CassandraConnector(config + .set("spark.cassandra.auth.username", username) + .set("spark.cassandra.auth.password", password) + .set("spark.cassandra.input.consistency.level", consistencyLevel) + .set("spark.cassandra.connection.config.cloud.path", scbPath)) + } + } return CassandraConnector(config .set("spark.cassandra.auth.username", username) .set("spark.cassandra.auth.password", password) diff --git a/src/main/scala/datastax/astra/migrate/BaseJob.scala b/src/main/scala/datastax/astra/migrate/BaseJob.scala index 1d816652..fc7e7e81 100644 --- a/src/main/scala/datastax/astra/migrate/BaseJob.scala +++ b/src/main/scala/datastax/astra/migrate/BaseJob.scala @@ -42,6 +42,9 @@ class BaseJob extends App { val destinationKeyStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.keyStore.password") val destinationEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, "spark.target.enabledAlgorithms") + val gsmProjectId = Util.getSparkPropOrEmpty(sc, "spark.validator.gsmprojectid") + val secretName = Util.getSparkPropOrEmpty(sc, "spark.validator.secretname") + val minPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.minPartition", "-9223372036854775808")) val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807")) val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")