Skip to content

Commit

Permalink
Merge pull request #1892 from cdapio/PLUGIN-1807
Browse files Browse the repository at this point in the history
[PLUGIN-1807] Introduce inputFormat getters so it can be overriden by child classes
  • Loading branch information
itsankit-google authored Oct 4, 2024
2 parents 12d4727 + 44a2734 commit 2cc85a5
Showing 1 changed file with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// are not macro
Schema schema = config.getSchema();

PluginProperties.Builder builder = PluginProperties.builder();
builder.addAll(config.getRawProperties().getProperties());

ValidatingInputFormat validatingInputFormat =
pipelineConfigurer.usePlugin(ValidatingInputFormat.PLUGIN_TYPE, fileFormat, fileFormat, builder.build());
ValidatingInputFormat validatingInputFormat = getValidatingInputFormat(pipelineConfigurer);
FormatContext context = new FormatContext(collector, null);

if (validatingInputFormat != null && schema == null && !config.containsMacro("schema")) {
Expand All @@ -148,25 +144,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
}

protected ValidatingInputFormat getValidatingInputFormat(PipelineConfigurer pipelineConfigurer) {
return pipelineConfigurer.usePlugin(ValidatingInputFormat.PLUGIN_TYPE, config.getFormatName(),
config.getFormatName(), config.getRawProperties());
}

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
FailureCollector collector = context.getFailureCollector();
config.validate(collector);
String fileFormat = config.getFormatName();
ValidatingInputFormat validatingInputFormat;
try {
validatingInputFormat = context.newPluginInstance(fileFormat);
} catch (InvalidPluginConfigException e) {
Set<String> properties = new HashSet<>(e.getMissingProperties());
for (InvalidPluginProperty invalidProperty: e.getInvalidProperties()) {
properties.add(invalidProperty.getName());
}
String errorMessage = String.format("Format '%s' cannot be used because properties %s were not provided or " +
"were invalid when the pipeline was deployed. Set the format to a " +
"different value, or re-create the pipeline with all required properties.",
fileFormat, properties);
throw new IllegalArgumentException(errorMessage, e);
}
ValidatingInputFormat validatingInputFormat = getInputFormatForRun(context);

FormatContext formatContext = new FormatContext(collector, null);
Schema schema = context.getOutputSchema() == null ?
Expand Down Expand Up @@ -239,6 +227,25 @@ public void prepareRun(BatchSourceContext context) throws Exception {
context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider(inputFormatClass, conf)));
}

protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context)
throws InstantiationException {
String fileFormat = config.getFormatName();
try {
return context.newPluginInstance(fileFormat);
} catch (InvalidPluginConfigException e) {
Set<String> properties = new HashSet<>(e.getMissingProperties());
for (InvalidPluginProperty invalidProperty: e.getInvalidProperties()) {
properties.add(invalidProperty.getName());
}
String errorMessage = String.format("Format '%s' cannot be used because properties %s "
+ "were not provided or were invalid when the pipeline was deployed. "
+ "Set the format to a different value, "
+ "or re-create the pipeline with all required properties.",
fileFormat, properties);
throw new IllegalArgumentException(errorMessage, e);
}
}

@Override
public void transform(KeyValue<NullWritable, StructuredRecord> input,
Emitter<StructuredRecord> emitter) throws Exception {
Expand Down

0 comments on commit 2cc85a5

Please sign in to comment.