Skip to content

Commit

Permalink
🐛 Updated source-mondodb-v2 performance and updated cursor for timest…
Browse files Browse the repository at this point in the history
…amp type (airbytehq#8161)

* updated source-mongodb-v2 performance

* updated code style

* fixed remarks

* fixed remarks

* fixed remarks

* updated strict encrypt source mongodb version

* updated source mongodb work with empty collections

* updated source mongodb timestamp cursor

* updated mongodb source perfomance

* fix code style

* fix code style

* updated tests and documentation

* updated tests and documentation

* updated tests and documentation

* added vudangngoc changes

* updated code style

* updated code style
  • Loading branch information
andriikorotkov authored Nov 23, 2021
1 parent 7daa6a3 commit 6bf9eba
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3754,7 +3754,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mongodb-v2:0.1.6"
- dockerImage: "airbyte/source-mongodb-v2:0.1.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down
88 changes: 53 additions & 35 deletions airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
Expand All @@ -49,7 +48,6 @@ public class MongoUtils {

private static final String MISSING_TYPE = "missing";
private static final String NULL_TYPE = "null";
private static final String TYPE = "type";
private static final String AIRBYTE_SUFFIX = "_aibyte_transform";

public static JsonSchemaPrimitive getType(final BsonType dataType) {
Expand All @@ -76,7 +74,7 @@ public static Object getBsonValue(final BsonType type, final String value) {
case INT64 -> new BsonInt64(Long.parseLong(value));
case DOUBLE -> new BsonDouble(Double.parseDouble(value));
case DECIMAL128 -> Decimal128.parse(value);
case TIMESTAMP -> new BsonTimestamp(Long.parseLong(value));
case TIMESTAMP -> new BsonTimestamp(new DateTime(value).getValue());
case DATE_TIME -> new BsonDateTime(new DateTime(value).getValue());
case OBJECT_ID -> new ObjectId(value);
case SYMBOL -> new Symbol(value);
Expand Down Expand Up @@ -121,7 +119,12 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode

private static void transformToStringIfMarked(final ObjectNode jsonNodes, final List<String> columnNames, final String fieldName) {
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
jsonNodes.put(fieldName, jsonNodes.get(fieldName).asText());
JsonNode data = jsonNodes.get(fieldName);
if (data != null) {
jsonNodes.put(fieldName, data.asText());
} else {
LOGGER.error("Field list out of sync, Document doesn't contain field: {}", fieldName);
}
}
}

Expand Down Expand Up @@ -184,9 +187,8 @@ public static Map<String, BsonType> getUniqueFields(final MongoCollection<Docume
var allkeys = getFieldsName(collection);
allkeys.forEach(key -> {
var types = getTypes(collection, key);
addUniqueType(result, collection, key, types);
addUniqueType(result, key, types);
});

return result;
}

Expand All @@ -202,42 +204,58 @@ private static List<String> getFieldsName(MongoCollection<Document> collection)
}
}

private static ArrayList<String> getTypes(MongoCollection<Document> collection, String name) {
var fieldName = "$" + name;
AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
new Document("$project", new Document("_id", 0).append("fieldType", new Document("$type", fieldName))),
new Document("$group", new Document("_id", new Document("fieldType", "$fieldType"))
.append("count", new Document("$sum", 1)))));
var listOfTypes = new ArrayList<String>();
var cursor = output.cursor();
while (cursor.hasNext()) {
var type = ((Document) cursor.next().get("_id")).get("fieldType").toString();
if (!type.equals(MISSING_TYPE) && !type.equals(NULL_TYPE)) {
listOfTypes.add(type);
}
}
if (listOfTypes.isEmpty()) {
listOfTypes.add(NULL_TYPE);
}
return listOfTypes;
}

