Skip to content

Commit

Permalink
MongoDB: Filter server collection using MongoDB query expression
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 12, 2024
1 parent d825f2f commit 42f22ee
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- MongoDB: Unlock processing multiple collections, either from server database,
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
18 changes: 16 additions & 2 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
import itertools
import json
import logging
import typing as t
from abc import abstractmethod
Expand Down Expand Up @@ -28,7 +29,7 @@ class MongoDBAdapterBase:
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "limit", "offset"]
_custom_query_parameters = ["batch-size", "filter", "limit", "offset"]

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
Expand All @@ -54,6 +55,10 @@ def __attrs_post_init__(self):
def batch_size(self) -> int:
return int(self.address.uri.query_params.get("batch-size", 500))

@cached_property
def filter(self) -> t.Union[str, None]:
return json.loads(self.address.uri.query_params.get("filter", "null"))

@cached_property
def limit(self) -> int:
return int(self.address.uri.query_params.get("limit", 0))
Expand Down Expand Up @@ -100,6 +105,8 @@ def record_count(self, filter_=None) -> int:
def query(self):
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
if self._path.suffix in [".json", ".jsonl", ".ndjson"]:
Expand Down Expand Up @@ -129,6 +136,8 @@ def record_count(self, filter_=None) -> int:
return -1

def query(self):
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"):
Expand Down Expand Up @@ -165,7 +174,12 @@ def record_count(self, filter_=None) -> int:
return self._mongodb_collection.count_documents(filter=filter_)

def query(self):
data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit)
data = (
self._mongodb_collection.find(filter=self.filter)
.batch_size(self.batch_size)
.skip(self.offset)
.limit(self.limit)
)
return batches(data, self.batch_size)


Expand Down
16 changes: 12 additions & 4 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,24 @@ The default batch size is 500. You can adjust the value by appending the HTTP
URL query parameter `batch-size` to the source URL, like
`mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`.

### Offset
Use the HTTP URL query parameter `offset` on the source URL, like
`&offset=42`, in order to start processing at this record from the
beginning.
### Filter
Use the HTTP URL query parameter `filter` on the source URL, like
`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB
query filter as a JSON string.
It works in the same way like `mongoexport`'s `--query` option.
On more complex query expressions, make sure to properly encode the right
value using URL/Percent Encoding.

### Limit
Use the HTTP URL query parameter `limit` on the source URL, like
`&limit=100`, in order to limit processing to a total number of
records.

### Offset
Use the HTTP URL query parameter `offset` on the source URL, like
`&offset=42`, in order to start processing at this record from the
beginning.

## Zyp Transformations
You can use [Zyp Transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
Expand Down
39 changes: 39 additions & 0 deletions tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from copy import deepcopy
from unittest import mock

Expand Down Expand Up @@ -59,6 +60,44 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb):
assert results[0]["data"] == data_out


def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering.
"""

# Define source and target URLs.
filter_expression = json.dumps({"timestamp": {"$gt": 1563051934050}})
mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo?filter={filter_expression}"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Define data.
data_in = [
{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000},
{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934100},
]
data_out = deepcopy(data_in)
data_out[0].update({"_id": mock.ANY})
data_out[1].update({"_id": mock.ANY})

# Populate source database.
client: pymongo.MongoClient = mongodb.get_connection_client()
testdrive = client.get_database("testdrive")
demo = testdrive.create_collection("demo")
demo.insert_many(data_in)

# Run transfer command.
mongodb_copy(
mongodb_url,
cratedb_url,
)

# Verify data in target database.
cratedb.database.refresh_table("testdrive.demo")
assert cratedb.database.count_records("testdrive.demo") == 1
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True)
assert results[0]["data"] == data_out[1]


def test_mongodb_copy_filesystem_folder(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for all files in a folder.
Expand Down

0 comments on commit 42f22ee

Please sign in to comment.