-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
modifying execute API to get column nullability state #686
base: develop
Are you sure you want to change the base?
Changes from 1 commit
080216e
d9cf5bc
6ae414e
b1fa542
aa0e088
e4a3a43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright © 2016-2019 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.wrangler.api; | ||
|
||
/** | ||
* A Null Handling specific exception used for communicating issues with Null Handling in a column. | ||
*/ | ||
public class NullHandlingException extends Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see this being thrown or caught anywhere, how are we expecting to use it? Would be better to leave this out of the PR and include it in whatever PR actually uses it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was for future use, removed |
||
public NullHandlingException(Exception e) { | ||
super(e); | ||
} | ||
|
||
public NullHandlingException(String message) { | ||
super(message); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import io.cdap.wrangler.api.ReportErrorAndProceed; | ||
import io.cdap.wrangler.api.Row; | ||
import io.cdap.wrangler.api.TransientVariableScope; | ||
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; | ||
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; | ||
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; | ||
import io.cdap.wrangler.schema.TransientStoreKeys; | ||
|
@@ -40,6 +41,8 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import javax.annotation.Nullable; | ||
|
||
|
@@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu | |
private final RecipeParser recipeParser; | ||
private final ExecutorContext context; | ||
private List<Directive> directives; | ||
private HashMap<String, UserDefinedAction> nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map. In general, when declaring variables we should be using the interface and not the specific implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should also be final. Basically anything passed into the constructor should be final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) { | ||
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, | ||
HashMap<String, UserDefinedAction> nullabilityMap) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
this.context = context; | ||
this.recipeParser = recipeParser; | ||
this.nullabilityMap = nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should do a defensive copy to make sure the map cannot change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext | |
|
||
String migrate = new MigrateToV2(recipe).migrate(); | ||
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); | ||
return new RecipePipelineExecutor(parser, context).execute(rows); | ||
return new RecipePipelineExecutor(parser, context, null).execute(rows); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should not pass null for anything that is not annotated as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
/** | ||
|
@@ -112,7 +112,7 @@ public static Pair<List<Row>, List<Row>> executeWithErrors(String[] recipe, List | |
|
||
String migrate = new MigrateToV2(recipe).migrate(); | ||
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); | ||
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context); | ||
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null); | ||
List<Row> results = pipeline.execute(rows); | ||
List<Row> errors = pipeline.errors(); | ||
return new Pair<>(results, errors); | ||
|
@@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe) | |
|
||
String migrate = new MigrateToV2(recipe).migrate(); | ||
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); | ||
return new RecipePipelineExecutor(parser, new TestingPipelineContext()); | ||
return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null); | ||
} | ||
|
||
public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException { | ||
|
harshdeeppruthi marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,9 @@ | |
|
||
package io.cdap.wrangler.proto.workspace.v2; | ||
|
||
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
|
||
/** | ||
|
@@ -26,10 +28,14 @@ | |
public class DirectiveExecutionRequest { | ||
private final List<String> directives; | ||
private final int limit; | ||
private final HashMap<String, UserDefinedAction> nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very similar to using send-to-error and filter directives. Would like to understand why we need to introduce a new way to do almost the same thing. |
||
|
||
public DirectiveExecutionRequest(List<String> directives, int limit) { | ||
|
||
public DirectiveExecutionRequest(List<String> directives, int limit, | ||
HashMap<String, UserDefinedAction> nullabilityMap) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
this.directives = directives; | ||
this.limit = limit; | ||
this.nullabilityMap = nullabilityMap; | ||
} | ||
|
||
public int getLimit() { | ||
|
@@ -39,4 +45,8 @@ public int getLimit() { | |
public List<String> getDirectives() { | ||
return directives == null ? Collections.emptyList() : directives; | ||
} | ||
|
||
public HashMap<String, UserDefinedAction> getNullabilityMap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to check for null here, similar to getDirectives(). This object is created by deserializing the HTTP request body, so it's possible the caller is not setting this field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import com.google.gson.JsonObject; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import javax.annotation.Nullable; | ||
|
@@ -38,6 +39,8 @@ public class Workspace { | |
// this is for insights page in UI | ||
private final JsonObject insights; | ||
|
||
private HashMap<String, UserDefinedAction> nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map (same for the rest of the class) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should also be final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
private Workspace(String workspaceName, String workspaceId, List<String> directives, | ||
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, | ||
JsonObject insights) { | ||
|
@@ -48,6 +51,7 @@ private Workspace(String workspaceName, String workspaceId, List<String> directi | |
this.updatedTimeMillis = updatedTimeMillis; | ||
this.sampleSpec = sampleSpec; | ||
this.insights = insights; | ||
this.nullabilityMap = new HashMap<>(); | ||
} | ||
|
||
public String getWorkspaceName() { | ||
|
@@ -79,6 +83,15 @@ public JsonObject getInsights() { | |
return insights; | ||
} | ||
|
||
public HashMap<String, UserDefinedAction> getColumnMappings() { | ||
harshdeeppruthi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nullabilityMap; | ||
} | ||
|
||
public void setColumnMappings( | ||
HashMap<String, UserDefinedAction> nullabilityMap) { | ||
this.nullabilityMap = nullabilityMap; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
|
@@ -164,4 +177,15 @@ public Workspace build() { | |
insights); | ||
} | ||
} | ||
|
||
/** | ||
* UserDefinedAction enum. | ||
*/ | ||
public enum UserDefinedAction { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should move this to its own class instead of being nested. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
NO_ACTION, | ||
SKIP_ROW, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a better name would be FILTER There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
SEND_TO_ERROR_COLLECTOR, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ERROR_COLLECTOR is a pipeline specific concept, this can just be SEND_TO_ERROR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But by SEND_TO_ERROR_COLLECTOR, I actually mean the pipeline specific error collector hence the name. |
||
ERROR_PIPELINE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pipeline is not a great name, as wrangler can be used outside of a pipeline when making HTTP requests. This can just be ERROR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
NULLABLE | ||
harshdeeppruthi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ | |
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; | ||
import io.cdap.wrangler.proto.workspace.v2.StageSpec; | ||
import io.cdap.wrangler.proto.workspace.v2.Workspace; | ||
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; | ||
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; | ||
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; | ||
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; | ||
|
@@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque | |
|
||
WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); | ||
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); | ||
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logic should be in the getNullabilityMap() class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
new HashMap<>() : executionRequest.getNullabilityMap(); | ||
if (!nullabilityMap.isEmpty()) { | ||
//change nullabilityMap in Workspace Object | ||
changeNullability(nullabilityMap, workspaceId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should update the workspace after executing the directives (this is already happening), not before. Otherwise the execution can fail and now there's a partially updated workspace. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need the updated nullabilityMap before directives are executed. |
||
} | ||
List<Row> result = executeDirectives(ns.getName(), directives, detail, | ||
userDirectivesCollector); | ||
DirectiveExecutionResponse response = generateExecutionResponse(result, | ||
|
@@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque | |
return response; | ||
} | ||
|
||
private void changeNullability(HashMap<String, UserDefinedAction> columnMappings, | ||
WorkspaceId workspaceId) throws Exception { | ||
try { | ||
Workspace workspace = wsStore.getWorkspace(workspaceId); | ||
workspace.setColumnMappings(columnMappings); | ||
wsStore.updateWorkspace(workspaceId, workspace); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Error in setting nullabilityMap of columns ", e); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
|
||
/** | ||
* Get source specs, contains some hacky way on dealing with the csv parser | ||
*/ | ||
|
@@ -580,7 +599,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St | |
// load the udd | ||
composite.reload(namespace); | ||
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()), | ||
grammarVisitor); | ||
grammarVisitor, detail.getWorkspace()); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2024