diff --git a/CHANGES.md b/CHANGES.md index 8bca014e..b345f385 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- MongoDB Full: Refactor transformation subsystem to `commons-codec` ## 2024/09/16 v0.0.23 - MongoDB: Unlock processing multiple collections, either from server database, diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py index 67be74e1..30c0c86f 100644 --- a/cratedb_toolkit/io/mongodb/cdc.py +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -11,7 +11,7 @@ import pymongo import sqlalchemy as sa -from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from commons_codec.transform.mongodb import MongoDBCDCTranslator from cratedb_toolkit.util import DatabaseAdapter @@ -35,7 +35,7 @@ def __init__( self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url) self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) + self.cdc = MongoDBCDCTranslator(table_name=self.table_name) def start(self): """ diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index 3c312b78..a88ff004 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -4,17 +4,13 @@ import sqlalchemy as sa from boltons.urlutils import URL -from commons_codec.model import SQLOperation -from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB -from pymongo.cursor import Cursor +from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm from zyp.model.collection import CollectionAddress from cratedb_toolkit.io.core import BulkProcessor from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory -from cratedb_toolkit.io.mongodb.export import CrateDBConverter -from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany @@ -23,45 +19,6 @@ logger = logging.getLogger(__name__) -class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB): - """ - Translate a MongoDB document into a CrateDB document. - """ - - def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None): - super().__init__(table_name=table_name) - self.converter = converter - self.tm = tm - - @staticmethod - def get_document_key(record: t.Dict[str, t.Any]) -> str: - """ - Return value of document key (MongoDB document OID) from CDC record. - - "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} - """ - return record["_id"] - - def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation: - """ - Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. - """ - if not isinstance(data, Cursor) and not isinstance(data, list): - data = [data] - - # Define SQL INSERT statement. - sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" - - # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. - parameters: t.List[DocumentDict] = [] - for document in data: - record = self.converter.convert(self.decode_bson(document)) - oid: str = self.get_document_key(record) - parameters.append({"oid": oid, "record": record}) - - return SQLOperation(sql, parameters) - - class MongoDBFullLoad: """ Copy MongoDB collection into CrateDB table. @@ -102,8 +59,8 @@ def __init__( transformation = tm.project.get(address=address) except KeyError: pass - self.converter = CrateDBConverter(transformation=transformation) - self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm) + self.converter = MongoDBCrateDBConverter(transformation=transformation) + self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter) self.on_error = on_error self.progress = progress diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index dbeae37c..cfa17e01 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -24,102 +24,25 @@ Export the documents from a MongoDB collection as JSON, to be ingested into CrateDB. """ -import base64 -import calendar import logging import typing as t -from uuid import UUID import bsonjs -import dateutil.parser as dateparser import orjson as json import pymongo.collection -from attr import Factory -from attrs import define -from zyp.model.collection import CollectionTransformation +from commons_codec.transform.mongodb import MongoDBCrateDBConverter -from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names logger = logging.getLogger(__name__) -def date_converter(value): - if isinstance(value, int): - return value - dt = dateparser.parse(value) - return calendar.timegm(dt.utctimetuple()) * 1000 - - -def timestamp_converter(value): - if len(str(value)) <= 10: - return value * 1000 - return value - - -type_converter = { - "date": date_converter, - "timestamp": timestamp_converter, - "undefined": lambda x: None, -} - - -@define -class CrateDBConverter: - transformation: CollectionTransformation = Factory(CollectionTransformation) - - def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]: - """ - Decode MongoDB Extended JSON, considering CrateDB specifics. - """ - return self.extract_value(data) - - def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any: - """ - Decode MongoDB Extended JSON. - - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ - """ - if isinstance(value, dict): - # Custom adjustments to compensate shape anomalies in source data. - self.apply_special_treatments(value) - if len(value) == 1: - if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: - decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) - return self.extract_value(decoded, parent_type) - for k, v in value.items(): - if k.startswith("$"): - return self.extract_value(v, k.lstrip("$")) - return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()} - if isinstance(value, list): - return [self.extract_value(v, parent_type) for v in value] - if parent_type: - converter = type_converter.get(parent_type) - if converter: - return converter(value) - return value - - def apply_special_treatments(self, value: t.Any): - """ - Apply special treatments to value that can't be described otherwise up until now. - # Ignore certain items including anomalies that are not resolved, yet. - - TODO: Needs an integration test feeding two records instead of just one. - """ - - if self.transformation is None or self.transformation.treatment is None: - return None - - return self.transformation.treatment.apply(value) - - def convert(d): """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ - converter = CrateDBConverter() + converter = MongoDBCrateDBConverter() newdict = {} for k, v in sanitize_field_names(d).items(): newdict[k] = converter.convert(v) diff --git a/pyproject.toml b/pyproject.toml index 7bb0fc0d..4511576e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -164,7 +164,7 @@ kinesis = [ "lorrystream[carabas]>=0.0.6", ] mongodb = [ - "commons-codec[mongodb,zyp]>=0.0.15", + "commons-codec[mongodb,zyp] @ git+https://github.com/crate/commons-codec.git@mongodb-full-next", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1", diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index fb3932b5..a4350a2c 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -237,9 +237,7 @@ def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb): "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True ) timestamp_type = type_result[0]["type"] - - # FIXME: Why does the "canonical format" yield worse results? - assert timestamp_type == "text" + assert timestamp_type == "bigint" def test_mongodb_copy_filesystem_bson(caplog, cratedb): diff --git a/tests/io/mongodb/test_export.py b/tests/io/mongodb/test_export.py deleted file mode 100644 index 46666a08..00000000 --- a/tests/io/mongodb/test_export.py +++ /dev/null @@ -1,107 +0,0 @@ -import pytest -from zyp.model.collection import CollectionTransformation -from zyp.model.treatment import Treatment - -from cratedb_toolkit.io.mongodb.export import CrateDBConverter - -pytestmark = pytest.mark.mongodb - - -def test_convert_basic(): - data_in = { - "_id": { - "$oid": "56027fcae4b09385a85f9344", - }, - "value": { - "id": 42, - "date": {"$date": "2015-09-23T10:32:42.33Z"}, - "list": [ - {"id": "foo", "value": "something"}, - {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, - ], - }, - } - - data_out = { - "_id": "56027fcae4b09385a85f9344", - "value": { - "date": 1443004362000, - "id": 42, - "list": [ - {"id": "foo", "value": "something"}, - {"id": "bar", "value": 1443090762000}, - ], - }, - } - converter = CrateDBConverter() - assert converter.convert(data_in) == data_out - - -def test_convert_with_treatment_ignore_complex_list(): - data_in = { - "_id": { - "$oid": "56027fcae4b09385a85f9344", - }, - "value": { - "id": 42, - "date": {"$date": "2015-09-23T10:32:42.33Z"}, - "some_complex_list": [ - {"id": "foo", "value": "something"}, - {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, - ], - }, - } - - data_out = { - "_id": "56027fcae4b09385a85f9344", - "value": { - "date": 1443004362000, - "id": 42, - }, - } - - treatment = Treatment(ignore_complex_lists=True) - converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment)) - assert converter.convert(data_in) == data_out - - -def test_convert_with_treatment_all_options(): - data_in = { - "_id": { - "$oid": "56027fcae4b09385a85f9344", - }, - "ignore_toplevel": 42, - "value": { - "id": 42, - "date": {"$date": "2015-09-23T10:32:42.33Z"}, - "ignore_nested": 42, - }, - "to_list": 42, - "to_string": 42, - "to_dict_scalar": 42, - "to_dict_list": [{"user": 42}], - } - - data_out = { - "_id": "56027fcae4b09385a85f9344", - "value": { - "date": 1443004362000, - "id": 42, - }, - "to_list": [42], - "to_string": "42", - "to_dict_scalar": {"id": 42}, - "to_dict_list": [{"user": {"id": 42}}], - } - treatment = Treatment( - ignore_complex_lists=False, - ignore_field=["ignore_toplevel", "ignore_nested"], - convert_list=["to_list"], - convert_string=["to_string"], - convert_dict=[ - {"name": "to_dict_scalar", "wrapper_name": "id"}, - {"name": "user", "wrapper_name": "id"}, - ], - ) - converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment)) - assert converter.convert(data_in) == data_out