diff --git a/CHANGES.md b/CHANGES.md index f8a0d44..e4681e7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- `ctk load table`: Added support for MongoDB Change Streams ## 2024/07/08 v0.0.15 - IO: Added the `if-exists` query parameter by updating to influxio 0.4.0. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 84a8424..5ea2e5f 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): logger.error(msg) raise OperationFailed(msg) elif source_url.startswith("mongodb"): - from cratedb_toolkit.io.mongodb.api import mongodb_copy + if "+cdc" in source_url: + source_url = source_url.replace("+cdc", "") + from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - if not mongodb_copy(source_url, target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + mongodb_relay_cdc(source_url, target_url, progress=True) + else: + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if not mongodb_copy(source_url, target_url, progress=True): + msg = "Data loading failed" + logger.error(msg) + raise OperationFailed(msg) else: raise NotImplementedError("Importing resource not implemented yet") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 46e163f..a8f73bf 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,6 +1,7 @@ import argparse import logging +from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.core import export, extract, translate from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False): cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) return True + + +def mongodb_relay_cdc(source_url, target_url, progress: bool = False): + """ + Synopsis + -------- + export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc + ctk load table mongodb+cdc://localhost:27017/testdrive/demo + + Backlog + ------- + TODO: Run on multiple collections. + TODO: Run on the whole database. + TODO: Accept parameters like `if_exists="append,replace"`. + TODO: Propagate parameters like `scan="full"`. + """ + logger.info("Running MongoDB CDC relay") + + # Decode database URL. + mongodb_address = DatabaseAddress.from_string(source_url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + mongodb_database = mongodb_collection_address.schema + mongodb_collection = mongodb_collection_address.table + + cratedb_address = DatabaseAddress.from_string(target_url) + cratedb_uri, cratedb_table_address = cratedb_address.decode() + + # Configure machinery. + relay = MongoDBCDCRelayCrateDB( + mongodb_url=str(mongodb_uri), + mongodb_database=mongodb_database, + mongodb_collection=mongodb_collection, + cratedb_sqlalchemy_url=str(cratedb_uri), + cratedb_table=cratedb_table_address.fullname, + ) + + # Invoke machinery. + relay.start() diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py new file mode 100644 index 0000000..83c7958 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -0,0 +1,62 @@ +""" +Basic relaying of a MongoDB Change Stream into CrateDB table. + +Documentation: +- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md +- https://www.mongodb.com/docs/manual/changeStreams/ +- https://www.mongodb.com/developer/languages/python/python-change-streams/ +""" + +import logging + +import pymongo +import sqlalchemy as sa +from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB + +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class MongoDBCDCRelayCrateDB: + """ + Relay MongoDB Change Stream into CrateDB table. + """ + + def __init__( + self, + mongodb_url: str, + mongodb_database: str, + mongodb_collection: str, + cratedb_sqlalchemy_url: str, + cratedb_table: str, + ): + self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True) + self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url) + self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] + self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table) + self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) + + def start(self): + """ + Subscribe to change stream events, convert to SQL, and submit to CrateDB. + """ + with self.cratedb_adapter.engine.connect() as connection: + connection.execute(sa.text(self.cdc.sql_ddl)) + for sql in self.cdc_to_sql(): + if sql: + connection.execute(sa.text(sql)) + connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) + + def cdc_to_sql(self): + """ + Subscribe to change stream events, and emit corresponding SQL statements. + """ + # Note that `.watch()` will block until events are ready for consumption, so + # this is not a busy loop. Also note that the routine doesn't perform any sensible + # error handling yet. + while True: + with self.mongodb_collection.watch(full_document="updateLookup") as change_stream: + for change in change_stream: + logger.info("MongoDB Change Stream event: %s" % change) + yield self.cdc.to_sql(change) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 754bb48..c4c4dad 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -85,12 +85,15 @@ class TableAddress: @property def fullname(self): - if self.schema is None and self.table is None: + if self.table is None: raise ValueError("Uninitialized table address can not be serialized") if self.schema and self.table: - return f'"{self.schema}"."{self.table}"' + schema = self.schema.strip('"') + table = self.table.strip('"') + return f'"{schema}"."{table}"' else: - return f'"{self.table}"' + table = self.table.strip('"') + return f'"{table}"' @dataclasses.dataclass diff --git a/doc/io/mongodb/cdc.md b/doc/io/mongodb/cdc.md new file mode 100644 index 0000000..d5abeb3 --- /dev/null +++ b/doc/io/mongodb/cdc.md @@ -0,0 +1,268 @@ +(mongodb-cdc-relay)= +# MongoDB CDC Relay + +## About +Relay a [MongoDB Change Stream] into a [CrateDB] table using a one-stop command +`ctk load table mongodb+cdc://...`, or `mongodb+srv+cdc://` for MongoDB Atlas. + +You can use it in order to facilitate convenient data transfers to be used +within data pipelines or ad hoc operations. It can be used as a CLI interface, +and as a library. + + +## Install +```shell +pip install --upgrade 'cratedb-toolkit[mongodb]' +``` + +:::{tip} +The tutorial also uses the programs `crash`, `mongosh`, and `atlas`. `crash` +will be installed with CrateDB Toolkit, but `mongosh` and `atlas` must be +installed by other means. If you are using Docker anyway, please use those +command aliases to provide them to your environment without actually needing +to install them. + +```shell +alias mongosh='docker run -i --rm --network=host mongo:7 mongosh' +``` + +The `atlas` program needs to store authentication information between invocations, +therefore you need to supply a storage volume. +```shell +mkdir atlas-config +alias atlas='docker run --rm -it --volume=$(pwd)/atlas-config:/root mongodb/atlas atlas' +``` +::: + + +## Usage + +(mongodb-cdc-workstation)= +### Workstation +The guidelines assume that both services, CrateDB and MongoDB, are listening on +`localhost`. +Please find guidelines how to provide them on your workstation using +Docker or Podman in the {ref}`mongodb-cdc-services-standalone` section below. +```shell +export MONGODB_URL=mongodb://localhost/testdrive +export MONGODB_URL_CTK=mongodb+cdc://localhost/testdrive/demo +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost/testdrive/demo-cdc +ctk load table "${MONGODB_URL_CTK}" +``` + +Insert document into MongoDB collection, and update it. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})' +mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })' +``` + +Query data in CrateDB. +```shell +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +Invoke a delete operation, and check data in CrateDB once more. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.deleteOne({"foo": "bar"})' +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +(mongodb-cdc-cloud)= +### Cloud +The guidelines assume usage of cloud variants for both services, CrateDB Cloud +and MongoDB Atlas. +Please find guidelines how to provision relevant cloud resources +in the {ref}`mongodb-cdc-services-cloud` section below. + +:::{rubric} Invoke pipeline +::: +A canonical invocation for ingesting MongoDB Atlas Change Streams into +CrateDB Cloud. + +```shell +export MONGODB_URL=mongodb+srv://user:password@testdrive.jaxmmfp.mongodb.net/testdrive +export MONGODB_URL_CTK=mongodb+srv+cdc://user:password@testdrive.jaxmmfp.mongodb.net/testdrive/demo +export CRATEDB_HTTP_URL="https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/" +export CRATEDB_SQLALCHEMY_URL="crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/testdrive/demo-cdc?ssl=true" +``` +```shell +ctk load table "${MONGODB_URL_CTK}" +``` + +:::{note} +Please note the `mongodb+srv://` and `mongodb+srv+cdc://` URL schemes, and the +`ssl=true` query parameter. Both are needed to establish connectivity with +MongoDB Atlas and CrateDB. +::: + +:::{rubric} Trigger CDC events +::: +Inserting a document into the MongoDB collection, and updating it, will trigger two CDC events. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})' +mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })' +``` + +:::{rubric} Query data in CrateDB +::: +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + + +## Appendix + +### Database Operations +A few operations that are handy when exploring this exercise. + +Reset MongoDB collection. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.drop()' +``` + +Reset CrateDB table. +```shell +crash --command 'DELETE FROM "testdrive"."demo-cdc";' +``` + +Display documents in MongoDB collection. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.find()' +``` + +(mongodb-cdc-services-standalone)= +### Standalone Services +Quickly start CrateDB and MongoDB using Docker or Podman. + +#### CrateDB +Start CrateDB. +```shell +docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \ + crate:5.7 -Cdiscovery.type=single-node +``` + +#### MongoDB +Start MongoDB. +Please note that change streams are only available for replica sets and +sharded clusters, so let's define a replica set by using the +`--replSet rs-testdrive` option when starting the MongoDB server. +```shell +docker run -it --rm --name=mongodb --publish=27017:27017 \ + mongo:7 mongod --replSet rs-testdrive +``` + +Now, initialize the replica set, by using the `mongosh` command to invoke +the `rs.initiate()` operation. +```shell +export MONGODB_URL="mongodb://localhost/" +docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <<EOF + +config = { + _id: "rs-testdrive", + members: [{ _id : 0, host : "localhost:27017"}] +}; +rs.initiate(config); + +EOF +``` + + +(mongodb-cdc-services-cloud)= +### Cloud Services +Quickly provision [CrateDB Cloud] and [MongoDB Atlas]. + +#### CrateDB Cloud +To provision a database cluster, use either the [croud CLI], or the +[CrateDB Cloud Web Console]. + +Invoke CLI login. +```shell +croud login +``` +Create organization. +```shell +croud organizations create --name samplecroudorganization +``` +Create project. +```shell +croud projects create --name sampleproject +``` +Deploy cluster. +```shell +croud clusters deploy / + --product-name crfree / + --tier default / + --cluster-name testdrive / + --subscription-id 782dfc00-7b25-4f48-8381-b1b096dd1619 \ + --project-id 952cd102-91c1-4837-962a-12ecb71a6ba8 \ + --version 5.8.0 \ + --username admin \ + --password "as6da9ddasfaad7i902jcv780dmcba" +``` + +Finally, create a "Database Access" user and use the credentials to populate +`MONGODB_URL` and `MONGODB_URL_CTK` at {ref}`mongodb-cdc-workstation` properly. + +When shutting down your workbench, you may want to clean up any cloud resources +you just used. +```shell +croud clusters delete --cluster-id CLUSTER_ID +``` + +#### MongoDB Atlas +To provision a database cluster, use either the [Atlas CLI], or the +Atlas User Interface. + +Create an API key. +```shell +atlas projects apiKeys create --desc "Ondemand Testdrive" --role GROUP_OWNER +``` +```text +API Key '889727cb5bfe8830d0f8a203' created. +Public API Key bksttjep +Private API Key 9f8c1c41-b5f7-4d2a-b1a0-a1d2ef457796 +``` +Enter authentication key information. +```shell +atlas config init +``` +Create database cluster. +```shell +atlas clusters create testdrive --provider AWS --region EU_CENTRAL_1 --tier M0 --tag env=dev +``` +Inquire connection string. +```shell +atlas clusters connectionStrings describe testdrive +``` +```text +mongodb+srv://testdrive.jaxmmfp.mongodb.net +``` + +Finally, create a "Database Access" user and use the credentials to populate +`MONGODB_URL` and `MONGODB_URL_CTK` at {ref}`mongodb-cdc-cloud` properly. + +When shutting down your workbench, you may want to clean up any cloud resources +you just used. +```shell +atlas clusters delete testdrive +``` + + +## Backlog +:::{todo} +- Improve UX/DX. +- Provide `ctk shell`. +- Provide [SDK and CLI for CrateDB Cloud Cluster APIs]. + +[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81 +::: + + +[Atlas CLI]: https://www.mongodb.com/docs/atlas/cli/ +[commons-codec]: https://pypi.org/project/commons-codec/ +[CrateDB]: https://cratedb.com/docs/guide/home/ +[CrateDB Cloud]: https://cratedb.com/docs/cloud/ +[MongoDB Atlas]: https://www.mongodb.com/atlas +[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/ +[croud CLI]: https://cratedb.com/docs/cloud/en/latest/tutorials/deploy/croud.html +[CrateDB Cloud Web Console]: https://cratedb.com/docs/cloud/en/latest/tutorials/quick-start.html#deploy-cluster diff --git a/doc/io/mongodb/index.md b/doc/io/mongodb/index.md index deef1bf..9f8bfb7 100644 --- a/doc/io/mongodb/index.md +++ b/doc/io/mongodb/index.md @@ -9,5 +9,9 @@ Using the MongoDB subsystem, you can transfer data from and to MongoDB. :maxdepth: 1 loader -migr8 +cdc ``` + +:::{note} +The MongoDB Table Loader is an improvement of the traditional {doc}`migr8`. +::: diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index ecbc48f..48ab352 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -1,5 +1,9 @@ +--- +orphan: true +--- + (migr8)= -# migr8 +# migr8 migration utility ## About diff --git a/pyproject.toml b/pyproject.toml index c0ec396..85832ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,7 @@ io = [ "sqlalchemy>=2", ] mongodb = [ + "commons-codec[mongodb] @ git+https://github.com/daq-tools/commons-codec.git@mongodb", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",