From 8e0e0b4f7998a2b6c70fa6d395a88a26a1169094 Mon Sep 17 00:00:00 2001 From: andriikorotkov <88329385+andriikorotkov@users.noreply.github.com> Date: Thu, 18 Nov 2021 12:05:58 +0200 Subject: [PATCH] :bug: Updated source-mongodb-v2 performance (#7982) * updated source-mongodb-v2 performance * updated code style * fixed remarks * fixed remarks * fixed remarks * updated strict encrypt source mongodb version --- .../b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../io/airbyte/db/mongodb/MongoUtils.java | 79 ++++++++++++++----- .../source-mongodb-strict-encrypt/Dockerfile | 2 +- ...godbSourceStrictEncryptAcceptanceTest.java | 16 ++-- .../connectors/source-mongodb-v2/Dockerfile | 2 +- .../MongoDbSourceAbstractAcceptanceTest.java | 4 +- .../MongoDbSourceAtlasAcceptanceTest.java | 9 ++- ...MongoDbSourceStandaloneAcceptanceTest.java | 9 ++- docs/integrations/sources/mongodb-v2.md | 1 + 10 files changed, 90 insertions(+), 36 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json index 25b676bac981..763684f7fa83 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e", "name": "MongoDb", "dockerRepository": "airbyte/source-mongodb-v2", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.4", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2", "icon": "mongodb.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c3e8317dc3e7..48bf6af69806 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -359,7 +359,7 @@ - name: MongoDb sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e dockerRepository: airbyte/source-mongodb-v2 - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2 icon: mongodb.svg sourceType: database diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 8f93e26251be..c969f7fddd2c 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -12,16 +12,19 @@ import com.google.api.client.util.DateTime; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.mongodb.client.AggregateIterable; import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.DataTypeUtils; import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.bson.BsonBinary; import org.bson.BsonDateTime; import org.bson.BsonDocument; @@ -44,7 +47,9 @@ public class MongoUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class); - private static final int DISCOVERY_BATCH_SIZE = 10000; + private static final String MISSING_TYPE = "missing"; + private static final String NULL_TYPE = "null"; + private static final String TYPE = "type"; private static final String AIRBYTE_SUFFIX = "_aibyte_transform"; public static JsonSchemaPrimitive getType(final BsonType dataType) { @@ -89,7 +94,7 @@ private static void formatDocument(final Document document, final ObjectNode obj try (final BsonReader reader = new BsonDocumentReader(bsonDocument)) { readDocument(reader, objectNode, columnNames); } catch (final Exception e) { - LOGGER.error("Exception while parsing BsonDocument: ", e.getMessage()); + LOGGER.error("Exception while parsing BsonDocument: {}", e.getMessage()); throw new RuntimeException(e); } } @@ -174,34 +179,68 @@ private static ObjectNode readField(final BsonReader reader, * @return map of unique fields and its type */ public static Map getUniqueFields(final MongoCollection collection) { - final Map uniqueFields = new HashMap<>(); - try (final MongoCursor cursor = collection.find().batchSize(DISCOVERY_BATCH_SIZE).iterator()) { - while (cursor.hasNext()) { - final BsonDocument document = toBsonDocument(cursor.next()); - try (final BsonReader reader = new BsonDocumentReader(document)) { - reader.readStartDocument(); - while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { - final var fieldName = reader.readName(); + + Map result = new HashMap<>(); + var allkeys = getFieldsName(collection); + allkeys.forEach(key -> { + var types = getTypes(collection, key); + addUniqueType(result, collection, key, types); + }); + + return result; + } + + private static List getFieldsName(MongoCollection collection) { + AggregateIterable output = collection.aggregate(Arrays.asList( + new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))), + new Document("$unwind", "$arrayofkeyvalue"), + new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k"))))); + return (List) output.cursor().next().get("allkeys"); + } + + private static void addUniqueType(Map map, + MongoCollection collection, + String fieldName, + Set types) { + if (types.size() != 1) { + map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); + } else { + var document = collection.find(new Document(fieldName, + new Document("$type", types.stream().findFirst().get()))).first(); + var bsonDoc = toBsonDocument(document); + try (final BsonReader reader = new BsonDocumentReader(bsonDoc)) { + reader.readStartDocument(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + if (reader.readName().equals(fieldName)) { final var fieldType = reader.getCurrentBsonType(); - reader.skipValue(); - if (uniqueFields.containsKey(fieldName) && fieldType.compareTo(uniqueFields.get(fieldName)) != 0) { - uniqueFields.replace(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); - } else { - uniqueFields.put(fieldName, fieldType); - } + map.put(fieldName, fieldType); } - reader.readEndDocument(); + reader.skipValue(); } + reader.readEndDocument(); + } + } + } + + private static Set getTypes(MongoCollection collection, String fieldName) { + var searchField = "$" + fieldName; + var docTypes = collection.aggregate(List.of( + new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); + Set types = new HashSet<>(); + while (docTypes.hasNext()) { + var type = String.valueOf(docTypes.next().get(TYPE)); + if (!MISSING_TYPE.equals(type) && !NULL_TYPE.equals(type)) { + types.add(type); } } - return uniqueFields; + return types.isEmpty() ? Set.of(NULL_TYPE) : types; } private static BsonDocument toBsonDocument(final Document document) { try { return document.toBsonDocument(); } catch (final Exception e) { - LOGGER.error("Exception while converting Document to BsonDocument: ", e.getMessage()); + LOGGER.error("Exception while converting Document to BsonDocument: {}", e.getMessage()); throw new RuntimeException(e); } } diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile index f0188fa585ec..87ef2134436d 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java index c7cda18cc7d7..a4f2176523f0 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java @@ -29,6 +29,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; + +import org.bson.BsonArray; +import org.bson.BsonString; import org.bson.Document; import org.junit.jupiter.api.Test; @@ -77,7 +80,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put("auth_source", "admin") .build()); - final String connectionString = String.format("mongodb://%s:%s@%s:%s/%s?authSource=admin&ssl=true", + final String connectionString = String.format("mongodb://%s:%s@%s:%s/%s?authSource=admin&directConnection=false&ssl=true", config.get("user").asText(), config.get("password").asText(), config.get("instance_type").get("host").asText(), @@ -87,9 +90,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc database = new MongoDatabase(connectionString, DATABASE_NAME); final MongoCollection collection = database.createCollection(COLLECTION_NAME); - final var doc1 = new Document("id", "0001").append("name", "Test"); - final var doc2 = new Document("id", "0002").append("name", "Mongo"); - final var doc3 = new Document("id", "0003").append("name", "Source"); + final var doc1 = new Document("id", "0001").append("name", "Test") + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); collection.insertMany(List.of(doc1, doc2, doc3)); } @@ -117,7 +121,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { DATABASE_NAME + "." + COLLECTION_NAME, Field.of("_id", JsonSchemaPrimitive.STRING), Field.of("id", JsonSchemaPrimitive.STRING), - Field.of("name", JsonSchemaPrimitive.STRING)) + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("test", JsonSchemaPrimitive.STRING), + Field.of("test_array", JsonSchemaPrimitive.ARRAY)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile index 4d824fd24a30..2781dc6458b1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java index 6fcf627a59e3..bda0ad6a8214 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java @@ -57,7 +57,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { DATABASE_NAME + "." + COLLECTION_NAME, Field.of("_id", JsonSchemaPrimitive.STRING), Field.of("id", JsonSchemaPrimitive.STRING), - Field.of("name", JsonSchemaPrimitive.STRING)) + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("test", JsonSchemaPrimitive.STRING), + Field.of("test_array", JsonSchemaPrimitive.ARRAY)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java index 52f7b7850cdb..5974760cc2a7 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java @@ -15,6 +15,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import org.bson.BsonArray; +import org.bson.BsonString; import org.bson.Document; public class MongoDbSourceAtlasAcceptanceTest extends MongoDbSourceAbstractAcceptanceTest { @@ -54,9 +56,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc database = new MongoDatabase(connectionString, DATABASE_NAME); final MongoCollection collection = database.createCollection(COLLECTION_NAME); - final var doc1 = new Document("id", "0001").append("name", "Test"); - final var doc2 = new Document("id", "0002").append("name", "Mongo"); - final var doc3 = new Document("id", "0003").append("name", "Source"); + final var doc1 = new Document("id", "0001").append("name", "Test") + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); collection.insertMany(List.of(doc1, doc2, doc3)); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java index 1f1ae4ae2b59..e72715457b5f 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java @@ -13,6 +13,8 @@ import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import java.util.List; +import org.bson.BsonArray; +import org.bson.BsonString; import org.bson.Document; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.utility.DockerImageName; @@ -46,9 +48,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc database = new MongoDatabase(connectionString, DATABASE_NAME); final MongoCollection collection = database.createCollection(COLLECTION_NAME); - final var doc1 = new Document("id", "0001").append("name", "Test"); - final var doc2 = new Document("id", "0002").append("name", "Mongo"); - final var doc3 = new Document("id", "0003").append("name", "Source"); + final var doc1 = new Document("id", "0001").append("name", "Test") + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); collection.insertMany(List.of(doc1, doc2, doc3)); } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 9a066a0f27c6..2490ee696469 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance | | 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing | | 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections | | 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |