Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit-CloudSufi committed Jan 31, 2025
1 parent 5267f3e commit 877c7c6
Showing 1 changed file with 104 additions and 99 deletions.
203 changes: 104 additions & 99 deletions wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,123 +171,128 @@ public void configurePipeline(PipelineConfigurer configurer) {
super.configurePipeline(configurer);
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();

Schema iSchema = configurer.getStageConfigurer().getInputSchema();
if (!config.containsMacro(Config.NAME_FIELD) && !(config.getField().equals("*")
|| config.getField().equals("#"))) {
validateInputSchema(iSchema, collector);
}

String directives = config.getDirectives();
if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) {
if (config.containsMacro("directives")) {
directives = String.format("#pragma load-directives %s;", config.getUDDs());
} else {
directives = String.format("#pragma load-directives %s;%s", config.getUDDs(), config.getDirectives());
try {
Schema iSchema = configurer.getStageConfigurer().getInputSchema();
if (!config.containsMacro(Config.NAME_FIELD) && !(config.getField().equals("*")
|| config.getField().equals("#"))) {
validateInputSchema(iSchema, collector);
}
}

if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) {
validatePrecondition(config.getPreconditionSQL(), true, collector);
}
validateSQLModeDirectives(collector);
} else {
if (!config.containsMacro(Config.NAME_PRECONDITION)) {
validatePrecondition(config.getPreconditionJEXL(), false, collector);
String directives = config.getDirectives();
if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) {
if (config.containsMacro("directives")) {
directives = String.format("#pragma load-directives %s;", config.getUDDs());
} else {
directives = String.format("#pragma load-directives %s;%s", config.getUDDs(),
config.getDirectives());
}
}
}

