Skip to content

Commit

Permalink
addressing review comments -1
Browse files Browse the repository at this point in the history
  • Loading branch information
minurajeeve committed Feb 13, 2024
1 parent aa0e088 commit e4a3a43
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 76 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.utils.RecordConvertor;
import io.cdap.wrangler.utils.RecordConvertorException;
import io.cdap.wrangler.utils.SchemaConverter;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import javax.annotation.Nullable;

Expand All @@ -59,13 +58,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu
private final RecipeParser recipeParser;
private final ExecutorContext context;
private List<Directive> directives;
private HashMap<String, UserDefinedAction> nullabilityMap;
private final Map<String, UserDefinedAction> nullabilityMap;

public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context,
HashMap<String, UserDefinedAction> nullabilityMap) {
Map<String, UserDefinedAction> nullabilityMap) {
this.context = context;
this.recipeParser = recipeParser;
this.nullabilityMap = nullabilityMap;
this.nullabilityMap = new HashMap<>(nullabilityMap);
}

/**
Expand Down
3 changes: 2 additions & 1 deletion wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.schema.TransientStoreKeys;
import java.util.Collections;
import org.junit.Assert;

import java.util.Iterator;
Expand Down Expand Up @@ -89,7 +90,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, context, null).execute(rows);
return new RecipePipelineExecutor(parser, context, Collections.emptyMap()).execute(rows);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@

package io.cdap.wrangler.proto.workspace.v2;

import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Directive execution request for v2 endpoint
*/
public class DirectiveExecutionRequest {
private final List<String> directives;
private final int limit;
private final HashMap<String, UserDefinedAction> nullabilityMap;
private final Map<String, UserDefinedAction> nullabilityMap;


public DirectiveExecutionRequest(List<String> directives, int limit,
HashMap<String, UserDefinedAction> nullabilityMap) {
Map<String, UserDefinedAction> nullabilityMap) {
this.directives = directives;
this.limit = limit;
this.nullabilityMap = nullabilityMap;
Expand All @@ -46,7 +47,8 @@ public List<String> getDirectives() {
return directives == null ? Collections.emptyList() : directives;
}

public HashMap<String, UserDefinedAction> getNullabilityMap() {
return nullabilityMap;
public Map<String, UserDefinedAction> getNullabilityMap() {

return nullabilityMap == null ? Collections.emptyMap() : nullabilityMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cdap.wrangler.proto.workspace.v2;

/**
* UserDefinedAction enum.
*/
public enum UserDefinedAction {
FILTER,
SEND_TO_ERROR_COLLECTOR,
ERROR,
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.gson.JsonObject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;

Expand All @@ -39,20 +41,19 @@ public class Workspace {
// this is for insights page in UI
private final JsonObject insights;

private HashMap<String, UserDefinedAction> nullabilityMap;
private final Map<String, UserDefinedAction> nullabilityMap;

private Workspace(String workspaceName, String workspaceId, List<String> directives,
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec,
JsonObject insights, HashMap<String, UserDefinedAction> nullabilityMap) {
JsonObject insights, Map<String, UserDefinedAction> nullabilityMap) {
this.workspaceName = workspaceName;
this.workspaceId = workspaceId;
this.directives = directives;
this.createdTimeMillis = createdTimeMillis;
this.updatedTimeMillis = updatedTimeMillis;
this.sampleSpec = sampleSpec;
this.insights = insights;
this.nullabilityMap = nullabilityMap == null || nullabilityMap.isEmpty() ?
new HashMap<>() : nullabilityMap;
this.nullabilityMap = Collections.unmodifiableMap(new HashMap(nullabilityMap));
}

public String getWorkspaceName() {
Expand Down Expand Up @@ -84,15 +85,10 @@ public JsonObject getInsights() {
return insights;
}

public HashMap<String, UserDefinedAction> getNullabilityMap() {
public Map<String, UserDefinedAction> getNullabilityMap() {
return nullabilityMap;
}

public void setNullabilityMap(
HashMap<String, UserDefinedAction> nullabilityMap) {
this.nullabilityMap = nullabilityMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -140,7 +136,7 @@ public static class Builder {
private long updatedTimeMillis;
private SampleSpec sampleSpec;
private JsonObject insights;
private HashMap<String, UserDefinedAction> nullabilityMap;
private Map<String, UserDefinedAction> nullabilityMap;

Builder(String name, String workspaceId) {
this.workspaceName = name;
Expand Down Expand Up @@ -175,7 +171,7 @@ public Builder setInsights(JsonObject insights) {
return this;
}

public Builder setNullabilityMap (HashMap<String, UserDefinedAction> nullabilityMap) {
public Builder setNullabilityMap(Map<String, UserDefinedAction> nullabilityMap) {
this.nullabilityMap = nullabilityMap;
return this;
}
Expand All @@ -185,13 +181,4 @@ public Workspace build() {
insights, nullabilityMap);
}
}

/**
* UserDefinedAction enum.
*/
public enum UserDefinedAction {
SKIP_ROW,
SEND_TO_ERROR_COLLECTOR,
ERROR_PIPELINE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse;
import io.cdap.wrangler.proto.workspace.v2.SampleSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
Expand Down Expand Up @@ -122,7 +122,8 @@ protected <E extends Exception> List<Row> executeDirectives(
List<String> directives,
List<Row> sample,
GrammarWalker.Visitor<E> grammarVisitor,
HashMap<String, UserDefinedAction> nullabilityMap) throws DirectiveParseException, E, RecipeException {
Map<String, UserDefinedAction> nullabilityMap)
throws DirectiveParseException, E, RecipeException {

if (directives.isEmpty()) {
return sample;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse;
import io.cdap.wrangler.proto.workspace.v2.StageSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId;
Expand Down Expand Up @@ -474,10 +474,9 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque

WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ?
new HashMap<>() : executionRequest.getNullabilityMap();
Map<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap();
if (!nullabilityMap.isEmpty()) {
//change nullabilityMap in Workspace Object
//create new workspace object with the new nullabilityMap
changeNullability(nullabilityMap, workspaceId);
}
List<Row> result = executeDirectives(ns.getName(), directives, detail,
Expand All @@ -492,16 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
return response;
}

private void changeNullability(HashMap<String, UserDefinedAction> columnMappings,
WorkspaceId workspaceId) throws Exception {
private void changeNullability(Map<String, UserDefinedAction> nullabilityMap,
WorkspaceId workspaceId) throws Exception {
try {
Workspace workspace = wsStore.getWorkspace(workspaceId);
workspace.setNullabilityMap(columnMappings);
wsStore.updateWorkspace(workspaceId, workspace);
Workspace newWorkspace = Workspace.builder(workspace)
.setUpdatedTimeMillis(System.currentTimeMillis())
.setNullabilityMap(nullabilityMap).build();
wsStore.updateWorkspace(workspaceId, newWorkspace);
} catch (Exception e) {
throw new RuntimeException("Error in setting nullabilityMap of columns ", e);
}
}
}


/**
Expand Down

0 comments on commit e4a3a43

Please sign in to comment.