diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java new file mode 100644 index 000000000..bd66cf5f6 --- /dev/null +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/batch/DedupAggregatorTest.java @@ -0,0 +1,226 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.etl.batch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.reflect.TypeToken; +import io.cdap.cdap.api.Resources; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Put; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.app.etl.ETLTestBase; +import io.cdap.cdap.app.etl.dataset.DatasetAccessApp; +import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService; +import io.cdap.cdap.datapipeline.SmartWorkflow; +import io.cdap.cdap.etl.api.Engine; +import io.cdap.cdap.etl.api.batch.BatchAggregator; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.ServiceManager; +import io.cdap.cdap.test.WorkflowManager; +import io.cdap.cdap.test.suite.category.RequiresSpark; +import io.cdap.common.http.HttpMethod; +import io.cdap.common.http.HttpResponse; +import io.cdap.common.http.ObjectResponse; +import io.cdap.plugin.common.Properties; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.DatumReader; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Tests DedupAggregator + */ +public class DedupAggregatorTest extends ETLTestBase { + public static final String SMARTWORKFLOW_NAME = SmartWorkflow.NAME; + public static final String USER_SOURCE = "userSource"; + public static final String USER_SINK = "userSink"; + + public static final Schema USER_SCHEMA = Schema.recordOf( + "user", + Schema.Field.of("Lastname", Schema.of(Schema.Type.STRING)), + Schema.Field.of("Firstname", Schema.of(Schema.Type.STRING)), + Schema.Field.of("profession", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.INT))); + + private static final Map CONFIG_MAP = new ImmutableMap.Builder() + .put("uniqueFields", "profession") + .put("filterOperation", "age:Min") + .build(); + + @Category({ + RequiresSpark.class + }) + @Test + public void testDeduplicateSpark() throws Exception { + testDeduplicate(Engine.SPARK); + } + + private void testDeduplicate(Engine spark) throws Exception { + ETLStage userSourceStage = + new ETLStage("users", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, USER_SOURCE, + Properties.Table.PROPERTY_SCHEMA, USER_SCHEMA.toString()), null)); + + ETLStage userSinkStage = new ETLStage(USER_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, USER_SINK) + .put("schema", USER_SCHEMA.toString()) + .build(), null)); + + ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate", + BatchAggregator.PLUGIN_TYPE, + CONFIG_MAP, null)); + + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(userSourceStage) + .addStage(userSinkStage) + .addStage(userGroupStage) + .addConnection(userSourceStage.getName(), userGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .build(); + + + ingestInputData(USER_SOURCE); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("deduplicate-test"); + ApplicationManager appManager = deployApplication(appId, request); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager + (SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser().parse(USER_SCHEMA.toString()); + // output has these records: + // 1: shelton, alex, professor, 45 + // 3: schuster, chris, accountant, 23 + // 5: gamal , ali , engineer, 28 + GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Shelton") + .set("Firstname", "Alex") + .set("profession", "professor") + .set("age", 45) + .build(); + + GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Schuster") + .set("Firstname", "Chris") + .set("profession", "accountant") + .set("age", 23) + .build(); + + GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Gamal") + .set("Firstname", "Ali") + .set("profession", "engineer") + .set("age", 28) + .build(); + + Set expected = ImmutableSet.of(record1, record2, record3); + // verfiy output + Assert.assertEquals(expected, readOutput(serviceManager, USER_SINK, USER_SCHEMA)); + } + + private Set readOutput(ServiceManager serviceManager, String sink, Schema schema) + throws IOException { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", sink)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { }.getType()).getResponseObject(); + + return parseOutput(map, schema); + } + + private Set parseOutput(Map contents, Schema schema) throws IOException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString()); + Set records = new HashSet<>(); + for (Map.Entry entry : contents.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + try (DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader)) { + for (GenericRecord record : fileStream) { + records.add(record); + } + } + } + return records; + } + + private void ingestInputData(String inputDatasetName) throws Exception { + // 1: shelton, alex, professor, 45 + // 2: seitz, bob, professor, 50 + // 3: schuster, chris, accountant, 23 + // 4: bolt , henry , engineer, 30 + // 5: gamal , ali , engineer, 28 + DataSetManager inputManager = getTableDataset(inputDatasetName); + Table inputTable = inputManager.get(); + putValues(inputTable, 1, "Shelton", "Alex", "professor", 45); + putValues(inputTable, 2, "Seitz", "Bob", "professor", 50); + putValues(inputTable, 3, "Schuster", "Chris", "accountant", 23); + putValues(inputTable, 4, "Bolt", "Henry", "engineer", 30); + putValues(inputTable, 5, "Gamal", "Ali", "engineer", 28); + inputManager.flush(); + } + + private void putValues(Table inputTable, int index, String lastname, String firstname, String profession, + int age) { + Put put = new Put(Bytes.toBytes(index)); + put.add("Lastname", lastname); + put.add("Firstname", firstname); + put.add("profession", profession); + put.add("age", age); + inputTable.put(put); + } +} diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java index c13dba19a..465012422 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleBigQuerySQLEngineTest.java @@ -16,23 +16,30 @@ package io.cdap.cdap.app.etl.gcp; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import io.cdap.cdap.api.Resources; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.table.Put; import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.app.etl.batch.DedupAggregatorTest; import io.cdap.cdap.app.etl.dataset.DatasetAccessApp; import io.cdap.cdap.app.etl.dataset.SnapshotFilesetService; import io.cdap.cdap.common.ArtifactNotFoundException; import io.cdap.cdap.datapipeline.SmartWorkflow; import io.cdap.cdap.etl.api.Engine; +import io.cdap.cdap.etl.api.batch.BatchAggregator; import io.cdap.cdap.etl.api.batch.BatchJoiner; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine; +import io.cdap.cdap.etl.proto.ArtifactSelectorConfig; import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; import io.cdap.cdap.etl.proto.v2.ETLPlugin; import io.cdap.cdap.etl.proto.v2.ETLStage; @@ -60,15 +67,22 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumReader; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -78,9 +92,50 @@ */ public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase { + private static final Logger LOG = LoggerFactory.getLogger(GoogleBigQuerySQLEngineTest.class); private static final String BQ_SQLENGINE_PLUGIN_NAME = "BigQueryPushdownEngine"; - private static final String BIG_QUERY_DATASET = "bq_dataset_joiner_test"; + private static final String BIG_QUERY_DATASET_PREFIX = "bq_pd_ds_"; private static final String CONNECTION_NAME = String.format("test_bq_%s", GoogleBigQueryUtils.getUUID()); + public static final String PURCHASE_SOURCE = "purchaseSource"; + public static final String ITEM_SINK = "itemSink"; + public static final String USER_SINK = "userSink"; + public static final String DEDUPLICATE_SOURCE = "userSource"; + public static final String DEDUPLICATE_SINK = "userSink"; + public static final long MILLISECONDS_IN_A_DAY = 24 * 60 * 60 * 1000; + public static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss_SSS"); + + private static BigQuery bq; + private String bigQueryDataset; + + + private static final Map CONFIG_MAP = new ImmutableMap.Builder() + .put("uniqueFields", "profession") + .put("filterOperation", "age:Min") + .build(); + + public static final Schema PURCHASE_SCHEMA = Schema.recordOf( + "purchase", + Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), + Schema.Field.of("user", Schema.of(Schema.Type.STRING)), + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("price", Schema.of(Schema.Type.LONG))); + + public static final Schema ITEM_SCHEMA = Schema.recordOf( + "item", + Schema.Field.of("item", Schema.of(Schema.Type.STRING)), + Schema.Field.of("totalPurchases", Schema.of(Schema.Type.LONG)), + Schema.Field.of("latestPurchase", Schema.of(Schema.Type.LONG))); + + public static final Schema USER_SCHEMA = Schema.recordOf( + "user", + Schema.Field.of("user", Schema.of(Schema.Type.STRING)), + Schema.Field.of("totalPurchases", Schema.of(Schema.Type.LONG)), + Schema.Field.of("totalSpent", Schema.of(Schema.Type.LONG))); + + @BeforeClass + public static void testClassSetup() throws IOException { + bq = GoogleBigQueryUtils.getBigQuery(getProjectId(), getServiceAccountCredentials()); + } @Override protected void innerSetup() throws Exception { @@ -94,10 +149,13 @@ protected void innerSetup() throws Exception { } }, 5, TimeUnit.MINUTES, 3, TimeUnit.SECONDS); createConnection(CONNECTION_NAME, "BigQuery"); + bigQueryDataset = BIG_QUERY_DATASET_PREFIX + LocalDateTime.now().format(DATE_TIME_FORMAT); + createDataset(bigQueryDataset); } @Override protected void innerTearDown() throws Exception { + deleteDataset(bigQueryDataset); deleteConnection(CONNECTION_NAME); } @@ -110,17 +168,132 @@ public void testSQLEngineJoinSpark() throws Exception { testSQLEngineJoin(Engine.SPARK, true); } - private Map getProps(boolean useConnection) { + @Category({ + RequiresSpark.class + }) + @Test + public void testSQLEngineGroupBySpark() throws Exception { + testSQLEngineGroupBy(Engine.SPARK, false); + testSQLEngineGroupBy(Engine.SPARK, true); + } + + @Category({ + RequiresSpark.class + }) + @Test + public void testSQLEngineDeduplicateSpark() throws Exception { + testSQLEngineDeduplicate(Engine.SPARK, false); + testSQLEngineDeduplicate(Engine.SPARK, true); + } + + private Map getProps(boolean useConnection, String includedStages) { String connectionId = String.format("${conn(%s)}", CONNECTION_NAME); Map props = new HashMap<>(); if (useConnection) { props.put(ConfigUtil.NAME_CONNECTION, connectionId); props.put(ConfigUtil.NAME_USE_CONNECTION, "true"); + } + props.put("dataset", bigQueryDataset); + props.put("retainTables", "true"); + if (includedStages != null) { + props.put("includedStages", includedStages); } - props.put("dataset", BIG_QUERY_DATASET); return new ImmutableMap.Builder().putAll(props).build(); } + private void testSQLEngineDeduplicate(Engine engine, boolean useConnection) throws Exception { + ETLStage userSourceStage = + new ETLStage("users", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, DEDUPLICATE_SOURCE, + Properties.Table.PROPERTY_SCHEMA, DedupAggregatorTest.USER_SCHEMA.toString()), + null)); + + ETLStage userSinkStage = new ETLStage(DEDUPLICATE_SINK, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, DEDUPLICATE_SINK) + .put("schema", DedupAggregatorTest.USER_SCHEMA.toString()) + .build(), null)); + + ArtifactSelectorConfig selectorConfig = new ArtifactSelectorConfig(null, + "google-cloud", + "[0.18.0-SNAPSHOT, 1.0.0-SNAPSHOT)"); + + ETLStage userGroupStage = new ETLStage("KeyAggregate", new ETLPlugin("Deduplicate", + BatchAggregator.PLUGIN_TYPE, + CONFIG_MAP, null)); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, "KeyAggregate"), + selectorConfig + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(userSourceStage) + .addStage(userSinkStage) + .addStage(userGroupStage) + .addConnection(userSourceStage.getName(), userGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .build(); + + + ingestInputDataDeduplicate(DEDUPLICATE_SOURCE); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-deduplicate-test"); + ApplicationManager appManager = deployApplication(appId, request); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager + (SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + org.apache.avro.Schema avroOutputSchema = new org.apache.avro.Schema.Parser() + .parse(DedupAggregatorTest.USER_SCHEMA.toString()); + // output has these records: + // 1: shelton, alex, professor, 45 + // 3: schuster, chris, accountant, 23 + // 5: gamal , ali , engineer, 28 + GenericRecord record1 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Shelton") + .set("Firstname", "Alex") + .set("profession", "professor") + .set("age", 45) + .build(); + + GenericRecord record2 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Schuster") + .set("Firstname", "Chris") + .set("profession", "accountant") + .set("age", 23) + .build(); + + GenericRecord record3 = new GenericRecordBuilder(avroOutputSchema) + .set("Lastname", "Gamal") + .set("Firstname", "Ali") + .set("profession", "engineer") + .set("age", 28) + .build(); + + Set expected = ImmutableSet.of(record1, record2, record3); + // verfiy output + Assert.assertEquals(expected, readOutput(serviceManager, DEDUPLICATE_SINK, DedupAggregatorTest.USER_SCHEMA)); + } + private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exception { String filmDatasetName = "film-sqlenginejoinertest"; String filmCategoryDatasetName = "film-category-sqlenginejoinertest"; @@ -201,7 +374,7 @@ private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exce new ETLTransformationPushdown( new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, BatchSQLEngine.PLUGIN_TYPE, - getProps(useConnection) + getProps(useConnection, null) ) ); @@ -305,6 +478,216 @@ private void testSQLEngineJoin(Engine engine, boolean useConnection) throws Exce Assert.assertEquals(expected, readOutput(serviceManager, joinedDatasetName, outputSchema)); } + private void testSQLEngineGroupBy(Engine engine, boolean useConnection) throws Exception { + ETLStage purchaseStage = + new ETLStage("purchases", new ETLPlugin("Table", + BatchSource.PLUGIN_TYPE, + ImmutableMap.of( + Properties.BatchReadableWritable.NAME, PURCHASE_SOURCE, + Properties.Table.PROPERTY_SCHEMA, PURCHASE_SCHEMA.toString()), null)); + + ETLStage userSinkStage = getSink(USER_SINK, USER_SCHEMA); + + ETLStage itemSinkStage = getSink(ITEM_SINK, ITEM_SCHEMA); + + + ETLStage userGroupStage = getGroupStage("userGroup", "user", + "totalPurchases:count(item), totalSpent:sum(price)"); + + + ETLStage itemGroupStage = getGroupStage("itemGroup", "item", + "totalPurchases:count(user), latestPurchase:max(ts)"); + + List includedStagesList = + Lists.newArrayList("userGroup", "itemGroup"); + String includedStages = Joiner.on("\u0001").join(includedStagesList); + + ArtifactSelectorConfig selectorConfig = new ArtifactSelectorConfig(null, + "google-cloud", + "[0.18.0-SNAPSHOT, 1.0.0-SNAPSHOT)"); + + ETLTransformationPushdown transformationPushdown = + new ETLTransformationPushdown( + new ETLPlugin(BQ_SQLENGINE_PLUGIN_NAME, + BatchSQLEngine.PLUGIN_TYPE, + getProps(useConnection, includedStages), + selectorConfig + ) + ); + + ETLBatchConfig config = ETLBatchConfig.builder("* * * * *") + .addStage(purchaseStage) + .addStage(userSinkStage) + .addStage(itemSinkStage) + .addStage(userGroupStage) + .addStage(itemGroupStage) + .addConnection(purchaseStage.getName(), userGroupStage.getName()) + .addConnection(purchaseStage.getName(), itemGroupStage.getName()) + .addConnection(userGroupStage.getName(), userSinkStage.getName()) + .addConnection(itemGroupStage.getName(), itemSinkStage.getName()) + .setEngine(engine) + .setPushdownEnabled(true) + .setTransformationPushdown(transformationPushdown) + .setDriverResources(new Resources(2048)) + .setResources(new Resources(2048)) + .build(); + + AppRequest request = getBatchAppRequestV2(config); + ApplicationId appId = TEST_NAMESPACE.app("bq-sqlengine-groupby-test"); + ApplicationManager appManager = deployApplication(appId, request); + + // ingest data + ingestData(PURCHASE_SOURCE); + + // run the pipeline + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + startAndWaitForRun(workflowManager, ProgramRunStatus.COMPLETED, 15, TimeUnit.MINUTES); + + // Deploy an application with a service to get partitionedFileset data for verification + ApplicationManager applicationManager = deployApplication(DatasetAccessApp.class); + ServiceManager serviceManager = applicationManager.getServiceManager(SnapshotFilesetService.class.getSimpleName()); + startAndWaitForRun(serviceManager, ProgramRunStatus.RUNNING); + + Map> groupedUsers = readOutputGroupBy(serviceManager, USER_SINK, USER_SCHEMA); + Map> groupedItems = readOutputGroupBy(serviceManager, ITEM_SINK, ITEM_SCHEMA); + + verifyOutput(groupedUsers, groupedItems); + } + + private Map> readOutputGroupBy(ServiceManager serviceManager, String sink, Schema schema) + throws IOException { + URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + String.format("read/%s", sink)); + HttpResponse response = getRestClient().execute(HttpMethod.GET, pfsURL, getClientConfig().getAccessToken()); + + Assert.assertEquals(HttpURLConnection.HTTP_OK, response.getResponseCode()); + + Map map = ObjectResponse.>fromJsonBody( + response, new TypeToken>() { + }.getType()).getResponseObject(); + + return parseOutputGroupBy(map, schema); + } + + private Map> parseOutputGroupBy(Map contents, Schema schema) throws IOException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.toString()); + Map> group = new HashMap<>(); + + for (Map.Entry entry : contents.entrySet()) { + DatumReader datumReader = new GenericDatumReader<>(avroSchema); + DataFileStream fileStream = new DataFileStream<>( + new ByteArrayInputStream(entry.getValue()), datumReader); + for (GenericRecord record : fileStream) { + List fields = schema.getFields(); + List values = new ArrayList<>(); + values.add((Long) record.get(fields.get(1).getName())); + values.add((Long) record.get(fields.get(2).getName())); + group.put(record.get(fields.get(0).getName()).toString(), values); + } + fileStream.close(); + } + return group; + } + + private void ingestData(String purchasesDatasetName) throws Exception { + // write input data + // 1: 1234567890000, samuel, island, 1000 + // 2: 1234567890001, samuel, shirt, 15 + // 3. 1234567890001, samuel, pie, 20 + // 4. 1234567890002, john, pie, 25 + // 5. 1234567890003, john, shirt, 30 + DataSetManager
purchaseManager = getTableDataset(purchasesDatasetName); + Table purchaseTable = purchaseManager.get(); + // 1: 1234567890000, samuel, island, 1000 + putValues(purchaseTable, 1, 1234567890000L, "samuel", "island", 1000L); + putValues(purchaseTable, 2, 1234567890001L, "samuel", "shirt", 15L); + putValues(purchaseTable, 3, 1234567890001L, "samuel", "pie", 20L); + putValues(purchaseTable, 4, 1234567890002L, "john", "pie", 25L); + putValues(purchaseTable, 5, 1234567890003L, "john", "shirt", 30L); + purchaseManager.flush(); + } + + private void ingestInputDataDeduplicate(String inputDatasetName) throws Exception { + // 1: shelton, alex, professor, 45 + // 2: seitz, bob, professor, 50 + // 3: schuster, chris, accountant, 23 + // 4: bolt , henry , engineer, 30 + // 5: gamal , ali , engineer, 28 + DataSetManager
inputManager = getTableDataset(inputDatasetName); + Table inputTable = inputManager.get(); + putValuesDeduplicate(inputTable, 1, "Shelton", "Alex", "professor", 45); + putValuesDeduplicate(inputTable, 2, "Seitz", "Bob", "professor", 50); + putValuesDeduplicate(inputTable, 3, "Schuster", "Chris", "accountant", 23); + putValuesDeduplicate(inputTable, 4, "Bolt", "Henry", "engineer", 30); + putValuesDeduplicate(inputTable, 5, "Gamal", "Ali", "engineer", 28); + inputManager.flush(); + } + + private void putValuesDeduplicate(Table inputTable, int index, String lastname, String firstname, String profession, + int age) { + Put put = new Put(Bytes.toBytes(index)); + put.add("Lastname", lastname); + put.add("Firstname", firstname); + put.add("profession", profession); + put.add("age", age); + inputTable.put(put); + } + + private void putValues(Table purchaseTable, int index, long timestamp, + String user, String item, long price) { + Put put = new Put(Bytes.toBytes(index)); + put.add("ts", timestamp); + put.add("user", user); + put.add("item", item); + put.add("price", price); + purchaseTable.put(put); + } + + private void verifyOutput(Map> groupedUsers, Map> groupedItems) { + // users table should have: + // samuel: 3, 1000 + 15 + 20 + List groupedValues = groupedUsers.get("samuel"); + Assert.assertEquals(groupedValues.get(0).longValue(), 3L); + Assert.assertEquals(Math.abs(groupedValues.get(1).longValue() - 1000L - 15L - 20L), 0L); + // john: 2, 25 + 30 + groupedValues = groupedUsers.get("john"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(Math.abs(groupedValues.get(1).longValue() - 25L - 30L), 0L); + + // items table should have: + // island: 1, 1234567890000 + groupedValues = groupedItems.get("island"); + Assert.assertEquals(groupedValues.get(0).longValue(), 1L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890000L); + + // pie: 2, 1234567890002 + groupedValues = groupedItems.get("pie"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890002L); + + // shirt: 2, 1234567890003 + groupedValues = groupedItems.get("shirt"); + Assert.assertEquals(groupedValues.get(0).longValue(), 2L); + Assert.assertEquals(groupedValues.get(1).longValue(), 1234567890003L); + } + + private ETLStage getSink(String name, Schema schema) { + return new ETLStage(name, new ETLPlugin("SnapshotAvro", BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .put(Properties.BatchReadableWritable.NAME, name) + .put("schema", schema.toString()) + .build(), null)); + } + + private ETLStage getGroupStage(String name, String field, String condition) { + return new ETLStage(name, + new ETLPlugin("GroupByAggregate", + BatchAggregator.PLUGIN_TYPE, + ImmutableMap.of( + "groupByFields", field, + "aggregates", condition), null)); + } + private Set readOutput(ServiceManager serviceManager, String sink, Schema schema) throws IOException { URL pfsURL = new URL(serviceManager.getServiceURL(PROGRAM_START_STOP_TIMEOUT_SECONDS, TimeUnit.SECONDS), @@ -407,4 +790,23 @@ private void stopServiceForDataset(String datasetName) throws Exception { .getServiceManager(AbstractDatasetApp.DatasetService.class.getSimpleName()) .stop(); } + + private static void createDataset(String bigQueryDataset) { + LOG.info("Creating bigquery dataset {}", bigQueryDataset); + // Create dataset with a default table expiration of 24 hours. + DatasetInfo datasetInfo = DatasetInfo.newBuilder(bigQueryDataset) + .setDefaultTableLifetime(MILLISECONDS_IN_A_DAY) + .setDefaultPartitionExpirationMs(MILLISECONDS_IN_A_DAY) + .build(); + bq.create(datasetInfo); + LOG.info("Created bigquery dataset {}", bigQueryDataset); + } + + private static void deleteDataset(String bigQueryDataset) { + LOG.info("Deleting bigquery dataset {}", bigQueryDataset); + boolean deleted = bq.delete(bigQueryDataset, BigQuery.DatasetDeleteOption.deleteContents()); + if (deleted) { + LOG.info("Deleted bigquery dataset {}", bigQueryDataset); + } + } } diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java b/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java index 3e53638f5..ffc9671b5 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/test/suite/AllTests.java @@ -21,6 +21,7 @@ import io.cdap.cdap.app.etl.batch.BatchAggregatorTest; import io.cdap.cdap.app.etl.batch.BatchCubeSinkTest; import io.cdap.cdap.app.etl.batch.BatchJoinerTest; +import io.cdap.cdap.app.etl.batch.DedupAggregatorTest; import io.cdap.cdap.app.etl.batch.ETLMapReduceTest; import io.cdap.cdap.app.etl.batch.ExcelInputReaderTest; import io.cdap.cdap.app.etl.batch.HivePluginTest; @@ -65,6 +66,7 @@ BatchJoinerTest.class, DatasetTest.class, DataStreamsTest.class, + DedupAggregatorTest.class, ETLMapReduceTest.class, ExcelInputReaderTest.class, FileSetTest.class,