From 1303cb72b4ce2b821b30cac8869da45dc44a49fc Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 30 Sep 2016 12:54:27 -0700 Subject: [PATCH] Forward port fixups in Beam PR-1032 --- .../sdk/util/BigQueryTableRowIterator.java | 17 +++++++-------- .../util/BigQueryTableRowIteratorTest.java | 21 ++++++++++++++----- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 552c60464b..8f4ff793dc 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -177,13 +177,11 @@ public boolean advance() throws IOException, InterruptedException { list.setPageToken(pageToken); } - TableDataList result = - executeWithBackOff( - list, - String.format( - "Error reading from BigQuery table %s of dataset %s.", - ref.getTableId(), - ref.getDatasetId())); + TableDataList result = executeWithBackOff( + list, + String.format( + "Error reading from BigQuery table %s of dataset %s.", + ref.getTableId(), ref.getDatasetId())); pageToken = result.getPageToken(); iteratorOverCurrentBatch = @@ -370,7 +368,7 @@ private void createDataset(String datasetId, @Nullable String location) executeWithBackOff( client.datasets().insert(projectId, dataset), String.format( - "Error when trying to create the temporary dataset %s in project %s", + "Error when trying to create the temporary dataset %s in project %s.", datasetId, projectId)); } @@ -407,7 +405,7 @@ private TableReference executeQueryAndWaitForCompletion() Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() .setQuery(new JobConfigurationQuery() - .setQuery(query)) + .setQuery(query)) .setDryRun(true)); JobStatistics jobStats = executeWithBackOff( client.jobs().insert(projectId, dryRunJob), @@ -508,7 +506,6 @@ public static T executeWithBackOff(AbstractGoogleClientRequest client, St } } } - return result; } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java index 4516d5d977..d6ac5b36ba 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java @@ -133,6 +133,17 @@ private static Table tableWithBasicSchema() { new TableFieldSchema().setName("anniversary_time").setType("TIME")))); } + private static Table noTableQuerySchema() { + return new Table() + .setSchema( + new TableSchema() + .setFields( + Arrays.asList( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("count").setType("INTEGER"), + new TableFieldSchema().setName("photo").setType("BYTES")))); + } + private static Table tableWithLocation() { return new Table() .setLocation("EU"); @@ -206,6 +217,7 @@ public void testReadFromQuery() throws IOException, InterruptedException { assertEquals("2000-01-01", row.get("anniversary_date")); assertEquals("2000-01-01 00:00:00.000005", row.get("anniversary_datetime")); assertEquals("00:00:00.000005", row.get("anniversary_time")); + assertFalse(iterator.advance()); } @@ -257,14 +269,13 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. - when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema()); + when(mockTablesGet.execute()).thenReturn(noTableQuerySchema()); byte[] photoBytes = "photograph".getBytes(); String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes); // Mock table data fetch. when(mockTabledataList.execute()).thenReturn( - rawDataList(rawRow("Arthur", 42, photoBytesEncoded, - "2000-01-01", "2000-01-01 00:00:00.000005", "00:00:00.000005"))); + rawDataList(rawRow("Arthur", 42, photoBytesEncoded))); // Run query and verify String query = String.format( @@ -277,10 +288,10 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException TableRow row = iterator.getCurrent(); assertTrue(row.containsKey("name")); - assertTrue(row.containsKey("answer")); + assertTrue(row.containsKey("count")); assertTrue(row.containsKey("photo")); assertEquals("Arthur", row.get("name")); - assertEquals(42, row.get("answer")); + assertEquals(42, row.get("count")); assertEquals(photoBytesEncoded, row.get("photo")); assertFalse(iterator.advance());