Skip to content

Commit

Permalink
🚨 Add ability to enforce SSL in MongoDB source connector (airbytehq#1…
Browse files Browse the repository at this point in the history
…7590)

* Check and test for if we are disabling SSL/TLS in source-mongodb-strict-encrypt

* Update docs

* Address comments

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
akashkulk and octavia-squidington-iii authored Oct 6, 2022
1 parent ada1cbf commit 4b990d0
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.17
dockerImageTag: 0.1.18
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6801,7 +6801,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mongodb-v2:0.1.17"
- dockerImage: "airbyte/source-mongodb-v2:0.1.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.name=airbyte/source-mongodb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

package io.airbyte.integrations.source.mongodb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,7 +26,21 @@ public MongodbSourceStrictEncrypt() {
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception {
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText());
// If the MongoDb source connector is not set up to use a TLS connection, then we should fail the check.
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("TLS connection must be used to read from MongoDB.");
}

return super.check(config);
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
// removing tls property for a standalone instance to disable possibility to switch off a tls
// connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoCollection;
Expand All @@ -17,6 +18,8 @@
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -78,7 +81,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("auth_source", "admin")
.build());

var credentials = String.format("%s:%s@", config.get("user").asText(),
final var credentials = String.format("%s:%s@", config.get("user").asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText());
final String connectionString = String.format("mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true",
credentials,
Expand Down Expand Up @@ -146,4 +149,24 @@ void testSpec() throws Exception {
assertEquals(expected, actual);
}

@Test
void testCheck() throws Exception {
final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("instance", MongoInstanceType.STANDALONE.getType())
.put("tls", false)
.build());

final JsonNode invalidStandaloneConfig = Jsons.clone(getConfig());

((ObjectNode) invalidStandaloneConfig).put(INSTANCE_TYPE, instanceConfig);

final AirbyteConnectionStatus actual = new MongodbSourceStrictEncrypt().check(invalidStandaloneConfig);
final AirbyteConnectionStatus expected =
new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("TLS connection must be used to read from MongoDB.");

assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-v2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@
package io.airbyte.integrations.source.mongodb;

import static com.mongodb.client.model.Filters.gt;
import static org.bson.BsonType.DATE_TIME;
import static org.bson.BsonType.DECIMAL128;
import static org.bson.BsonType.DOCUMENT;
import static org.bson.BsonType.DOUBLE;
import static org.bson.BsonType.INT32;
import static org.bson.BsonType.INT64;
import static org.bson.BsonType.OBJECT_ID;
import static org.bson.BsonType.STRING;
import static org.bson.BsonType.TIMESTAMP;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -55,20 +46,6 @@ public class MongoDbSource extends AbstractDbSource<BsonType, MongoDatabase> {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true";
private static final String USER = "user";
private static final String INSTANCE_TYPE = "instance_type";
private static final String INSTANCE = "instance";
private static final String CLUSTER_URL = "cluster_url";
private static final String SERVER_ADDRESSES = "server_addresses";
private static final String REPLICA_SET = "replica_set";
private static final String AUTH_SOURCE = "auth_source";
private static final String PRIMARY_KEY = "_id";
private static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
INT32, TIMESTAMP, INT64, DECIMAL128);

public static void main(final String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
Expand All @@ -78,8 +55,8 @@ public static void main(final String[] args) throws Exception {

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final var credentials = config.has(USER) && config.has(JdbcUtils.PASSWORD_KEY)
? String.format("%s:%s@", config.get(USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText())
final var credentials = config.has(MongoDbSourceUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY)
? String.format("%s:%s@", config.get(MongoDbSourceUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText())
: StringUtils.EMPTY;

return Jsons.jsonNode(ImmutableMap.builder()
Expand Down Expand Up @@ -132,7 +109,7 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
.nameSpace(database.getName())
.name(collectionName)
.fields(fields)
.primaryKeys(List.of(PRIMARY_KEY))
.primaryKeys(List.of(MongoDbSourceUtils.PRIMARY_KEY))
.build();

tableInfos.add(tableInfo);
Expand Down Expand Up @@ -214,7 +191,7 @@ public boolean isCursorType(final BsonType bsonType) {
// when we have no cursor field here, at least id could be used as cursor here.
// This logic will be used feather when we will implement part which will show only list of possible
// cursor fields on UI
return ALLOWED_CURSOR_TYPES.contains(bsonType);
return MongoDbSourceUtils.ALLOWED_CURSOR_TYPES.contains(bsonType);
}

private AutoCloseableIterator<JsonNode> queryTable(final MongoDatabase database,
Expand All @@ -234,31 +211,30 @@ private AutoCloseableIterator<JsonNode> queryTable(final MongoDatabase database,
private String buildConnectionString(final JsonNode config, final String credentials) {
final StringBuilder connectionStrBuilder = new StringBuilder();

final JsonNode instanceConfig = config.get(INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(INSTANCE).asText());
final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText());
switch (instance) {
case STANDALONE -> {
// supports backward compatibility and secure only connector
final var tls = config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
connectionStrBuilder.append(
String.format(MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
String.format(MongoDbSourceUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
instanceConfig.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(), config.get(AUTH_SOURCE).asText(), tls));
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(MongoDbSourceUtils.AUTH_SOURCE).asText(), MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)));
}
case REPLICA -> {
connectionStrBuilder.append(
String.format(MONGODB_REPLICA_URL, credentials, instanceConfig.get(SERVER_ADDRESSES).asText(),
String.format(MongoDbSourceUtils.MONGODB_REPLICA_URL, credentials, instanceConfig.get(MongoDbSourceUtils.SERVER_ADDRESSES).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(AUTH_SOURCE).asText()));
if (instanceConfig.has(REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(REPLICA_SET).asText()));
config.get(MongoDbSourceUtils.AUTH_SOURCE).asText()));
if (instanceConfig.has(MongoDbSourceUtils.REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoDbSourceUtils.REPLICA_SET).asText()));
}
}
case ATLAS -> {
connectionStrBuilder.append(
String.format(MONGODB_CLUSTER_URL, credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(AUTH_SOURCE).asText()));
String.format(MongoDbSourceUtils.MONGODB_CLUSTER_URL, credentials,
instanceConfig.get(MongoDbSourceUtils.CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(MongoDbSourceUtils.AUTH_SOURCE).asText()));
}
default -> throw new IllegalArgumentException("Unsupported instance type: " + instance);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.airbyte.integrations.source.mongodb;

import static org.bson.BsonType.DATE_TIME;
import static org.bson.BsonType.DECIMAL128;
import static org.bson.BsonType.DOCUMENT;
import static org.bson.BsonType.DOUBLE;
import static org.bson.BsonType.INT32;
import static org.bson.BsonType.INT64;
import static org.bson.BsonType.OBJECT_ID;
import static org.bson.BsonType.STRING;
import static org.bson.BsonType.TIMESTAMP;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcUtils;
import java.util.Set;
import org.bson.BsonType;

public final class MongoDbSourceUtils {

private MongoDbSourceUtils() {}

public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s";
public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true";
public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true";
public static final String USER = "user";
public static final String INSTANCE_TYPE = "instance_type";
public static final String INSTANCE = "instance";
public static final String CLUSTER_URL = "cluster_url";
public static final String SERVER_ADDRESSES = "server_addresses";
public static final String REPLICA_SET = "replica_set";
public static final String AUTH_SOURCE = "auth_source";
public static final String PRIMARY_KEY = "_id";
public static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
INT32, TIMESTAMP, INT64, DECIMAL128);

/**
* Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB.
*/
public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) {
return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
}
}
39 changes: 20 additions & 19 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,23 @@ For more information regarding configuration parameters, please see [MongoDb Doc

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.17 | 2022-09-08 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Fixed bug with empty strings in fields with __aibyte_transform_ |
| 0.1.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.1.15 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors |
| 0.1.14 | 2022-05-05 | [12428](https://github.com/airbytehq/airbyte/pull/12428) | JsonSchema: Add properties to fields with type 'object' |
| 0.1.13 | 2022-02-21 | [10276](https://github.com/airbytehq/airbyte/pull/10276) | Create a custom codec registry to handle DBRef MongoDB objects |
| 0.1.12 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges |
| 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description |
| 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step |
| 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |
| 0.1.7 | 2021-11-22 | [8161](https://github.com/airbytehq/airbyte/pull/8161) | Updated Performance and updated cursor for timestamp type |
| 0.1.5 | 2021-11-17 | [8046](https://github.com/airbytehq/airbyte/pull/8046) | Added milliseconds to convert timestamp to datetime format |
| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance |
| 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |
| 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections |
| 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |
| 0.1.0 | 2021-08-30 | [5530](https://github.com/airbytehq/airbyte/pull/5530) | New source: MongoDb ported to java |
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------------|
| 0.1.18 | 2022-10-05 | [17590](https://github.com/airbytehq/airbyte/pull/17590) | Add ability to enforce SSL in MongoDB connector and check logic _ |
| 0.1.17 | 2022-09-08 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Fixed bug with empty strings in fields with __aibyte_transform_ |
| 0.1.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.1.15 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors |
| 0.1.14 | 2022-05-05 | [12428](https://github.com/airbytehq/airbyte/pull/12428) | JsonSchema: Add properties to fields with type 'object' |
| 0.1.13 | 2022-02-21 | [10276](https://github.com/airbytehq/airbyte/pull/10276) | Create a custom codec registry to handle DBRef MongoDB objects |
| 0.1.12 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges |
| 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description |
| 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step |
| 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |
| 0.1.7 | 2021-11-22 | [8161](https://github.com/airbytehq/airbyte/pull/8161) | Updated Performance and updated cursor for timestamp type |
| 0.1.5 | 2021-11-17 | [8046](https://github.com/airbytehq/airbyte/pull/8046) | Added milliseconds to convert timestamp to datetime format |
| 0.1.4 | 2021-11-15 | [7982](https://github.com/airbytehq/airbyte/pull/7982) | Updated Performance |
| 0.1.3 | 2021-10-19 | [7160](https://github.com/airbytehq/airbyte/pull/7160) | Fixed nested document parsing |
| 0.1.2 | 2021-10-07 | [6860](https://github.com/airbytehq/airbyte/pull/6860) | Added filter to avoid MongoDb system collections |
| 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |
| 0.1.0 | 2021-08-30 | [5530](https://github.com/airbytehq/airbyte/pull/5530) | New source: MongoDb ported to java |

0 comments on commit 4b990d0

Please sign in to comment.