Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modifying execute API to get column nullability state #686

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2016-2019 Cask Data, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024

*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

/**
* A Null Handling specific exception used for communicating issues with Null Handling in a column.
*/
public class NullHandlingException extends Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being thrown or caught anywhere, how are we expecting to use it? Would be better to leave this out of the PR and include it in whatever PR actually uses it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was for future use, removed

public NullHandlingException(Exception e) {
super(e);
}

public NullHandlingException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
Expand All @@ -40,6 +41,8 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

Expand All @@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu
private final RecipeParser recipeParser;
private final ExecutorContext context;
private List<Directive> directives;
private HashMap<String, UserDefinedAction> nullabilityMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap -> Map. In general, when declaring variables we should be using the interface and not the specific implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also be final. Basically anything passed into the constructor should be final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) {
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context,
HashMap<String, UserDefinedAction> nullabilityMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap -> Map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.context = context;
this.recipeParser = recipeParser;
this.nullabilityMap = nullabilityMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should do a defensive copy to make sure the map cannot change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
Expand Down
6 changes: 3 additions & 3 deletions wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, context).execute(rows);
return new RecipePipelineExecutor(parser, context, null).execute(rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not pass null for anything that is not annotated as @Nullable. Also should normally not pass null for any collection and instead pass an empty collection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
Expand All @@ -112,7 +112,7 @@ public static Pair<List<Row>, List<Row>> executeWithErrors(String[] recipe, List

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context);
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null);
List<Row> results = pipeline.execute(rows);
List<Row> errors = pipeline.errors();
return new Pair<>(results, errors);
Expand All @@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe)

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, new TestingPipelineContext());
return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null);
}

