Skip to content

Commit

Permalink
[Reverse Replication] Adding load test for custom transformation (Goo…
Browse files Browse the repository at this point in the history
…gleCloudPlatform#2061)

* [Reverse Replication] Adding load test for custom transformation

* minor fix

* addressing review comments
  • Loading branch information
shreyakhajanchi authored Feb 11, 2025
1 parent c0cadc4 commit ccc6edf
Show file tree
Hide file tree
Showing 5 changed files with 591 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: Rename the class since its being used in both Live and Reverse replication tests and in
// both ITs and LTs
public class CustomTransformationWithShardForLiveIT implements ISpannerMigrationTransformer {

private static final Logger LOG = LoggerFactory.getLogger(CustomShardIdFetcher.class);
Expand Down Expand Up @@ -199,6 +201,21 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
throw new InvalidTransformationException(e);
}

MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
} else if (request.getTableName().equals("Person")) {
Map<String, Object> responseRow = new HashMap<>();
Map<String, Object> requestRow = request.getRequestRow();
String firstName1 = requestRow.get("first_name1").toString();
String lastName1 = requestRow.get("last_name1").toString();
String firstName2 = requestRow.get("first_name2").toString();
String lastName2 = requestRow.get("last_name2").toString();
String firstName3 = requestRow.get("first_name3").toString();
String lastName3 = requestRow.get("last_name3").toString();
responseRow.put("full_name1", "\'" + firstName1 + " " + lastName1 + "\'");
responseRow.put("full_name2", "\'" + firstName2 + " " + lastName2 + "\'");
responseRow.put("full_name3", "\'" + firstName3 + " " + lastName3 + "\'");
MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils.getFullGcsPath;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.teleport.metadata.TemplateLoadTest;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.common.io.Resources;
import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.datagenerator.DataGenerator;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.apache.beam.it.jdbc.MySQLResourceManager;
import org.apache.beam.it.jdbc.conditions.JDBCRowsCheck;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(TemplateLoadTest.class)
@TemplateLoadTest(SpannerToSourceDb.class)
@RunWith(JUnit4.class)
public class SpannerToMySqlCustomTransformationLT extends SpannerToSourceDbLTBase {
private static final Logger LOG = LoggerFactory.getLogger(SpannerToMySqlSourceLT.class);

private String generatorSchemaPath;
private final String artifactBucket = TestProperties.artifactBucket();
private final String spannerDdlResource = "SpannerToMySqlSourceLT/spanner-schema.sql";
private final String sessionFileResource = "SpannerToMySqlCustomTransformationLT/session.json";
private final String dataGeneratorSchemaResource =
"SpannerToMySqlSourceLT/datagenerator-schema.json";
private final String table = "Person";
private final int maxWorkers = 50;
private final int numWorkers = 20;
private PipelineLauncher.LaunchInfo jobInfo;
private PipelineLauncher.LaunchInfo readerJobInfo;
private final int numShards = 1;

@Before
public void setup() throws IOException, InterruptedException {
setupResourceManagers(spannerDdlResource, sessionFileResource, artifactBucket);
setupMySQLResourceManager(numShards);
generatorSchemaPath =
getFullGcsPath(
artifactBucket,
gcsResourceManager
.uploadArtifact(
"input/schema.json",
Resources.getResource(dataGeneratorSchemaResource).getPath())
.name());

createMySQLSchema(jdbcResourceManagers);
CustomTransformation customTransformation =
CustomTransformation.builder(
"input/customShard.jar", "com.custom.CustomTransformationWithShardForLiveIT")
.build();
createAndUploadJarToGcs(gcsResourceManager);
jobInfo = launchDataflowJob(artifactBucket, numWorkers, maxWorkers, customTransformation);
}

@After
public void tearDown() {
cleanupResourceManagers();
}

@Test
public void reverseReplication1KTpsWithCustomTransformation()
throws IOException, ParseException, InterruptedException {
// Start data generator
DataGenerator dataGenerator =
DataGenerator.builderWithSchemaLocation(testName, generatorSchemaPath)
.setQPS("1000")
.setMessagesLimit(String.valueOf(300000))
.setSpannerInstanceName(spannerResourceManager.getInstanceId())
.setSpannerDatabaseName(spannerResourceManager.getDatabaseId())
.setSpannerTableName(table)
.setNumWorkers("50")
.setMaxNumWorkers("100")
.setSinkType("SPANNER")
.setProjectId(project)
.setBatchSizeBytes("0")
.build();

dataGenerator.execute(Duration.ofMinutes(90));
assertThatPipeline(jobInfo).isRunning();

JDBCRowsCheck check =
JDBCRowsCheck.builder(jdbcResourceManagers.get(0), table)
.setMinRows(300000)
.setMaxRows(300000)
.build();

PipelineOperator.Result result =
pipelineOperator.waitForCondition(
createConfig(jobInfo, Duration.ofMinutes(10), Duration.ofSeconds(30)), check);

// Assert Conditions
assertThatResult(result).meetsConditions();

PipelineOperator.Result result1 =
pipelineOperator.cancelJobAndFinish(createConfig(jobInfo, Duration.ofMinutes(20)));

assertThatResult(result1).isLaunchFinished();

exportMetrics(jobInfo, numShards);
}

private void createMySQLSchema(List<JDBCResourceManager> jdbcResourceManagers) {
if (!(jdbcResourceManagers.get(0) instanceof MySQLResourceManager)) {
throw new IllegalArgumentException(jdbcResourceManagers.get(0).getClass().getSimpleName());
}
MySQLResourceManager jdbcResourceManager = (MySQLResourceManager) jdbcResourceManagers.get(0);
HashMap<String, String> columns = new HashMap<>();
columns.put("first_name1", "varchar(500)");
columns.put("last_name1", "varchar(500)");
columns.put("full_name1", "varchar(1500)");
columns.put("first_name2", "varchar(500)");
columns.put("last_name2", "varchar(500)");
columns.put("full_name2", "varchar(1500)");
columns.put("first_name3", "varchar(500)");
columns.put("last_name3", "varchar(500)");
columns.put("full_name3", "varchar(1500)");
columns.put("ID", "varchar(100) NOT NULL");

JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, "ID");

jdbcResourceManager.createTable(table, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setup() throws IOException {
.name());

createMySQLSchema(jdbcResourceManagers);
jobInfo = launchDataflowJob(artifactBucket, numWorkers, maxWorkers);
jobInfo = launchDataflowJob(artifactBucket, numWorkers, maxWorkers, null);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.common.base.MoreObjects;
import com.google.common.io.Resources;
import com.google.gson.Gson;
Expand All @@ -35,6 +36,7 @@
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.IORedirectUtil;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
Expand Down Expand Up @@ -186,7 +188,11 @@ public void createAndUploadShardConfigToGcs(
}

public PipelineLauncher.LaunchInfo launchDataflowJob(
String artifactBucket, int numWorkers, int maxWorkers) throws IOException {
String artifactBucket,
int numWorkers,
int maxWorkers,
CustomTransformation customTransformation)
throws IOException {
// default parameters

Map<String, String> params =
Expand All @@ -210,6 +216,13 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
}
};

if (customTransformation != null) {
params.put(
"transformationJarPath",
getGcsPath(artifactBucket, customTransformation.jarPath(), gcsResourceManager));
params.put("transformationClassName", customTransformation.classPath());
}

LaunchConfig.Builder options =
LaunchConfig.builder(getClass().getSimpleName(), TEMPLATE_SPEC_PATH);
options
Expand Down Expand Up @@ -272,6 +285,20 @@ public void exportMetrics(PipelineLauncher.LaunchInfo jobInfo, int numShards)
exportMetricsToBigQuery(jobInfo, metrics);
}

protected void createAndUploadJarToGcs(GcsResourceManager gcsResourceManager)
throws IOException, InterruptedException {
String[] shellCommand = {"/bin/bash", "-c", "cd ../spanner-custom-shard"};
Process exec = Runtime.getRuntime().exec(shellCommand);
IORedirectUtil.redirectLinesLog(exec.getInputStream(), LOG);
IORedirectUtil.redirectLinesLog(exec.getErrorStream(), LOG);
if (exec.waitFor() != 0) {
throw new RuntimeException("Error staging template, check Maven logs.");
}
gcsResourceManager.uploadArtifact(
"input/customShard.jar",
"../spanner-custom-shard/target/spanner-custom-shard-1.0-SNAPSHOT.jar");
}

public void getResourceManagerMetrics(Map<String, Double> metrics) {
pubsubResourceManager.collectMetrics(metrics);
}
Expand Down
Loading

0 comments on commit ccc6edf

Please sign in to comment.