Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🍒][PLUGIN-1785] Column name cleansing done as per other file plugins and made schema non transient. #55

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/GoogleSheets-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ Names Row contains less number of columns.

**Read Buffer Size:** Number of rows the source reads with a single API request. Default value is 100.

**Enable Cleansing Column Names:** Toggle that specifies whether to cleanse column names containing special characters.
Special characters will be replaced by underscores.

### Steps to Generate OAuth2 Credentials
1. Create credentials for the Client ID and Client Secret properties [here](https://console.cloud.google.com/apis/credentials).
2. On the Create OAuth client ID page, under Authorized redirect URIs, specify a URI of `http://localhost:8080`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
Expand All @@ -51,11 +52,13 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
config.validate(failureCollector);
failureCollector.getOrThrowException();

pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(failureCollector));
Schema configuredSchema = config.getSchema(failureCollector);
stageConfigurer.setOutputSchema(configuredSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
public static final String CONFIGURATION_PARSE_PROPERTY_NAME = "properties";
private static final Logger LOG = LoggerFactory.getLogger(GoogleSheetsSourceConfig.class);
private static final Pattern CELL_ADDRESS = Pattern.compile("^([A-Z]+)([0-9]+)$");
private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+");
private static final Pattern COLUMN_NAME = Pattern.compile("^[A-Za-z_][A-Za-z0-9_-]*$");
public static final String CLEANSE_COLUMN_NAMES = "columnNameCleansingEnabled";
private static LinkedHashMap<Integer, ColumnComplexSchemaInfo> dataSchemaInfo = new LinkedHashMap<>();

@Name(SHEETS_TO_PULL)
Expand All @@ -118,7 +120,7 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
@Name(NAME_SCHEMA)
@Description("The schema of the table to read.")
@Macro
private transient Schema schema = null;
private String schema;

@Name(FORMATTING)
@Description("Output format for numeric sheet cells. " +
Expand Down Expand Up @@ -234,6 +236,13 @@ public class GoogleSheetsSourceConfig extends GoogleFilteringSourceConfig {
@Macro
private String sheetFieldName;

@Nullable
@Name(CLEANSE_COLUMN_NAMES)
@Description("Toggle that specifies whether to cleanse column names containing special characters. " +
"Special characters will be replaced by underscores.")
@Macro
private Boolean columnNameCleansingEnabled;

public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIdentifiers, String formatting,
Boolean skipEmptyData, String columnNamesSelection,
@Nullable Integer customColumnNamesRow, String metadataFieldName,
Expand All @@ -242,7 +251,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde
@Nullable String lastDataColumn, @Nullable String lastDataRow,
@Nullable String metadataCells, @Nullable Integer readBufferSize,
@Nullable Boolean addNameFields, @Nullable String spreadsheetFieldName,
@Nullable String sheetFieldName) {
@Nullable String sheetFieldName, @Nullable String schema) {

super(referenceName);
this.sheetsIdentifiers = sheetsIdentifiers;
Expand All @@ -262,6 +271,7 @@ public GoogleSheetsSourceConfig(String referenceName, @Nullable String sheetsIde
this.addNameFields = addNameFields;
this.spreadsheetFieldName = spreadsheetFieldName;
this.sheetFieldName = sheetFieldName;
this.schema = schema;
}


Expand All @@ -281,9 +291,15 @@ public Schema getSchema(FailureCollector collector) {
"Perhaps no validation step was executed before schema generation.")
.withConfigProperty(SCHEMA);
}
schema = SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values()));
return SchemaBuilder.buildSchema(this, new ArrayList<>(dataSchemaInfo.values()));
}
return schema;
try {
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
} catch (IOException e) {
collector.addFailure("Invalid schema: " + e.getMessage(),
null).withConfigProperty(NAME_SCHEMA);
}
throw collector.getOrThrowException();
}

