Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #458 from peihe/fixup-bq-diff
Browse files Browse the repository at this point in the history
Forward port fixups in Beam PR-1032
  • Loading branch information
lukecwik authored Sep 30, 2016
2 parents ac1d97b + 1303cb7 commit fe13fb7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,11 @@ public boolean advance() throws IOException, InterruptedException {
list.setPageToken(pageToken);
}

TableDataList result =
executeWithBackOff(
list,
String.format(
"Error reading from BigQuery table %s of dataset %s.",
ref.getTableId(),
ref.getDatasetId()));
TableDataList result = executeWithBackOff(
list,
String.format(
"Error reading from BigQuery table %s of dataset %s.",
ref.getTableId(), ref.getDatasetId()));

pageToken = result.getPageToken();
iteratorOverCurrentBatch =
Expand Down Expand Up @@ -370,7 +368,7 @@ private void createDataset(String datasetId, @Nullable String location)
executeWithBackOff(
client.datasets().insert(projectId, dataset),
String.format(
"Error when trying to create the temporary dataset %s in project %s",
"Error when trying to create the temporary dataset %s in project %s.",
datasetId, projectId));
}

Expand Down Expand Up @@ -407,7 +405,7 @@ private TableReference executeQueryAndWaitForCompletion()
Job dryRunJob = new Job()
.setConfiguration(new JobConfiguration()
.setQuery(new JobConfigurationQuery()
.setQuery(query))
.setQuery(query))
.setDryRun(true));
JobStatistics jobStats = executeWithBackOff(
client.jobs().insert(projectId, dryRunJob),
Expand Down Expand Up @@ -508,7 +506,6 @@ public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, St
}
}
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ private static Table tableWithBasicSchema() {
new TableFieldSchema().setName("anniversary_time").setType("TIME"))));
}

private static Table noTableQuerySchema() {
return new Table()
.setSchema(
new TableSchema()
.setFields(
Arrays.asList(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("count").setType("INTEGER"),
new TableFieldSchema().setName("photo").setType("BYTES"))));
}

private static Table tableWithLocation() {
return new Table()
.setLocation("EU");
Expand Down Expand Up @@ -206,6 +217,7 @@ public void testReadFromQuery() throws IOException, InterruptedException {
assertEquals("2000-01-01", row.get("anniversary_date"));
assertEquals("2000-01-01 00:00:00.000005", row.get("anniversary_datetime"));
assertEquals("00:00:00.000005", row.get("anniversary_time"));

assertFalse(iterator.advance());
}

Expand Down Expand Up @@ -257,14 +269,13 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
when(mockJobsGet.execute()).thenReturn(getJob);

// Mock table schema fetch.
when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
when(mockTablesGet.execute()).thenReturn(noTableQuerySchema());

byte[] photoBytes = "photograph".getBytes();
String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes);
// Mock table data fetch.
when(mockTabledataList.execute()).thenReturn(
rawDataList(rawRow("Arthur", 42, photoBytesEncoded,
"2000-01-01", "2000-01-01 00:00:00.000005", "00:00:00.000005")));
rawDataList(rawRow("Arthur", 42, photoBytesEncoded)));

// Run query and verify
String query = String.format(
Expand All @@ -277,10 +288,10 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
TableRow row = iterator.getCurrent();

assertTrue(row.containsKey("name"));
assertTrue(row.containsKey("answer"));
assertTrue(row.containsKey("count"));
assertTrue(row.containsKey("photo"));
assertEquals("Arthur", row.get("name"));
assertEquals(42, row.get("answer"));
assertEquals(42, row.get("count"));
assertEquals(photoBytesEncoded, row.get("photo"));

assertFalse(iterator.advance());
Expand Down

0 comments on commit fe13fb7

Please sign in to comment.