// Validate the DSL by compiling the DSL. In case of macros being
// specified, the compilation will them at this phase.
Compiler compiler = new RecipeCompiler();
try {
// Compile the directive extracting the loadable plugins (a.k.a
// Directives in this context).
CompileStatus status = compiler.compile(new MigrateToV2(directives).migrate());
RecipeSymbol symbols = status.getSymbols();
if (symbols != null) {
Set<String> dynamicDirectives = symbols.getLoadableDirectives();
for (String directive : dynamicDirectives) {
Object directivePlugin = configurer.usePlugin(Directive.TYPE, directive, directive,
PluginProperties.builder().build());
if (directivePlugin == null) {
collector.addFailure(
String.format("User Defined Directive '%s' is not deployed or is not available.",
directive),
"Ensure the directive is deployed.")
.withPluginNotFound(directive, directive, Directive.TYPE)
.withConfigElement(Config.NAME_UDD, directive);
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) {
validatePrecondition(config.getPreconditionSQL(), true, collector);
}
validateSQLModeDirectives(collector);
} else {
if (!config.containsMacro(Config.NAME_PRECONDITION)) {
validatePrecondition(config.getPreconditionJEXL(), false, collector);
}
}
// If the 'directives' contains macro, then we would not attempt to compile
// it.
if (!config.containsMacro(Config.NAME_DIRECTIVES)) {
// Create the registry that only interacts with system directives.
registry = SystemDirectiveRegistry.INSTANCE;

Iterator<TokenGroup> iterator = symbols.iterator();
while (iterator.hasNext()) {
TokenGroup group = iterator.next();
if (group != null) {
String directive = (String) group.get(0).value();
DirectiveInfo directiveInfo = registry.get("", directive);
if (directiveInfo == null && !dynamicDirectives.contains(directive)) {
collector.addFailure(
String.format(
"Wrangler plugin has a directive '%s' that does not exist in system or " +
"user space.", directive),
"Ensure the directive is loaded or the directive name is correct.")
.withConfigProperty(Config.NAME_DIRECTIVES);
}

// Validate the DSL by compiling the DSL. In case of macros being
// specified, the compilation will them at this phase.
Compiler compiler = new RecipeCompiler();
try {
// Compile the directive extracting the loadable plugins (a.k.a
// Directives in this context).
CompileStatus status = compiler.compile(new MigrateToV2(directives).migrate());
RecipeSymbol symbols = status.getSymbols();
if (symbols != null) {
Set<String> dynamicDirectives = symbols.getLoadableDirectives();
for (String directive : dynamicDirectives) {
Object directivePlugin = configurer.usePlugin(Directive.TYPE, directive, directive,
PluginProperties.builder().build());
if (directivePlugin == null) {
collector.addFailure(
String.format("User Defined Directive '%s' is not deployed or is not available.",
directive),
"Ensure the directive is deployed.")
.withPluginNotFound(directive, directive, Directive.TYPE)
.withConfigElement(Config.NAME_UDD, directive);
}
}
// If the 'directives' contains macro, then we would not attempt to compile
// it.
if (!config.containsMacro(Config.NAME_DIRECTIVES)) {
// Create the registry that only interacts with system directives.
registry = SystemDirectiveRegistry.INSTANCE;

Iterator<TokenGroup> iterator = symbols.iterator();
while (iterator.hasNext()) {
TokenGroup group = iterator.next();
if (group != null) {
String directive = (String) group.get(0).value();
DirectiveInfo directiveInfo = registry.get("", directive);
if (directiveInfo == null && !dynamicDirectives.contains(directive)) {
collector.addFailure(
String.format(
"Wrangler plugin has a directive '%s' that does not exist in system or " +
"user space.", directive),
"Ensure the directive is loaded or the directive name is correct.")
.withConfigProperty(Config.NAME_DIRECTIVES);
}
}
}
}
}
} catch (CompileException e) {
collector.addFailure(
String.format("Compilation error occurred, %s: %s ", e.getClass().getName(),
e.getMessage()), null);
} catch (DirectiveParseException e) {
collector.addFailure(
String.format("Error parsing directive, %s: %s", e.getClass().getName(),
e.getMessage()), null);
} catch (DirectiveLoadException e) {
collector.addFailure(
String.format("Error loading directive, %s: %s", e.getClass().getName(),
e.getMessage()), null);
}
} catch (CompileException e) {
collector.addFailure(
String.format("Compilation error occurred, %s: %s ", e.getClass().getName(),
e.getMessage()), null);
} catch (DirectiveParseException e) {
collector.addFailure(
String.format("Error parsing directive, %s: %s", e.getClass().getName(),
e.getMessage()), null);
} catch (DirectiveLoadException e) {
collector.addFailure(
String.format("Error loading directive, %s: %s", e.getClass().getName(),
e.getMessage()), null);
}

// Based on the configuration create output schema.
try {
if (!config.containsMacro(Config.NAME_SCHEMA)) {
oSchema = Schema.parseJson(config.schema);
// Based on the configuration create output schema.
try {
if (!config.containsMacro(Config.NAME_SCHEMA)) {
oSchema = Schema.parseJson(config.schema);
}
} catch (IOException e) {
collector.addFailure(
String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()),
null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
}
} catch (IOException e) {
collector.addFailure(
String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()),
null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(
Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
new Precondition(config.getPreconditionJEXL());
} catch (PreconditionException e) {
collector.addFailure(String.format("Error compiling precondition expression, %s: %s",
e.getClass().getName(), e.getMessage()), null)
.withConfigProperty(Config.NAME_PRECONDITION);
// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(
Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
new Precondition(config.getPreconditionJEXL());
} catch (PreconditionException e) {
collector.addFailure(String.format("Error compiling precondition expression, %s: %s",
e.getClass().getName(), e.getMessage()), null)
.withConfigProperty(Config.NAME_PRECONDITION);
}
}
}
}

// Set the output schema.
if (oSchema != null) {
configurer.getStageConfigurer().setOutputSchema(oSchema);
// Set the output schema.
if (oSchema != null) {
configurer.getStageConfigurer().setOutputSchema(oSchema);
}
} catch (Exception e) {
collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace());
}
}

Expand Down

0 comments on commit 877c7c6

Please sign in to comment.