From 12eba1ee483c0ab86a47618597fe1cc6543c7001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 13 Nov 2024 17:04:26 +0100 Subject: [PATCH 1/5] Support source lookup array results --- CHANGELOG.md | 4 + README.md | 10 ++ pom.xml | 7 ++ .../http/internal/PollingClient.java | 4 +- .../config/HttpConnectorConfigConstants.java | 3 + .../table/lookup/HttpTableLookupFunction.java | 5 +- .../lookup/JavaNetHttpPollingClient.java | 51 ++++++++-- .../JavaNetHttpPollingClientFactory.java | 2 + ...avaNetHttpPollingClientConnectionTest.java | 93 +++++++++++++++---- ...tHttpPollingClientHttpsConnectionTest.java | 8 +- .../lookup/JavaNetHttpPollingClientTest.java | 4 + .../http-array-result/HttpResult.json | 36 +++++++ 12 files changed, 189 insertions(+), 38 deletions(-) create mode 100644 src/test/resources/http-array-result/HttpResult.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bfaef7d..8387b55e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Added + +- Allow to fetch multiple results from REST API endpoint (`gid.connector.http.source.lookup.result-type`). + ## [0.16.0] - 2024-10-18 ### Added diff --git a/README.md b/README.md index da757118..68297e0e 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,16 @@ Because of that, if AsyncIO timer passes, Flink will throw TimeoutException whic The HTTP request timeouts on the other hand will not cause Job restart. In that case, exception will be logged into application logs. To avoid job restart on timeouts caused by Lookup queries, the value of `gid.connector.http.source.lookup.request.timeout` should be smaller than `table.exec.async-lookup.timeout`. +#### Lookup multiple results + +Typically, join can return zero, one or more results. What is more, there are lots of possible REST API designs and +pagination methods. Currently, the connector supports only two simple approaches (`gid.connector.http.source.lookup.result-type`): + +- `single-value` - REST API returns single object. +- `array` - REST API returns array of objects. Pagination is not supported yet. + +Please be informed that the mechanism will be enhanced in the future. See [HTTP-118](https://github.com/getindata/flink-http-connector/issues/118). + ### HTTP Sink The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed: diff --git a/pom.xml b/pom.xml index 5b33822d..072af900 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ under the License. ${target.java.version} 2.17.2 1.18.22 + 2.18.1 4.13.2 5.10.1 ${junit5.version} @@ -153,6 +154,12 @@ under the License. provided + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + org.projectlombok lombok diff --git a/src/main/java/com/getindata/connectors/http/internal/PollingClient.java b/src/main/java/com/getindata/connectors/http/internal/PollingClient.java index b87d64c8..a035e8a8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/PollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/PollingClient.java @@ -1,6 +1,6 @@ package com.getindata.connectors.http.internal; -import java.util.Optional; +import java.util.Collection; import org.apache.flink.table.data.RowData; @@ -14,5 +14,5 @@ public interface PollingClient { * @param lookupRow A {@link RowData} containing request parameters. * @return an optional result of data lookup. */ - Optional pull(RowData lookupRow); + Collection pull(RowData lookupRow); } diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index b501b29b..61c413c8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -43,6 +43,9 @@ public final class HttpConnectorConfigConstants { public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP + "source.lookup.use-raw-authorization-header"; + public static final String RESULT_TYPE = GID_CONNECTOR_HTTP + + "source.lookup.result-type"; + // --------- Error code handling configuration --------- public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude"; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java index 8579a02f..8bab7451 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java @@ -1,8 +1,6 @@ package com.getindata.connectors.http.internal.table.lookup; import java.util.Collection; -import java.util.Collections; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import lombok.AccessLevel; @@ -66,7 +64,6 @@ public void open(FunctionContext context) throws Exception { @Override public Collection lookup(RowData keyRow) { localHttpCallCounter.incrementAndGet(); - Optional result = client.pull(keyRow); - return result.map(Collections::singletonList).orElse(Collections.emptyList()); + return client.pull(keyRow); } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index ce3a31cc..3add34d8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -4,9 +4,14 @@ import java.net.http.HttpClient; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.Optional; +import java.util.List; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -19,6 +24,7 @@ import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE; /** * An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}. @@ -33,18 +39,24 @@ public class JavaNetHttpPollingClient implements PollingClient { private final DeserializationSchema responseBodyDecoder; + private final ObjectMapper objectMapper; + private final HttpRequestFactory requestFactory; private final HttpPostRequestCallback httpPostRequestCallback; + private final HttpLookupConfig options; + public JavaNetHttpPollingClient( HttpClient httpClient, DeserializationSchema responseBodyDecoder, + ObjectMapper objectMapper, HttpLookupConfig options, HttpRequestFactory requestFactory) { this.httpClient = httpClient; this.responseBodyDecoder = responseBodyDecoder; + this.objectMapper = objectMapper; this.requestFactory = requestFactory; this.httpPostRequestCallback = options.getHttpPostRequestCallback(); @@ -61,21 +73,22 @@ public JavaNetHttpPollingClient( .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + this.options = options; } @Override - public Optional pull(RowData lookupRow) { + public Collection pull(RowData lookupRow) { try { - log.debug("Optional pull with Rowdata={}.", lookupRow); + log.debug("Collection pull with Rowdata={}.", lookupRow); return queryAndProcess(lookupRow); } catch (Exception e) { log.error("Exception during HTTP request.", e); - return Optional.empty(); + return Collections.emptyList(); } } // TODO Add Retry Policy And configure TimeOut from properties - private Optional queryAndProcess(RowData lookupData) throws Exception { + private Collection queryAndProcess(RowData lookupData) throws Exception { HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); HttpResponse response = httpClient.send( @@ -85,14 +98,14 @@ private Optional queryAndProcess(RowData lookupData) throws Exception { return processHttpResponse(response, request); } - private Optional processHttpResponse( + private Collection processHttpResponse( HttpResponse response, HttpLookupSourceRequestEntry request) throws IOException { this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); if (response == null) { - return Optional.empty(); + return Collections.emptyList(); } String responseBody = response.body(); @@ -102,14 +115,14 @@ private Optional processHttpResponse( "with Server response body [%s] ", statusCode, responseBody)); if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) { - return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes())); + return deserialize(responseBody); } else { log.warn( String.format("Returned Http status code was invalid or returned body was empty. " + "Status Code [%s]", statusCode) ); - return Optional.empty(); + return Collections.emptyList(); } } @@ -122,4 +135,24 @@ private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) { HttpRequestFactory getRequestFactory() { return this.requestFactory; } + + private Collection deserialize(String responseBody) throws IOException { + byte[] rawBytes = responseBody.getBytes(); + String resultType = options.getProperties().getProperty(RESULT_TYPE, "single-value"); + if (resultType.equals("single-value")) { + return Collections.singletonList(responseBodyDecoder.deserialize(rawBytes)); + } else if (resultType.equals("array")) { + List rawObjects = + objectMapper.readValue(rawBytes, new TypeReference<>() { + }); + List result = new ArrayList<>(rawObjects.size()); + for (JsonNode rawObject : rawObjects) { + result.add(responseBodyDecoder.deserialize(rawObject.toString().getBytes())); + } + return result; + } else { + throw new IllegalStateException( + String.format("Unknown lookup source result type '%'.", resultType)); + } + } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java index cb3c4dd2..1e79248e 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java @@ -2,6 +2,7 @@ import java.net.http.HttpClient; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.data.RowData; @@ -26,6 +27,7 @@ public JavaNetHttpPollingClient createPollClient( return new JavaNetHttpPollingClient( httpClient, schemaDecoder, + new ObjectMapper(), options, requestFactory ); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index 27f62df0..f6bb491b 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -1,7 +1,8 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.util.Collection; +import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.Properties; import java.util.stream.Stream; @@ -52,12 +53,14 @@ import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; import static com.getindata.connectors.http.TestHelper.readTestFile; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; @ExtendWith(MockitoExtension.class) class JavaNetHttpPollingClientConnectionTest { private static final String SAMPLES_FOLDER = "/http/"; + private static final String SAMPLES_FOLDER_ARRAY_RESULT = "/http-array-result/"; private static final String ENDPOINT = "/service"; @@ -110,6 +113,7 @@ public void setUp() { HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type", "application/json" ); + this.properties.setProperty(RESULT_TYPE, "single-value"); } @AfterEach @@ -127,11 +131,13 @@ void shouldQuery200WithParams() { JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl()); // WHEN - RowData result = pollingClient.pull(lookupRowData).orElseThrow(); + Collection results = pollingClient.pull(lookupRowData); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); + assertThat(results).hasSize(1); + RowData result = results.iterator().next(); assertThat(result.getArity()).isEqualTo(4); assertThat(result.getString(1) .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER"); @@ -157,7 +163,7 @@ void shouldQuery200WithBodyParams(String methodName) { ); // WHEN - RowData result = pollingClient.pull(lookupRowData).orElseThrow(); + Collection results = pollingClient.pull(lookupRowData); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); @@ -169,6 +175,8 @@ void shouldQuery200WithBodyParams(String methodName) { fail("Unexpected REST method."); } + assertThat(results).hasSize(1); + RowData result = results.iterator().next(); assertThat(result.getArity()).isEqualTo(4); assertThat(result.getString(1) .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER"); @@ -188,6 +196,43 @@ private static Stream clientErrorCodeConfig() { ); } + @Test + void shouldQuery200WithArrayResult() { + // GIVEN + this.stubMapping = setUpServerStubArrayResult(200); + + Properties properties = new Properties(); + properties.putAll(this.properties); + properties.setProperty(RESULT_TYPE, "array"); + + // WHEN + JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl(), properties); + + // WHEN + Collection results = pollingClient.pull(lookupRowData); + + // THEN + wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); + + assertThat(results).hasSize(2); + + Iterator iterator = results.iterator(); + + RowData firstResult = iterator.next(); + assertThat(firstResult.getArity()).isEqualTo(4); + RowData detailsRow1 = firstResult.getRow(3, 2); + assertThat(detailsRow1.getBoolean(0)).isEqualTo(true); // isActive + RowData nestedDetailsRow1 = detailsRow1.getRow(1, 1); + assertThat(nestedDetailsRow1.getString(0).toString()).isEqualTo("$1,729.34"); + + RowData secondResult = iterator.next(); + assertThat(secondResult.getArity()).isEqualTo(4); + RowData detailsRow2 = secondResult.getRow(3, 2); + assertThat(detailsRow2.getBoolean(0)).isEqualTo(false); // isActive + RowData nestedDetailsRow2 = detailsRow2.getRow(1, 1); + assertThat(nestedDetailsRow2.getString(0).toString()).isEqualTo("$22,001.99"); + } + @ParameterizedTest @MethodSource("clientErrorCodeConfig") void shouldHandleCodeBasedOnConfiguration( @@ -203,10 +248,10 @@ void shouldHandleCodeBasedOnConfiguration( ); // WHEN - Optional poll = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData); // THEN - assertThat(poll.isEmpty()).isEqualTo(isExpectedResponseEmpty); + assertThat(results.isEmpty()).isEqualTo(isExpectedResponseEmpty); } @Test @@ -217,12 +262,12 @@ void shouldHandleServerError() { JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl()); // WHEN - Optional poll = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); - assertThat(poll.isEmpty()).isTrue(); + assertThat(results.isEmpty()).isTrue(); } @Test @@ -233,10 +278,10 @@ void shouldProcessWithMissingArguments() { JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl()); // WHEN - Optional poll = pollingClient.pull(null); + Collection results = pollingClient.pull(null); // THEN - assertThat(poll.isEmpty()).isTrue(); + assertThat(results.isEmpty()).isTrue(); } @ParameterizedTest @@ -252,10 +297,7 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue, this.stubMapping = setupServerStubForBasicAuth(); Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type", - "application/json" - ); + properties.putAll(this.properties); properties.setProperty( HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Authorization", @@ -274,11 +316,13 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue, ); // WHEN - RowData result = pollingClient.pull(lookupRowData).orElseThrow(); + Collection results = pollingClient.pull(lookupRowData); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); + assertThat(results).hasSize(1); + RowData result = results.iterator().next(); assertThat(result.getArity()).isEqualTo(4); assertThat(result.getString(1) .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER"); @@ -295,13 +339,10 @@ private String getBaseUrl() { } public JavaNetHttpPollingClient setUpPollingClient(String url) { + return setUpPollingClient(url, properties); + } - Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type", - "application/json" - ); - + public JavaNetHttpPollingClient setUpPollingClient(String url, Properties properties) { return setUpPollingClient(url, properties, setUpGetRequestFactory(properties)); } @@ -423,6 +464,16 @@ private StubMapping setUpServerBodyStub(String methodName) { .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json")))); } + private StubMapping setUpServerStubArrayResult(int status) { + return wireMockServer.stubFor( + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn( + aResponse() + .withStatus(status) + .withBody(readTestFile(SAMPLES_FOLDER_ARRAY_RESULT + "HttpResult.json")))); + } + private StubMapping setupServerStubForBasicAuth() { return wireMockServer.stubFor(get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) .withHeader("Content-Type", equalTo("application/json")) @@ -448,6 +499,8 @@ private static Properties prepareErrorCodeProperties(String errorCodeList, Strin HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type", "application/json"); + properties.setProperty(RESULT_TYPE, "single-value"); + return properties; } } diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java index 801c0142..25719e63 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java @@ -2,6 +2,7 @@ import java.io.File; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -261,7 +262,7 @@ public void shouldThrowOnInvalidPath( private void testPollingClientConnection() { JavaNetHttpPollingClient pollingClient = setUpPollingClient(properties); - RowData result = pollingClient.pull(lookupRowData).orElseThrow(); + Collection result = pollingClient.pull(lookupRowData); assertResult(result); } @@ -337,8 +338,9 @@ private void setUpPollingClientFactory(String baseUrl, HeaderPreprocessor header this.pollingClientFactory = new JavaNetHttpPollingClientFactory(requestFactory); } - private void assertResult(RowData result) { - + private void assertResult(Collection results) { + assertThat(results).hasSize(1); + RowData result = results.iterator().next(); assertThat(result.getArity()).isEqualTo(4); assertThat(result.getString(1) .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER"); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java index f5c9f4bf..afae27a6 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java @@ -5,6 +5,7 @@ import java.net.http.HttpRequest; import java.util.*; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; @@ -63,6 +64,7 @@ public void shouldBuildClientWithoutHeaders() { JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( httpClient, decoder, + new ObjectMapper(), options, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -82,6 +84,7 @@ public void shouldBuildGetClientUri() { JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( httpClient, decoder, + new ObjectMapper(), options, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -203,6 +206,7 @@ public void shouldBuildClientWithHeaders() { JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( httpClient, decoder, + new ObjectMapper(), lookupConfig, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), diff --git a/src/test/resources/http-array-result/HttpResult.json b/src/test/resources/http-array-result/HttpResult.json new file mode 100644 index 00000000..c8158d0a --- /dev/null +++ b/src/test/resources/http-array-result/HttpResult.json @@ -0,0 +1,36 @@ +[ + { + "id": "COUNTER1", + "uuid": "UUID", + "msg": "Returned HTTP message for parameter PARAM, COUNTER", + "age": 30, + "eyeColor": "green", + "name": "Marva Fischer", + "gender": "female", + "details": { + "isActive": true, + "nestedDetails": { + "index": 0, + "guid": "d81fc542-6b49-4d59-8fb9-d57430d4871d", + "balance": "$1,729.34" + } + } + }, + { + "id": "COUNTER2", + "uuid": "UUID", + "msg": "Returned HTTP message for parameter PARAM, COUNTER", + "age": 40, + "eyeColor": "brown", + "name": "John Doe", + "gender": "male", + "details": { + "isActive": false, + "nestedDetails": { + "index": 0, + "guid": "d81fc542-6b49-4d59-8fb9-d57430d4871d", + "balance": "$22,001.99" + } + } + } +] \ No newline at end of file From 16277db209eff2e0945c6cbff39a5d883cee6c39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Mon, 25 Nov 2024 13:32:19 +0100 Subject: [PATCH 2/5] Skip null deserialization results --- .../lookup/JavaNetHttpPollingClient.java | 34 +++++++++++++------ .../JavaNetHttpPollingClientFactory.java | 2 -- .../lookup/JavaNetHttpPollingClientTest.java | 4 --- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 3add34d8..294014c1 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -33,16 +33,19 @@ @Slf4j public class JavaNetHttpPollingClient implements PollingClient { + private static final String RESULT_TYPE_SINGLE_VALUE = "single-value"; + private static final String RESULT_TYPE_ARRAY = "array"; + private final HttpClient httpClient; private final HttpStatusCodeChecker statusCodeChecker; private final DeserializationSchema responseBodyDecoder; - private final ObjectMapper objectMapper; - private final HttpRequestFactory requestFactory; + private final ObjectMapper objectMapper; + private final HttpPostRequestCallback httpPostRequestCallback; private final HttpLookupConfig options; @@ -50,15 +53,14 @@ public class JavaNetHttpPollingClient implements PollingClient { public JavaNetHttpPollingClient( HttpClient httpClient, DeserializationSchema responseBodyDecoder, - ObjectMapper objectMapper, HttpLookupConfig options, HttpRequestFactory requestFactory) { this.httpClient = httpClient; this.responseBodyDecoder = responseBodyDecoder; - this.objectMapper = objectMapper; this.requestFactory = requestFactory; + this.objectMapper = new ObjectMapper(); this.httpPostRequestCallback = options.getHttpPostRequestCallback(); // TODO Inject this via constructor when implementing a response processor. @@ -138,21 +140,31 @@ HttpRequestFactory getRequestFactory() { private Collection deserialize(String responseBody) throws IOException { byte[] rawBytes = responseBody.getBytes(); - String resultType = options.getProperties().getProperty(RESULT_TYPE, "single-value"); - if (resultType.equals("single-value")) { - return Collections.singletonList(responseBodyDecoder.deserialize(rawBytes)); - } else if (resultType.equals("array")) { + String resultType = + options.getProperties().getProperty(RESULT_TYPE, RESULT_TYPE_SINGLE_VALUE); + if (resultType.equals(RESULT_TYPE_SINGLE_VALUE)) { + RowData deserialized = responseBodyDecoder.deserialize(rawBytes); + // deserialize() returns null if deserialization fails + return deserialized != null + ? Collections.singletonList(deserialized) + : Collections.emptyList(); + } else if (resultType.equals(RESULT_TYPE_ARRAY)) { List rawObjects = objectMapper.readValue(rawBytes, new TypeReference<>() { }); - List result = new ArrayList<>(rawObjects.size()); + List result = new ArrayList<>(); for (JsonNode rawObject : rawObjects) { - result.add(responseBodyDecoder.deserialize(rawObject.toString().getBytes())); + RowData deserialized = + responseBodyDecoder.deserialize(rawObject.toString().getBytes()); + // deserialize() returns null if deserialization fails + if (deserialized != null) { + result.add(deserialized); + } } return result; } else { throw new IllegalStateException( - String.format("Unknown lookup source result type '%'.", resultType)); + String.format("Unknown lookup source result type '%s'.", resultType)); } } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java index 1e79248e..cb3c4dd2 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java @@ -2,7 +2,6 @@ import java.net.http.HttpClient; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.data.RowData; @@ -27,7 +26,6 @@ public JavaNetHttpPollingClient createPollClient( return new JavaNetHttpPollingClient( httpClient, schemaDecoder, - new ObjectMapper(), options, requestFactory ); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java index afae27a6..f5c9f4bf 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java @@ -5,7 +5,6 @@ import java.net.http.HttpRequest; import java.util.*; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; @@ -64,7 +63,6 @@ public void shouldBuildClientWithoutHeaders() { JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( httpClient, decoder, - new ObjectMapper(), options, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -84,7 +82,6 @@ public void shouldBuildGetClientUri() { JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( httpClient, decoder, - new ObjectMapper(), options, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -206,7 +203,6 @@ public void shouldBuildClientWithHeaders() { JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( httpClient, decoder, - new ObjectMapper(), lookupConfig, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), From 913c8b55b219644a6564524b635924dad9adc2ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Mon, 25 Nov 2024 13:39:48 +0100 Subject: [PATCH 3/5] Add test for null values in array --- ...avaNetHttpPollingClientConnectionTest.java | 43 +++++++++++++++++++ .../HttpResult.json | 20 +++++++++ 2 files changed, 63 insertions(+) create mode 100644 src/test/resources/http-array-result-with-nulls/HttpResult.json diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index f6bb491b..95cc0f9f 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -61,6 +61,8 @@ class JavaNetHttpPollingClientConnectionTest { private static final String SAMPLES_FOLDER = "/http/"; private static final String SAMPLES_FOLDER_ARRAY_RESULT = "/http-array-result/"; + private static final String SAMPLES_FOLDER_ARRAY_RESULT_WITH_NULLS = + "/http-array-result-with-nulls/"; private static final String ENDPOINT = "/service"; @@ -233,6 +235,36 @@ void shouldQuery200WithArrayResult() { assertThat(nestedDetailsRow2.getString(0).toString()).isEqualTo("$22,001.99"); } + @Test + void shouldQuery200WithArrayResultWithNulls() { + // GIVEN + this.stubMapping = setUpServerStubArrayResultWithNulls(200); + + Properties properties = new Properties(); + properties.putAll(this.properties); + properties.setProperty(RESULT_TYPE, "array"); + + // WHEN + JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl(), properties); + + // WHEN + Collection results = pollingClient.pull(lookupRowData); + + // THEN + wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); + + assertThat(results).hasSize(1); + + Iterator iterator = results.iterator(); + + RowData firstResult = iterator.next(); + assertThat(firstResult.getArity()).isEqualTo(4); + RowData detailsRow1 = firstResult.getRow(3, 2); + assertThat(detailsRow1.getBoolean(0)).isEqualTo(true); // isActive + RowData nestedDetailsRow1 = detailsRow1.getRow(1, 1); + assertThat(nestedDetailsRow1.getString(0).toString()).isEqualTo("$1,729.34"); + } + @ParameterizedTest @MethodSource("clientErrorCodeConfig") void shouldHandleCodeBasedOnConfiguration( @@ -474,6 +506,17 @@ private StubMapping setUpServerStubArrayResult(int status) { .withBody(readTestFile(SAMPLES_FOLDER_ARRAY_RESULT + "HttpResult.json")))); } + private StubMapping setUpServerStubArrayResultWithNulls(int status) { + return wireMockServer.stubFor( + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn( + aResponse() + .withStatus(status) + .withBody(readTestFile( + SAMPLES_FOLDER_ARRAY_RESULT_WITH_NULLS + "HttpResult.json")))); + } + private StubMapping setupServerStubForBasicAuth() { return wireMockServer.stubFor(get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) .withHeader("Content-Type", equalTo("application/json")) diff --git a/src/test/resources/http-array-result-with-nulls/HttpResult.json b/src/test/resources/http-array-result-with-nulls/HttpResult.json new file mode 100644 index 00000000..f3815672 --- /dev/null +++ b/src/test/resources/http-array-result-with-nulls/HttpResult.json @@ -0,0 +1,20 @@ +[ + { + "id": "COUNTER1", + "uuid": "UUID", + "msg": "Returned HTTP message for parameter PARAM, COUNTER", + "age": 30, + "eyeColor": "green", + "name": "Marva Fischer", + "gender": "female", + "details": { + "isActive": true, + "nestedDetails": { + "index": 0, + "guid": "d81fc542-6b49-4d59-8fb9-d57430d4871d", + "balance": "$1,729.34" + } + } + }, + null +] \ No newline at end of file From 919d2b4ad85e5d54d8e9f6320b0819f5ce8bfbbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 27 Nov 2024 14:16:57 +0100 Subject: [PATCH 4/5] Handle NullNode --- .../table/lookup/JavaNetHttpPollingClient.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 294014c1..e3f043bd 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -154,11 +155,13 @@ private Collection deserialize(String responseBody) throws IOException }); List result = new ArrayList<>(); for (JsonNode rawObject : rawObjects) { - RowData deserialized = - responseBodyDecoder.deserialize(rawObject.toString().getBytes()); - // deserialize() returns null if deserialization fails - if (deserialized != null) { - result.add(deserialized); + if (!(rawObject instanceof NullNode)) { + RowData deserialized = + responseBodyDecoder.deserialize(rawObject.toString().getBytes()); + // deserialize() returns null if deserialization fails + if (deserialized != null) { + result.add(deserialized); + } } } return result; From b3e48cda67c0958d93dd0062f12c4aff543d224c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 27 Nov 2024 14:31:57 +0100 Subject: [PATCH 5/5] Refactoring --- .../lookup/JavaNetHttpPollingClient.java | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index e3f043bd..8a352190 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -8,6 +8,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -144,30 +145,36 @@ private Collection deserialize(String responseBody) throws IOException String resultType = options.getProperties().getProperty(RESULT_TYPE, RESULT_TYPE_SINGLE_VALUE); if (resultType.equals(RESULT_TYPE_SINGLE_VALUE)) { - RowData deserialized = responseBodyDecoder.deserialize(rawBytes); - // deserialize() returns null if deserialization fails - return deserialized != null - ? Collections.singletonList(deserialized) - : Collections.emptyList(); + return deserializeSingleValue(rawBytes); } else if (resultType.equals(RESULT_TYPE_ARRAY)) { - List rawObjects = - objectMapper.readValue(rawBytes, new TypeReference<>() { - }); - List result = new ArrayList<>(); - for (JsonNode rawObject : rawObjects) { - if (!(rawObject instanceof NullNode)) { - RowData deserialized = - responseBodyDecoder.deserialize(rawObject.toString().getBytes()); - // deserialize() returns null if deserialization fails - if (deserialized != null) { - result.add(deserialized); - } - } - } - return result; + return deserializeArray(rawBytes); } else { throw new IllegalStateException( String.format("Unknown lookup source result type '%s'.", resultType)); } } + + private List deserializeSingleValue(byte[] rawBytes) throws IOException { + return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes)) + .map(Collections::singletonList) + .orElse(Collections.emptyList()); + } + + private List deserializeArray(byte[] rawBytes) throws IOException { + List rawObjects = + objectMapper.readValue(rawBytes, new TypeReference<>() { + }); + List result = new ArrayList<>(); + for (JsonNode rawObject : rawObjects) { + if (!(rawObject instanceof NullNode)) { + RowData deserialized = + responseBodyDecoder.deserialize(rawObject.toString().getBytes()); + // deserialize() returns null if deserialization fails + if (deserialized != null) { + result.add(deserialized); + } + } + } + return result; + } }