Skip to content

Commit

Permalink
Source MongoDB v2: Fixed nested document parsing (airbytehq#7160)
Browse files Browse the repository at this point in the history
* Fixed nested document parsing: added parsing of nested documents and arrays, added data types test
  • Loading branch information
irynakruk authored Oct 25, 2021
1 parent 27df558 commit dbfd6ca
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@
- sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
name: MongoDb
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
147 changes: 89 additions & 58 deletions airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

package io.airbyte.db.mongodb;

import static org.bson.BsonType.ARRAY;
import static org.bson.BsonType.DOCUMENT;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.client.util.DateTime;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Collections;
Expand All @@ -28,7 +34,6 @@
import org.bson.BsonTimestamp;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.bson.types.Symbol;
Expand All @@ -46,16 +51,16 @@ public static JsonSchemaPrimitive getType(final BsonType dataType) {
return switch (dataType) {
case BOOLEAN -> JsonSchemaPrimitive.BOOLEAN;
case INT32, INT64, DOUBLE, DECIMAL128 -> JsonSchemaPrimitive.NUMBER;
case STRING, SYMBOL, BINARY, DATE_TIME, TIMESTAMP, OBJECT_ID, REGULAR_EXPRESSION, JAVASCRIPT, JAVASCRIPT_WITH_SCOPE -> JsonSchemaPrimitive.STRING;
case STRING, SYMBOL, BINARY, DATE_TIME, TIMESTAMP, OBJECT_ID, REGULAR_EXPRESSION, JAVASCRIPT -> JsonSchemaPrimitive.STRING;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
case DOCUMENT -> JsonSchemaPrimitive.OBJECT;
case DOCUMENT, JAVASCRIPT_WITH_SCOPE -> JsonSchemaPrimitive.OBJECT;
default -> JsonSchemaPrimitive.STRING;
};
}

public static JsonNode toJsonNode(final Document document, final List<String> columnNames) {
final ObjectNode objectNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
readBson(document, objectNode, columnNames);
formatDocument(document, objectNode, columnNames);
return objectNode;
}

Expand All @@ -74,50 +79,93 @@ public static Object getBsonValue(final BsonType type, final String value) {
default -> value;
};
} catch (final Exception e) {
LOGGER.error("Failed to get BsonValue for field type " + type, e.getMessage());
LOGGER.error(String.format("Failed to get BsonValue for field type %s", type), e.getMessage());
return value;
}
}

private static void readBson(final Document document, final ObjectNode o, final List<String> columnNames) {
private static void formatDocument(final Document document, final ObjectNode objectNode, final List<String> columnNames) {
final BsonDocument bsonDocument = toBsonDocument(document);
try (final BsonReader reader = new BsonDocumentReader(bsonDocument)) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
final var fieldName = reader.readName();
final var fieldType = reader.getCurrentBsonType();

switch (fieldType) {
case BOOLEAN -> o.put(fieldName, reader.readBoolean());
case INT32 -> o.put(fieldName, reader.readInt32());
case INT64 -> o.put(fieldName, reader.readInt64());
case DOUBLE -> o.put(fieldName, reader.readDouble());
case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128()));
case TIMESTAMP -> o.put(fieldName, toString(reader.readTimestamp()));
case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601String(reader.readDateTime()));
case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData()));
case SYMBOL -> o.put(fieldName, reader.readSymbol());
case STRING -> o.put(fieldName, reader.readString());
case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId()));
case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript());
case JAVASCRIPT_WITH_SCOPE -> o.put(fieldName, reader.readJavaScriptWithScope());
case REGULAR_EXPRESSION -> o.put(fieldName, toString(reader.readRegularExpression()));
case DOCUMENT -> o.put(fieldName, documentToString(document.get(fieldName), reader));
case ARRAY -> o.put(fieldName, arrayToString(document.get(fieldName), reader));
default -> reader.skipValue();
}

if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
o.put(fieldName, o.get(fieldName).asText());
}
}
reader.readEndDocument();
readDocument(reader, objectNode, columnNames);
} catch (final Exception e) {
LOGGER.error("Exception while parsing BsonDocument: ", e.getMessage());
throw new RuntimeException(e);
}
}

