Skip to content

Commit

Permalink
🐛 Updated source-mongodb-v2 performance (airbytehq#7982)
Browse files Browse the repository at this point in the history
* updated source-mongodb-v2 performance

* updated code style

* fixed remarks

* fixed remarks

* fixed remarks

* updated strict encrypt source mongodb version
  • Loading branch information
andriikorotkov authored Nov 18, 2021
1 parent 2c4b1f5 commit 8e0e0b4
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
79 changes: 59 additions & 20 deletions airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import com.google.api.client.util.DateTime;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
Expand All @@ -44,7 +47,9 @@ public class MongoUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class);

private static final int DISCOVERY_BATCH_SIZE = 10000;
private static final String MISSING_TYPE = "missing";
private static final String NULL_TYPE = "null";
private static final String TYPE = "type";
private static final String AIRBYTE_SUFFIX = "_aibyte_transform";

public static JsonSchemaPrimitive getType(final BsonType dataType) {
Expand Down Expand Up @@ -89,7 +94,7 @@ private static void formatDocument(final Document document, final ObjectNode obj
try (final BsonReader reader = new BsonDocumentReader(bsonDocument)) {
readDocument(reader, objectNode, columnNames);
} catch (final Exception e) {
LOGGER.error("Exception while parsing BsonDocument: ", e.getMessage());
LOGGER.error("Exception while parsing BsonDocument: {}", e.getMessage());
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -174,34 +179,68 @@ private static ObjectNode readField(final BsonReader reader,
* @return map of unique fields and its type
*/
public static Map<String, BsonType> getUniqueFields(final MongoCollection<Document> collection) {
final Map<String, BsonType> uniqueFields = new HashMap<>();
try (final MongoCursor<Document> cursor = collection.find().batchSize(DISCOVERY_BATCH_SIZE).iterator()) {
while (cursor.hasNext()) {
final BsonDocument document = toBsonDocument(cursor.next());
try (final BsonReader reader = new BsonDocumentReader(document)) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
final var fieldName = reader.readName();

Map<String, BsonType> result = new HashMap<>();
var allkeys = getFieldsName(collection);
allkeys.forEach(key -> {
var types = getTypes(collection, key);
addUniqueType(result, collection, key, types);
});

return result;
}

private static List<String> getFieldsName(MongoCollection<Document> collection) {
AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))),
new Document("$unwind", "$arrayofkeyvalue"),
new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k")))));
return (List) output.cursor().next().get("allkeys");
}

private static void addUniqueType(Map<String, BsonType> map,
MongoCollection<Document> collection,
String fieldName,
Set<String> types) {
if (types.size() != 1) {
map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING);
} else {
var document = collection.find(new Document(fieldName,
new Document("$type", types.stream().findFirst().get()))).first();
var bsonDoc = toBsonDocument(document);
try (final BsonReader reader = new BsonDocumentReader(bsonDoc)) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
if (reader.readName().equals(fieldName)) {
final var fieldType = reader.getCurrentBsonType();
reader.skipValue();
if (uniqueFields.containsKey(fieldName) && fieldType.compareTo(uniqueFields.get(fieldName)) != 0) {
uniqueFields.replace(fieldName + AIRBYTE_SUFFIX, BsonType.STRING);
} else {
uniqueFields.put(fieldName, fieldType);
}
map.put(fieldName, fieldType);
}
reader.readEndDocument();
reader.skipValue();
}
reader.readEndDocument();
}
}
}

private static Set<String> getTypes(MongoCollection<Document> collection, String fieldName) {
var searchField = "$" + fieldName;
var docTypes = collection.aggregate(List.of(
new Document("$project", new Document(TYPE, new Document("$type", searchField))))).cursor();
Set<String> types = new HashSet<>();
while (docTypes.hasNext()) {
var type = String.valueOf(docTypes.next().get(TYPE));
if (!MISSING_TYPE.equals(type) && !NULL_TYPE.equals(type)) {
types.add(type);
}
}
return uniqueFields;
return types.isEmpty() ? Set.of(NULL_TYPE) : types;
}

private static BsonDocument toBsonDocument(final Document document) {
try {
return document.toBsonDocument();
} catch (final Exception e) {
LOGGER.error("Exception while converting Document to BsonDocument: ", e.getMessage());
LOGGER.error("Exception while converting Document to BsonDocument: {}", e.getMessage());
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.bson.BsonArray;
import org.bson.BsonString;
import org.bson.Document;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -77,7 +80,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("auth_source", "admin")
.build());

final String connectionString = String.format("mongodb://%s:%s@%s:%s/%s?authSource=admin&ssl=true",
final String connectionString = String.format("mongodb://%s:%s@%s:%s/%s?authSource=admin&directConnection=false&ssl=true",
config.get("user").asText(),
config.get("password").asText(),
config.get("instance_type").get("host").asText(),
Expand All @@ -87,9 +90,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
database = new MongoDatabase(connectionString, DATABASE_NAME);

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var doc1 = new Document("id", "0001").append("name", "Test");
final var doc2 = new Document("id", "0002").append("name", "Mongo");
final var doc3 = new Document("id", "0003").append("name", "Source");
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))));
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value");
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand Down Expand Up @@ -117,7 +121,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
DATABASE_NAME + "." + COLLECTION_NAME,
Field.of("_id", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("test", JsonSchemaPrimitive.STRING),
Field.of("test_array", JsonSchemaPrimitive.ARRAY))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withDefaultCursorField(List.of("_id")))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
DATABASE_NAME + "." + COLLECTION_NAME,
Field.of("_id", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("test", JsonSchemaPrimitive.STRING),
Field.of("test_array", JsonSchemaPrimitive.ARRAY))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withDefaultCursorField(List.of("_id")))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.bson.BsonArray;
import org.bson.BsonString;
import org.bson.Document;

public class MongoDbSourceAtlasAcceptanceTest extends MongoDbSourceAbstractAcceptanceTest {
Expand Down Expand Up @@ -54,9 +56,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
database = new MongoDatabase(connectionString, DATABASE_NAME);

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var doc1 = new Document("id", "0001").append("name", "Test");
final var doc2 = new Document("id", "0002").append("name", "Mongo");
final var doc3 = new Document("id", "0003").append("name", "Source");
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))));
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value");
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import java.util.List;
import org.bson.BsonArray;
import org.bson.BsonString;
import org.bson.Document;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -46,9 +48,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
database = new MongoDatabase(connectionString, DATABASE_NAME);

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var doc1 = new Document("id", "0001").append("name", "Test");
final var doc2 = new Document("id", "0002").append("name", "Mongo");
final var doc3 = new Document("id", "0003").append("name", "Source");
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))));
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value");
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance |
| 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |
| 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections |
| 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |
Expand Down

0 comments on commit 8e0e0b4

Please sign in to comment.