Skip to content

Commit

Permalink
modifying execute API to get column nullability state
Browse files Browse the repository at this point in the history
  • Loading branch information
minurajeeve committed Nov 28, 2023
1 parent 5790e4b commit f9fcbc1
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2016-2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

/**
* A Null Handling specific exception used for communicating issues with Null Handling in a column.
*/
public class NullHandlingException extends Exception {
public NullHandlingException(Exception e) {
super(e);
}

public NullHandlingException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
Expand All @@ -40,6 +41,8 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

Expand All @@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu
private final RecipeParser recipeParser;
private final ExecutorContext context;
private List<Directive> directives;
private HashMap<String, UserDefinedAction> nullabilityMap;

public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) {
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context,
HashMap<String, UserDefinedAction> nullabilityMap) {
this.context = context;
this.recipeParser = recipeParser;
this.nullabilityMap = nullabilityMap;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, context).execute(rows);
return new RecipePipelineExecutor(parser, context, null).execute(rows);
}

/**
Expand All @@ -112,7 +112,7 @@ public static Pair<List<Row>, List<Row>> executeWithErrors(String[] recipe, List

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context);
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null);
List<Row> results = pipeline.execute(rows);
List<Row> errors = pipeline.errors();
return new Pair<>(results, errors);
Expand All @@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe)

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, new TestingPipelineContext());
return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null);
}

public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package io.cdap.wrangler.proto.workspace.v2;

import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

/**
Expand All @@ -26,10 +28,14 @@
public class DirectiveExecutionRequest {
private final List<String> directives;
private final int limit;
private final HashMap<String, UserDefinedAction> nullabilityMap;

public DirectiveExecutionRequest(List<String> directives, int limit) {

public DirectiveExecutionRequest(List<String> directives, int limit,
HashMap<String, UserDefinedAction> nullabilityMap) {
this.directives = directives;
this.limit = limit;
this.nullabilityMap = nullabilityMap;
}

public int getLimit() {
Expand All @@ -39,4 +45,8 @@ public int getLimit() {
public List<String> getDirectives() {
return directives == null ? Collections.emptyList() : directives;
}

public HashMap<String, UserDefinedAction> getNullabilityMap() {
return nullabilityMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.gson.JsonObject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
Expand All @@ -38,6 +39,8 @@ public class Workspace {
// this is for insights page in UI
private final JsonObject insights;

private HashMap<String, UserDefinedAction> nullabilityMap;

private Workspace(String workspaceName, String workspaceId, List<String> directives,
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec,
JsonObject insights) {
Expand All @@ -48,6 +51,7 @@ private Workspace(String workspaceName, String workspaceId, List<String> directi
this.updatedTimeMillis = updatedTimeMillis;
this.sampleSpec = sampleSpec;
this.insights = insights;
this.nullabilityMap = new HashMap<>();
}

public String getWorkspaceName() {
Expand Down Expand Up @@ -79,6 +83,15 @@ public JsonObject getInsights() {
return insights;
}

public HashMap<String, UserDefinedAction> getColumnMappings() {
return nullabilityMap;
}

public void setColumnMappings(
HashMap<String, UserDefinedAction> nullabilityMap) {
this.nullabilityMap = nullabilityMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -164,4 +177,15 @@ public Workspace build() {
insights);
}
}

/**
* UserDefinedAction enum.
*/
public enum UserDefinedAction {
NO_ACTION,
SKIP_ROW,
SEND_TO_ERROR_COLLECTOR,
ERROR_PIPELINE,
NULLABLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import io.cdap.wrangler.proto.workspace.ColumnValidationResult;
import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult;
import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse;
import io.cdap.wrangler.proto.workspace.v2.SampleSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
Expand Down Expand Up @@ -118,7 +121,8 @@ protected <E extends Exception> List<Row> executeDirectives(
String namespace,
List<String> directives,
List<Row> sample,
GrammarWalker.Visitor<E> grammarVisitor) throws DirectiveParseException, E, RecipeException {
GrammarWalker.Visitor<E> grammarVisitor,
Workspace workspace) throws DirectiveParseException, E, RecipeException {

if (directives.isEmpty()) {
return sample;
Expand All @@ -139,8 +143,11 @@ protected <E extends Exception> List<Row> executeDirectives(
new ConfigDirectiveContext(DirectiveConfig.EMPTY));
try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser,
new ServicePipelineContext(
namespace, ExecutorContext.Environment.SERVICE,
getContext(), TRANSIENT_STORE))) {
namespace,
ExecutorContext.Environment.SERVICE,
getContext(),
TRANSIENT_STORE),
workspace.getColumnMappings())) {
List<Row> result = executor.execute(sample);

List<ErrorRecordBase> errors = executor.errors()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ private <E extends Exception> List<Row> executeDirectives(NamespacedId id, List<
// Extract rows from the workspace.
List<Row> rows = fromWorkspace(workspace);
return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows),
grammarVisitor);
grammarVisitor, null);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
namespace,
ExecutorContext.Environment.SERVICE,
systemAppContext,
new DefaultTransientStore()))) {
new DefaultTransientStore()), null)) {
rows = executor.execute(rows);
List<ErrorRecordBase> errors = executor.errors().stream()
.filter(ErrorRecordBase::isShownInWrangler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse;
import io.cdap.wrangler.proto.workspace.v2.StageSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId;
Expand Down Expand Up @@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque

WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ?
new HashMap<>() : executionRequest.getNullabilityMap();
if (!nullabilityMap.isEmpty()) {
//change nullabilityMap in Workspace Object
changeNullability(nullabilityMap, workspaceId);
}
List<Row> result = executeDirectives(ns.getName(), directives, detail,
userDirectivesCollector);
DirectiveExecutionResponse response = generateExecutionResponse(result,
Expand All @@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
return response;
}

private void changeNullability(HashMap<String, UserDefinedAction> columnMappings,
WorkspaceId workspaceId) throws Exception {
try {
Workspace workspace = wsStore.getWorkspace(workspaceId);
workspace.setColumnMappings(columnMappings);
wsStore.updateWorkspace(workspaceId, workspace);
} catch (Exception e) {
throw new RuntimeException("Error in setting nullabilityMap of columns ", e);
}
}


/**
* Get source specs, contains some hacky way on dealing with the csv parser
*/
Expand Down Expand Up @@ -580,7 +599,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St
// load the udd
composite.reload(namespace);
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()),
grammarVisitor);
grammarVisitor, detail.getWorkspace());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class<? extends Directive> directive, Test

String migrate = new MigrateToV2(recipe.toArray()).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, null);
return new RecipePipelineExecutor(parser, null, null);
}

public static RecipeParser parser(Class<? extends Directive> directive, String[] recipe)
Expand Down

0 comments on commit f9fcbc1

Please sign in to comment.