From bb08ab6bd7df9365b5fd26ca27503a8c1ee8eb8e Mon Sep 17 00:00:00 2001 From: tgmof <31918716+tgmof@users.noreply.github.com> Date: Wed, 16 Nov 2022 10:05:51 +0100 Subject: [PATCH] Fix record conversion for Arrays Issue summary: I cannot use the Wrangler or any other XML plugin provided for a (a priori) simple use case which consist of importing (nested/repeated) XML data (that have repeated columns, i.e. JSON Arrays) to whatever sink. Steps to reproduce: 1. Create a pipeline GCS->Wrangler->Whatever sink (with the input path in GCS set as a runtime variable). 2. Use the following sample to create the output schema (with the xml-to-json transform) and run the pipeline with this file. 65.95 3.98 65.95 2022-10-03T11:01:48 3. Oberve that the pipeline is successful. 4. Change the source to a new file: 65.95 2022-10-03T11:01:48 5. Observe that the pipeline fails with the "Unable to decode array 'body_MyRoot_SomeField'" error. Why this PR? Because there is no general way to know when an XML contains repeated columns or not and thus everything should be expected to be repeated. Why I think it's a good idea to do that in the standard CDAP code: 1. Correct me if I'm wrong but this RecordConvertor.java is meant to convert the input Runtime data to match the Output Schema. It is NOT meant to "VALIDATE the input against the output schema". 2. It is a "high level" data type since an array is always filled with elements that have a type themselves (or no element but then we won't have any issue in the first place) thus doing this Collections.singletonList(object) is pretty much the "array equivalent" of doing Double.parseDouble(value); (which is already in this code) i.e. we basically cast the input to match the output schema. --- .../main/java/io/cdap/wrangler/utils/RecordConvertor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/RecordConvertor.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/RecordConvertor.java index 1b7e995a4..90b55266a 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/RecordConvertor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/RecordConvertor.java @@ -37,6 +37,7 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeParseException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -434,10 +435,9 @@ private List decodeArray(String name, Object object, Schema schema) thro return decodeArray(name, (List) object, schema); } else if (object instanceof JsonArray) { return decodeArray(name, (JsonArray) object, schema); + } else { + return decodeArray(name, Collections.singletonList(object), schema); } - throw new RecordConvertorException( - String.format("Unable to decode array '%s'", name) - ); } private List decodeArray(String name, JsonArray list, Schema schema) throws RecordConvertorException {