From e707a6e7d14f72051ddf933214108e32187e1765 Mon Sep 17 00:00:00 2001 From: vikasrathee-cs Date: Fri, 8 Nov 2024 17:55:38 +0530 Subject: [PATCH] Added mime type check for sheet files. --- .../google/common/APIRequestRetryer.java | 21 ++++----- .../common/GoogleDriveFilteringClient.java | 7 ++- .../source/GoogleSheetsFilteringClient.java | 45 +++++++++++++++++++ .../source/GoogleSheetsInputFormat.java | 2 +- .../source/GoogleSheetsSourceConfig.java | 2 +- .../sheets/source/SheetTransformer.java | 3 +- .../google/common/APIRequestRetryerTest.java | 6 +-- 7 files changed, 68 insertions(+), 18 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java diff --git a/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java b/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java index 334498f..6149078 100644 --- a/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java +++ b/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java @@ -51,9 +51,9 @@ public abstract class APIRequestRetryer { protected static final int LIMIT_RATE_EXCEEDED_CODE = 403; protected static final int BACKEND_ERROR_CODE = 500; protected static final int SERVICE_UNAVAILABLE_CODE = 503; - private static final int MAX_RETRY_WAIT = 200; - private static final int MAX_RETRY_COUNT = 8; - private static final int MAX_RETRY_JITTER_WAIT = 100; + private static final int MAX_RETRY_WAIT_SECONDS = 300; + private static final int MAX_RETRY_COUNT = 10; + private static final int MAX_RETRY_JITTER_WAIT_MS = 100; protected static final String TOO_MANY_REQUESTS_MESSAGE = "Too Many Requests"; protected static final String LIMIT_RATE_EXCEEDED_MESSAGE = "Rate Limit Exceeded"; protected static final String FORBIDDEN_STATUS_MESSAGE = "Forbidden"; @@ -76,7 +76,7 @@ public void onRetry(Attempt attempt) { GoogleJsonResponseException e = (GoogleJsonResponseException) exceptionCause; LOG.warn(String.format( "Error code: '%d', message: '%s'. Attempt: '%d'. Delay since first: '%d'. Description: '%s'.", - e.getDetails().getCode(), + e.getStatusCode(), e.getStatusMessage(), attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt(), @@ -96,8 +96,8 @@ public void onRetry(Attempt attempt) { .retryIfException(APIRequestRetryer::checkThrowable) .retryIfExceptionOfType(SocketTimeoutException.class) .withWaitStrategy(WaitStrategies.join( - new TrueExponentialWaitStrategy(1000, TimeUnit.SECONDS.toMillis(MAX_RETRY_WAIT)), - WaitStrategies.randomWait(MAX_RETRY_JITTER_WAIT, TimeUnit.MILLISECONDS))) + new TrueExponentialWaitStrategy(1000, TimeUnit.SECONDS.toMillis(MAX_RETRY_WAIT_SECONDS)), + WaitStrategies.randomWait(MAX_RETRY_JITTER_WAIT_MS, TimeUnit.MILLISECONDS))) .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_COUNT)) .withRetryListener(listener) .build(); @@ -125,21 +125,22 @@ private static boolean checkHttpResponseException(Throwable t) { private static boolean isTooManyRequestsError(GoogleJsonResponseException e) { List possibleMessages = Arrays.asList(TOO_MANY_REQUESTS_MESSAGE, LIMIT_RATE_EXCEEDED_MESSAGE); - return e.getDetails().getCode() == TOO_MANY_REQUESTS_CODE && possibleMessages.contains(e.getStatusMessage()); + return e.getStatusCode() == TOO_MANY_REQUESTS_CODE && possibleMessages.contains( + e.getStatusMessage()); } private static boolean isRateLimitError(GoogleJsonResponseException e) { - return e.getDetails().getCode() == LIMIT_RATE_EXCEEDED_CODE + return e.getStatusCode() == LIMIT_RATE_EXCEEDED_CODE && (LIMIT_RATE_EXCEEDED_MESSAGE.equals(e.getStatusMessage()) || e.getDetails().getMessage().contains(LIMIT_RATE_EXCEEDED_MESSAGE)); } private static boolean isBackendError(GoogleJsonResponseException e) { - return e.getDetails().getCode() == BACKEND_ERROR_CODE; + return e.getStatusCode() == BACKEND_ERROR_CODE; } private static boolean isServiceUnavailableError(GoogleJsonResponseException e) { - return e.getDetails().getCode() == SERVICE_UNAVAILABLE_CODE; + return e.getStatusCode() == SERVICE_UNAVAILABLE_CODE; } private static boolean isRateLimitError(HttpResponseException e) { diff --git a/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java b/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java index a666d3f..c306e56 100644 --- a/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java +++ b/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java @@ -72,9 +72,10 @@ public List getFilesSummary(List exportedTypes, int filesNum int retrievedFiles = 0; int actualFilesNumber = filesNumber; if (IdentifierType.FILE_IDENTIFIER.equals(config.getIdentifierType())) { - files.add(service.files().get(config.getFileIdentifier()).setSupportsAllDrives(true).execute()); + files.add(getFilesSummaryByFileId()); return files; } + Drive.Files.List request = service.files().list() .setSupportsAllDrives(true) .setIncludeItemsFromAllDrives(true) @@ -99,6 +100,10 @@ public List getFilesSummary(List exportedTypes, int filesNum }); } + protected File getFilesSummaryByFileId() throws IOException, ExecutionException { + return service.files().get(config.getFileIdentifier()).setSupportsAllDrives(true).execute(); + } + private String generateFilter(List exportedTypes) throws InterruptedException { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java new file mode 100644 index 0000000..624b61e --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.sheets.source; + +import com.google.api.services.drive.model.File; +import io.cdap.plugin.google.common.GoogleDriveFilteringClient; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +/** + * Client for getting File information via Google Sheets API. + */ +public class GoogleSheetsFilteringClient extends GoogleDriveFilteringClient { + + public GoogleSheetsFilteringClient(GoogleSheetsSourceConfig config) throws IOException { + super(config); + } + + @Override + protected File getFilesSummaryByFileId() throws IOException, ExecutionException { + File file = service.files().get(config.getFileIdentifier()).setSupportsAllDrives(true).execute(); + if (!file.getMimeType().equalsIgnoreCase(DRIVE_SPREADSHEETS_MIME)) { + throw new ExecutionException( + String.format("File with id: '%s' has a MIME_TYPE '%s' and is not a Google Sheets File.", + file.getMimeType(), + config.getFileIdentifier()), null); + } + return file; + } +} diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java index 8796f69..5fd1f40 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java @@ -57,7 +57,7 @@ public List getSplits(JobContext jobContext) throws IOException { GoogleSheetsInputFormatProvider.GSON.fromJson(headersJson, headersType); // get all sheets files according to filter - GoogleDriveFilteringClient driveFilteringClient = new GoogleDriveFilteringClient(googleSheetsSourceConfig); + GoogleDriveFilteringClient driveFilteringClient = new GoogleSheetsFilteringClient(googleSheetsSourceConfig); List spreadsheetsFiles; try { spreadsheetsFiles = driveFilteringClient.getFilesSummary(Collections.singletonList(ExportedType.SPREADSHEETS)); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java index 03939c8..161835d 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java @@ -334,7 +334,7 @@ public ValidationResult validate(FailureCollector collector) { GoogleDriveFilteringClient driveClient; GoogleSheetsSourceClient sheetsSourceClient; try { - driveClient = new GoogleDriveFilteringClient(this); + driveClient = new GoogleSheetsFilteringClient(this); sheetsSourceClient = new GoogleSheetsSourceClient(this); } catch (IOException e) { collector.addFailure("Exception during drive and sheets connections instantiating.", null); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java b/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java index e017988..33bdba9 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java @@ -69,7 +69,8 @@ public static StructuredRecord transform(RowRecord rowRecord, Schema schema, boo builder.set(metadataRecordName, rowRecord.getMetadata()); } else { ComplexSingleValueColumn complexSingleValueColumn = rowRecord.getHeaderedCells().get(name); - if (complexSingleValueColumn.getData() == null && complexSingleValueColumn.getSubColumns().isEmpty()) { + if (complexSingleValueColumn == null || (complexSingleValueColumn.getData() == null + && complexSingleValueColumn.getSubColumns().isEmpty())) { builder.set(name, null); } else { processCellData(builder, field, complexSingleValueColumn); diff --git a/src/test/java/io/cdap/plugin/google/common/APIRequestRetryerTest.java b/src/test/java/io/cdap/plugin/google/common/APIRequestRetryerTest.java index 00ebb31..98c6b1c 100644 --- a/src/test/java/io/cdap/plugin/google/common/APIRequestRetryerTest.java +++ b/src/test/java/io/cdap/plugin/google/common/APIRequestRetryerTest.java @@ -33,14 +33,12 @@ @PrepareForTest({GoogleJsonResponseException.class}) public class APIRequestRetryerTest { private static final int UNPROCESSED_CODE = 504; - private static final int RETRY_NUMBER = 8; + private static final int RETRY_NUMBER = 10; @Test public void testRetryCount() throws ExecutionException { GoogleJsonResponseException exception = PowerMockito.mock(GoogleJsonResponseException.class); - GoogleJsonError googleJsonError = new GoogleJsonError(); - googleJsonError.setCode(APIRequestRetryer.TOO_MANY_REQUESTS_CODE); - PowerMockito.when(exception.getDetails()).thenReturn(googleJsonError); + PowerMockito.when(exception.getStatusCode()).thenReturn(APIRequestRetryer.TOO_MANY_REQUESTS_CODE); PowerMockito.when(exception.getStatusMessage()).thenReturn(APIRequestRetryer.TOO_MANY_REQUESTS_MESSAGE);