From 6bf9ebacb23170dbc0bd24682368f9607b89d931 Mon Sep 17 00:00:00 2001 From: andriikorotkov <88329385+andriikorotkov@users.noreply.github.com> Date: Tue, 23 Nov 2021 13:56:25 +0200 Subject: [PATCH] :bug: Updated source-mondodb-v2 performance and updated cursor for timestamp type (#8161) * updated source-mongodb-v2 performance * updated code style * fixed remarks * fixed remarks * fixed remarks * updated strict encrypt source mongodb version * updated source mongodb work with empty collections * updated source mongodb timestamp cursor * updated mongodb source perfomance * fix code style * fix code style * updated tests and documentation * updated tests and documentation * updated tests and documentation * added vudangngoc changes * updated code style * updated code style --- .../b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../io/airbyte/db/mongodb/MongoUtils.java | 88 +++++++++++-------- .../source-mongodb-strict-encrypt/Dockerfile | 2 +- ...godbSourceStrictEncryptAcceptanceTest.java | 13 ++- .../connectors/source-mongodb-v2/Dockerfile | 2 +- .../MongoDbSourceAbstractAcceptanceTest.java | 5 +- .../MongoDbSourceAtlasAcceptanceTest.java | 8 +- ...MongoDbSourceStandaloneAcceptanceTest.java | 8 +- docs/integrations/sources/mongodb-v2.md | 1 + 11 files changed, 82 insertions(+), 51 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json index 92c06b63504b..197b375d60ca 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e", "name": "MongoDb", "dockerRepository": "airbyte/source-mongodb-v2", - "dockerImageTag": "0.1.6", + "dockerImageTag": "0.1.7", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2", "icon": "mongodb.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 110274935ab5..c186c714b686 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -365,7 +365,7 @@ - name: MongoDb sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e dockerRepository: airbyte/source-mongodb-v2 - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2 icon: mongodb.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 255226f7b410..ee8d8c42dd01 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3754,7 +3754,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mongodb-v2:0.1.6" +- dockerImage: "airbyte/source-mongodb-v2:0.1.7" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2" changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2" diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index f40d9d79e35d..7da014d476ad 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -18,13 +18,12 @@ import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.DataTypeUtils; import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.bson.BsonBinary; import org.bson.BsonDateTime; import org.bson.BsonDocument; @@ -49,7 +48,6 @@ public class MongoUtils { private static final String MISSING_TYPE = "missing"; private static final String NULL_TYPE = "null"; - private static final String TYPE = "type"; private static final String AIRBYTE_SUFFIX = "_aibyte_transform"; public static JsonSchemaPrimitive getType(final BsonType dataType) { @@ -76,7 +74,7 @@ public static Object getBsonValue(final BsonType type, final String value) { case INT64 -> new BsonInt64(Long.parseLong(value)); case DOUBLE -> new BsonDouble(Double.parseDouble(value)); case DECIMAL128 -> Decimal128.parse(value); - case TIMESTAMP -> new BsonTimestamp(Long.parseLong(value)); + case TIMESTAMP -> new BsonTimestamp(new DateTime(value).getValue()); case DATE_TIME -> new BsonDateTime(new DateTime(value).getValue()); case OBJECT_ID -> new ObjectId(value); case SYMBOL -> new Symbol(value); @@ -121,7 +119,12 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode private static void transformToStringIfMarked(final ObjectNode jsonNodes, final List columnNames, final String fieldName) { if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) { - jsonNodes.put(fieldName, jsonNodes.get(fieldName).asText()); + JsonNode data = jsonNodes.get(fieldName); + if (data != null) { + jsonNodes.put(fieldName, data.asText()); + } else { + LOGGER.error("Field list out of sync, Document doesn't contain field: {}", fieldName); + } } } @@ -184,9 +187,8 @@ public static Map getUniqueFields(final MongoCollection { var types = getTypes(collection, key); - addUniqueType(result, collection, key, types); + addUniqueType(result, key, types); }); - return result; } @@ -202,42 +204,58 @@ private static List getFieldsName(MongoCollection collection) } } + private static ArrayList getTypes(MongoCollection collection, String name) { + var fieldName = "$" + name; + AggregateIterable output = collection.aggregate(Arrays.asList( + new Document("$project", new Document("_id", 0).append("fieldType", new Document("$type", fieldName))), + new Document("$group", new Document("_id", new Document("fieldType", "$fieldType")) + .append("count", new Document("$sum", 1))))); + var listOfTypes = new ArrayList(); + var cursor = output.cursor(); + while (cursor.hasNext()) { + var type = ((Document) cursor.next().get("_id")).get("fieldType").toString(); + if (!type.equals(MISSING_TYPE) && !type.equals(NULL_TYPE)) { + listOfTypes.add(type); + } + } + if (listOfTypes.isEmpty()) { + listOfTypes.add(NULL_TYPE); + } + return listOfTypes; + } + private static void addUniqueType(Map map, - MongoCollection collection, String fieldName, - Set types) { + List types) { if (types.size() != 1) { map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING); } else { - var document = collection.find(new Document(fieldName, - new Document("$type", types.stream().findFirst().get()))).first(); - var bsonDoc = toBsonDocument(document); - try (final BsonReader reader = new BsonDocumentReader(bsonDoc)) { - reader.readStartDocument(); - while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { - if (reader.readName().equals(fieldName)) { - final var fieldType = reader.getCurrentBsonType(); - map.put(fieldName, fieldType); - } - reader.skipValue(); - } - reader.readEndDocument(); - } + var type = types.get(0); + map.put(fieldName, getBsonTypeByTypeAlias(type)); } } - private static Set getTypes(MongoCollection collection, String fieldName) { - var searchField = "$" + fieldName; - var docTypes = collection.aggregate(List.of( - new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor(); - Set types = new HashSet<>(); - while (docTypes.hasNext()) { - var type = String.valueOf(docTypes.next().get(TYPE)); - if (!MISSING_TYPE.equals(type) && !NULL_TYPE.equals(type)) { - types.add(type); - } - } - return types.isEmpty() ? Set.of(NULL_TYPE) : types; + private static BsonType getBsonTypeByTypeAlias(String typeAlias) { + return switch (typeAlias) { + case "double" -> BsonType.DOUBLE; + case "string" -> BsonType.STRING; + case "objectId" -> BsonType.OBJECT_ID; + case "array" -> BsonType.ARRAY; + case "binData" -> BsonType.BINARY; + case "bool" -> BsonType.BOOLEAN; + case "date" -> BsonType.DATE_TIME; + case "null" -> BsonType.NULL; + case "regex" -> BsonType.REGULAR_EXPRESSION; + case "dbPointer" -> BsonType.DB_POINTER; + case "javascript" -> BsonType.JAVASCRIPT; + case "symbol" -> BsonType.SYMBOL; + case "javascriptWithScope" -> BsonType.JAVASCRIPT_WITH_SCOPE; + case "int" -> BsonType.INT32; + case "timestamp" -> BsonType.TIMESTAMP; + case "long" -> BsonType.INT64; + case "decimal" -> BsonType.DECIMAL128; + default -> BsonType.STRING; + }; } private static BsonDocument toBsonDocument(final Document document) { diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile index 3017fb613eff..82970c9054e5 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java index 9204afdfb9e0..3b73693008ef 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java @@ -90,9 +90,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc final MongoCollection collection = database.createCollection(COLLECTION_NAME); final var doc1 = new Document("id", "0001").append("name", "Test") - .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); - final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); - final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))) + .append("double_test", 100.12).append("int_test", 100); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null) + .append("double_test", 212.11).append("int_test", 302); collection.insertMany(List.of(doc1, doc2, doc3)); } @@ -122,7 +124,10 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { Field.of("id", JsonSchemaPrimitive.STRING), Field.of("name", JsonSchemaPrimitive.STRING), Field.of("test", JsonSchemaPrimitive.STRING), - Field.of("test_array", JsonSchemaPrimitive.ARRAY)) + Field.of("test_array", JsonSchemaPrimitive.ARRAY), + Field.of("empty_test", JsonSchemaPrimitive.STRING), + Field.of("double_test", JsonSchemaPrimitive.NUMBER), + Field.of("int_test", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile index 38e130382134..34093aad65aa 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile +++ b/airbyte-integrations/connectors/source-mongodb-v2/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java index bda0ad6a8214..965d1749922d 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAbstractAcceptanceTest.java @@ -59,7 +59,10 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { Field.of("id", JsonSchemaPrimitive.STRING), Field.of("name", JsonSchemaPrimitive.STRING), Field.of("test", JsonSchemaPrimitive.STRING), - Field.of("test_array", JsonSchemaPrimitive.ARRAY)) + Field.of("test_array", JsonSchemaPrimitive.ARRAY), + Field.of("empty_test", JsonSchemaPrimitive.STRING), + Field.of("double_test", JsonSchemaPrimitive.NUMBER), + Field.of("int_test", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java index 5974760cc2a7..9cb8b5f062b3 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java @@ -57,9 +57,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc final MongoCollection collection = database.createCollection(COLLECTION_NAME); final var doc1 = new Document("id", "0001").append("name", "Test") - .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); - final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); - final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))) + .append("double_test", 100.12).append("int_test", 100); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null) + .append("double_test", 212.11).append("int_test", 302); collection.insertMany(List.of(doc1, doc2, doc3)); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java index e72715457b5f..5c41a244ee04 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java @@ -49,9 +49,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc final MongoCollection collection = database.createCollection(COLLECTION_NAME); final var doc1 = new Document("id", "0001").append("name", "Test") - .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))); - final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value"); - final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null); + .append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo")))) + .append("double_test", 100.12).append("int_test", 100); + final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201); + final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null) + .append("double_test", 212.11).append("int_test", 302); collection.insertMany(List.of(doc1, doc2, doc3)); } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 2906a059736e..6a68dd868085 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.7 | 2021-11-22 | [8161](https://github.com/airbytehq/airbyte/pull/8161) | Updated Performance and updated cursor for timestamp type | | 0.1.5 | 2021-11-17 | [8046](https://github.com/airbytehq/airbyte/pull/8046) | Added milliseconds to convert timestamp to datetime format | | 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance | | 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |