From 44a2734bda85945f42b034a67da18f3d43f7d3a5 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Fri, 4 Oct 2024 08:30:03 +0000 Subject: [PATCH] Introduce inputFormat getters so it can be overriden by child classes --- .../format/plugin/AbstractFileSource.java | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java index bd198ba84..ea024ffb8 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java +++ b/format-common/src/main/java/io/cdap/plugin/format/plugin/AbstractFileSource.java @@ -120,11 +120,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { // are not macro Schema schema = config.getSchema(); - PluginProperties.Builder builder = PluginProperties.builder(); - builder.addAll(config.getRawProperties().getProperties()); - - ValidatingInputFormat validatingInputFormat = - pipelineConfigurer.usePlugin(ValidatingInputFormat.PLUGIN_TYPE, fileFormat, fileFormat, builder.build()); + ValidatingInputFormat validatingInputFormat = getValidatingInputFormat(pipelineConfigurer); FormatContext context = new FormatContext(collector, null); if (validatingInputFormat != null && schema == null && !config.containsMacro("schema")) { @@ -148,25 +144,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { pipelineConfigurer.getStageConfigurer().setOutputSchema(schema); } + protected ValidatingInputFormat getValidatingInputFormat(PipelineConfigurer pipelineConfigurer) { + return pipelineConfigurer.usePlugin(ValidatingInputFormat.PLUGIN_TYPE, config.getFormatName(), + config.getFormatName(), config.getRawProperties()); + } + @Override public void prepareRun(BatchSourceContext context) throws Exception { FailureCollector collector = context.getFailureCollector(); config.validate(collector); String fileFormat = config.getFormatName(); - ValidatingInputFormat validatingInputFormat; - try { - validatingInputFormat = context.newPluginInstance(fileFormat); - } catch (InvalidPluginConfigException e) { - Set properties = new HashSet<>(e.getMissingProperties()); - for (InvalidPluginProperty invalidProperty: e.getInvalidProperties()) { - properties.add(invalidProperty.getName()); - } - String errorMessage = String.format("Format '%s' cannot be used because properties %s were not provided or " + - "were invalid when the pipeline was deployed. Set the format to a " + - "different value, or re-create the pipeline with all required properties.", - fileFormat, properties); - throw new IllegalArgumentException(errorMessage, e); - } + ValidatingInputFormat validatingInputFormat = getInputFormatForRun(context); FormatContext formatContext = new FormatContext(collector, null); Schema schema = context.getOutputSchema() == null ? @@ -239,6 +227,25 @@ public void prepareRun(BatchSourceContext context) throws Exception { context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider(inputFormatClass, conf))); } + protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context) + throws InstantiationException { + String fileFormat = config.getFormatName(); + try { + return context.newPluginInstance(fileFormat); + } catch (InvalidPluginConfigException e) { + Set properties = new HashSet<>(e.getMissingProperties()); + for (InvalidPluginProperty invalidProperty: e.getInvalidProperties()) { + properties.add(invalidProperty.getName()); + } + String errorMessage = String.format("Format '%s' cannot be used because properties %s " + + "were not provided or were invalid when the pipeline was deployed. " + + "Set the format to a different value, " + + "or re-create the pipeline with all required properties.", + fileFormat, properties); + throw new IllegalArgumentException(errorMessage, e); + } + } + @Override public void transform(KeyValue input, Emitter emitter) throws Exception {