Skip to content

Commit

Permalink
🎉🐛: Source mongoDB: implement building JsonSchema with 'properties' f…
Browse files Browse the repository at this point in the history
…or fields with type 'object' (airbytehq#12428)

* mongodb: build JsonSchema with 'properties'

* add tests

* bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
yurii-bidiuk and octavia-squidington-iii authored May 5, 2022
1 parent b752d82 commit b3194b2
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5262,7 +5262,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mongodb-v2:0.1.13"
- dockerImage: "airbyte/source-mongodb-v2:0.1.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down
69 changes: 55 additions & 14 deletions airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.TreeNode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -94,6 +95,16 @@ public static Object getBsonValue(final BsonType type, final String value) {
}
}

public static CommonField<BsonType> nodeToCommonField(TreeNode<CommonField<BsonType>> node) {
CommonField<BsonType> field = node.getData();
if (node.hasChildren()) {
List<CommonField<BsonType>> subFields = node.getChildren().stream().map(MongoUtils::nodeToCommonField).toList();
return new CommonField<>(field.getName(), field.getType(), subFields);
} else {
return new CommonField<>(field.getName(), field.getType());
}
}

private static void formatDocument(final Document document, final ObjectNode objectNode, final List<String> columnNames) {
final BsonDocument bsonDocument = toBsonDocument(document);
try (final BsonReader reader = new BsonDocumentReader(bsonDocument)) {
Expand Down Expand Up @@ -188,21 +199,52 @@ private static ObjectNode readField(final BsonReader reader,
* @param collection mongo collection
* @return map of unique fields and its type
*/
public static Map<String, BsonType> getUniqueFields(final MongoCollection<Document> collection) {
public static List<TreeNode<CommonField<BsonType>>> getUniqueFields(final MongoCollection<Document> collection) {
var allkeys = new HashSet<>(getFieldsName(collection));

Map<String, BsonType> result = new HashMap<>();
var allkeys = getFieldsName(collection);
allkeys.forEach(key -> {
return allkeys.stream().map(key -> {
var types = getTypes(collection, key);
addUniqueType(result, key, types);
var type = getUniqueType(types);
var fieldNode = new TreeNode<>(new CommonField<>(transformName(types, key), type));
if (type.equals(DOCUMENT)) {
setSubFields(collection, fieldNode, key);
}
return fieldNode;
}).toList();
}

/**
* If one field has different types in 2 and more documents, the name is transformed to
* 'name_aibyte_transform'.
*
* @param types list with field types
* @param name field name
* @return name
*/
private static String transformName(List<String> types, String name) {
return types.size() != 1 ? name + AIRBYTE_SUFFIX : name;
}

private static void setSubFields(final MongoCollection<Document> collection, TreeNode<CommonField<BsonType>> parentNode, String pathToField) {
var nestedKeys = getFieldsName(collection, pathToField);
nestedKeys.forEach(key -> {
var types = getTypes(collection, pathToField + "." + key);
var nestedType = getUniqueType(types);
var childNode = parentNode.addChild(new CommonField<>(transformName(types, key), nestedType));
if (nestedType.equals(DOCUMENT)) {
setSubFields(collection, childNode, pathToField + "." + key);
}
});
return result;
}

private static List<String> getFieldsName(MongoCollection<Document> collection) {
return getFieldsName(collection, "$ROOT");
}

private static List<String> getFieldsName(MongoCollection<Document> collection, String fieldName) {
AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
new Document("$limit", DISCOVER_LIMIT),
new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$$ROOT"))),
new Document("$project", new Document("arrayofkeyvalue", new Document("$objectToArray", "$" + fieldName))),
new Document("$unwind", "$arrayofkeyvalue"),
new Document("$group", new Document("_id", null).append("allkeys", new Document("$addToSet", "$arrayofkeyvalue.k")))));
if (output.cursor().hasNext()) {
Expand Down Expand Up @@ -233,19 +275,18 @@ private static ArrayList<String> getTypes(MongoCollection<Document> collection,
return listOfTypes;
}

private static void addUniqueType(Map<String, BsonType> map,
String fieldName,
List<String> types) {
private static BsonType getUniqueType(List<String> types) {
if (types.size() != 1) {
map.put(fieldName + AIRBYTE_SUFFIX, BsonType.STRING);
return BsonType.STRING;
} else {
var type = types.get(0);
map.put(fieldName, getBsonTypeByTypeAlias(type));
return getBsonTypeByTypeAlias(type);
}
}

private static BsonType getBsonTypeByTypeAlias(String typeAlias) {
return switch (typeAlias) {
case "object" -> BsonType.DOCUMENT;
case "double" -> BsonType.DOUBLE;
case "string" -> BsonType.STRING;
case "objectId" -> BsonType.OBJECT_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-v2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,7 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat

for (final String collectionName : getAuthorizedCollections(database)) {
final MongoCollection<Document> collection = database.getCollection(collectionName);
final Map<String, BsonType> uniqueFields = MongoUtils.getUniqueFields(collection);

final List<CommonField<BsonType>> fields = uniqueFields.keySet().stream()
.map(field -> new CommonField<>(field, uniqueFields.get(field)))
.collect(Collectors.toList());
final List<CommonField<BsonType>> fields = MongoUtils.getUniqueFields(collection).stream().map(MongoUtils::nodeToCommonField).toList();

// The field name _id is reserved for use as a primary key;
final TableInfo<CommonField<BsonType>> tableInfo = TableInfo.<CommonField<BsonType>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
Field.of("test_array", JsonSchemaType.ARRAY),
Field.of("empty_test", JsonSchemaType.STRING),
Field.of("double_test", JsonSchemaType.NUMBER),
Field.of("int_test", JsonSchemaType.NUMBER))
Field.of("int_test", JsonSchemaType.NUMBER),
Field.of("object_test", JsonSchemaType.OBJECT))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withDefaultCursorField(List.of("_id")))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static io.airbyte.db.mongodb.MongoUtils.MongoInstanceType.ATLAS;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
Expand All @@ -23,6 +30,25 @@ public class MongoDbSourceAtlasAcceptanceTest extends MongoDbSourceAbstractAccep

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");

protected static final List<Field> SUB_FIELDS = List.of(
Field.of("testObject", JsonSchemaType.OBJECT, List.of(
Field.of("name", JsonSchemaType.STRING),
Field.of("testField1", JsonSchemaType.STRING),
Field.of("testInt", JsonSchemaType.NUMBER),
Field.of("thirdLevelDocument", JsonSchemaType.OBJECT, List.of(
Field.of("data", JsonSchemaType.STRING),
Field.of("intData", JsonSchemaType.NUMBER))))));

protected static final List<Field> FIELDS = List.of(
Field.of("id", JsonSchemaType.STRING),
Field.of("_id", JsonSchemaType.STRING),
Field.of("name", JsonSchemaType.STRING),
Field.of("test_aibyte_transform", JsonSchemaType.STRING),
Field.of("test_array", JsonSchemaType.ARRAY),
Field.of("int_test", JsonSchemaType.NUMBER),
Field.of("double_test", JsonSchemaType.NUMBER),
Field.of("object_test", JsonSchemaType.OBJECT, SUB_FIELDS));

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
if (!Files.exists(CREDENTIALS_PATH)) {
Expand Down Expand Up @@ -56,12 +82,15 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
database = new MongoDatabase(connectionString, DATABASE_NAME);

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var objectDocument = new Document("testObject", new Document("name", "subName").append("testField1", "testField1").append("testInt", 10)
.append("thirdLevelDocument", new Document("data", "someData").append("intData", 1)));
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))))
.append("double_test", 100.12).append("int_test", 100);
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201);
.append("double_test", 100.12).append("int_test", 100).append("object_test", objectDocument);
final var doc2 =
new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201).append("object_test", objectDocument);
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null)
.append("double_test", 212.11).append("int_test", 302);
.append("double_test", 212.11).append("int_test", 302).append("object_test", objectDocument);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand All @@ -72,4 +101,14 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
database.close();
}