private boolean shouldGetSchema() {
Expand Down Expand Up @@ -593,7 +609,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
int lastDataColumn,
FailureCollector collector) {
LinkedHashMap<Integer, ColumnComplexSchemaInfo> columnHeaders = new LinkedHashMap<>();

final Map<String, Integer> seenFieldNames = new HashMap<>();
List<String> headerTitles = new ArrayList<>();
for (int i = 0; i < Math.min(columnsRow.size(), lastDataColumn); i++) {
CellData columnHeaderCell = columnsRow.get(i);
Expand All @@ -609,7 +625,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
}
String title = columnHeaderCell.getFormattedValue();
if (StringUtils.isNotEmpty(title)) {
title = checkTitleFormat(title, i);
title = checkTitleFormat(title, seenFieldNames, i);

// for merge we should analyse sub headers for data schemas
if (isMergeHead) {
Expand All @@ -634,6 +650,7 @@ private LinkedHashMap<Integer, ColumnComplexSchemaInfo> processColumns(List<Cell
private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int length, List<CellData> subColumnsRow,
List<CellData> dataRow, FailureCollector collector) {
List<ColumnComplexSchemaInfo> subHeaders = new ArrayList<>();
final Map<String, Integer> seenFieldNames = new HashMap<>();
List<String> titles = new ArrayList<>();
for (int i = startIndex; i < startIndex + length; i++) {
String subHeaderTitle;
Expand All @@ -642,7 +659,7 @@ private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int leng
if (StringUtils.isEmpty(subHeaderTitle)) {
subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1);
}
subHeaderTitle = checkTitleFormat(subHeaderTitle, i);
subHeaderTitle = checkTitleFormat(subHeaderTitle, seenFieldNames, i);
} else {
subHeaderTitle = ColumnAddressConverter.getColumnName(i + 1);
}
Expand All @@ -661,11 +678,49 @@ private List<ColumnComplexSchemaInfo> processSubHeaders(int startIndex, int leng
return subHeaders;
}

private String checkTitleFormat(String title, int columnIndex) {
private String checkTitleFormat(String title, Map<String, Integer> seenFieldNames, int columnIndex) {
if (getColumnNameCleansingEnabled()) {
return applyFileTitleConvention(title, seenFieldNames);
}
return applySheetTitleConvention(title, columnIndex);
}

private String applyFileTitleConvention(String title, Map<String, Integer> seenFieldNames) {
final String replacementChar = "_";

StringBuilder cleanFieldNameBuilder = new StringBuilder();

// Remove any spaces at the end of the strings
title = title.trim();

// If it's an empty string replace it with BLANK
if (title.isEmpty()) {
cleanFieldNameBuilder.append("BLANK");
} else if (Character.isDigit(title.charAt(0))) {
// Prepend a col_ if the first character is a number
cleanFieldNameBuilder.append("col_");
}

// Replace all invalid characters with the replacement char
cleanFieldNameBuilder.append(NOT_VALID_PATTERN.matcher(title).replaceAll(replacementChar));

String cleanFieldName = cleanFieldNameBuilder.toString();
String lowerCaseCleanFieldName = cleanFieldName.toLowerCase();
int count = seenFieldNames.getOrDefault(lowerCaseCleanFieldName, 0) + 1;
seenFieldNames.put(lowerCaseCleanFieldName, count);
// In case column already exists in seenFieldNames map, append the count with column name.
if (count > 1) {
cleanFieldNameBuilder.append(replacementChar).append(count);
}
return cleanFieldNameBuilder.toString();
}

private String applySheetTitleConvention(String title, int columnIndex) {
if (!COLUMN_NAME.matcher(title).matches()) {
String defaultColumnName = ColumnAddressConverter.getColumnName(columnIndex + 1);
LOG.warn(String.format("Original column name '%s' doesn't satisfy column name requirements '%s', " +
"the default column name '%s' will be used.", title, COLUMN_NAME.pattern(), defaultColumnName));
"the default column name '%s' will be used.", title, COLUMN_NAME.pattern(),
defaultColumnName));
return defaultColumnName;
}
return title;
Expand Down Expand Up @@ -929,6 +984,11 @@ public boolean getAutoDetectRowsAndColumns() {
return Boolean.TRUE.equals(autoDetectRowsAndColumns);
}

@Nullable
public boolean getColumnNameCleansingEnabled() {
return Boolean.TRUE.equals(columnNameCleansingEnabled);
}

public Integer getLastDataColumn() {
return lastDataColumn == null ? 0 : Integer.parseInt(lastDataColumn);
}
Expand Down Expand Up @@ -1030,8 +1090,8 @@ public void setSheetsIdentifiers(String sheetsIdentifiers) {
this.sheetsIdentifiers = sheetsIdentifiers;
}

public void setSchema(String schema) throws IOException {
this.schema = Schema.parseJson(schema);
public void setSchema(String schema) {
this.schema = schema;
}

public void setFormatting(String formatting) {
Expand Down Expand Up @@ -1122,6 +1182,10 @@ public void setEndDate(String endDate) {
this.endDate = endDate;
}

public void setColumnNameCleansingEnabled(@Nullable Boolean columnNameCleansingEnabled) {
this.columnNameCleansingEnabled = columnNameCleansingEnabled;
}

public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOException {

GoogleSheetsSourceConfig googleSheetsSourceConfig = GoogleSheetsSourceConfig
Expand Down Expand Up @@ -1305,6 +1369,11 @@ public static GoogleSheetsSourceConfig of(JsonObject properties) throws IOExcept
properties.get(GoogleSheetsSourceConfig.AUTO_DETECT_ROWS_AND_COLUMNS).getAsBoolean());
}

if (properties.has(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES)) {
googleSheetsSourceConfig.setColumnNameCleansingEnabled(
properties.get(GoogleSheetsSourceConfig.CLEANSE_COLUMN_NAMES).getAsBoolean());
}

return googleSheetsSourceConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ public void testProcessColumnsWithLastDataColumnLessThanColumnsRowSize()
@Test
public void testProcessColumnsInvalidTitles()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
config.setColumnNameCleansingEnabled(true);
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class, FailureCollector.class);
processColumnsMethod.setAccessible(true);
Expand Down Expand Up @@ -359,13 +360,13 @@ public void testProcessColumnsInvalidTitles()
Assert.assertTrue(columns.get(0).getSubColumns().isEmpty());

// check complex columns, top header should have column name as name
Assert.assertEquals("B", columns.get(1).getHeaderTitle());
Assert.assertEquals("title_with_space", columns.get(1).getHeaderTitle());
List<ColumnComplexSchemaInfo> subColumns = columns.get(1).getSubColumns();
Assert.assertFalse(subColumns.isEmpty());

// check sub-columns
Assert.assertEquals(2, subColumns.size());
Assert.assertEquals("B", subColumns.get(0).getHeaderTitle());
Assert.assertEquals("col_9titleWithFirstNumber", subColumns.get(0).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Assert.assertEquals("d", subColumns.get(1).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Expand All @@ -376,4 +377,98 @@ private void setFieldValue(String fieldName, Object fieldValue) throws NoSuchFie
metadataKeyCellsField.setAccessible(true);
metadataKeyCellsField.set(config, fieldValue);
}

@Test
public void testProcessColumnsSameCaseSensitiveTitles()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
config.setColumnNameCleansingEnabled(true);
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class,
FailureCollector.class);
processColumnsMethod.setAccessible(true);

List<CellData> columnsRow = new ArrayList<>();
columnsRow.add(new CellData().setFormattedValue("title with space"));
columnsRow.add(new CellData().setFormattedValue("Title with space"));
columnsRow.add(new CellData().setFormattedValue("Title%with%space"));

List<CellData> dataRow = new ArrayList<>();
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa")));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d)));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true)));

List<GridRange> columnMerges = new ArrayList<>();

FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP);

int lastDataColumn = 3;

LinkedHashMap<Integer, ColumnComplexSchemaInfo> columns =
(LinkedHashMap<Integer, ColumnComplexSchemaInfo>) processColumnsMethod.invoke(config, columnsRow,
null, dataRow, columnMerges,
lastDataColumn, collector);

Assert.assertEquals(3, columns.size());
Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1, 2)));

