Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL preconditions for Wrangler #550

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions wrangler-transform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
<artifactId>wrangler-transform</artifactId>
<name>Wrangler Transform</name>

<properties>
<spark.version>2.3.1</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>io.cdap.wrangler</groupId>
Expand All @@ -28,6 +32,68 @@
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-etl-api-spark</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-api-spark2_2.11</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.esotericsoftware.reflectasm</groupId>
<artifactId>reflectasm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
58 changes: 58 additions & 0 deletions wrangler-transform/src/main/java/io/cdap/wrangler/SQLWrangler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler;

import io.cdap.cdap.api.annotation.RuntimeImplementation;
import io.cdap.cdap.etl.api.dl.DLDataSet;
import io.cdap.cdap.etl.api.dl.DLExpressionFactory;
import io.cdap.cdap.etl.api.dl.DLPluginContext;
import io.cdap.cdap.etl.api.dl.SimpleDLPluginRuntimeImplementation;
import io.cdap.cdap.etl.api.engine.sql.StandardSQLCapabilities;
import io.cdap.wrangler.Wrangler.Config;

/**
* Wrangler implementation with BQ pushdown. Currently it's not implemented
*/
@RuntimeImplementation(pluginClass = Wrangler.class, order = Wrangler.ORDER_SQL)
public class SQLWrangler implements SimpleDLPluginRuntimeImplementation {

private final Config config;

public SQLWrangler(Config config) {
this.config = config;
if (!Config.EL_SQL.equalsIgnoreCase(config.getExpressionLanguage())) {
//Fall back
throw new IllegalStateException("This implementation runs for SQL language");
}
if (config.getDirectives() != null && !config.getDirectives().trim().isEmpty()) {
throw new IllegalStateException("We only run this for empty directives list");
}
}

@Override
public void initialize(DLPluginContext context) throws Exception {
if (!context.getDLContext().getExpressionFactory(StandardSQLCapabilities.SQL).isPresent()) {
throw new IllegalStateException("Expression language does not implement SQL");
}
}

@Override
public DLDataSet transform(DLPluginContext context, DLDataSet input) {
DLExpressionFactory expressionFactory = context.getDLContext()
.getExpressionFactory(StandardSQLCapabilities.SQL).get();
return input.filter(expressionFactory.compile(config.getPrecondition()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler;

import io.cdap.cdap.api.annotation.RuntimeImplementation;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.spark.sql.DataFrames;
import io.cdap.cdap.etl.api.batch.SparkCompute;
import io.cdap.cdap.etl.api.batch.SparkExecutionPluginContext;
import io.cdap.wrangler.Wrangler.Config;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

/**
* Wrangler implementation with Spark SQL. Currently it has only precondition support
*/
@RuntimeImplementation(pluginClass = Wrangler.class, order = Wrangler.ORDER_SPARK)
public class SparkWrangler extends SparkCompute<StructuredRecord, StructuredRecord> {

private final Wrangler.Config config;

public SparkWrangler(Config config) {
this.config = config;
if (!Config.EL_SQL.equalsIgnoreCase(config.getExpressionLanguage())) {
//Fall back
throw new IllegalStateException("This implementation runs for SQL language");
}
if (config.getDirectives() != null && !config.getDirectives().trim().isEmpty()) {
throw new IllegalStateException("We only run this for empty directives list");
}
}

@Override
public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext context,
JavaRDD<StructuredRecord> input) throws Exception {
SparkSession session = SparkSession.builder().sparkContext(context.getSparkContext().sc())
.getOrCreate();
StructType inputSchema = DataFrames.toDataType(context.getInputSchema());
Schema outputSchema = context.getOutputSchema();
JavaRDD<Row> rowRDD = input.map(record -> DataFrames.toRow(record, inputSchema));
Dataset<Row> df = session.createDataFrame(rowRDD, inputSchema);
Dataset<Row> result = df.filter(config.getPrecondition());
return result.javaRDD()
.map(r -> DataFrames.fromRow(r, outputSchema));
}
}
36 changes: 35 additions & 1 deletion wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.annotation.RuntimeImplementation;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
Expand Down Expand Up @@ -82,6 +83,7 @@
@Plugin(type = "transform")
@Name("Wrangler")
@Description("Wrangler - A interactive tool for data cleansing and transformation.")
@RuntimeImplementation(pluginClass = Wrangler.class, order = Wrangler.ORDER_TRANSFORM)
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> {
private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);

Expand All @@ -92,6 +94,10 @@ public class Wrangler extends Transform<StructuredRecord, StructuredRecord> {
private static final String ON_ERROR_DEFAULT = "fail-pipeline";
private static final String ERROR_STRATEGY_DEFAULT = "wrangler.error.strategy.default";

static final int ORDER_SQL = 1;
static final int ORDER_SPARK = 2;
static final int ORDER_TRANSFORM = 3;

// Plugin configuration.
private final Config config;

Expand Down Expand Up @@ -288,6 +294,10 @@ public void prepareRun(StageSubmitterContext context) throws Exception {
public void initialize(TransformContext context) throws Exception {
super.initialize(context);

if (Config.EL_SQL.equalsIgnoreCase(config.getExpressionLanguage())) {
throw new IllegalStateException("SQL is not supported in MR implementation");
}

// Parse DSL and initialize the wrangle pipeline.
store = new DefaultTransientStore();
RecipeParser recipe = getRecipeParser(context);
Expand Down Expand Up @@ -480,6 +490,9 @@ public static class Config extends PluginConfig {
static final String NAME_UDD = "udd";
static final String NAME_SCHEMA = "schema";
static final String NAME_ON_ERROR = "on-error";
static final String NAME_EL = "expression-language";

static final String EL_SQL = "sql";

@Name(NAME_PRECONDITION)
@Description("Precondition expression specifying filtering before applying directives (true to filter)")
Expand Down Expand Up @@ -513,14 +526,21 @@ public static class Config extends PluginConfig {
@Nullable
private final String onError;

@Name(NAME_EL)
@Description("Expression language to use (jexl, sql). Currently applies to precondition only.")
@Macro
@Nullable
private String expressionLanguage;

public Config(String precondition, String directives, String udds,
String field, String schema, String onError) {
String field, String schema, String onError, String expressionLanguage) {
this.precondition = precondition;
this.directives = directives;
this.udds = udds;
this.field = field;
this.schema = schema;
this.onError = onError;
this.expressionLanguage = expressionLanguage;
}

/**
Expand All @@ -529,6 +549,20 @@ public Config(String precondition, String directives, String udds,
public String getOnError() {
return onError == null ? ON_ERROR_DEFAULT : onError;
}

@Nullable
public String getExpressionLanguage() {
return expressionLanguage;
}

public String getPrecondition() {
return precondition;
}

@Nullable
public String getDirectives() {
return directives;
}
}
}