@Override
protected void verifyCatalog(AirbyteCatalog catalog) {
final List<AirbyteStream> streams = catalog.getStreams();
// only one stream is expected; the schema that should be ignored
// must not be included in the retrieved catalog
assertEquals(1, streams.size());
final AirbyteStream actualStream = streams.get(0);
assertEquals(CatalogHelpers.fieldsToJsonSchema(FIELDS), actualStream.getJsonSchema());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static io.airbyte.db.mongodb.MongoUtils.MongoInstanceType.STANDALONE;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import org.bson.BsonArray;
import org.bson.BsonString;
Expand All @@ -23,6 +30,25 @@ public class MongoDbSourceStandaloneAcceptanceTest extends MongoDbSourceAbstract

private MongoDBContainer mongoDBContainer;

private static final List<Field> SUB_FIELDS = List.of(
Field.of("testObject", JsonSchemaType.OBJECT, List.of(
Field.of("name", JsonSchemaType.STRING),
Field.of("testField1", JsonSchemaType.STRING),
Field.of("testInt", JsonSchemaType.NUMBER),
Field.of("thirdLevelDocument", JsonSchemaType.OBJECT, List.of(
Field.of("data", JsonSchemaType.STRING),
Field.of("intData", JsonSchemaType.NUMBER))))));

private static final List<Field> FIELDS = List.of(
Field.of("id", JsonSchemaType.STRING),
Field.of("_id", JsonSchemaType.STRING),
Field.of("name", JsonSchemaType.STRING),
Field.of("test_aibyte_transform", JsonSchemaType.STRING),
Field.of("test_array", JsonSchemaType.ARRAY),
Field.of("int_test", JsonSchemaType.NUMBER),
Field.of("double_test", JsonSchemaType.NUMBER),
Field.of("object_test", JsonSchemaType.OBJECT, SUB_FIELDS));

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"));
Expand All @@ -48,12 +74,15 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
database = new MongoDatabase(connectionString, DATABASE_NAME);

final MongoCollection<Document> collection = database.createCollection(COLLECTION_NAME);
final var objectDocument = new Document("testObject", new Document("name", "subName").append("testField1", "testField1").append("testInt", 10)
.append("thirdLevelDocument", new Document("data", "someData").append("intData", 1)));
final var doc1 = new Document("id", "0001").append("name", "Test")
.append("test", 10).append("test_array", new BsonArray(List.of(new BsonString("test"), new BsonString("mongo"))))
.append("double_test", 100.12).append("int_test", 100);
final var doc2 = new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201);
.append("double_test", 100.12).append("int_test", 100).append("object_test", objectDocument);
final var doc2 =
new Document("id", "0002").append("name", "Mongo").append("test", "test_value").append("int_test", 201).append("object_test", objectDocument);
final var doc3 = new Document("id", "0003").append("name", "Source").append("test", null)
.append("double_test", 212.11).append("int_test", 302);
.append("double_test", 212.11).append("int_test", 302).append("object_test", objectDocument);

