Skip to content

Commit

Permalink
MongoDB Adapter: Evaluate filtering by _id by casting to bson.ObjectId
Browse files Browse the repository at this point in the history
Native objects can't be passed using JSON, so this is a minor surrogate
representation to convey a filter that makes `ctk load table` process
a single document, identified by its OID.
  • Loading branch information
amotl committed Sep 22, 2024
1 parent ddddfb0 commit 4084f4e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
- MongoDB Adapter: Improved performance of when computing collection cardinality
by using `collection.estimated_document_count()`
- MongoDB Full: Optionally use `limit` parameter as number of total records
- MongoDB Adapter: Evaluate `_id` filter field by upcasting to `bson.ObjectId`,
to convey a filter that makes `ctk load table` process a single document,
identified by its OID

## 2024/09/19 v0.0.24
- MongoDB Full: Refactor transformation subsystem to `commons-codec`
Expand Down
9 changes: 7 additions & 2 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import typing as t
from abc import abstractmethod
from copy import deepcopy
from functools import cached_property
from pathlib import Path

Expand Down Expand Up @@ -57,7 +58,7 @@ def batch_size(self) -> int:
return int(self.address.uri.query_params.get("batch-size", 100))

@cached_property
def filter(self) -> t.Union[str, None]:
def filter(self) -> t.Union[t.Dict[str, t.Any], None]:
return json.loads(self.address.uri.query_params.get("filter", "null"))

@cached_property
Expand Down Expand Up @@ -179,8 +180,12 @@ def record_count(self, filter_=None) -> int:
return self._mongodb_collection.estimated_document_count()

def query(self):
_filter = deepcopy(self.filter)
# Evaluate `_id` filter field specially, by upcasting to `bson.ObjectId`.
if _filter and "_id" in _filter:
_filter["_id"] = bson.ObjectId(_filter["_id"])
data = (
self._mongodb_collection.find(filter=self.filter)
self._mongodb_collection.find(filter=_filter)
.batch_size(self.batch_size)
.skip(self.offset)
.limit(self.limit)
Expand Down
5 changes: 3 additions & 2 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ appending the HTTP URL query parameter `batch-size` to the source URL, like

### Filter
Use the HTTP URL query parameter `filter` on the source URL, like
`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB
query filter as a JSON string.
`&filter={"exchange":{"$eq":"NASDAQ"}}`, or
`&filter={"_id":"66f0002e98c00fb8261d87c8"}`,
in order to provide a MongoDB query filter as a JSON string.
It works in the same way like `mongoexport`'s `--query` option.
On more complex query expressions, make sure to properly encode the right
value using URL/Percent Encoding.
Expand Down
51 changes: 50 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from unittest import mock

import bson
import pymongo
import pytest
from zyp import CollectionTransformation, MokshaTransformation
Expand Down Expand Up @@ -64,7 +65,7 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb):
assert results[0]["data"] == data_out


def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb):
def test_mongodb_copy_server_collection_with_filter_timestamp(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering.
"""
Expand Down Expand Up @@ -102,6 +103,54 @@ def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb):
assert results[0]["data"] == data_out[1]


def test_mongodb_copy_server_collection_with_filter_oid(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering by oid.
"""

# Define source and target URLs.
filter_expression = json.dumps({"_id": "66f0002e98c00fb8261d87c8"})
mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo?filter={filter_expression}"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Define data.
data_in = [
{
"_id": bson.ObjectId("66efff8de45f4f7b695134a6"),
"device": "Räuber",
"temperature": 42.42,
"timestamp": 1563051934000,
},
{
"_id": bson.ObjectId("66f0002e98c00fb8261d87c8"),
"device": "Hotzenplotz",
"temperature": 84.84,
"timestamp": 1563051934100,
},
]
data_out = deepcopy(data_in)
data_out[0].update({"_id": mock.ANY})
data_out[1].update({"_id": mock.ANY})

# Populate source database.
client: pymongo.MongoClient = mongodb.get_connection_client()
testdrive = client.get_database("testdrive")
demo = testdrive.create_collection("demo")
demo.insert_many(data_in)

# Run transfer command.
mongodb_copy(
mongodb_url,
cratedb_url,
)

# Verify data in target database.
cratedb.database.refresh_table("testdrive.demo")
assert cratedb.database.count_records("testdrive.demo") == 1
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True)
assert results[0]["data"] == data_out[1]


def test_mongodb_copy_filesystem_folder_absolute(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing.
Expand Down

0 comments on commit 4084f4e

Please sign in to comment.