From a3963fecb1bf3cd1623b0f6b8a24178ade758df8 Mon Sep 17 00:00:00 2001 From: yugwon-oh Date: Tue, 17 Oct 2023 00:05:01 +0900 Subject: [PATCH] fix-baselibrary --- .../integrations/source/opensearch/OpenSearchSource.java | 6 +++--- .../integrations/source/opensearch/OpenSearchUtils.java | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchSource.java b/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchSource.java index 7ee3ccf2bf4f..76044ece7f1d 100644 --- a/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchSource.java +++ b/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchSource.java @@ -8,11 +8,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.cdk.integrations.BaseConnector; +import io.airbyte.cdk.integrations.base.IntegrationRunner; +import io.airbyte.cdk.integrations.base.Source; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; -import io.airbyte.integrations.BaseConnector; -import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.integrations.base.Source; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; diff --git a/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchUtils.java b/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchUtils.java index daaa4db946a8..8ce3cdc48991 100644 --- a/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchUtils.java +++ b/airbyte-integrations/connectors/source-opensearch/src/main/java/io/airbyte/integrations/source/opensearch/OpenSearchUtils.java @@ -5,8 +5,10 @@ package io.airbyte.integrations.source.opensearch; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.stream.AirbyteStreamUtils; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStream; @@ -14,19 +16,20 @@ import java.util.List; public class OpenSearchUtils { - public static AutoCloseableIterator getDataIterator(final OpenSearchConnection connection, final AirbyteStream stream) { + final AirbyteStreamNameNamespacePair airbyteStream = AirbyteStreamUtils.convertFromAirbyteStream(stream); return AutoCloseableIterators.lazyIterator(() -> { try { List data = connection.getRecords(stream.getName()); - return AutoCloseableIterators.fromIterator(data.iterator()); + return AutoCloseableIterators.fromIterator(data.iterator(), airbyteStream); } catch (final Exception e) { throw new RuntimeException(e); } - }); + }, airbyteStream); } + public static AutoCloseableIterator getMessageIterator(final AutoCloseableIterator recordIterator, final String streamName) { return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()