Skip to content

Commit

Permalink
Source MongoDB fetch authorized collections only (airbytehq#9238)
Browse files Browse the repository at this point in the history
* fix for jdk 17

* Source MongoDB show authorized collections

* add javadoc

* fixed checkstyle

* add CHANGELOG

* fix checkstyle

* refactoring

* bump version anf fix checkstyle

Co-authored-by: vmaltsev <[email protected]>
  • Loading branch information
VitaliiMaltsev and vmaltsev authored Jan 13, 2022
1 parent ecfc9e1 commit 2b0d0bd
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.10",
"dockerImageTag": "0.1.11",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.10
dockerImageTag: 0.1.11
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,11 +49,11 @@ public class SnowflakeInternalStagingConsumerFactory {
private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString();

public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final SnowflakeSQLNameTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final SnowflakeSQLNameTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog);

return new BufferedStreamConsumer(
Expand Down Expand Up @@ -135,10 +133,10 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon
}

private RecordWriter recordWriterFunction(final JdbcDatabase database,
final SqlOperations snowflakeSqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final SnowflakeSQLNameTransformer namingResolver) {
final SqlOperations snowflakeSqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final SnowflakeSQLNameTransformer namingResolver) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig =
writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(
Expand All @@ -160,9 +158,9 @@ private RecordWriter recordWriterFunction(final JdbcDatabase database,
}

private OnCloseFunction onCloseFunction(final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final SnowflakeSQLNameTransformer namingResolver) {
final SnowflakeStagingSqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final SnowflakeSQLNameTransformer namingResolver) {
return (hasFailed) -> {
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
Expand All @@ -176,14 +174,14 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,

final String path = namingResolver.getStagingPath(schemaName, dstTableName, CURRENT_SYNC_PATH);
LOGGER.info("Uploading data from stage: stream {}. schema {}, tmp table {}, stage path {}", writeConfig.getStreamName(), schemaName,
srcTableName,
path);
srcTableName,
path);
try {
sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName);
} catch (SQLException e){
} catch (Exception e) {
sqlOperations.cleanUpStage(database, path);
LOGGER.info("Cleaning stage path {}", path);
throw new RuntimeException("Failed to upload data from stage "+ path, e);
throw new RuntimeException("Failed to upload data from stage " + path, e);
}

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public String getStageName(String schemaName, String outputTableName) {
}

public String getStagingPath(String schemaName, String tableName, String currentSyncPath) {
return (getStageName(schemaName,tableName)+"/staged/"+currentSyncPath).toUpperCase();
return (getStageName(schemaName, tableName) + "/staged/" + currentSyncPath).toUpperCase();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ public void cleanUpStage(JdbcDatabase database, String path) throws SQLException
public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception {
return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mongodb-v2

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.name=airbyte/source-mongodb-v2
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
implementation 'org.mongodb:mongodb-driver-sync:4.4.0'

testImplementation 'org.testcontainers:mongodb:1.15.3'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<CheckedConsumer<MongoDatabase, Exception>> getCheckOperations(final
throws Exception {
final List<CheckedConsumer<MongoDatabase, Exception>> checkList = new ArrayList<>();
checkList.add(database -> {
if (database.getCollectionNames().isEmpty()) {
if (getAuthorizedCollections(database).isEmpty()) {
throw new Exception("Unable to execute any operation on the source!");
} else {
LOGGER.info("The source passed the basic operation test!");
Expand All @@ -114,7 +114,7 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
throws Exception {
final List<TableInfo<CommonField<BsonType>>> tableInfos = new ArrayList<>();

for (final String collectionName : database.getCollectionNames()) {
for (final String collectionName : getAuthorizedCollections(database)) {
final MongoCollection<Document> collection = database.getCollection(collectionName);
final Map<String, BsonType> uniqueFields = MongoUtils.getUniqueFields(collection);

Expand All @@ -135,6 +135,27 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
return tableInfos;
}

private Set<String> getAuthorizedCollections(MongoDatabase database) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
* returns only those collections for which the user has privileges. For example, if a user has find
* action on specific collections, the command returns only those collections; or, if a user has
* find or any other action, on the database resource, the command lists all collections in the
* database.
*/
Document document = database.getDatabase().runCommand(new Document("listCollections", 1)
.append("authorizedCollections", true)
.append("nameOnly", true))
.append("filter", "{ 'type': 'collection' }");
return document.toBsonDocument()
.get("cursor").asDocument()
.getArray("firstBatch")
.stream()
.map(bsonValue -> bsonValue.asDocument().getString("name").getValue())
.collect(Collectors.toSet());

}

@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDatabase database, final String schema) throws Exception {
// MondoDb doesn't support schemas
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.11 | 2022-01-10 | [9238](https://github.com/airbytehq/airbyte/pull/9238) | Return only those collections for which the user has privileges |
| 0.1.10 | 2021-12-30 | [9202](https://github.com/airbytehq/airbyte/pull/9202) | Update connector fields title/description |
| 0.1.9 | 2021-12-07 | [8491](https://github.com/airbytehq/airbyte/pull/8491) | Configure 10000 limit doc reading during Discovery step |
| 0.1.8 | 2021-11-29 | [8306](https://github.com/airbytehq/airbyte/pull/8306) | Added milliseconds for date format for cursor |
Expand Down

0 comments on commit 2b0d0bd

Please sign in to comment.