private static void addUniqueType(Map<String, BsonType> map,
MongoCollection<Document> collection,
String fieldName,
Set<String> types) {
List<String> types) {
if (types.size() != 1) {
map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING);
} else {
var document = collection.find(new Document(fieldName,
new Document("$type", types.stream().findFirst().get()))).first();
var bsonDoc = toBsonDocument(document);
try (final BsonReader reader = new BsonDocumentReader(bsonDoc)) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
if (reader.readName().equals(fieldName)) {
final var fieldType = reader.getCurrentBsonType();
map.put(fieldName, fieldType);
}
reader.skipValue();
}
reader.readEndDocument();
}
var type = types.get(0);
map.put(fieldName, getBsonTypeByTypeAlias(type));
}
}

private static Set<String> getTypes(MongoCollection<Document> collection, String fieldName) {
var searchField = "$" + fieldName;
var docTypes = collection.aggregate(List.of(
new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor();
Set<String> types = new HashSet<>();
while (docTypes.hasNext()) {
var type = String.valueOf(docTypes.next().get(TYPE));
if (!MISSING_TYPE.equals(type) && !NULL_TYPE.equals(type)) {
types.add(type);
}
}
return types.isEmpty() ? Set.of(NULL_TYPE) : types;
private static BsonType getBsonTypeByTypeAlias(String typeAlias) {
return switch (typeAlias) {
case "double" -> BsonType.DOUBLE;
case "string" -> BsonType.STRING;
case "objectId" -> BsonType.OBJECT_ID;
case "array" -> BsonType.ARRAY;
case "binData" -> BsonType.BINARY;
case "bool" -> BsonType.BOOLEAN;
case "date" -> BsonType.DATE_TIME;
case "null" -> BsonType.NULL;
case "regex" -> BsonType.REGULAR_EXPRESSION;
case "dbPointer" -> BsonType.DB_POINTER;
case "javascript" -> BsonType.JAVASCRIPT;
case "symbol" -> BsonType.SYMBOL;
case "javascriptWithScope" -> BsonType.JAVASCRIPT_WITH_SCOPE;
case "int" -> BsonType.INT32;
case "timestamp" -> BsonType.TIMESTAMP;
case "long" -> BsonType.INT64;
case "decimal" -> BsonType.DECIMAL128;
default -> BsonType.STRING;
};
}

private static BsonDocument toBsonDocument(final Document document) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))));
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value");
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null);
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))))
.append("double_test", 100.12).append("int_test", 100);
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201);
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null)
.append("double_test", 212.11).append("int_test", 302);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand Down Expand Up @@ -122,7 +124,10 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("test", JsonSchemaPrimitive.STRING),
Field.of("test_array", JsonSchemaPrimitive.ARRAY))
Field.of("test_array", JsonSchemaPrimitive.ARRAY),
Field.of("empty_test", JsonSchemaPrimitive.STRING),
Field.of("double_test", JsonSchemaPrimitive.NUMBER),
Field.of("int_test", JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withDefaultCursorField(List.of("_id")))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("test", JsonSchemaPrimitive.STRING),
Field.of("test_array", JsonSchemaPrimitive.ARRAY))
Field.of("test_array", JsonSchemaPrimitive.ARRAY),
Field.of("empty_test", JsonSchemaPrimitive.STRING),
Field.of("double_test", JsonSchemaPrimitive.NUMBER),
Field.of("int_test", JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withDefaultCursorField(List.of("_id")))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))));
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value");
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null);
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))))
.append("double_test", 100.12).append("int_test", 100);
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201);
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null)
.append("double_test", 212.11).append("int_test", 302);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))));
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value");
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null);
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))))
.append("double_test", 100.12).append("int_test", 100);
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201);
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null)
.append("double_test", 212.11).append("int_test", 302);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.7 | 2021-11-22 | [8161](https://github.com/airbytehq/airbyte/pull/8161) | Updated Performance and updated cursor for timestamp type |
| 0.1.5 | 2021-11-17 | [8046](https://github.com/airbytehq/airbyte/pull/8046) | Added milliseconds to convert timestamp to datetime format |
| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance |
| 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |
Expand Down

0 comments on commit 6bf9eba

Please sign in to comment.