From 6abe6044343ad3f6b167d4d1427b8be6d625f93a Mon Sep 17 00:00:00 2001 From: itsankitjain-google Date: Tue, 18 Jan 2022 18:34:33 +0530 Subject: [PATCH 1/6] testing with older image version of dataproc --- .../test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java index e8920936..0ff0eda8 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java @@ -162,7 +162,7 @@ private void createProfile(String profileName) throws Exception { properties.add(ofProperty("stackdriverLoggingEnabled", "true")); properties.add(ofProperty("stackdriverMonitoringEnabled", "true")); properties.add(ofProperty("idleTTL", "60")); - properties.add(ofProperty("imageVersion", "1.5-debian10")); + properties.add(ofProperty("imageVersion", "2.0.24-debian10")); JsonObject provisioner = new JsonObject(); provisioner.addProperty("name", "gcp-dataproc"); From b4cb7d7728b178a6ac9830007432c3f19702dfdf Mon Sep 17 00:00:00 2001 From: itsankitjain-google Date: Tue, 18 Jan 2022 19:30:30 +0530 Subject: [PATCH 2/6] testing with older image version of dataproc --- .../test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java index 0ff0eda8..172a1bc9 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/DataprocETLTestBase.java @@ -162,7 +162,7 @@ private void createProfile(String profileName) throws Exception { properties.add(ofProperty("stackdriverLoggingEnabled", "true")); properties.add(ofProperty("stackdriverMonitoringEnabled", "true")); properties.add(ofProperty("idleTTL", "60")); - properties.add(ofProperty("imageVersion", "2.0.24-debian10")); + properties.add(ofProperty("imageVersion", "2.0.25-debian10")); JsonObject provisioner = new JsonObject(); provisioner.addProperty("name", "gcp-dataproc"); From f4561830622b3f45cd855175fbfc7bd9a48079ea Mon Sep 17 00:00:00 2001 From: Sanjana Sandeep Date: Wed, 9 Feb 2022 10:42:51 -0800 Subject: [PATCH 3/6] test for connection support in bq transformation pushdown --- .../etl/gcp/GoogleBigQuerySQLEngineTest.java | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java index 75683108..c9aee69d 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java @@ -51,6 +51,7 @@ import io.cdap.common.http.HttpMethod; import io.cdap.common.http.HttpResponse; import io.cdap.common.http.ObjectResponse; +import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.Properties; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileStream; @@ -58,6 +59,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumReader; +import org.apache.parquet.Strings; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,6 +68,7 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -78,6 +81,7 @@ public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase { private static final String BQ_SQLENGINE_PLUGIN_NAME = "BigQueryPushdownEngine"; private static final String BIG_QUERY_DATASET = "bq_dataset_joiner_test"; + private static final String CONNECTION_NAME = String.format("test_bq_%s", GoogleBigQueryUtils.getUUID()); @Override protected void innerSetup() throws Exception { @@ -90,6 +94,7 @@ protected void innerSetup() throws Exception { return false; } }, 5, TimeUnit.MINUTES, 3, TimeUnit.SECONDS); + createConnection(CONNECTION_NAME, "BigQuery"); } @Override @@ -101,10 +106,22 @@ protected void innerTearDown() throws Exception { }) @Test public void testSQLEngineJoinSpark() throws Exception { - testSQLEngineJoin(Engine.SPARK); + testSQLEngineJoin(Engine.SPARK, false); + testSQLEngineJoin(Engine.SPARK, true); } - private void testSQLEngineJoin(Engine engine) throws Exception { + private Map getProps(boolean useConnection) { + String connectionId = String.format("${conn(%s)}", CONNECTION_NAME); + Map props = new HashMap<>(); + if (useConnection) { + props.put(ConfigUtil.NAME_CONNECTION, connectionId); + props.put(ConfigUtil.NAME_USE_CONNECTION, "true"); + } + props.put("dataset", BIG_QUERY_DATASET); + return new ImmutableMap.Builder().putAll(props).build(); + } + + private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exception { String filmDatasetName = "film-sqlenginejoinertest"; String filmCategoryDatasetName = "film-category-sqlenginejoinertest"; String filmActorDatasetName = "film-actor-sqlenginejoinertest"; @@ -184,9 +201,7 @@ private void testSQLEngineJoin(Engine engine) throws Exception { new ETLTransformationPushdown( new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, BatchSQLEngine.PLUGIN_TYPE, - ImmutableMap.of( - "dataset", BIG_QUERY_DATASET - ) + getProps(useConnection) ) ); From 978b44295bdfc168310a7cc675a269c95018ba10 Mon Sep 17 00:00:00 2001 From: Sanjana Sandeep Date: Wed, 9 Feb 2022 10:42:51 -0800 Subject: [PATCH 4/6] test for connection support in bq transformation pushdown --- .../etl/gcp/GoogleBigQuerySQLEngineTest.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java index 75683108..97ad1c7d 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java @@ -51,6 +51,7 @@ import io.cdap.common.http.HttpMethod; import io.cdap.common.http.HttpResponse; import io.cdap.common.http.ObjectResponse; +import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.Properties; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileStream; @@ -58,6 +59,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumReader; +import org.apache.parquet.Strings; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,6 +68,7 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -78,6 +81,7 @@ public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase { private static final String BQ_SQLENGINE_PLUGIN_NAME = "BigQueryPushdownEngine"; private static final String BIG_QUERY_DATASET = "bq_dataset_joiner_test"; + private static final String CONNECTION_NAME = String.format("test_bq_%s", GoogleBigQueryUtils.getUUID()); @Override protected void innerSetup() throws Exception { @@ -90,10 +94,12 @@ protected void innerSetup() throws Exception { return false; } }, 5, TimeUnit.MINUTES, 3, TimeUnit.SECONDS); + createConnection(CONNECTION_NAME, "BigQuery"); } @Override protected void innerTearDown() throws Exception { + deleteConnection(CONNECTION_NAME); } @Category({ @@ -101,10 +107,22 @@ protected void innerTearDown() throws Exception { }) @Test public void testSQLEngineJoinSpark() throws Exception { - testSQLEngineJoin(Engine.SPARK); + testSQLEngineJoin(Engine.SPARK, false); + testSQLEngineJoin(Engine.SPARK, true); } - private void testSQLEngineJoin(Engine engine) throws Exception { + private Map getProps(boolean useConnection) { + String connectionId = String.format("${conn(%s)}", CONNECTION_NAME); + Map props = new HashMap<>(); + if (useConnection) { + props.put(ConfigUtil.NAME_CONNECTION, connectionId); + props.put(ConfigUtil.NAME_USE_CONNECTION, "true"); + } + props.put("dataset", BIG_QUERY_DATASET); + return new ImmutableMap.Builder().putAll(props).build(); + } + + private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exception { String filmDatasetName = "film-sqlenginejoinertest"; String filmCategoryDatasetName = "film-category-sqlenginejoinertest"; String filmActorDatasetName = "film-actor-sqlenginejoinertest"; @@ -184,9 +202,7 @@ private void testSQLEngineJoin(Engine engine) throws Exception { new ETLTransformationPushdown( new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, BatchSQLEngine.PLUGIN_TYPE, - ImmutableMap.of( - "dataset", BIG_QUERY_DATASET - ) + getProps(useConnection) ) ); From 59108d92e4ecddcaad8235d288a8c129fdb90dd5 Mon Sep 17 00:00:00 2001 From: Sanjana Sandeep Date: Wed, 16 Mar 2022 12:21:39 -0700 Subject: [PATCH 5/6] Integration tests for deduplicate and groupby in bq pushdown --- .../app/etl/batch/DedupAggregatorTest.java | 226 ++++++++ .../etl/gcp/GoogleBigQuerySQLEngineTest.java | 548 +++++++++++++++++- .../io/cdap/cdap/test/suite/AllTests.java | 2 + 3 files changed, 774 insertions(+), 2 deletions(-) create mode 100644 integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java new file mode 100644 index 00000000..bd66cf5f --- /dev/null +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java @@ -0,0 +1,226 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.etl.batch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.reflect.TypeToken; +import io.cdap.cdap.api.Resources; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Put; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.app.etl.ETLTestBase; +import io.cdap.cdap.app.etl.dataset.DatasetAccessApp; +import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService; +import io.cdap.cdap.datapipeline.SmartWorkflow; +import io.cdap.cdap.etl.api.Engine; +import io.cdap.cdap.etl.api.batch.BatchAggregator; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.ServiceManager; +import io.cdap.cdap.test.WorkflowManager; +import io.cdap.cdap.test.suite.category.RequiresSpark; +import io.cdap.common.http.HttpMethod; +import io.cdap.common.http.HttpResponse; +import io.cdap.common.http.ObjectResponse; +import io.cdap.plugin.common.Properties; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.DatumReader; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Tests DedupAggregator + */ +public class DedupAggregatorTest extends ETLTestBase { + public static final String SMARTWORKFLOW_NAME = SmartWorkflow.NAME; + public static final String USER_SOURCE = "userSource"; + public static final String USER_SINK = "userSink"; + + public static final Schema USER_SCHEMA = Schema.recordOf( + "user", + Schema.Field.of("Lastname", Schema.of(Schema.Type.STRING)), + Schema.Field.of("Firstname", Schema.of(Schema.Type.STRING)), + Schema.Field.of("profession", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.INT))); + + private static final Map CONFIG_MAP = new ImmutableMap.Builder() + .put("uniqueFields", "profession") + .put("filterOperation", "age:Min") + .build(); + + @Category({ + RequiresSpark.class + }) + @Test + public void testDeduplicateSpark() throws Exception { + testDeduplicate(Engine.SPARK); + } + + private void testDeduplicate(Engine spark) throws Exception { + ETLStage userSourceStage = + new ETLStage("users", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, USER_SOURCE, + Properties.Table.PROPERTY_SCHEMA, USER_SCHEMA.toString()), null)); + + ETLStage userSinkStage = new ETLStage(USER_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, USER_SINK) + .put("schema", USER_SCHEMA.toString()) + .build(), null)); + + ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate", + BatchAggregator.PLUGIN_TYPE, + CONFIG_MAP, null)); + + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(userSourceStage) + .addStage(userSinkStage) + .addStage(userGroupStage) + .addConnection(userSourceStage.getName(), userGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .build(); + + + ingestInputData(USER_SOURCE); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("deduplicate-test"); + ApplicationManager appManager = deployApplication(appId, request); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager + (SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser().parse(USER_SCHEMA.toString()); + // output has these records: + // 1: shelton, alex, professor, 45 + // 3: schuster, chris, accountant, 23 + // 5: gamal , ali , engineer, 28 + GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Shelton") + .set("Firstname", "Alex") + .set("profession", "professor") + .set("age", 45) + .build(); + + GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Schuster") + .set("Firstname", "Chris") + .set("profession", "accountant") + .set("age", 23) + .build(); + + GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Gamal") + .set("Firstname", "Ali") + .set("profession", "engineer") + .set("age", 28) + .build(); + + Set expected = ImmutableSet.of(record1, record2, record3); + // verfiy output + Assert.assertEquals(expected, readOutput(serviceManager, USER_SINK, USER_SCHEMA)); + } + + private Set readOutput(ServiceManager serviceManager, String sink, Schema schema) + throws IOException { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", sink)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { }.getType()).getResponseObject(); + + return parseOutput(map, schema); + } + + private Set parseOutput(Map contents, Schema schema) throws IOException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString()); + Set records = new HashSet<>(); + for (Map.Entry entry : contents.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + try (DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader)) { + for (GenericRecord record : fileStream) { + records.add(record); + } + } + } + return records; + } + + private void ingestInputData(String inputDatasetName) throws Exception { + // 1: shelton, alex, professor, 45 + // 2: seitz, bob, professor, 50 + // 3: schuster, chris, accountant, 23 + // 4: bolt , henry , engineer, 30 + // 5: gamal , ali , engineer, 28 + DataSetManager inputManager = getTableDataset(inputDatasetName); + Table inputTable = inputManager.get(); + putValues(inputTable, 1, "Shelton", "Alex", "professor", 45); + putValues(inputTable, 2, "Seitz", "Bob", "professor", 50); + putValues(inputTable, 3, "Schuster", "Chris", "accountant", 23); + putValues(inputTable, 4, "Bolt", "Henry", "engineer", 30); + putValues(inputTable, 5, "Gamal", "Ali", "engineer", 28); + inputManager.flush(); + } + + private void putValues(Table inputTable, int index, String lastname, String firstname, String profession, + int age) { + Put put = new Put(Bytes.toBytes(index)); + put.add("Lastname", lastname); + put.add("Firstname", firstname); + put.add("profession", profession); + put.add("age", age); + inputTable.put(put); + } +} diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java index c13dba19..43ee8663 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java @@ -16,23 +16,29 @@ package io.cdap.cdap.app.etl.gcp; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import io.cdap.cdap.api.Resources; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.table.Put; import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.app.etl.batch.DedupAggregatorTest; import io.cdap.cdap.app.etl.dataset.DatasetAccessApp; import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService; import io.cdap.cdap.common.ArtifactNotFoundException; import io.cdap.cdap.datapipeline.SmartWorkflow; import io.cdap.cdap.etl.api.Engine; +import io.cdap.cdap.etl.api.batch.BatchAggregator; import io.cdap.cdap.etl.api.batch.BatchJoiner; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine; +import io.cdap.cdap.etl.proto.ArtifactSelectorConfig; import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; import io.cdap.cdap.etl.proto.v2.ETLPlugin; import io.cdap.cdap.etl.proto.v2.ETLStage; @@ -67,8 +73,10 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -81,6 +89,67 @@ public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase { private static final String BQ_SQLENGINE_PLUGIN_NAME = "BigQueryPushdownEngine"; private static final String BIG_QUERY_DATASET = "bq_dataset_joiner_test"; private static final String CONNECTION_NAME = String.format("test_bq_%s", GoogleBigQueryUtils.getUUID()); + public static final String SMARTWORKFLOW_NAME = SmartWorkflow.NAME; + public static final String PURCHASE_SOURCE = "purchaseSource"; + public static final String USER_CONDITION_SOURCE = "userConditionSource"; + public static final String USER_CONDITION_SINK = "userConditionSink"; + public static final String ITEM_SINK = "itemSink"; + public static final String USER_SINK = "userSink"; + public static final String DEDUPLICATE_SOURCE = "userSource"; + public static final String DEDUPLICATE_SINK = "userSink"; + + private static final Map CONFIG_MAP = new ImmutableMap.Builder() + .put("uniqueFields", "profession") + .put("filterOperation", "age:Min") + .build(); + + private static final List CONDITIONAL_AGGREGATES = ImmutableList.of( + "highestPrice:maxIf(price):condition(city.equals('LA'))", + "averageDonutPrice:avgIf(price):condition(item.equals('doughnut'))", + "totalPurchasesInTokyo:sumIf(price):condition(city.equals('Tokyo'))", + "anyPurchaseInBerlin:anyIf(item):condition(city.equals('Berlin'))", + "doughnutsSold:countIf(item):condition(item.equals('doughnut'))", + "lowestPrice:minIf(price):condition(!item.equals('bagel'))" + ); + + public static final Schema PURCHASE_SCHEMA = Schema.recordOf( + "purchase", + Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), + Schema.Field.of("user", Schema.of(Schema.Type.STRING)), + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("price", Schema.of(Schema.Type.LONG))); + + public static final Schema ITEM_SCHEMA = Schema.recordOf( + "item", + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("totalPurchases", Schema.of(Schema.Type.LONG)), + Schema.Field.of("latestPurchase", Schema.of(Schema.Type.LONG))); + + public static final Schema USER_SCHEMA = Schema.recordOf( + "user", + Schema.Field.of("user", Schema.of(Schema.Type.STRING)), + Schema.Field.of("totalPurchases", Schema.of(Schema.Type.LONG)), + Schema.Field.of("totalSpent", Schema.of(Schema.Type.LONG))); + + private static final Schema USER_CONDITION_SCHEMA = Schema.recordOf( + "user_condition", + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("isMember", Schema.of(Schema.Type.BOOLEAN)), + Schema.Field.of("city", Schema.of(Schema.Type.STRING)), + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE))); + + private static final Schema USER_CONDITION_OUTPUT_SCHEMA = Schema.recordOf( + "user_condition", + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("highestPrice", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("averageDonutPrice", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("totalPurchasesInTokyo", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("anyPurchaseInBerlin", Schema.of(Schema.Type.STRING)), + Schema.Field.of("doughnutsSold", Schema.of(Schema.Type.INT)), + Schema.Field.of("lowestPrice", Schema.of(Schema.Type.DOUBLE)) + ); @Override protected void innerSetup() throws Exception { @@ -110,7 +179,28 @@ public void testSQLEngineJoinSpark() throws Exception { testSQLEngineJoin(Engine.SPARK, true); } - private Map getProps(boolean useConnection) { + @Category({ + RequiresSpark.class + }) + @Test + public void testSQLEngineGroupBySpark() throws Exception { + testSQLEngineGroupBy(Engine.SPARK, false); + testSQLEngineGroupBy(Engine.SPARK, true); + + testSQLEngineGroupByCondition(Engine.SPARK, false); + testSQLEngineGroupByCondition(Engine.SPARK, true); + } + + @Category({ + RequiresSpark.class + }) + @Test + public void testSQLEngineDeduplicateSpark() throws Exception { + testSQLEngineDeduplicate(Engine.SPARK, false); + testSQLEngineDeduplicate(Engine.SPARK, true); + } + + private Map getProps(boolean useConnection, String includedStages) { String connectionId = String.format("${conn(%s)}", CONNECTION_NAME); Map props = new HashMap<>(); if (useConnection) { @@ -118,9 +208,105 @@ private Map getProps(boolean useConnection) { props.put(ConfigUtil.NAME_USE_CONNECTION, "true"); } props.put("dataset", BIG_QUERY_DATASET); + if (includedStages != null) { + props.put("includedStages", includedStages); + } return new ImmutableMap.Builder().putAll(props).build(); } + private void testSQLEngineDeduplicate(Engine engine, boolean useConnection) throws Exception { + ETLStage userSourceStage = + new ETLStage("users", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, DEDUPLICATE_SOURCE, + Properties.Table.PROPERTY_SCHEMA, DedupAggregatorTest.USER_SCHEMA.toString()), + null)); + + ETLStage userSinkStage = new ETLStage(DEDUPLICATE_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, DEDUPLICATE_SINK) + .put("schema", DedupAggregatorTest.USER_SCHEMA.toString()) + .build(), null)); + + ArtifactSelectorConfig selectorConfig = new ArtifactSelectorConfig(null, + "google-cloud", + "[0.18.0-SNAPSHOT, 1.0.0-SNAPSHOT)"); + + ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate", + BatchAggregator.PLUGIN_TYPE, + CONFIG_MAP, null)); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, "KeyAggregate"), + selectorConfig + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(userSourceStage) + .addStage(userSinkStage) + .addStage(userGroupStage) + .addConnection(userSourceStage.getName(), userGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .build(); + + + ingestInputDataDeduplicate(DEDUPLICATE_SOURCE); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-deduplicate-test"); + ApplicationManager appManager = deployApplication(appId, request); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager + (SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser() + .parse(DedupAggregatorTest.USER_SCHEMA.toString()); + // output has these records: + // 1: shelton, alex, professor, 45 + // 3: schuster, chris, accountant, 23 + // 5: gamal , ali , engineer, 28 + GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Shelton") + .set("Firstname", "Alex") + .set("profession", "professor") + .set("age", 45) + .build(); + + GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Schuster") + .set("Firstname", "Chris") + .set("profession", "accountant") + .set("age", 23) + .build(); + + GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Gamal") + .set("Firstname", "Ali") + .set("profession", "engineer") + .set("age", 28) + .build(); + + Set expected = ImmutableSet.of(record1, record2, record3); + // verfiy output + Assert.assertEquals(expected, readOutput(serviceManager, DEDUPLICATE_SINK, DedupAggregatorTest.USER_SCHEMA)); + } + private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exception { String filmDatasetName = "film-sqlenginejoinertest"; String filmCategoryDatasetName = "film-category-sqlenginejoinertest"; @@ -201,7 +387,7 @@ private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exce new ETLTransformationPushdown( new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, BatchSQLEngine.PLUGIN_TYPE, - getProps(useConnection) + getProps(useConnection, null) ) ); @@ -305,6 +491,364 @@ private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exce Assert.assertEquals(expected, readOutput(serviceManager, joinedDatasetName, outputSchema)); } + private void testSQLEngineGroupBy(Engine engine, boolean useConnection) throws Exception { + ETLStage purchaseStage = + new ETLStage("purchases", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, PURCHASE_SOURCE, + Properties.Table.PROPERTY_SCHEMA, PURCHASE_SCHEMA.toString()), null)); + + ETLStage userSinkStage = getSink(USER_SINK, USER_SCHEMA); + + ETLStage itemSinkStage = getSink(ITEM_SINK, ITEM_SCHEMA); + + + ETLStage userGroupStage = getGroupStage("userGroup", "user", + "totalPurchases:count(*), totalSpent:sum(price)"); + + + ETLStage itemGroupStage = getGroupStage("itemGroup", "item", + "totalPurchases:count(user), latestPurchase:max(ts)"); + + List includedStagesList = + Lists.newArrayList("userGroup", "itemGroup"); + String includedStages = Joiner.on("\u0001").join(includedStagesList); + + ArtifactSelectorConfig selectorConfig = new ArtifactSelectorConfig(null, + "google-cloud", + "[0.18.0-SNAPSHOT, 1.0.0-SNAPSHOT)"); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, includedStages), + selectorConfig + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(purchaseStage) + .addStage(userSinkStage) + .addStage(itemSinkStage) + .addStage(userGroupStage) + .addStage(itemGroupStage) + .addConnection(purchaseStage.getName(), userGroupStage.getName()) + .addConnection(purchaseStage.getName(), itemGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .addConnection(itemGroupStage.getName(), itemSinkStage.getName()) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .build(); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-groupby-test"); + ApplicationManager appManager = deployApplication(appId, request); + + // ingest data + ingestData(PURCHASE_SOURCE); + + // run the pipeline + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 15, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager(SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + Map> groupedUsers = readOutputGroupBy(serviceManager, USER_SINK, USER_SCHEMA); + Map> groupedItems = readOutputGroupBy(serviceManager, ITEM_SINK, ITEM_SCHEMA); + + verifyOutput(groupedUsers, groupedItems); + } + + private void testSQLEngineGroupByCondition(Engine engine, boolean useConnection) throws Exception { + ETLStage sourceStage = + new ETLStage("source", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, USER_CONDITION_SOURCE, + Properties.Table.PROPERTY_SCHEMA, USER_CONDITION_SCHEMA.toString()), + null)); + + ETLStage groupStage = getGroupStage("group", "name", String.join(",", CONDITIONAL_AGGREGATES)); + + ETLStage sinkStage = getSink(USER_CONDITION_SINK, USER_CONDITION_OUTPUT_SCHEMA); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, "group") + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(sourceStage) + .addStage(groupStage) + .addStage(sinkStage) + .addConnection(sourceStage.getName(), groupStage.getName()) + .addConnection(groupStage.getName(), sinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .build(); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-groupby-condition-test"); + ApplicationManager appManager = deployApplication(appId, request); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager( + SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + ingestConditionData(USER_CONDITION_SOURCE); + + // run the pipeline + WorkflowManager workflowManager = appManager.getWorkflowManager(SMARTWORKFLOW_NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 15, TimeUnit.MINUTES); + + + Map> groups = parseConditionOutput(serviceManager); + verifyConditionOutput(groups); + } + + private void ingestConditionData(String conditionDatasetName) throws Exception { + DataSetManager
manager = getTableDataset(conditionDatasetName); + Table table = manager.get(); + putConditionValues(table, 1, "Ben", 23, true, "Berlin", "doughnut", 1.5); + putConditionValues(table, 2, "Ben", 23, true, "LA", "pretzel", 2.05); + putConditionValues(table, 3, "Ben", 23, true, "Berlin", "doughnut", 0.75); + putConditionValues(table, 4, "Ben", 23, true, "Tokyo", "pastry", 3.25); + putConditionValues(table, 5, "Emma", 18, false, "Tokyo", "doughnut", 1.75); + putConditionValues(table, 6, "Emma", 18, false, "LA", "bagel", 2.95); + putConditionValues(table, 7, "Emma", 18, false, "Berlin", "pretzel", 2.05); + putConditionValues(table, 8, "Ron", 22, true, "LA", "bagel", 2.95); + putConditionValues(table, 9, "Ron", 22, true, "Tokyo", "pretzel", 0.5); + putConditionValues(table, 10, "Ron", 22, true, "Berlin", "doughnut", 1.75); + + manager.flush(); + } + + private void putConditionValues(Table table, int id, String name, double age, boolean isMember, String city, + String item, double price) { + Put put = new Put(Bytes.toBytes(id)); + put.add("name", name); + put.add("age", age); + put.add("isMember", isMember); + put.add("city", city); + put.add("item", item); + put.add("price", price); + table.put(put); + } + + private void verifyConditionOutput(Map> groups) { + Assert.assertEquals(3, groups.size()); + + List groupedValues = groups.get("Ben"); + Assert.assertEquals(2.05, groupedValues.get(0)); + Assert.assertEquals(1.125, groupedValues.get(1)); + Assert.assertEquals(3.25, groupedValues.get(2)); + Assert.assertEquals("doughnut", groupedValues.get(3).toString()); + Assert.assertEquals(2, groupedValues.get(4)); + Assert.assertEquals(0.75, groupedValues.get(5)); + + groupedValues = groups.get("Ron"); + Assert.assertEquals(2.95, groupedValues.get(0)); + Assert.assertEquals(1.75, groupedValues.get(1)); + Assert.assertEquals(0.5, groupedValues.get(2)); + Assert.assertEquals("doughnut", groupedValues.get(3).toString()); + Assert.assertEquals(1, groupedValues.get(4)); + Assert.assertEquals(0.5, groupedValues.get(5)); + + groupedValues = groups.get("Emma"); + Assert.assertEquals(2.95, groupedValues.get(0)); + Assert.assertEquals(1.75, groupedValues.get(1)); + Assert.assertEquals(1.75, groupedValues.get(2)); + Assert.assertEquals("pretzel", groupedValues.get(3).toString()); + Assert.assertEquals(1, groupedValues.get(4)); + Assert.assertEquals(1.75, groupedValues.get(5)); + } + + private Map> parseConditionOutput(ServiceManager serviceManager) throws Exception { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", USER_CONDITION_SINK)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { + }.getType()).getResponseObject(); + + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser() + .parse(USER_CONDITION_OUTPUT_SCHEMA.toString()); + + Map> group = new HashMap<>(); + + for (Map.Entry entry : map.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader); + for (GenericRecord record : fileStream) { + List fields = USER_CONDITION_OUTPUT_SCHEMA.getFields(); + List values = new ArrayList<>(); + values.add(record.get(fields.get(1).getName())); + values.add(record.get(fields.get(2).getName())); + values.add(record.get(fields.get(3).getName())); + values.add(record.get(fields.get(4).getName())); + values.add(record.get(fields.get(5).getName())); + values.add(record.get(fields.get(6).getName())); + + group.put(record.get(fields.get(0).getName()).toString(), values); + } + fileStream.close(); + } + return group; + } + + private Map> readOutputGroupBy(ServiceManager serviceManager, String sink, Schema schema) + throws IOException { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", sink)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { + }.getType()).getResponseObject(); + + return parseOutputGroupBy(map, schema); + } + + private Map> parseOutputGroupBy(Map contents, Schema schema) throws IOException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString()); + Map> group = new HashMap<>(); + + for (Map.Entry entry : contents.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader); + for (GenericRecord record : fileStream) { + List fields = schema.getFields(); + List values = new ArrayList<>(); + values.add((Long) record.get(fields.get(1).getName())); + values.add((Long) record.get(fields.get(2).getName())); + group.put(record.get(fields.get(0).getName()).toString(), values); + } + fileStream.close(); + } + return group; + } + + private void ingestData(String purchasesDatasetName) throws Exception { + // write input data + // 1: 1234567890000, samuel, island, 1000 + // 2: 1234567890001, samuel, shirt, 15 + // 3. 1234567890001, samuel, pie, 20 + // 4. 1234567890002, john, pie, 25 + // 5. 1234567890003, john, shirt, 30 + DataSetManager
purchaseManager = getTableDataset(purchasesDatasetName); + Table purchaseTable = purchaseManager.get(); + // 1: 1234567890000, samuel, island, 1000 + putValues(purchaseTable, 1, 1234567890000L, "samuel", "island", 1000L); + putValues(purchaseTable, 2, 1234567890001L, "samuel", "shirt", 15L); + putValues(purchaseTable, 3, 1234567890001L, "samuel", "pie", 20L); + putValues(purchaseTable, 4, 1234567890002L, "john", "pie", 25L); + putValues(purchaseTable, 5, 1234567890003L, "john", "shirt", 30L); + purchaseManager.flush(); + } + + private void ingestInputDataDeduplicate(String inputDatasetName) throws Exception { + // 1: shelton, alex, professor, 45 + // 2: seitz, bob, professor, 50 + // 3: schuster, chris, accountant, 23 + // 4: bolt , henry , engineer, 30 + // 5: gamal , ali , engineer, 28 + DataSetManager
inputManager = getTableDataset(inputDatasetName); + Table inputTable = inputManager.get(); + putValuesDeduplicate(inputTable, 1, "Shelton", "Alex", "professor", 45); + putValuesDeduplicate(inputTable, 2, "Seitz", "Bob", "professor", 50); + putValuesDeduplicate(inputTable, 3, "Schuster", "Chris", "accountant", 23); + putValuesDeduplicate(inputTable, 4, "Bolt", "Henry", "engineer", 30); + putValuesDeduplicate(inputTable, 5, "Gamal", "Ali", "engineer", 28); + inputManager.flush(); + } + + private void putValuesDeduplicate(Table inputTable, int index, String lastname, String firstname, String profession, + int age) { + Put put = new Put(Bytes.toBytes(index)); + put.add("Lastname", lastname); + put.add("Firstname", firstname); + put.add("profession", profession); + put.add("age", age); + inputTable.put(put); + } + + private void putValues(Table purchaseTable, int index, long timestamp, + String user, String item, long price) { + Put put = new Put(Bytes.toBytes(index)); + put.add("ts", timestamp); + put.add("user", user); + put.add("item", item); + put.add("price", price); + purchaseTable.put(put); + } + + private void verifyOutput(Map> groupedUsers, Map> groupedItems) { + // users table should have: + // samuel: 3, 1000 + 15 + 20 + List groupedValues = groupedUsers.get("samuel"); + Assert.assertEquals(groupedValues.get(0).longValue(), 3L); + Assert.assertEquals(Math.abs(groupedValues.get(1).longValue() - 1000L - 15L - 20L), 0L); + // john: 2, 25 + 30 + groupedValues = groupedUsers.get("john"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(Math.abs(groupedValues.get(1).longValue() - 25L - 30L), 0L); + + // items table should have: + // island: 1, 1234567890000 + groupedValues = groupedItems.get("island"); + Assert.assertEquals(groupedValues.get(0).longValue(), 1L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890000L); + + // pie: 2, 1234567890002 + groupedValues = groupedItems.get("pie"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890002L); + + // shirt: 2, 1234567890003 + groupedValues = groupedItems.get("shirt"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890003L); + } + + private ETLStage getSink(String name, Schema schema) { + return new ETLStage(name, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, name) + .put("schema", schema.toString()) + .build(), null)); + } + + private ETLStage getGroupStage(String name, String field, String condition) { + return new ETLStage(name, + new ETLPlugin("GroupByAggregate", + BatchAggregator.PLUGIN_TYPE, + ImmutableMap.of( + "groupByFields", field, + "aggregates", condition), null)); + } + private Set readOutput(ServiceManager serviceManager, String sink, Schema schema) throws IOException { URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java b/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java index 3e53638f..ffc9671b 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java @@ -21,6 +21,7 @@ import io.cdap.cdap.app.etl.batch.BatchAggregatorTest; import io.cdap.cdap.app.etl.batch.BatchCubeSinkTest; import io.cdap.cdap.app.etl.batch.BatchJoinerTest; +import io.cdap.cdap.app.etl.batch.DedupAggregatorTest; import io.cdap.cdap.app.etl.batch.ETLMapReduceTest; import io.cdap.cdap.app.etl.batch.ExcelInputReaderTest; import io.cdap.cdap.app.etl.batch.HivePluginTest; @@ -65,6 +66,7 @@ BatchJoinerTest.class, DatasetTest.class, DataStreamsTest.class, + DedupAggregatorTest.class, ETLMapReduceTest.class, ExcelInputReaderTest.class, FileSetTest.class, From 44d05b245ce519c7a908556ae91e5df16523afdc Mon Sep 17 00:00:00 2001 From: Sanjana Sandeep Date: Wed, 16 Mar 2022 12:21:39 -0700 Subject: [PATCH 6/6] Integration tests for deduplicate and groupby in bq pushdown --- .../app/etl/batch/DedupAggregatorTest.java | 226 ++++++++++ .../etl/gcp/GoogleBigQuerySQLEngineTest.java | 405 +++++++++++++++++- .../io/cdap/cdap/test/suite/AllTests.java | 2 + 3 files changed, 630 insertions(+), 3 deletions(-) create mode 100644 integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java new file mode 100644 index 00000000..bd66cf5f --- /dev/null +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java @@ -0,0 +1,226 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.etl.batch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.reflect.TypeToken; +import io.cdap.cdap.api.Resources; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Put; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.app.etl.ETLTestBase; +import io.cdap.cdap.app.etl.dataset.DatasetAccessApp; +import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService; +import io.cdap.cdap.datapipeline.SmartWorkflow; +import io.cdap.cdap.etl.api.Engine; +import io.cdap.cdap.etl.api.batch.BatchAggregator; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.ServiceManager; +import io.cdap.cdap.test.WorkflowManager; +import io.cdap.cdap.test.suite.category.RequiresSpark; +import io.cdap.common.http.HttpMethod; +import io.cdap.common.http.HttpResponse; +import io.cdap.common.http.ObjectResponse; +import io.cdap.plugin.common.Properties; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.DatumReader; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Tests DedupAggregator + */ +public class DedupAggregatorTest extends ETLTestBase { + public static final String SMARTWORKFLOW_NAME = SmartWorkflow.NAME; + public static final String USER_SOURCE = "userSource"; + public static final String USER_SINK = "userSink"; + + public static final Schema USER_SCHEMA = Schema.recordOf( + "user", + Schema.Field.of("Lastname", Schema.of(Schema.Type.STRING)), + Schema.Field.of("Firstname", Schema.of(Schema.Type.STRING)), + Schema.Field.of("profession", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.INT))); + + private static final Map CONFIG_MAP = new ImmutableMap.Builder() + .put("uniqueFields", "profession") + .put("filterOperation", "age:Min") + .build(); + + @Category({ + RequiresSpark.class + }) + @Test + public void testDeduplicateSpark() throws Exception { + testDeduplicate(Engine.SPARK); + } + + private void testDeduplicate(Engine spark) throws Exception { + ETLStage userSourceStage = + new ETLStage("users", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, USER_SOURCE, + Properties.Table.PROPERTY_SCHEMA, USER_SCHEMA.toString()), null)); + + ETLStage userSinkStage = new ETLStage(USER_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, USER_SINK) + .put("schema", USER_SCHEMA.toString()) + .build(), null)); + + ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate", + BatchAggregator.PLUGIN_TYPE, + CONFIG_MAP, null)); + + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(userSourceStage) + .addStage(userSinkStage) + .addStage(userGroupStage) + .addConnection(userSourceStage.getName(), userGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .build(); + + + ingestInputData(USER_SOURCE); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("deduplicate-test"); + ApplicationManager appManager = deployApplication(appId, request); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager + (SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser().parse(USER_SCHEMA.toString()); + // output has these records: + // 1: shelton, alex, professor, 45 + // 3: schuster, chris, accountant, 23 + // 5: gamal , ali , engineer, 28 + GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Shelton") + .set("Firstname", "Alex") + .set("profession", "professor") + .set("age", 45) + .build(); + + GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Schuster") + .set("Firstname", "Chris") + .set("profession", "accountant") + .set("age", 23) + .build(); + + GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Gamal") + .set("Firstname", "Ali") + .set("profession", "engineer") + .set("age", 28) + .build(); + + Set expected = ImmutableSet.of(record1, record2, record3); + // verfiy output + Assert.assertEquals(expected, readOutput(serviceManager, USER_SINK, USER_SCHEMA)); + } + + private Set readOutput(ServiceManager serviceManager, String sink, Schema schema) + throws IOException { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", sink)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { }.getType()).getResponseObject(); + + return parseOutput(map, schema); + } + + private Set parseOutput(Map contents, Schema schema) throws IOException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString()); + Set records = new HashSet<>(); + for (Map.Entry entry : contents.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + try (DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader)) { + for (GenericRecord record : fileStream) { + records.add(record); + } + } + } + return records; + } + + private void ingestInputData(String inputDatasetName) throws Exception { + // 1: shelton, alex, professor, 45 + // 2: seitz, bob, professor, 50 + // 3: schuster, chris, accountant, 23 + // 4: bolt , henry , engineer, 30 + // 5: gamal , ali , engineer, 28 + DataSetManager
inputManager = getTableDataset(inputDatasetName); + Table inputTable = inputManager.get(); + putValues(inputTable, 1, "Shelton", "Alex", "professor", 45); + putValues(inputTable, 2, "Seitz", "Bob", "professor", 50); + putValues(inputTable, 3, "Schuster", "Chris", "accountant", 23); + putValues(inputTable, 4, "Bolt", "Henry", "engineer", 30); + putValues(inputTable, 5, "Gamal", "Ali", "engineer", 28); + inputManager.flush(); + } + + private void putValues(Table inputTable, int index, String lastname, String firstname, String profession, + int age) { + Put put = new Put(Bytes.toBytes(index)); + put.add("Lastname", lastname); + put.add("Firstname", firstname); + put.add("profession", profession); + put.add("age", age); + inputTable.put(put); + } +} diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java index c13dba19..2ef849d0 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java @@ -16,23 +16,29 @@ package io.cdap.cdap.app.etl.gcp; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import io.cdap.cdap.api.Resources; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.table.Put; import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.app.etl.batch.DedupAggregatorTest; import io.cdap.cdap.app.etl.dataset.DatasetAccessApp; import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService; import io.cdap.cdap.common.ArtifactNotFoundException; import io.cdap.cdap.datapipeline.SmartWorkflow; import io.cdap.cdap.etl.api.Engine; +import io.cdap.cdap.etl.api.batch.BatchAggregator; import io.cdap.cdap.etl.api.batch.BatchJoiner; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine; +import io.cdap.cdap.etl.proto.ArtifactSelectorConfig; import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; import io.cdap.cdap.etl.proto.v2.ETLPlugin; import io.cdap.cdap.etl.proto.v2.ETLStage; @@ -67,8 +73,10 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -81,6 +89,44 @@ public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase { private static final String BQ_SQLENGINE_PLUGIN_NAME = "BigQueryPushdownEngine"; private static final String BIG_QUERY_DATASET = "bq_dataset_joiner_test"; private static final String CONNECTION_NAME = String.format("test_bq_%s", GoogleBigQueryUtils.getUUID()); + public static final String PURCHASE_SOURCE = "purchaseSource"; + public static final String ITEM_SINK = "itemSink"; + public static final String USER_SINK = "userSink"; + public static final String DEDUPLICATE_SOURCE = "userSource"; + public static final String DEDUPLICATE_SINK = "userSink"; + + private static final Map CONFIG_MAP = new ImmutableMap.Builder() + .put("uniqueFields", "profession") + .put("filterOperation", "age:Min") + .build(); + + private static final List CONDITIONAL_AGGREGATES = ImmutableList.of( + "highestPrice:maxIf(price):condition(city.equals('LA'))", + "averageDonutPrice:avgIf(price):condition(item.equals('doughnut'))", + "totalPurchasesInTokyo:sumIf(price):condition(city.equals('Tokyo'))", + "anyPurchaseInBerlin:anyIf(item):condition(city.equals('Berlin'))", + "doughnutsSold:countIf(item):condition(item.equals('doughnut'))", + "lowestPrice:minIf(price):condition(!item.equals('bagel'))" + ); + + public static final Schema PURCHASE_SCHEMA = Schema.recordOf( + "purchase", + Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), + Schema.Field.of("user", Schema.of(Schema.Type.STRING)), + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("price", Schema.of(Schema.Type.LONG))); + + public static final Schema ITEM_SCHEMA = Schema.recordOf( + "item", + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("totalPurchases", Schema.of(Schema.Type.LONG)), + Schema.Field.of("latestPurchase", Schema.of(Schema.Type.LONG))); + + public static final Schema USER_SCHEMA = Schema.recordOf( + "user", + Schema.Field.of("user", Schema.of(Schema.Type.STRING)), + Schema.Field.of("totalPurchases", Schema.of(Schema.Type.LONG)), + Schema.Field.of("totalSpent", Schema.of(Schema.Type.LONG))); @Override protected void innerSetup() throws Exception { @@ -110,17 +156,131 @@ public void testSQLEngineJoinSpark() throws Exception { testSQLEngineJoin(Engine.SPARK, true); } - private Map getProps(boolean useConnection) { + @Category({ + RequiresSpark.class + }) + @Test + public void testSQLEngineGroupBySpark() throws Exception { + testSQLEngineGroupBy(Engine.SPARK, false); + testSQLEngineGroupBy(Engine.SPARK, true); + } + + @Category({ + RequiresSpark.class + }) + @Test + public void testSQLEngineDeduplicateSpark() throws Exception { + testSQLEngineDeduplicate(Engine.SPARK, false); + testSQLEngineDeduplicate(Engine.SPARK, true); + } + + private Map getProps(boolean useConnection, String includedStages) { String connectionId = String.format("${conn(%s)}", CONNECTION_NAME); Map props = new HashMap<>(); if (useConnection) { props.put(ConfigUtil.NAME_CONNECTION, connectionId); props.put(ConfigUtil.NAME_USE_CONNECTION, "true"); - } + } props.put("dataset", BIG_QUERY_DATASET); + if (includedStages != null) { + props.put("includedStages", includedStages); + } return new ImmutableMap.Builder().putAll(props).build(); } + private void testSQLEngineDeduplicate(Engine engine, boolean useConnection) throws Exception { + ETLStage userSourceStage = + new ETLStage("users", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, DEDUPLICATE_SOURCE, + Properties.Table.PROPERTY_SCHEMA, DedupAggregatorTest.USER_SCHEMA.toString()), + null)); + + ETLStage userSinkStage = new ETLStage(DEDUPLICATE_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, DEDUPLICATE_SINK) + .put("schema", DedupAggregatorTest.USER_SCHEMA.toString()) + .build(), null)); + + ArtifactSelectorConfig selectorConfig = new ArtifactSelectorConfig(null, + "google-cloud", + "[0.18.0-SNAPSHOT, 1.0.0-SNAPSHOT)"); + + ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate", + BatchAggregator.PLUGIN_TYPE, + CONFIG_MAP, null)); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, "KeyAggregate"), + selectorConfig + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(userSourceStage) + .addStage(userSinkStage) + .addStage(userGroupStage) + .addConnection(userSourceStage.getName(), userGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .build(); + + + ingestInputDataDeduplicate(DEDUPLICATE_SOURCE); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-deduplicate-test"); + ApplicationManager appManager = deployApplication(appId, request); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager + (SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser() + .parse(DedupAggregatorTest.USER_SCHEMA.toString()); + // output has these records: + // 1: shelton, alex, professor, 45 + // 3: schuster, chris, accountant, 23 + // 5: gamal , ali , engineer, 28 + GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Shelton") + .set("Firstname", "Alex") + .set("profession", "professor") + .set("age", 45) + .build(); + + GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Schuster") + .set("Firstname", "Chris") + .set("profession", "accountant") + .set("age", 23) + .build(); + + GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Gamal") + .set("Firstname", "Ali") + .set("profession", "engineer") + .set("age", 28) + .build(); + + Set expected = ImmutableSet.of(record1, record2, record3); + // verfiy output + Assert.assertEquals(expected, readOutput(serviceManager, DEDUPLICATE_SINK, DedupAggregatorTest.USER_SCHEMA)); + } + private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exception { String filmDatasetName = "film-sqlenginejoinertest"; String filmCategoryDatasetName = "film-category-sqlenginejoinertest"; @@ -201,7 +361,7 @@ private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exce new ETLTransformationPushdown( new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, BatchSQLEngine.PLUGIN_TYPE, - getProps(useConnection) + getProps(useConnection, null) ) ); @@ -305,6 +465,245 @@ private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exce Assert.assertEquals(expected, readOutput(serviceManager, joinedDatasetName, outputSchema)); } + private void testSQLEngineGroupBy(Engine engine, boolean useConnection) throws Exception { + ETLStage purchaseStage = + new ETLStage("purchases", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, PURCHASE_SOURCE, + Properties.Table.PROPERTY_SCHEMA, PURCHASE_SCHEMA.toString()), null)); + + ETLStage userSinkStage = getSink(USER_SINK, USER_SCHEMA); + + ETLStage itemSinkStage = getSink(ITEM_SINK, ITEM_SCHEMA); + + + ETLStage userGroupStage = getGroupStage("userGroup", "user", + "totalPurchases:count(item), totalSpent:sum(price)"); + + + ETLStage itemGroupStage = getGroupStage("itemGroup", "item", + "totalPurchases:count(user), latestPurchase:max(ts)"); + + List includedStagesList = + Lists.newArrayList("userGroup", "itemGroup"); + String includedStages = Joiner.on("\u0001").join(includedStagesList); + + ArtifactSelectorConfig selectorConfig = new ArtifactSelectorConfig(null, + "google-cloud", + "[0.18.0-SNAPSHOT, 1.0.0-SNAPSHOT)"); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, includedStages), + selectorConfig + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(purchaseStage) + .addStage(userSinkStage) + .addStage(itemSinkStage) + .addStage(userGroupStage) + .addStage(itemGroupStage) + .addConnection(purchaseStage.getName(), userGroupStage.getName()) + .addConnection(purchaseStage.getName(), itemGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .addConnection(itemGroupStage.getName(), itemSinkStage.getName()) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .build(); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-groupby-test"); + ApplicationManager appManager = deployApplication(appId, request); + + // ingest data + ingestData(PURCHASE_SOURCE); + + // run the pipeline + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 15, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager(SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + Map> groupedUsers = readOutputGroupBy(serviceManager, USER_SINK, USER_SCHEMA); + Map> groupedItems = readOutputGroupBy(serviceManager, ITEM_SINK, ITEM_SCHEMA); + + verifyOutput(groupedUsers, groupedItems); + } + + private void ingestConditionData(String conditionDatasetName) throws Exception { + DataSetManager
manager = getTableDataset(conditionDatasetName); + Table table = manager.get(); + putConditionValues(table, 1, "Ben", 23, true, "Berlin", "doughnut", 1.5); + putConditionValues(table, 2, "Ben", 23, true, "LA", "pretzel", 2.05); + putConditionValues(table, 3, "Ben", 23, true, "Berlin", "doughnut", 0.75); + putConditionValues(table, 4, "Ben", 23, true, "Tokyo", "pastry", 3.25); + putConditionValues(table, 5, "Emma", 18, false, "Tokyo", "doughnut", 1.75); + putConditionValues(table, 6, "Emma", 18, false, "LA", "bagel", 2.95); + putConditionValues(table, 7, "Emma", 18, false, "Berlin", "pretzel", 2.05); + putConditionValues(table, 8, "Ron", 22, true, "LA", "bagel", 2.95); + putConditionValues(table, 9, "Ron", 22, true, "Tokyo", "pretzel", 0.5); + putConditionValues(table, 10, "Ron", 22, true, "Berlin", "doughnut", 1.75); + + manager.flush(); + } + + private void putConditionValues(Table table, int id, String name, double age, boolean isMember, String city, + String item, double price) { + Put put = new Put(Bytes.toBytes(id)); + put.add("name", name); + put.add("age", age); + put.add("isMember", isMember); + put.add("city", city); + put.add("item", item); + put.add("price", price); + table.put(put); + } + + private Map> readOutputGroupBy(ServiceManager serviceManager, String sink, Schema schema) + throws IOException { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", sink)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { + }.getType()).getResponseObject(); + + return parseOutputGroupBy(map, schema); + } + + private Map> parseOutputGroupBy(Map contents, Schema schema) throws IOException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString()); + Map> group = new HashMap<>(); + + for (Map.Entry entry : contents.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader); + for (GenericRecord record : fileStream) { + List fields = schema.getFields(); + List values = new ArrayList<>(); + values.add((Long) record.get(fields.get(1).getName())); + values.add((Long) record.get(fields.get(2).getName())); + group.put(record.get(fields.get(0).getName()).toString(), values); + } + fileStream.close(); + } + return group; + } + + private void ingestData(String purchasesDatasetName) throws Exception { + // write input data + // 1: 1234567890000, samuel, island, 1000 + // 2: 1234567890001, samuel, shirt, 15 + // 3. 1234567890001, samuel, pie, 20 + // 4. 1234567890002, john, pie, 25 + // 5. 1234567890003, john, shirt, 30 + DataSetManager
purchaseManager = getTableDataset(purchasesDatasetName); + Table purchaseTable = purchaseManager.get(); + // 1: 1234567890000, samuel, island, 1000 + putValues(purchaseTable, 1, 1234567890000L, "samuel", "island", 1000L); + putValues(purchaseTable, 2, 1234567890001L, "samuel", "shirt", 15L); + putValues(purchaseTable, 3, 1234567890001L, "samuel", "pie", 20L); + putValues(purchaseTable, 4, 1234567890002L, "john", "pie", 25L); + putValues(purchaseTable, 5, 1234567890003L, "john", "shirt", 30L); + purchaseManager.flush(); + } + + private void ingestInputDataDeduplicate(String inputDatasetName) throws Exception { + // 1: shelton, alex, professor, 45 + // 2: seitz, bob, professor, 50 + // 3: schuster, chris, accountant, 23 + // 4: bolt , henry , engineer, 30 + // 5: gamal , ali , engineer, 28 + DataSetManager
inputManager = getTableDataset(inputDatasetName); + Table inputTable = inputManager.get(); + putValuesDeduplicate(inputTable, 1, "Shelton", "Alex", "professor", 45); + putValuesDeduplicate(inputTable, 2, "Seitz", "Bob", "professor", 50); + putValuesDeduplicate(inputTable, 3, "Schuster", "Chris", "accountant", 23); + putValuesDeduplicate(inputTable, 4, "Bolt", "Henry", "engineer", 30); + putValuesDeduplicate(inputTable, 5, "Gamal", "Ali", "engineer", 28); + inputManager.flush(); + } + + private void putValuesDeduplicate(Table inputTable, int index, String lastname, String firstname, String profession, + int age) { + Put put = new Put(Bytes.toBytes(index)); + put.add("Lastname", lastname); + put.add("Firstname", firstname); + put.add("profession", profession); + put.add("age", age); + inputTable.put(put); + } + + private void putValues(Table purchaseTable, int index, long timestamp, + String user, String item, long price) { + Put put = new Put(Bytes.toBytes(index)); + put.add("ts", timestamp); + put.add("user", user); + put.add("item", item); + put.add("price", price); + purchaseTable.put(put); + } + + private void verifyOutput(Map> groupedUsers, Map> groupedItems) { + // users table should have: + // samuel: 3, 1000 + 15 + 20 + List groupedValues = groupedUsers.get("samuel"); + Assert.assertEquals(groupedValues.get(0).longValue(), 3L); + Assert.assertEquals(Math.abs(groupedValues.get(1).longValue() - 1000L - 15L - 20L), 0L); + // john: 2, 25 + 30 + groupedValues = groupedUsers.get("john"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(Math.abs(groupedValues.get(1).longValue() - 25L - 30L), 0L); + + // items table should have: + // island: 1, 1234567890000 + groupedValues = groupedItems.get("island"); + Assert.assertEquals(groupedValues.get(0).longValue(), 1L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890000L); + + // pie: 2, 1234567890002 + groupedValues = groupedItems.get("pie"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890002L); + + // shirt: 2, 1234567890003 + groupedValues = groupedItems.get("shirt"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890003L); + } + + private ETLStage getSink(String name, Schema schema) { + return new ETLStage(name, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, name) + .put("schema", schema.toString()) + .build(), null)); + } + + private ETLStage getGroupStage(String name, String field, String condition) { + return new ETLStage(name, + new ETLPlugin("GroupByAggregate", + BatchAggregator.PLUGIN_TYPE, + ImmutableMap.of( + "groupByFields", field, + "aggregates", condition), null)); + } + private Set readOutput(ServiceManager serviceManager, String sink, Schema schema) throws IOException { URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java b/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java index 3e53638f..ffc9671b 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java @@ -21,6 +21,7 @@ import io.cdap.cdap.app.etl.batch.BatchAggregatorTest; import io.cdap.cdap.app.etl.batch.BatchCubeSinkTest; import io.cdap.cdap.app.etl.batch.BatchJoinerTest; +import io.cdap.cdap.app.etl.batch.DedupAggregatorTest; import io.cdap.cdap.app.etl.batch.ETLMapReduceTest; import io.cdap.cdap.app.etl.batch.ExcelInputReaderTest; import io.cdap.cdap.app.etl.batch.HivePluginTest; @@ -65,6 +66,7 @@ BatchJoinerTest.class, DatasetTest.class, DataStreamsTest.class, + DedupAggregatorTest.class, ETLMapReduceTest.class, ExcelInputReaderTest.class, FileSetTest.class,