Skip to content

Commit

Permalink
MongoDB: Process JSON files from HTTP resource, using https+bson://
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 11, 2024
1 parent 4b08e51 commit 93d9d3c
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
## Unreleased
- MongoDB: Unlock processing multiple collections, either from server database,
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
9 changes: 9 additions & 0 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
progress=True,
)

elif source_url_obj.scheme.startswith("http"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
progress=True,
)

elif source_url_obj.scheme.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

Expand Down
35 changes: 33 additions & 2 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def query(self):


@define
class MongoDBFileAdapter(MongoDBAdapterBase):
class MongoDBFilesystemAdapter(MongoDBAdapterBase):
_path: Path = field(init=False)

def setup(self):
Expand Down Expand Up @@ -113,6 +113,35 @@ def query(self):
return batches(data, self.batch_size)


@define
class MongoDBResourceAdapter(MongoDBAdapterBase):
_url: URL = field(init=False)

def setup(self):
self._url = self.address.uri
if "+bson" in self._url.scheme:
self._url.scheme = self._url.scheme.replace("+bson", "")

def get_collections(self) -> t.List[str]:
raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet")

def record_count(self, filter_=None) -> int:
return -1

def query(self):
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"):
data = pl.read_ndjson(
str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
elif self._url.path.endswith(".bson"):
raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC")
else:
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
_mongodb_client: pymongo.MongoClient = field(init=False)
Expand Down Expand Up @@ -142,7 +171,9 @@ def query(self):

def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
return MongoDBFileAdapter.from_url(mongodb_uri)
return MongoDBFilesystemAdapter.from_url(mongodb_uri)
elif mongodb_uri.scheme.startswith("http"):
return MongoDBResourceAdapter.from_url(mongodb_uri)
elif mongodb_uri.scheme.startswith("mongodb"):
return MongoDBServerAdapter.from_url(mongodb_uri)
raise ValueError("Unable to create MongoDB adapter")
2 changes: 2 additions & 0 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N
logger.info(f"Inquiring collections at {source_url}")
mongodb_uri = URL(source_url)
cratedb_uri = URL(target_url)
if Path(mongodb_uri.path).is_absolute() and mongodb_uri.path[-1] != "/":
mongodb_uri.path += "/"
if cratedb_uri.path[-1] != "/":
cratedb_uri.path += "/"
mongodb_query_parameters = mongodb_uri.query_params
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def decode_database_table(url: str) -> t.Tuple[str, t.Union[str, None]]:
if url_.scheme == "crate" and not database:
database = url_.query_params.get("schema")
if database is None and table is None:
if url_.scheme.startswith("file"):
if url_.scheme.startswith("file") or url_.scheme.startswith("http"):
_, database, table = url_.path.rsplit("/", 2)

# If table name is coming from a filesystem, strip suffix, e.g. `books-relaxed.ndjson`.
Expand Down
9 changes: 6 additions & 3 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ Load data from MongoDB JSON/BSON files, for example produced by the
`mongoexport` or `mongodump` programs.

```shell
# Extended JSON, full path.
# Extended JSON, filesystem, full path.
ctk load table "file+bson:///path/to/mongodb-json-files/datasets/books.json"

# Extended JSON, multiple files.
# Extended JSON, HTTP resource.
ctk load table "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json"

# Extended JSON, filesystem, multiple files.
ctk load table "file+bson:///path/to/mongodb-json-files/datasets/*.json"

# BSON, compressed, relative path.
# BSON, filesystem, relative path, compressed.
ctk load table "file+bson:./var/data/testdrive/books.bson.gz"
```

Expand Down
30 changes: 30 additions & 0 deletions tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb):
Verify MongoDB -> CrateDB data transfer for all collections in a database.
"""

# Reset two database tables.
cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."demo1";')
cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."demo2";')

# Define source and target URLs.
mongodb_url = f"{mongodb.get_connection_url()}/testdrive"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive"
Expand Down Expand Up @@ -181,3 +185,29 @@ def test_mongodb_copy_filesystem_bson(caplog, cratedb):
)
timestamp_type = type_result[0]["type"]
assert timestamp_type == "bigint"


def test_mongodb_copy_http_json_relaxed(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP.
"""

# Define source and target URLs.
json_resource = "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
mongodb_copy(json_resource, cratedb_url)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 431

# Verify content in target database.
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True)
assert results[0]["data"]["authors"] == [
"W. Frank Ableson",
"Charlie Collins",
"Robi Sen",
]

0 comments on commit 93d9d3c

Please sign in to comment.