private static ObjectNode readDocument(final BsonReader reader, final ObjectNode jsonNodes, final List<String> columnNames) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
final var fieldName = reader.readName();
final var fieldType = reader.getCurrentBsonType();
if (DOCUMENT.equals(fieldType)) {
// recursion in used to parse inner documents
jsonNodes.set(fieldName, readDocument(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), columnNames));
} else if (ARRAY.equals(fieldType)) {
jsonNodes.set(fieldName, readArray(reader, columnNames, fieldName));
} else {
readField(reader, jsonNodes, columnNames, fieldName, fieldType);
}
transformToStringIfMarked(jsonNodes, columnNames, fieldName);
}
reader.readEndDocument();

return jsonNodes;
}

private static void transformToStringIfMarked(final ObjectNode jsonNodes, final List<String> columnNames, final String fieldName) {
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
jsonNodes.put(fieldName, jsonNodes.get(fieldName).asText());
}
}

private static JsonNode readArray(final BsonReader reader, final List<String> columnNames, final String fieldName) {
reader.readStartArray();
final var elements = Lists.newArrayList();

while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
final var arrayFieldType = reader.getCurrentBsonType();
if (DOCUMENT.equals(arrayFieldType)) {
// recursion is used to read inner doc
elements.add(readDocument(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), columnNames));
} else if (ARRAY.equals(arrayFieldType)) {
// recursion is used to read inner array
elements.add(readArray(reader, columnNames, fieldName));
} else {
final var element = readField(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), columnNames, fieldName, arrayFieldType);
elements.add(element.get(fieldName));
}
}
reader.readEndArray();
return Jsons.jsonNode(MoreIterators.toList(elements.iterator()));
}

private static ObjectNode readField(final BsonReader reader,
final ObjectNode o,
final List<String> columnNames,
final String fieldName,
final BsonType fieldType) {
switch (fieldType) {
case BOOLEAN -> o.put(fieldName, reader.readBoolean());
case INT32 -> o.put(fieldName, reader.readInt32());
case INT64 -> o.put(fieldName, reader.readInt64());
case DOUBLE -> o.put(fieldName, reader.readDouble());
case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128()));
case TIMESTAMP -> o.put(fieldName, DataTypeUtils.toISO8601String(reader.readTimestamp().getValue()));
case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601String(reader.readDateTime()));
case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData()));
case SYMBOL -> o.put(fieldName, reader.readSymbol());
case STRING -> o.put(fieldName, reader.readString());
case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId()));
case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript());
case JAVASCRIPT_WITH_SCOPE -> readJavaScriptWithScope(o, reader, fieldName, columnNames);
case REGULAR_EXPRESSION -> toString(reader.readRegularExpression());
default -> reader.skipValue();
}
return o;
}

/**
* Gets 10.000 documents from collection, gathers all unique fields and its type. In case when one
* field has different types in 2 and more documents, the type is set to String.
Expand Down Expand Up @@ -151,7 +199,7 @@ public static Map<String, BsonType> getUniqueFields(final MongoCollection<Docume

private static BsonDocument toBsonDocument(final Document document) {
try {
return document.toBsonDocument(BsonDocument.class, Bson.DEFAULT_CODEC_REGISTRY);
return document.toBsonDocument();
} catch (final Exception e) {
LOGGER.error("Exception while converting Document to BsonDocument: ", e.getMessage());
throw new RuntimeException(e);
Expand All @@ -170,27 +218,10 @@ private static byte[] toByteArray(final BsonBinary value) {
return value == null ? null : value.getData();
}

// temporary method for MVP
private static String documentToString(final Object obj, final BsonReader reader) {
try {
reader.skipValue();
final Document document = (Document) obj;
return document.toJson();
} catch (final Exception e) {
LOGGER.error("Failed to convert document to a String: ", e.getMessage());
return null;
}
}

// temporary method for MVP
private static String arrayToString(final Object obj, final BsonReader reader) {
try {
reader.skipValue();
return obj.toString();
} catch (final Exception e) {
LOGGER.error("Failed to convert array to a String: ", e.getMessage());
return null;
}
private static void readJavaScriptWithScope(ObjectNode o, BsonReader reader, String fieldName, List<String> columnNames) {
var code = reader.readJavaScriptWithScope();
var scope = readDocument(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), columnNames);
o.set(fieldName, Jsons.jsonNode(ImmutableMap.of("code", code, "scope", scope)));
}

public enum MongoInstanceType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Loading

0 comments on commit dbfd6ca

Please sign in to comment.