public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException {
Expand Down
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package io.cdap.wrangler.proto.workspace.v2;

import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

/**
Expand All @@ -26,10 +28,14 @@
public class DirectiveExecutionRequest {
private final List<String> directives;
private final int limit;
private final HashMap<String, UserDefinedAction> nullabilityMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very similar to using send-to-error and filter directives. Would like to understand why we need to introduce a new way to do almost the same thing.


public DirectiveExecutionRequest(List<String> directives, int limit) {

public DirectiveExecutionRequest(List<String> directives, int limit,
HashMap<String, UserDefinedAction> nullabilityMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap -> Map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.directives = directives;
this.limit = limit;
this.nullabilityMap = nullabilityMap;
}

public int getLimit() {
Expand All @@ -39,4 +45,8 @@ public int getLimit() {
public List<String> getDirectives() {
return directives == null ? Collections.emptyList() : directives;
}

public HashMap<String, UserDefinedAction> getNullabilityMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap -> Map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return nullabilityMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check for null here, similar to getDirectives(). This object is created by deserializing the HTTP request body, so it's possible the caller is not setting this field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.gson.JsonObject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
Expand All @@ -38,6 +39,8 @@ public class Workspace {
// this is for insights page in UI
private final JsonObject insights;

private HashMap<String, UserDefinedAction> nullabilityMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap -> Map (same for the rest of the class)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also be final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private Workspace(String workspaceName, String workspaceId, List<String> directives,
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec,
JsonObject insights) {
Expand All @@ -48,6 +51,7 @@ private Workspace(String workspaceName, String workspaceId, List<String> directi
this.updatedTimeMillis = updatedTimeMillis;
this.sampleSpec = sampleSpec;
this.insights = insights;
this.nullabilityMap = new HashMap<>();
}

public String getWorkspaceName() {
Expand Down Expand Up @@ -79,6 +83,15 @@ public JsonObject getInsights() {
return insights;
}

public HashMap<String, UserDefinedAction> getColumnMappings() {
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
return nullabilityMap;
}

public void setColumnMappings(
HashMap<String, UserDefinedAction> nullabilityMap) {
this.nullabilityMap = nullabilityMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -164,4 +177,15 @@ public Workspace build() {
insights);
}
}

/**
* UserDefinedAction enum.
*/
public enum UserDefinedAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should move this to its own class instead of being nested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

NO_ACTION,
SKIP_ROW,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a better name would be FILTER

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

SEND_TO_ERROR_COLLECTOR,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ERROR_COLLECTOR is a pipeline specific concept, this can just be SEND_TO_ERROR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But by SEND_TO_ERROR_COLLECTOR, I actually mean the pipeline specific error collector hence the name.

ERROR_PIPELINE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline is not a great name, as wrangler can be used outside of a pipeline when making HTTP requests. This can just be ERROR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

NULLABLE
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import io.cdap.wrangler.proto.workspace.ColumnValidationResult;
import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult;
import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse;
import io.cdap.wrangler.proto.workspace.v2.SampleSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
Expand Down Expand Up @@ -118,7 +121,8 @@ protected <E extends Exception> List<Row> executeDirectives(
String namespace,
List<String> directives,
List<Row> sample,
GrammarWalker.Visitor<E> grammarVisitor) throws DirectiveParseException, E, RecipeException {
GrammarWalker.Visitor<E> grammarVisitor,
Workspace workspace) throws DirectiveParseException, E, RecipeException {
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved

if (directives.isEmpty()) {
return sample;
Expand All @@ -139,8 +143,11 @@ protected <E extends Exception> List<Row> executeDirectives(
new ConfigDirectiveContext(DirectiveConfig.EMPTY));
try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser,
new ServicePipelineContext(
namespace, ExecutorContext.Environment.SERVICE,
getContext(), TRANSIENT_STORE))) {
namespace,
ExecutorContext.Environment.SERVICE,
getContext(),
TRANSIENT_STORE),
workspace.getColumnMappings())) {
List<Row> result = executor.execute(sample);

List<ErrorRecordBase> errors = executor.errors()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ private <E extends Exception> List<Row> executeDirectives(NamespacedId id, List<
// Extract rows from the workspace.
List<Row> rows = fromWorkspace(workspace);
return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows),
grammarVisitor);
grammarVisitor, null);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
namespace,
ExecutorContext.Environment.SERVICE,
systemAppContext,
new DefaultTransientStore()))) {
new DefaultTransientStore()), null)) {
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
rows = executor.execute(rows);
List<ErrorRecordBase> errors = executor.errors().stream()
.filter(ErrorRecordBase::isShownInWrangler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse;
import io.cdap.wrangler.proto.workspace.v2.StageSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId;
Expand Down Expand Up @@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque

WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be in the getNullabilityMap() class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new HashMap<>() : executionRequest.getNullabilityMap();
if (!nullabilityMap.isEmpty()) {
//change nullabilityMap in Workspace Object
changeNullability(nullabilityMap, workspaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should update the workspace after executing the directives (this is already happening), not before. Otherwise the execution can fail and now there's a partially updated workspace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the updated nullabilityMap before directives are executed.

}
List<Row> result = executeDirectives(ns.getName(), directives, detail,
userDirectivesCollector);
DirectiveExecutionResponse response = generateExecutionResponse(result,
Expand All @@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
return response;
}

private void changeNullability(HashMap<String, UserDefinedAction> columnMappings,
WorkspaceId workspaceId) throws Exception {
try {
Workspace workspace = wsStore.getWorkspace(workspaceId);
workspace.setColumnMappings(columnMappings);
wsStore.updateWorkspace(workspaceId, workspace);
} catch (Exception e) {
throw new RuntimeException("Error in setting nullabilityMap of columns ", e);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



/**
* Get source specs, contains some hacky way on dealing with the csv parser
*/
Expand Down Expand Up @@ -580,7 +599,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St
// load the udd
composite.reload(namespace);
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()),
grammarVisitor);
grammarVisitor, detail.getWorkspace());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class<? extends Directive> directive, Test

String migrate = new MigrateToV2(recipe.toArray()).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, null);
return new RecipePipelineExecutor(parser, null, null);
}

public static RecipeParser parser(Class<? extends Directive> directive, String[] recipe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.wrangler.parser.NoOpDirectiveContext;
import io.cdap.wrangler.parser.RecipeCompiler;
import io.cdap.wrangler.proto.Contexts;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.DirectiveRegistry;
Expand Down Expand Up @@ -365,7 +366,7 @@ && checkPreconditionNotEmpty(false)) {

try {
// Create the pipeline executor with context being set.
pipeline = new RecipePipelineExecutor(recipe, ctx);
pipeline = new RecipePipelineExecutor(recipe, ctx, null);
} catch (Exception e) {
throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e);
}
Expand Down
Loading