Skip to content

Commit

Permalink
Google Secret Manager Support
Browse files Browse the repository at this point in the history
  • Loading branch information
dikshatew committed Apr 11, 2023
1 parent 45d6d00 commit 5669f52
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 4 deletions.
101 changes: 101 additions & 0 deletions src/main/java/datastax/astra/migrate/GoogleSecretManagerHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package datastax.astra.migrate;

import java.nio.charset.Charset;

import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;

public class GoogleSecretManagerHelper {

private static GoogleSecretManagerHelper googleSecretManagerHelper;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
public static final String ASTRA_CLIENT_ID_KEY = "client_id";
public static final String ASTRA_CLIENT_SECRET_KEY = "secret";
public static final String ASTRA_SECRET_VERSION_ID = "latest";

private String astraClusterUserName;
private String astraAuthenToken;

private GoogleSecretManagerHelper() {
logger.info("Private constructor GoogleSecretManagerHelper invoked");
}

public static GoogleSecretManagerHelper getInstance() {
if (googleSecretManagerHelper == null) {
synchronized (GoogleSecretManagerHelper.class) {
if (googleSecretManagerHelper == null) {
googleSecretManagerHelper = new GoogleSecretManagerHelper();
}
}
}
return googleSecretManagerHelper;
}

public void populateAstraCredentials(String astraProjectId, String astraSecretManagerSecredId) throws Exception {
try {
JSONObject astraSecretJson = getAstraClientSecretJson(astraProjectId, astraSecretManagerSecredId);
astraClusterUserName = astraSecretJson.getString(ASTRA_CLIENT_ID_KEY);
astraAuthenToken = astraSecretJson.getString(ASTRA_CLIENT_SECRET_KEY);
} catch (JSONException e) {
throw new Exception("Error Occured while extracting client Id/secret from Json:" + e);
} catch (Exception e) {
throw e;
}
}

public String getAstraClusterUserName() {
return astraClusterUserName;
}

public String getAstraAuthenToken() {
return astraAuthenToken;
}

private JSONObject getAstraClientSecretJson(String projectId, String secretId) throws Exception {
JSONObject astraSecretJson = new JSONObject();
if (secretId == null || projectId == null) {
logger.error(
"Required Google Secret Manager properties are missing, please check.");
} else {
astraSecretJson = getClientSecretJson(projectId, secretId);
}
return astraSecretJson;
}

private JSONObject getClientSecretJson(String projectId, String secretId) throws Exception {
JSONObject clientSecJsonObj = new JSONObject();
ByteString payloadData = accessSecretVersion(projectId, secretId, ASTRA_SECRET_VERSION_ID);
if (ByteString.EMPTY != payloadData)
clientSecJsonObj = new JSONObject(payloadData.toString(Charset.defaultCharset()));
return clientSecJsonObj;
}

private ByteString accessSecretVersion(String projectId, String secretId, String version) throws Exception {
ByteString payloadData = ByteString.EMPTY;
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, version);
// Access the secret version.
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
payloadData = response.getPayload().getData();
long payloadChecksum = response.getPayload().getDataCrc32C() & 0xFFFFFFFFL;
if (!validChecksum(payloadData, payloadChecksum))
throw new Exception(
"The checksum received is invalid for Google Secret Paylod. Corrupted Data Detected!!");
}
return payloadData;
}

private boolean validChecksum(ByteString data, long crc32Checksum) {
long checksum = Hashing.crc32c().hashBytes(data.toByteArray()).asInt() & 0xFFFFFFFFL; // to convert it into long appended 0xFFFFFFFFL
return checksum == crc32Checksum;
}

}
39 changes: 35 additions & 4 deletions src/main/scala/datastax/astra/migrate/AbstractJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,60 @@ import org.apache.spark.SparkConf

class AbstractJob extends BaseJob {

val ASTRA_SECRET_VERSION_ID: String = "latest"

abstractLogger.info("PARAM -- Min Partition: " + minPartition)
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
abstractLogger.info("PARAM -- Split Size: " + splitSize)
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)

var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword,
sourceEnabledAlgorithms, gsmProjectId, secretName);

var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword,
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath,
destinationKeyStorePassword, destinationEnabledAlgorithms, gsmProjectId, secretName);

private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String,
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String, gsmProjectId: String,
secretName: String): CassandraConnector = {

var connType: String = "Source"
if (!isSource) {
connType = "Destination"
}

var config: SparkConf = sContext.getConf
if (scbPath.nonEmpty) {

abstractLogger.info(connType + ": Connecting to Astra using SCB: " + scbPath);
abstractLogger.info("GSM Project Id Value : "+gsmProjectId)
abstractLogger.info("Secret Name : "+secretName)
if(gsmProjectId.nonEmpty && secretName.nonEmpty){

abstractLogger.info(connType + ": Connecting to Astra using secure manager : ");
GoogleSecretManagerHelper.getInstance().populateAstraCredentials(gsmProjectId,secretName)
val id: String = GoogleSecretManagerHelper.getInstance().getAstraClusterUserName()
val secret: String = GoogleSecretManagerHelper.getInstance().getAstraAuthenToken()

if(id.nonEmpty && secret.nonEmpty){
abstractLogger.info("Connecting using GSM");
return CassandraConnector(config
.set("spark.cassandra.auth.username", id)
.set("spark.cassandra.auth.password", secret)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.config.cloud.path", scbPath))
}else{
abstractLogger.info("Connecting using USERNAME and PASSWORD");
return CassandraConnector(config
.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.config.cloud.path", scbPath))
}
}
return CassandraConnector(config
.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/datastax/astra/migrate/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class BaseJob extends App {
val destinationKeyStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.keyStore.password")
val destinationEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, "spark.target.enabledAlgorithms")

val gsmProjectId = Util.getSparkPropOrEmpty(sc, "spark.validator.gsmprojectid")
val secretName = Util.getSparkPropOrEmpty(sc, "spark.validator.secretname")

val minPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.minPartition", "-9223372036854775808"))
val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807"))
val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")
Expand Down

0 comments on commit 5669f52

Please sign in to comment.