diff --git a/docs/GoogleSheets-batchsource.md b/docs/GoogleSheets-batchsource.md index fe10937..617d25c 100644 --- a/docs/GoogleSheets-batchsource.md +++ b/docs/GoogleSheets-batchsource.md @@ -150,6 +150,9 @@ Names Row contains less number of columns. **Read Buffer Size:** Number of rows the source reads with a single API request. Default value is 100. +**Enable Cleansing Column Names:** Toggle that specifies whether to cleanse column names containing special characters. +Special characters will be replaced by underscores. + ### Steps to Generate OAuth2 Credentials 1. Create credentials for the Client ID and Client Secret properties [here](https://console.cloud.google.com/apis/credentials). 2. On the Create OAuth client ID page, under Authorized redirect URIs, specify a URI of `http://localhost:8080`. diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java index 91f12cd..f7b14dd 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java @@ -27,6 +27,7 @@ import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.LineageRecorder; @@ -51,11 +52,14 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); config.validate(failureCollector); failureCollector.getOrThrowException(); - pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(failureCollector)); + Schema configuredSchema = config.getSchema(failureCollector); + failureCollector.getOrThrowException(); + stageConfigurer.setOutputSchema(configuredSchema); } @Override diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java index 53c816b..03939c8 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java @@ -99,6 +99,8 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { private static final Logger LOG = LoggerFactory.getLogger(GoogleSheetsSourceConfig.class); private static final Pattern CELL_ADDRESS = Pattern.compile("^([A-Z]+)([0-9]+)$"); private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+"); + private static final Pattern COLUMN_NAME = Pattern.compile("^[A-Za-z_][A-Za-z0-9_-]*$"); + public static final String CLEANSE_COLUMN_NAMES = "columnNameCleansingEnabled"; private static LinkedHashMap dataSchemaInfo = new LinkedHashMap<>(); @Name(SHEETS_TO_PULL) @@ -118,7 +120,7 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { @Name(NAME_SCHEMA) @Description("The schema of the table to read.") @Macro - private transient Schema schema = null; + private String schema; @Name(FORMATTING) @Description("Output format for numeric sheet cells. " + @@ -234,6 +236,13 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig { @Macro private String sheetFieldName; + @Nullable + @Name(CLEANSE_COLUMN_NAMES) + @Description("Toggle that specifies whether to cleanse column names containing special characters. " + + "Special characters will be replaced by underscores.") + @Macro + private Boolean columnNameCleansingEnabled; + public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIdentifiers, String formatting, Boolean skipEmptyData, String columnNamesSelection, @Nullable Integer customColumnNamesRow, String metadataFieldName, @@ -242,7 +251,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde @Nullable String lastDataColumn, @Nullable String lastDataRow, @Nullable String metadataCells, @Nullable Integer readBufferSize, @Nullable Boolean addNameFields, @Nullable String spreadsheetFieldName, - @Nullable String sheetFieldName) { + @Nullable String sheetFieldName, @Nullable String schema) { super(referenceName); this.sheetsIdentifiers = sheetsIdentifiers; @@ -262,6 +271,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde this.addNameFields = addNameFields; this.spreadsheetFieldName = spreadsheetFieldName; this.sheetFieldName = sheetFieldName; + this.schema = schema; } @@ -281,9 +291,15 @@ public Schema getSchema(FailureCollector collector) { "Perhaps no validation step was executed before schema generation.") .withConfigProperty(SCHEMA); } - schema = SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values())); + return SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values())); } - return schema; + try { + return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema); + } catch (IOException e) { + collector.addFailure("Invalid schema: " + e.getMessage(), + null).withConfigProperty(NAME_SCHEMA); + } + throw collector.getOrThrowException(); } private boolean shouldGetSchema() { @@ -609,7 +625,7 @@ private LinkedHashMap processColumns(List processSubHeaders(int startIndex, int leng if (StringUtils.isEmpty(subHeaderTitle)) { subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1); } - subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames); + subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames, i); } else { subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1); } @@ -662,7 +678,14 @@ private List processSubHeaders(int startIndex, int leng return subHeaders; } - private String checkTitleFormat(String title, Map seenFieldNames) { + private String checkTitleFormat(String title, Map seenFieldNames, int columnIndex) { + if (getColumnNameCleansingEnabled()) { + return applyFileTitleConvention(title, seenFieldNames); + } + return applySheetTitleConvention(title, columnIndex); + } + + private String applyFileTitleConvention(String title, Map seenFieldNames) { final String replacementChar = "_"; StringBuilder cleanFieldNameBuilder = new StringBuilder(); @@ -692,6 +715,17 @@ private String checkTitleFormat(String title, Map seenFieldName return cleanFieldNameBuilder.toString(); } + private String applySheetTitleConvention(String title, int columnIndex) { + if (!COLUMN_NAME.matcher(title).matches()) { + String defaultColumnName = ColumnAddressConverter.getColumnName(columnIndex + 1); + LOG.warn(String.format("Original column name '%s' doesn't satisfy column name requirements '%s', " + + "the default column name '%s' will be used.", title, COLUMN_NAME.pattern(), + defaultColumnName)); + return defaultColumnName; + } + return title; + } + private Schema getDataCellSchema(List dataRow, int index, String headerName) { Schema dataSchema = Schema.of(Schema.Type.STRING); if (dataRow != null && dataRow.size() > index) { @@ -950,6 +984,11 @@ public boolean getAutoDetectRowsAndColumns() { return Boolean.TRUE.equals(autoDetectRowsAndColumns); } + @Nullable + public boolean getColumnNameCleansingEnabled() { + return Boolean.TRUE.equals(columnNameCleansingEnabled); + } + public Integer getLastDataColumn() { return lastDataColumn == null ? 0 : Integer.parseInt(lastDataColumn); } @@ -1051,8 +1090,8 @@ public void setSheetsIdentifiers(String sheetsIdentifiers) { this.sheetsIdentifiers = sheetsIdentifiers; } - public void setSchema(String schema) throws IOException { - this.schema = Schema.parseJson(schema); + public void setSchema(String schema) { + this.schema = schema; } public void setFormatting(String formatting) { @@ -1143,6 +1182,10 @@ public void setEndDate(String endDate) { this.endDate = endDate; } + public void setColumnNameCleansingEnabled(@Nullable Boolean columnNameCleansingEnabled) { + this.columnNameCleansingEnabled = columnNameCleansingEnabled; + } + public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOException { GoogleSheetsSourceConfig googleSheetsSourceConfig = GoogleSheetsSourceConfig @@ -1326,6 +1369,11 @@ public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOExcept properties.get(GoogleSheetsSourceConfig.AUTO_DETECT_ROWS_AND_COLUMNS).getAsBoolean()); } + if (properties.has(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES)) { + googleSheetsSourceConfig.setColumnNameCleansingEnabled( + properties.get(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES).getAsBoolean()); + } + return googleSheetsSourceConfig; } } diff --git a/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java b/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java index d227aa8..65d8b63 100644 --- a/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java +++ b/src/test/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfigTest.java @@ -321,6 +321,7 @@ public void testProcessColumnsWithLastDataColumnLessThanColumnsRowSize() @Test public void testProcessColumnsInvalidTitles() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + config.setColumnNameCleansingEnabled(true); Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class, List.class, List.class, List.class, int.class, FailureCollector.class); processColumnsMethod.setAccessible(true); @@ -380,6 +381,7 @@ private void setFieldValue(String fieldName, Object fieldValue) throws NoSuchFie @Test public void testProcessColumnsSameCaseSensitiveTitles() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + config.setColumnNameCleansingEnabled(true); Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class, List.class, List.class, List.class, int.class, FailureCollector.class); @@ -413,4 +415,60 @@ public void testProcessColumnsSameCaseSensitiveTitles() Assert.assertEquals("Title_with_space_2", columns.get(1).getHeaderTitle()); Assert.assertEquals("Title_with_space_3", columns.get(2).getHeaderTitle()); } + + @Test + public void testProcessColumnsInvalidTitlesOldSchema() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class, + List.class, List.class, List.class, int.class, + FailureCollector.class); + processColumnsMethod.setAccessible(true); + + List columnsRow = new ArrayList<>(); + columnsRow.add(new CellData().setFormattedValue("a")); + columnsRow.add(new CellData().setFormattedValue("title with space")); + columnsRow.add(new CellData()); + + List subColumnsRow = new ArrayList<>(); + subColumnsRow.add(new CellData().setFormattedValue("no header value")); + subColumnsRow.add(new CellData().setFormattedValue("9titleWithFirstNumber")); + subColumnsRow.add(new CellData().setFormattedValue("d")); + + List dataRow = new ArrayList<>(); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa"))); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d))); + dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true))); + + List columnMerges = new ArrayList<>(); + columnMerges.add(new GridRange().setStartRowIndex(0).setEndRowIndex(1).setStartColumnIndex(1).setEndColumnIndex(3)); + + FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP); + + int lastDataColumn = 3; + + LinkedHashMap columns = + (LinkedHashMap) processColumnsMethod.invoke(config, columnsRow, + subColumnsRow, dataRow, + columnMerges, lastDataColumn, + collector); + + Assert.assertEquals(2, columns.size()); + Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1))); + + // check simple column + Assert.assertEquals("a", columns.get(0).getHeaderTitle()); + Assert.assertTrue(columns.get(0).getSubColumns().isEmpty()); + + // check complex columns, top header should have column name as name + Assert.assertEquals("B", columns.get(1).getHeaderTitle()); + List subColumns = columns.get(1).getSubColumns(); + Assert.assertFalse(subColumns.isEmpty()); + + // check sub-columns + Assert.assertEquals(2, subColumns.size()); + Assert.assertEquals("B", subColumns.get(0).getHeaderTitle()); + Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty()); + Assert.assertEquals("d", subColumns.get(1).getHeaderTitle()); + Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty()); + } } diff --git a/widgets/GoogleSheets-batchsource.json b/widgets/GoogleSheets-batchsource.json index 5d87a1e..f4137d2 100644 --- a/widgets/GoogleSheets-batchsource.json +++ b/widgets/GoogleSheets-batchsource.json @@ -429,11 +429,44 @@ "default": "100", "min": "1" } + }, + { + "widget-type": "toggle", + "label": "Enable Cleansing Column Names", + "name": "columnNameCleansingEnabled", + "widget-attributes": { + "on": { + "value": "true", + "label": "Yes" + }, + "off": { + "value": "false", + "label": "No" + }, + "default": "true" + } } ] } ], - "outputs": [], + "outputs": [ + { + "name": "schema", + "label": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "long", + "double", + "bytes", + "string", + "array" + ], + "schema-default-type": "string" + } + } + ], "filters": [ { "name": "Select modification date range",