Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support source lookup array results #135

Merged
merged 5 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Allow to fetch multiple results from REST API endpoint (`gid.connector.http.source.lookup.result-type`).

## [0.16.0] - 2024-10-18

### Added
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ Because of that, if AsyncIO timer passes, Flink will throw TimeoutException whic
The HTTP request timeouts on the other hand will not cause Job restart. In that case, exception will be logged into application logs.
To avoid job restart on timeouts caused by Lookup queries, the value of `gid.connector.http.source.lookup.request.timeout` should be smaller than `table.exec.async-lookup.timeout`.

#### Lookup multiple results

Typically, join can return zero, one or more results. What is more, there are lots of possible REST API designs and
pagination methods. Currently, the connector supports only two simple approaches (`gid.connector.http.source.lookup.result-type`):

- `single-value` - REST API returns single object.
- `array` - REST API returns array of objects. Pagination is not supported yet.
Copy link
Contributor

@davidradl davidradl Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if I have a kafka event that matches on the lookup key - is the idea that the kafka event will be enriched with an new field that is an array type. I think a SQL example would be useful to show this.

If there are 2 or more lookup key conditions in the query how will this work with the arrays. I think a SQL example would be useful to show this behaviour also.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if I have a kafka event that matches on the lookup key - is the idea that the kafka event will be enriched with an new field that is an array type. I think a SQL example would be useful to show this.

It works differently. For a lookup key there might be multiple matches during lookup. gid.connector.http.source.lookup.result-type=array indicates that the REST API returns array of objects. As a result of lookup join, multiple rows will be returned. Thanks to the flag the connector knows whether the byte array received should be parsed as single object or an array of objects.


Please be informed that the mechanism will be enhanced in the future. See [HTTP-118](https://github.com/getindata/flink-http-connector/issues/118).

### HTTP Sink
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:

Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ under the License.
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.10.1</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
Expand Down Expand Up @@ -153,6 +154,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.getindata.connectors.http.internal;

import java.util.Optional;
import java.util.Collection;

import org.apache.flink.table.data.RowData;

Expand All @@ -14,5 +14,5 @@ public interface PollingClient<T> {
* @param lookupRow A {@link RowData} containing request parameters.
* @return an optional result of data lookup.
*/
Optional<T> pull(RowData lookupRow);
Collection<T> pull(RowData lookupRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public final class HttpConnectorConfigConstants {
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
+ "source.lookup.use-raw-authorization-header";

public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
+ "source.lookup.result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.AccessLevel;
Expand Down Expand Up @@ -66,7 +64,6 @@ public void open(FunctionContext context) throws Exception {
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
Optional<RowData> result = client.pull(keyRow);
return result.map(Collections::singletonList).orElse(Collections.emptyList());
return client.pull(keyRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -19,6 +26,7 @@
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;

/**
* An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}.
Expand All @@ -27,6 +35,9 @@
@Slf4j
public class JavaNetHttpPollingClient implements PollingClient<RowData> {

private static final String RESULT_TYPE_SINGLE_VALUE = "single-value";
private static final String RESULT_TYPE_ARRAY = "array";

private final HttpClient httpClient;

private final HttpStatusCodeChecker statusCodeChecker;
Expand All @@ -35,8 +46,12 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {

private final HttpRequestFactory requestFactory;

private final ObjectMapper objectMapper;

private final HttpPostRequestCallback<HttpLookupSourceRequestEntry> httpPostRequestCallback;

private final HttpLookupConfig options;

public JavaNetHttpPollingClient(
HttpClient httpClient,
DeserializationSchema<RowData> responseBodyDecoder,
Expand All @@ -47,6 +62,7 @@ public JavaNetHttpPollingClient(
this.responseBodyDecoder = responseBodyDecoder;
this.requestFactory = requestFactory;

this.objectMapper = new ObjectMapper();
this.httpPostRequestCallback = options.getHttpPostRequestCallback();

// TODO Inject this via constructor when implementing a response processor.
Expand All @@ -61,21 +77,22 @@ public JavaNetHttpPollingClient(
.build();

this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
this.options = options;
}

@Override
public Optional<RowData> pull(RowData lookupRow) {
public Collection<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata={}.", lookupRow);
log.debug("Collection<RowData> pull with Rowdata={}.", lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
return Optional.empty();
return Collections.emptyList();
}
}

// TODO Add Retry Policy And configure TimeOut from properties
private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
private Collection<RowData> queryAndProcess(RowData lookupData) throws Exception {

HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
HttpResponse<String> response = httpClient.send(
Expand All @@ -85,14 +102,14 @@ private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
return processHttpResponse(response, request);
}

private Optional<RowData> processHttpResponse(
private Collection<RowData> processHttpResponse(
HttpResponse<String> response,
HttpLookupSourceRequestEntry request) throws IOException {

this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());

if (response == null) {
return Optional.empty();
return Collections.emptyList();
}

String responseBody = response.body();
Expand All @@ -102,14 +119,14 @@ private Optional<RowData> processHttpResponse(
"with Server response body [%s] ", statusCode, responseBody));

if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) {
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));
return deserialize(responseBody);
} else {
log.warn(
String.format("Returned Http status code was invalid or returned body was empty. "
+ "Status Code [%s]", statusCode)
);

return Optional.empty();
return Collections.emptyList();
}
}

Expand All @@ -122,4 +139,42 @@ private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) {
HttpRequestFactory getRequestFactory() {
return this.requestFactory;
}

private Collection<RowData> deserialize(String responseBody) throws IOException {
byte[] rawBytes = responseBody.getBytes();
String resultType =
options.getProperties().getProperty(RESULT_TYPE, RESULT_TYPE_SINGLE_VALUE);
if (resultType.equals(RESULT_TYPE_SINGLE_VALUE)) {
return deserializeSingleValue(rawBytes);
} else if (resultType.equals(RESULT_TYPE_ARRAY)) {
return deserializeArray(rawBytes);
} else {
throw new IllegalStateException(
String.format("Unknown lookup source result type '%s'.", resultType));
}
}

private List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
.map(Collections::singletonList)
.orElse(Collections.emptyList());
}

private List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
List<JsonNode> rawObjects =
objectMapper.readValue(rawBytes, new TypeReference<>() {
});
List<RowData> result = new ArrayList<>();
for (JsonNode rawObject : rawObjects) {
if (!(rawObject instanceof NullNode)) {
RowData deserialized =
responseBodyDecoder.deserialize(rawObject.toString().getBytes());
// deserialize() returns null if deserialization fails
if (deserialized != null) {
result.add(deserialized);
}
}
}
return result;
}
}
Loading
Loading