Skip to content

Commit

Permalink
Source MongoDb: added support via TLS/SSL (airbytehq#6364)
Browse files Browse the repository at this point in the history
* updated ssl configs;
fixed replica connection;
fixed incremental read;

* added MongoDbSourceAtlasAcceptanceTest with ssl enabled

* updated docs, moved TLS option for standalone instance type, enabled it by default for other types

* Update mongodb-v2.md

* updated README.md

* updated spec.json

* Code review changes

* updated ci_credentials.sh
  • Loading branch information
irynakruk authored Sep 24, 2021
1 parent 3378ba4 commit 5551698
Show file tree
Hide file tree
Showing 14 changed files with 273 additions and 82 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ jobs:
DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }}
MSSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MSSQL_SSH_KEY_TEST_CREDS }}
MSSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MSSQL_SSH_PWD_TEST_CREDS }}
MONGODB_TEST_CREDS: ${{ secrets.MONGODB_TEST_CREDS }}
- run: |
echo "$SPEC_CACHE_SERVICE_ACCOUNT_KEY" > spec_cache_key_file.json && docker login -u airbytebot -p ${DOCKER_PASSWORD}
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }} --publish_spec_to_cache
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ jobs:
DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }}
MSSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MSSQL_SSH_KEY_TEST_CREDS }}
MSSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MSSQL_SSH_PWD_TEST_CREDS }}
MONGODB_TEST_CREDS: ${{ secrets.MONGODB_TEST_CREDS }}
- run: |
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
name: test ${{ github.event.inputs.connector }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@
- sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
name: MongoDb
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
29 changes: 29 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.client.util.DateTime;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
Expand All @@ -36,13 +37,21 @@
import java.util.List;
import java.util.Map;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.bson.types.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,6 +79,26 @@ public static JsonNode toJsonNode(final Document document, final List<String> co
return objectNode;
}

public static Object getBsonValue(BsonType type, String value) {
try {
return switch (type) {
case INT32 -> new BsonInt32(Integer.parseInt(value));
case INT64 -> new BsonInt64(Long.parseLong(value));
case DOUBLE -> new BsonDouble(Double.parseDouble(value));
case DECIMAL128 -> Decimal128.parse(value);
case TIMESTAMP -> new BsonTimestamp(Long.parseLong(value));
case DATE_TIME -> new BsonDateTime(new DateTime(value).getValue());
case OBJECT_ID -> new ObjectId(value);
case SYMBOL -> new Symbol(value);
case STRING -> new BsonString(value);
default -> value;
};
} catch (Exception e) {
LOGGER.error("Failed to get BsonValue for field type " + type, e.getMessage());
return value;
}
}

private static void readBson(final Document document, final ObjectNode o, final List<String> columnNames) {
final BsonDocument bsonDocument = toBsonDocument(document);
try (BsonReader reader = new BsonDocumentReader(bsonDocument)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-mongodb-v2
25 changes: 24 additions & 1 deletion airbyte-integrations/connectors/source-mongodb-v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,30 @@ the Dockerfile.
We use `JUnit` for Java tests.

### Test Configuration
No specific configuration needed for testing, MongoDb Test Container is used.

No specific configuration needed for testing Standalone MongoDb instance, MongoDb Test Container is used.
In order to test the MongoDb Atlas or Replica set, you need to provide configuration parameters.

## Community Contributor

As a community contributor, you will need to have an Atlas cluster to test MongoDb source.

1. Create `secrets/credentials.json` file
1. Insert below json to the file with your configuration
```
{
"database": "database_name",
"user": "user",
"password": "password",
"cluster_url": "cluster_url"
}
```
## Airbyte Employee
1. Access the `MONGODB_TEST_CREDS` secret on LastPass
1. Create a file with the contents at `secrets/credentials.json`
#### Acceptance Tests
To run acceptance and custom integration tests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class MongoDbSource extends AbstractDbSource<BsonType, MongoDatabase> {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/?";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/?replicaSet=%s&";
private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String INSTANCE_TYPE = "instance_type";
Expand All @@ -90,26 +90,30 @@ public JsonNode toDatabaseConfig(JsonNode config) {
? String.format("%s:%s@", config.get(USER).asText(), config.get(PASSWORD).asText())
: StringUtils.EMPTY;

StringBuilder connectionStrBuilder = new StringBuilder();
JsonNode instanceConfig = config.get(INSTANCE_TYPE);
String instanceConnectUrl;
if (instanceConfig.has(HOST) && instanceConfig.has(PORT)) {
instanceConnectUrl = String.format(MONGODB_SERVER_URL,
credentials, instanceConfig.get(HOST).asText(), instanceConfig.get(PORT).asText());
// Standalone MongoDb Instance
var tls = config.has(TLS) ? config.get(TLS).asBoolean() : instanceConfig.get(TLS).asBoolean(); // for backward compatibility
connectionStrBuilder.append(String.format(MONGODB_SERVER_URL, credentials, instanceConfig.get(HOST).asText(), instanceConfig.get(PORT).asText(),
config.get(DATABASE).asText(), config.get(AUTH_SOURCE).asText(), tls));
} else if (instanceConfig.has(CLUSTER_URL)) {
instanceConnectUrl = String.format(MONGODB_CLUSTER_URL,
credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(DATABASE).asText());
// MongoDB Atlas
connectionStrBuilder.append(
String.format(MONGODB_CLUSTER_URL, credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(DATABASE).asText(),
config.get(AUTH_SOURCE).asText()));
} else {
instanceConnectUrl = String.format(MONGODB_REPLICA_URL,
credentials, instanceConfig.get(SERVER_ADDRESSES).asText(), config.get(REPLICA_SET).asText());
}

String options = "authSource=".concat(config.get(AUTH_SOURCE).asText());
if (config.get(TLS).asBoolean()) {
options.concat("&tls=true");
// Replica Set & Shard
connectionStrBuilder.append(
String.format(MONGODB_REPLICA_URL, credentials, instanceConfig.get(SERVER_ADDRESSES).asText(), config.get(DATABASE).asText(),
config.get(AUTH_SOURCE).asText()));
if (instanceConfig.has(REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(REPLICA_SET).asText()));
}
}

return Jsons.jsonNode(ImmutableMap.builder()
.put("connectionString", instanceConnectUrl + options)
.put("connectionString", connectionStrBuilder.toString())
.put("database", config.get(DATABASE).asText())
.build());
}
Expand Down Expand Up @@ -207,7 +211,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(MongoDatabase datab
String cursorField,
BsonType cursorFieldType,
String cursor) {
Bson greaterComparison = gt(cursorField, cursor);
Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursor));
return queryTable(database, columnNames, tableName, greaterComparison);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
"title": "MongoDb Source Spec",
"type": "object",
"required": ["database"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"instance_type": {
"type": "object",
"title": "MongoDb instance type",
"description": "MongoDb instance to connect to.",
"description": "MongoDb instance to connect to. For MongoDB Atlas and Replica Set TLS connection is used by default.",
"order": 0,
"oneOf": [
{
Expand All @@ -34,13 +34,20 @@
"default": 27017,
"examples": ["27017"],
"order": 1
},
"tls": {
"title": "TLS connection",
"type": "boolean",
"description": "Indicates whether TLS encryption protocol will be used to connect to MongoDB. It is recommended to use TLS connection if possible. For more information see <a href=\"https://docs.airbyte.io/integrations/sources/mongodb-v2\">documentation</a>.",
"default": false,
"order": 2
}
}
},
{
"title": "Replica Set",
"additionalProperties": false,
"required": ["server_addresses", "replica_set"],
"required": ["server_addresses"],
"properties": {
"server_addresses": {
"title": "Server addresses",
Expand Down Expand Up @@ -98,13 +105,6 @@
"default": "admin",
"examples": ["admin"],
"order": 4
},
"tls": {
"title": "TLS connection",
"type": "boolean",
"description": "If this switch is enabled, TLS connections will be used to connect to MongoDB.",
"default": false,
"order": 5
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -44,15 +41,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.bson.Document;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.utility.DockerImageName;

public class MongoDbSourceAcceptanceTest extends SourceAcceptanceTest {
public abstract class MongoDbSourceAbstractAcceptanceTest extends SourceAcceptanceTest {

private MongoDBContainer mongoDBContainer;
private JsonNode config;
private MongoDatabase database;
protected static final String DATABASE_NAME = "test";
protected static final String COLLECTION_NAME = "acceptance_test";

protected JsonNode config;
protected MongoDatabase database;

@Override
protected String getImageName() {
Expand All @@ -64,43 +60,6 @@ protected JsonNode getConfig() throws Exception {
return config;
}

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"));
mongoDBContainer.start();

final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("host", mongoDBContainer.getHost())
.put("port", mongoDBContainer.getFirstMappedPort())
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
.put("instance_type", instanceConfig)
.put("database", "test")
.put("auth_source", "admin")
.put("tls", false)
.build());

String connectionString = String.format("mongodb://%s:%s/",
mongoDBContainer.getHost(),
mongoDBContainer.getFirstMappedPort());

database = new MongoDatabase(connectionString, "test");

MongoCollection<Document> collection = database.createCollection("acceptance_test");
var doc1 = new Document("id", "0001").append("name", "Test");
var doc2 = new Document("id", "0002").append("name", "Mongo");
var doc3 = new Document("id", "0003").append("name", "Source");

collection.insertMany(List.of(doc1, doc2, doc3));
}

@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
database.close();
mongoDBContainer.close();
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
Expand All @@ -115,7 +74,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withCursorField(List.of("_id"))
.withStream(CatalogHelpers.createAirbyteStream(
"test.acceptance_test",
DATABASE_NAME + "." + COLLECTION_NAME,
Field.of("_id", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
Expand Down
Loading

0 comments on commit 5551698

Please sign in to comment.