diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Precondition.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Precondition.java index 6fae9d470..22fbecd26 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Precondition.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Precondition.java @@ -16,7 +16,10 @@ package io.cdap.wrangler; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.wrangler.api.Row; +import jdk.jpackage.internal.Log; import org.apache.commons.jexl3.scripting.JexlScriptEngine; import javax.script.Bindings; @@ -41,11 +44,7 @@ public Precondition(String condition) throws PreconditionException { try { script = engine.compile(condition); } catch (ScriptException e) { - if (e.getCause() != null) { - throw new PreconditionException(e.getCause().getMessage()); - } else { - throw new PreconditionException(e.getMessage()); - } + throw getProgramFailureExceptionDueToPrecondition(condition, e); } } @@ -61,7 +60,7 @@ public ScriptContext createContext() { return context; } - public boolean apply(Row row) throws PreconditionException { + public boolean apply(Row row) { Bindings bindings = new SimpleBindings(); for (int i = 0; i < row.width(); ++i) { bindings.put(row.getColumn(i), row.getValue(i)); @@ -72,20 +71,33 @@ public boolean apply(Row row) throws PreconditionException { scriptContext.setBindings(bindings, ScriptContext.ENGINE_SCOPE); Object result = script.eval(scriptContext); if (!(result instanceof Boolean)) { - throw new PreconditionException( - String.format("Precondition '%s' does not result in true or false.", condition) - ); + String errorMessage = String.format("Precondition '%s' does not result in true or false.", + condition); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(null, errorMessage, + ErrorType.USER, false); } return (Boolean) result; } catch (ScriptException e) { // Generally JexlException wraps the original exception, so it's good idea // to check if there is a inner exception, if there is wrap it in 'DirectiveExecutionException' // else just print the error message. - if (e.getCause() != null) { - throw new PreconditionException(e.getCause().getMessage()); - } else { - throw new PreconditionException(e.getMessage()); - } + throw getProgramFailureExceptionDueToPrecondition(condition, e); + } + } + + private static ProgramFailureException getProgramFailureExceptionDueToPrecondition( + String condition, ScriptException e) { + String errorMessage; + if (e.getCause() != null) { + Log.error(String.format("Error in evaluating precondition '%s'. %s: %s", condition, + e.getClass().getName(), e.getCause().getMessage())); + errorMessage = e.getCause().getMessage(); + } else { + Log.error(String.format("Error in evaluating precondition '%s'. %s: %s", condition, + e.getClass().getName(), e.getMessage())); + errorMessage = e.getMessage(); } + return WranglerUtil.getProgramFailureExceptionDetailsFromChain( + new PreconditionException(errorMessage), null, ErrorType.SYSTEM, false); } } diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..fdcaee943 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -24,6 +24,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.api.plugin.PluginProperties; @@ -53,6 +56,7 @@ import io.cdap.wrangler.api.EntityCountMetric; import io.cdap.wrangler.api.ErrorRecord; import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.RecipeException; import io.cdap.wrangler.api.RecipeParser; import io.cdap.wrangler.api.RecipePipeline; import io.cdap.wrangler.api.RecipeSymbol; @@ -73,6 +77,7 @@ import io.cdap.wrangler.registry.SystemDirectiveRegistry; import io.cdap.wrangler.registry.UserDirectiveRegistry; import io.cdap.wrangler.utils.StructuredToRowTransformer; +import jdk.jpackage.internal.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,9 +248,13 @@ public void configurePipeline(PipelineConfigurer configurer) { } } } catch (CompileException e) { - collector.addFailure("Compilation error occurred : " + e.getMessage(), null); + collector.addFailure( + String.format("Compilation error occurred, %s: %s ", e.getClass().getName(), + e.getMessage()), null); } catch (DirectiveParseException e) { - collector.addFailure(e.getMessage(), null); + collector.addFailure( + String.format("Error parsing directive, %s: %s", e.getClass().getName(), + e.getMessage()), null); } // Based on the configuration create output schema. @@ -254,8 +263,9 @@ public void configurePipeline(PipelineConfigurer configurer) { oSchema = Schema.parseJson(config.schema); } } catch (IOException e) { - collector.addFailure("Invalid output schema.", null) - .withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace()); + collector.addFailure( + String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()), + null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace()); } // Check if jexl pre-condition is not null or empty and if so compile expression. @@ -265,7 +275,9 @@ && checkPreconditionNotEmpty(false)) { try { new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { - collector.addFailure(e.getMessage(), null).withConfigProperty(Config.NAME_PRECONDITION); + collector.addFailure(String.format("Error compiling precondition expression, %s: %s", + e.getClass().getName(), e.getMessage()), null) + .withConfigProperty(Config.NAME_PRECONDITION); } } } @@ -276,8 +288,11 @@ && checkPreconditionNotEmpty(false)) { } } catch (Exception e) { - LOG.error(e.getMessage()); - collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace()); + LOG.error("Error occurred during configuration of the plugin, {}: {}", e.getClass().getName(), + e.getMessage()); + collector.addFailure( + String.format("Error occurred during configuration of the plugin, %s: %s", + e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace()); } } @@ -319,7 +334,12 @@ public void prepareRun(StageSubmitterContext context) throws Exception { // Parse the recipe and extract all the instances of directives // to be processed for extracting lineage. RecipeParser recipe = getRecipeParser(context); - List directives = recipe.parse(); + List directives; + try { + directives = recipe.parse(); + } catch (RecipeException e) { + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.SYSTEM, false); + } emitDirectiveMetrics(directives, context.getMetrics()); LineageOperations lineageOperations = new LineageOperations(input, output, directives); @@ -345,10 +365,11 @@ public void initialize(TransformContext context) throws Exception { try { oSchema = Schema.parseJson(config.schema); } catch (IOException e) { - throw new IllegalArgumentException( - String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.", - context.getStageName()), e - ); + String errorMessage = String.format("Error in stage '%s'. Format of output schema specified " + + "is invalid. Please check the format. %s: %s", context.getStageName(), + e.getClass().getName(), e.getMessage()); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain( + new IllegalArgumentException(errorMessage, e), errorMessage, ErrorType.USER, false); } // Check if jexl pre-condition is not null or empty and if so compile expression. @@ -358,7 +379,7 @@ && checkPreconditionNotEmpty(false)) { try { condition = new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { - throw new IllegalArgumentException(e.getMessage(), e); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.SYSTEM, false); } } } @@ -367,7 +388,11 @@ && checkPreconditionNotEmpty(false)) { // Create the pipeline executor with context being set. pipeline = new RecipePipelineExecutor(recipe, ctx); } catch (Exception e) { - throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e); + String errorMessage = String.format( + "Error in stage '%s'. Please check the configuration or input data. %s: %s", + context.getStageName(), e.getClass().getName(), e.getMessage()); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, errorMessage, + ErrorType.SYSTEM, false); } String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT); @@ -437,8 +462,10 @@ && checkPreconditionNotEmpty(false)) { } if (WRANGLER_FAIL_PIPELINE_FOR_ERROR.isEnabled(getContext()) && onErrorStrategy.equalsIgnoreCase(ON_ERROR_FAIL_PIPELINE)) { - throw new Exception( - String.format("Errors in Wrangler Transformation - %s", errorMessages)); + String errorReason = String.format("Errors in Wrangler Transformation - %s", + errorMessages); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(null, errorReason, + ErrorType.SYSTEM, true); } } } catch (Exception e) { @@ -457,8 +484,10 @@ && checkPreconditionNotEmpty(false)) { getContext().getStageName(), e.getMessage()), "value", String.valueOf(errorCounter) )); - throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s", - getContext().getStageName(), e.getMessage()), e); + String errorMessage = String.format("Pipeline failed at stage:%s, %s: %s", + getContext().getStageName(), e.getClass().getName(), e.getMessage()); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, errorMessage, + ErrorType.SYSTEM, true); } // If it's 'skip-on-error' we continue processing and don't emit any error records. return; @@ -553,18 +582,32 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) { * @throws DirectiveLoadException * @throws DirectiveParseException */ - private RecipeParser getRecipeParser(StageContext context) - throws DirectiveLoadException, DirectiveParseException { + private RecipeParser getRecipeParser(StageContext context) { registry = new CompositeDirectiveRegistry(SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context)); - registry.reload(context.getNamespace()); + try { + registry.reload(context.getNamespace()); + } catch (DirectiveLoadException e) { + Log.error(String.format("Failed to reload the directive registry for namespace " + + "'%s' at stage '%s'. Please verify the namespace and ensure the directives are " + + "correctly configured. %s: %s", context.getNamespace(), context.getStageName(), + e.getClass().getName(), e.getMessage())); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.USER, false); + } String directives = config.getDirectives(); if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) { directives = String.format("#pragma load-directives %s;%s", config.getUDDs(), config.getDirectives()); } - return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry); + try { + return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry); + } catch (DirectiveParseException e) { + Log.error(String.format("Failed to parse directives for namespace '%s' at stage " + + "'%s'. Please verify the directives and ensure they are correctly formatted. %s, %s", + context.getNamespace(), context.getStageName(), e.getClass().getName(), e.getMessage())); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.USER, false); + } } @Override @@ -573,7 +616,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R && checkPreconditionNotEmpty(true)) { if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { - throw new RuntimeException("SQL Precondition feature is not available"); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(null, + "SQL Precondition feature is not available", ErrorType.SYSTEM, true); } Optional> expressionFactory = getExpressionFactory(relationalTranformContext); @@ -598,11 +642,18 @@ private Optional> getExpressionFactory(RelationalTranf * @param directives a list of Wrangler directives * @param metrics CDAP {@link Metrics} object using which metrics can be emitted */ - private void emitDirectiveMetrics(List directives, Metrics metrics) throws DirectiveLoadException { + private void emitDirectiveMetrics(List directives, Metrics metrics) { for (Directive directive : directives) { // skip emitting metrics if the directive is not system directive - if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) { - continue; + try { + if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) { + continue; + } + } catch (DirectiveLoadException e) { + Log.error(String.format("Error loading system directive '%s'. %s: %s", + directive.define().getDirectiveName(), e.getClass().getName(), e.getMessage())); + throw WranglerUtil.getProgramFailureExceptionDetailsFromChain(e, null, ErrorType.SYSTEM, + false); } List countMetrics = new ArrayList<>(); diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/WranglerUtil.java b/wrangler-transform/src/main/java/io/cdap/wrangler/WranglerUtil.java new file mode 100644 index 000000000..5840a06e0 --- /dev/null +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/WranglerUtil.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2016-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.wrangler.api.DirectiveLoadException; +import io.cdap.wrangler.api.DirectiveParseException; +import io.cdap.wrangler.api.RecipeException; +import java.util.List; + +/** + * Util file to handle exceptions caught in Wrangler plugin + */ +public class WranglerUtil { + + private WranglerUtil() { + throw new IllegalStateException("Utility class"); + } + + public static ProgramFailureException getProgramFailureExceptionDetailsFromChain(Throwable e, + String errorMessage, ErrorType errorType, boolean dependency) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // Avoid double wrap + return (ProgramFailureException) t; + } + if (t instanceof DirectiveLoadException) { + return getProgramFailureException((DirectiveLoadException) t, errorType, dependency); + } + if (t instanceof DirectiveParseException) { + return getProgramFailureException((DirectiveParseException) t, errorType, dependency); + } + if (t instanceof RecipeException) { + return getProgramFailureException((RecipeException) t, errorType, dependency); + } + if (t instanceof PreconditionException) { + return getProgramFailureException((PreconditionException) t, errorType, dependency); + } + } + // If no predefined exception found in the causal chain, return generic program failure exception + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + errorType, dependency, e); + } + + /** + * Get a ProgramFailureException with the given error information from {@link Exception}. + * + * @param exception The Exception to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private static ProgramFailureException getProgramFailureException(Exception exception, + ErrorType errorType, boolean dependency) { + String errorMessage = exception.getMessage(); + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + errorType, dependency, exception); + } + +}