collection.insertMany(List.of(doc1, doc2, doc3));
}
Expand All @@ -64,4 +93,14 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
mongoDBContainer.close();
}

@Override
protected void verifyCatalog(AirbyteCatalog catalog) {
final List<AirbyteStream> streams = catalog.getStreams();
// only one stream is expected; the schema that should be ignored
// must not be included in the retrieved catalog
assertEquals(1, streams.size());
final AirbyteStream actualStream = streams.get(0);
assertEquals(CatalogHelpers.fieldsToJsonSchema(FIELDS), actualStream.getJsonSchema());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ protected List<TableInfo<Field>> getTables(final Database database) throws Excep
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(f -> Field.of(f.getName(), getType(f.getType())))
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(), t.getName());
Expand All @@ -343,6 +343,15 @@ protected List<TableInfo<Field>> getTables(final Database database) throws Excep
.collect(Collectors.toList());
}

protected Field toField(CommonField<DataType> field) {
if (getType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null && !field.getProperties().isEmpty()) {
var properties = field.getProperties().stream().map(this::toField).toList();
return Field.of(field.getName(), getType(field.getType()), properties);
} else {
return Field.of(field.getName(), getType(field.getType()));
}
}

protected void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName, final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,13 @@ public static JsonNode fieldsToJsonSchema(final List<Field> fields) {
.stream()
.collect(Collectors.toMap(
Field::getName,
field -> field.getType().getJsonSchemaTypeMap())))
field -> {
if (isObjectWithSubFields(field)) {
return fieldsToJsonSchema(field.getSubFields());
} else {
return field.getType().getJsonSchemaTypeMap();
}
})))
.build());
}

Expand Down Expand Up @@ -141,4 +147,8 @@ protected static Set<String> getAllFieldNames(final JsonNode node) {
return allFieldNames;
}

private static boolean isObjectWithSubFields(Field field) {
return field.getType() == JsonSchemaType.OBJECT && field.getSubFields() != null && !field.getSubFields().isEmpty();
}

}
Loading

0 comments on commit b3194b2

Please sign in to comment.