Assert.assertEquals("title_with_space", columns.get(0).getHeaderTitle());
Assert.assertEquals("Title_with_space_2", columns.get(1).getHeaderTitle());
Assert.assertEquals("Title_with_space_3", columns.get(2).getHeaderTitle());
}

@Test
public void testProcessColumnsInvalidTitlesOldSchema()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method processColumnsMethod = config.getClass().getDeclaredMethod("processColumns", List.class,
List.class, List.class, List.class, int.class,
FailureCollector.class);
processColumnsMethod.setAccessible(true);

List<CellData> columnsRow = new ArrayList<>();
columnsRow.add(new CellData().setFormattedValue("a"));
columnsRow.add(new CellData().setFormattedValue("title with space"));
columnsRow.add(new CellData());

List<CellData> subColumnsRow = new ArrayList<>();
subColumnsRow.add(new CellData().setFormattedValue("no header value"));
subColumnsRow.add(new CellData().setFormattedValue("9titleWithFirstNumber"));
subColumnsRow.add(new CellData().setFormattedValue("d"));

List<CellData> dataRow = new ArrayList<>();
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setStringValue("aa")));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setNumberValue(13d)));
dataRow.add(new CellData().setUserEnteredValue(new ExtendedValue().setBoolValue(true)));

List<GridRange> columnMerges = new ArrayList<>();
columnMerges.add(new GridRange().setStartRowIndex(0).setEndRowIndex(1).setStartColumnIndex(1).setEndColumnIndex(3));

FailureCollector collector = new DefaultFailureCollector("", Collections.EMPTY_MAP);

int lastDataColumn = 3;

LinkedHashMap<Integer, ColumnComplexSchemaInfo> columns =
(LinkedHashMap<Integer, ColumnComplexSchemaInfo>) processColumnsMethod.invoke(config, columnsRow,
subColumnsRow, dataRow,
columnMerges, lastDataColumn,
collector);

Assert.assertEquals(2, columns.size());
Assert.assertTrue(columns.keySet().containsAll(Arrays.asList(0, 1)));

// check simple column
Assert.assertEquals("a", columns.get(0).getHeaderTitle());
Assert.assertTrue(columns.get(0).getSubColumns().isEmpty());

// check complex columns, top header should have column name as name
Assert.assertEquals("B", columns.get(1).getHeaderTitle());
List<ColumnComplexSchemaInfo> subColumns = columns.get(1).getSubColumns();
Assert.assertFalse(subColumns.isEmpty());

// check sub-columns
Assert.assertEquals(2, subColumns.size());
Assert.assertEquals("B", subColumns.get(0).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
Assert.assertEquals("d", subColumns.get(1).getHeaderTitle());
Assert.assertTrue(subColumns.get(0).getSubColumns().isEmpty());
}
}
Loading