diff --git a/.github/workflows/build_test.yml b/.github/workflows/build_test.yml index 7e5b6bed2..4480cc827 100644 --- a/.github/workflows/build_test.yml +++ b/.github/workflows/build_test.yml @@ -53,6 +53,8 @@ jobs: ~/.cargo target .pixi + - name: start minio server in the background + run: pixi run start-minio & - name: Test rust run: | pixi run test-rs --release @@ -348,7 +350,7 @@ jobs: python -m pip install vegafusion-*.whl python -m pip install vegafusion_python_embed-*macosx_10_7_x86_64.whl python -m pip install pytest vega-datasets polars duckdb altair vl-convert-python scikit-image pandas==2.0 - + # Downgrade pyarrow to 10.0.1 python -m pip install pyarrow==10.0.1 - name: Test vegafusion diff --git a/.gitignore b/.gitignore index 98a0519ca..7bdee7245 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,4 @@ __pycache__ /java/.idea/ /.pixi/ /python/vegafusion-jupyter/MANIFEST +/minio_data/ diff --git a/BUILD.md b/BUILD.md index 7ae2023cd..7eaaeb183 100644 --- a/BUILD.md +++ b/BUILD.md @@ -18,6 +18,13 @@ Then restart your shell. For more information on installing Pixi, see https://prefix.dev/docs/pixi/overview. ## Build and test Rust + +Start the test minio server in a dedicated terminal + +``` +pixi run start-minio +``` + Build and test the VegaFusion Rust crates with: ``` diff --git a/Cargo.lock b/Cargo.lock index fc7593daf..9ce9cee31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -638,12 +638,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.21.2" @@ -823,6 +817,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.48.0", ] @@ -1032,6 +1027,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13418e745008f7349ec7e449155f419a61b92b58a99cc3616942b926825ec76b" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -1364,7 +1369,7 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-schema", - "base64 0.21.2", + "base64", "blake2", "blake3", "chrono", @@ -1999,15 +2004,16 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.23.2" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ + "futures-util", "http", "hyper", - "rustls 0.20.8", + "rustls", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", ] [[package]] @@ -2636,13 +2642,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d359e231e5451f4f9fa889d56e3ce34f8724f1a61db2107739359717cf2bbf08" dependencies = [ "async-trait", + "base64", "bytes", "chrono", "futures", "humantime", + "hyper", "itertools 0.10.5", "parking_lot 0.12.1", "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring", + "serde", + "serde_json", "snafu", "tokio", "tracing", @@ -2754,7 +2768,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.21.2", + "base64", "brotli", "bytes", "chrono", @@ -3150,6 +3164,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.33" @@ -3272,11 +3296,11 @@ checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca" [[package]] name = "reqwest" -version = "0.11.13" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ - "base64 0.13.1", + "base64", "bytes", "encoding_rs", "futures-core", @@ -3294,17 +3318,20 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.20.8", + "rustls", "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "winreg", @@ -3462,18 +3489,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "rustls" -version = "0.20.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" -dependencies = [ - "log", - "ring", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.7" @@ -3492,7 +3507,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64 0.21.2", + "base64", ] [[package]] @@ -3822,6 +3837,27 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "target-lexicon" version = "0.12.6" @@ -4010,24 +4046,13 @@ dependencies = [ "syn 2.0.31", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.8", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.7", + "rustls", "tokio", ] @@ -4099,7 +4124,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.2", + "base64", "bytes", "h2", "http", @@ -4109,10 +4134,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls 0.21.7", + "rustls", "rustls-pemfile", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -4139,7 +4164,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fddb2a37b247e6adcb9f239f4e5cefdcc5ed526141a416b943929f13aea2cce" dependencies = [ - "base64 0.21.2", + "base64", "bytes", "http", "http-body", @@ -4357,12 +4382,13 @@ name = "vegafusion-common" version = "1.4.3" dependencies = [ "arrow", - "base64 0.21.2", + "base64", "chrono", "datafusion-common", "datafusion-expr", "datafusion-proto", "jni", + "object_store", "pyo3", "serde_json", "sqlparser 0.37.0", @@ -4467,7 +4493,7 @@ dependencies = [ "async-lock", "async-recursion", "async-trait", - "base64 0.21.2", + "base64", "bytes", "chrono", "chrono-tz", @@ -4488,6 +4514,7 @@ dependencies = [ "log", "lru", "num-traits", + "object_store", "ordered-float 3.6.0", "prost", "prost-types", @@ -4549,6 +4576,7 @@ dependencies = [ "deterministic-hash", "lazy_static", "log", + "object_store", "regex", "reqwest", "reqwest-middleware", @@ -4561,6 +4589,7 @@ dependencies = [ "tempfile", "tokio", "toml", + "url", "vegafusion-common", "vegafusion-dataframe", "vegafusion-datafusion-udfs", @@ -4724,6 +4753,19 @@ dependencies = [ "quote", ] +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" @@ -4749,24 +4791,11 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki-roots" -version = "0.22.6" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] +checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" [[package]] name = "which" @@ -4962,11 +4991,12 @@ dependencies = [ [[package]] name = "winreg" -version = "0.10.1" +version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "winapi", + "cfg-if", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d0348ca97..e69b749f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,12 @@ members = [ arrow = { version = "47.0.0", default_features = false } sqlparser = { version = "0.37.0" } chrono = { version = "0.4.31", default_features = false } -reqwest = { version = "=0.11.13", default-features = false } +reqwest = { version = "0.11.22", default-features = false } tokio = { version = "1.32.0" } pyo3 = { version = "0.19" } prost = { version = "0.12.1" } prost-types = { version = "0.12.1" } +object_store = { version="0.7" } datafusion = { version = "32.0.0" } datafusion-common = { version = "32.0.0", default_features = false} diff --git a/automation/start_minio.py b/automation/start_minio.py new file mode 100644 index 000000000..3af020390 --- /dev/null +++ b/automation/start_minio.py @@ -0,0 +1,131 @@ +from minio import Minio +from minio.error import S3Error +import os +import subprocess +import time +from pathlib import Path +import shutil +import pandas as pd +from csv import QUOTE_ALL +from io import BytesIO +import pyarrow as pa + +root = Path(__file__).parent.parent + + +def main(): + access_key = "access_key123" + secret_key = "secret_key123" + print("Starting minio") + p = start_minio_server(access_key, secret_key) + time.sleep(1) + + # Create a client with the MinIO server playground, its access key + # and secret key. + client = Minio( + "localhost:9000", + access_key=access_key, + secret_key=secret_key, + secure=False, + ) + + # Make 'data' bucket if it does not exist + print("Loading test data to the 'data' bucket") + found = client.bucket_exists("data") + if not found: + client.make_bucket("data") + else: + print("Bucket 'data' already exists") + + # Put original json object + movies_json_path = root / "vegafusion-runtime" / "tests" / "util" / "vegajs_runtime" / "data" / "movies.json" + client.fput_object( + "data", + "movies.json", + movies_json_path, + ) + + # load as pandas + df = pd.read_json(movies_json_path) + df["Title"] = df.Title.astype(str) + df["Release Date"] = pd.to_datetime(df["Release Date"]) + + # Convert to csv + f = BytesIO() + df.to_csv(f, index=False, quoting=QUOTE_ALL) + b = f.getvalue() + n = len(b) + + client.put_object( + "data", + "movies.csv", + BytesIO(b), + n + ) + + # Convert to arrow + tbl = pa.Table.from_pandas(df) + b = arrow_table_to_ipc_bytes(tbl) + client.put_object( + "data", + "movies.arrow", + BytesIO(b), + len(b) + ) + + # Convert to parquet. For some reason, uploading to minio with client.fput_object + # (as above for arrow) results in a parquet file with corrupt footer. + f = BytesIO() + df.to_parquet(f) + b = f.getvalue() + n = len(b) + + client.put_object( + "data", + "movies.parquet", + BytesIO(b), + n + ) + + print("Data loaded") + print(f""" +Open dashboard at http://127.0.0.1:9000 +username: {access_key} +password: {secret_key} +""") + # Block on the server + p.wait() + + +def start_minio_server(access_key, secret_key): + # Set environment variables for access and secret keys + env = os.environ.copy() + env["MINIO_ROOT_USER"] = access_key + env["MINIO_ROOT_PASSWORD"] = secret_key + env["MINIO_REGION"] = "us-east-1" + + # Command to start MinIO server + data_dir = root / "minio_data" + shutil.rmtree(data_dir, ignore_errors=True) + cmd = ["minio", "server", "minio_data"] + + # Start MinIO server in the background + process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + return process + + +def arrow_table_to_ipc_bytes(table): + bytes_buffer = BytesIO() + max_chunksize=8096 + with pa.ipc.new_file(bytes_buffer, table.schema) as f: + f.write_table(table, max_chunksize=max_chunksize) + + return bytes_buffer.getvalue() + + +if __name__ == "__main__": + try: + main() + except S3Error as exc: + print("error occurred.", exc) \ No newline at end of file diff --git a/pixi.lock b/pixi.lock index c4787f221..4e94ffdd3 100644 --- a/pixi.lock +++ b/pixi.lock @@ -4466,6 +4466,90 @@ package: noarch: python size: 11682 timestamp: 1691045097208 +- name: configparser + version: 5.3.0 + manager: conda + platform: linux-64 + dependencies: + python: '>=3.6' + url: https://conda.anaconda.org/conda-forge/noarch/configparser-5.3.0-pyhd8ed1ab_0.tar.bz2 + hash: + md5: c99fd5916160900dc5ff64204da99c4d + sha256: ce6ce9ee08437b46c284d52b076fb091cf6f2a9e12860d4a37546adbd5f53b28 + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: linux-64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 22490 + timestamp: 1660952265700 +- name: configparser + version: 5.3.0 + manager: conda + platform: osx-64 + dependencies: + python: '>=3.6' + url: https://conda.anaconda.org/conda-forge/noarch/configparser-5.3.0-pyhd8ed1ab_0.tar.bz2 + hash: + md5: c99fd5916160900dc5ff64204da99c4d + sha256: ce6ce9ee08437b46c284d52b076fb091cf6f2a9e12860d4a37546adbd5f53b28 + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: osx-64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 22490 + timestamp: 1660952265700 +- name: configparser + version: 5.3.0 + manager: conda + platform: osx-arm64 + dependencies: + python: '>=3.6' + url: https://conda.anaconda.org/conda-forge/noarch/configparser-5.3.0-pyhd8ed1ab_0.tar.bz2 + hash: + md5: c99fd5916160900dc5ff64204da99c4d + sha256: ce6ce9ee08437b46c284d52b076fb091cf6f2a9e12860d4a37546adbd5f53b28 + optional: false + category: main + build: pyhd8ed1ab_0 + arch: aarch64 + subdir: osx-arm64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 22490 + timestamp: 1660952265700 +- name: configparser + version: 5.3.0 + manager: conda + platform: win-64 + dependencies: + python: '>=3.6' + url: https://conda.anaconda.org/conda-forge/noarch/configparser-5.3.0-pyhd8ed1ab_0.tar.bz2 + hash: + md5: c99fd5916160900dc5ff64204da99c4d + sha256: ce6ce9ee08437b46c284d52b076fb091cf6f2a9e12860d4a37546adbd5f53b28 + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: win-64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 22490 + timestamp: 1660952265700 - name: coverage version: 7.3.2 manager: conda @@ -5869,6 +5953,90 @@ package: license: LGPL-2.1 size: 60255 timestamp: 1604417405528 +- name: future + version: 0.18.3 + manager: conda + platform: linux-64 + dependencies: + python: '>=3.8' + url: https://conda.anaconda.org/conda-forge/noarch/future-0.18.3-pyhd8ed1ab_0.conda + hash: + md5: fec8329fc739090f26a7d7803db254f1 + sha256: b3d34bf4924cb80363c1ab57ac821393f118ffaa94f05368bf4044941163b65e + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: linux-64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 365520 + timestamp: 1673596757510 +- name: future + version: 0.18.3 + manager: conda + platform: osx-64 + dependencies: + python: '>=3.8' + url: https://conda.anaconda.org/conda-forge/noarch/future-0.18.3-pyhd8ed1ab_0.conda + hash: + md5: fec8329fc739090f26a7d7803db254f1 + sha256: b3d34bf4924cb80363c1ab57ac821393f118ffaa94f05368bf4044941163b65e + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: osx-64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 365520 + timestamp: 1673596757510 +- name: future + version: 0.18.3 + manager: conda + platform: osx-arm64 + dependencies: + python: '>=3.8' + url: https://conda.anaconda.org/conda-forge/noarch/future-0.18.3-pyhd8ed1ab_0.conda + hash: + md5: fec8329fc739090f26a7d7803db254f1 + sha256: b3d34bf4924cb80363c1ab57ac821393f118ffaa94f05368bf4044941163b65e + optional: false + category: main + build: pyhd8ed1ab_0 + arch: aarch64 + subdir: osx-arm64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 365520 + timestamp: 1673596757510 +- name: future + version: 0.18.3 + manager: conda + platform: win-64 + dependencies: + python: '>=3.8' + url: https://conda.anaconda.org/conda-forge/noarch/future-0.18.3-pyhd8ed1ab_0.conda + hash: + md5: fec8329fc739090f26a7d7803db254f1 + sha256: b3d34bf4924cb80363c1ab57ac821393f118ffaa94f05368bf4044941163b65e + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: win-64 + build_number: 0 + license: MIT + license_family: MIT + noarch: python + size: 365520 + timestamp: 1673596757510 - name: gcc_impl_linux-64 version: 13.2.0 manager: conda @@ -14580,6 +14748,192 @@ package: noarch: python size: 13707 timestamp: 1639515992326 +- name: minio + version: 7.1.17 + manager: conda + platform: linux-64 + dependencies: + certifi: '*' + configparser: '*' + future: '*' + python: '>=3.6' + python-dateutil: '*' + pytz: '*' + urllib3: '*' + url: https://conda.anaconda.org/conda-forge/noarch/minio-7.1.17-pyhd8ed1ab_0.conda + hash: + md5: 37a6599b8de3b46f8faff4aeb55b718f + sha256: 845cea4347a5b40d08caf013a7ae4aefe0112a66dd3bf0aa334068a3a5db962c + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: linux-64 + build_number: 0 + license: Apache-2.0 + license_family: Apache + noarch: python + size: 59925 + timestamp: 1695627015341 +- name: minio + version: 7.1.17 + manager: conda + platform: osx-64 + dependencies: + certifi: '*' + configparser: '*' + future: '*' + python: '>=3.6' + python-dateutil: '*' + pytz: '*' + urllib3: '*' + url: https://conda.anaconda.org/conda-forge/noarch/minio-7.1.17-pyhd8ed1ab_0.conda + hash: + md5: 37a6599b8de3b46f8faff4aeb55b718f + sha256: 845cea4347a5b40d08caf013a7ae4aefe0112a66dd3bf0aa334068a3a5db962c + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: osx-64 + build_number: 0 + license: Apache-2.0 + license_family: Apache + noarch: python + size: 59925 + timestamp: 1695627015341 +- name: minio + version: 7.1.17 + manager: conda + platform: osx-arm64 + dependencies: + certifi: '*' + configparser: '*' + future: '*' + python: '>=3.6' + python-dateutil: '*' + pytz: '*' + urllib3: '*' + url: https://conda.anaconda.org/conda-forge/noarch/minio-7.1.17-pyhd8ed1ab_0.conda + hash: + md5: 37a6599b8de3b46f8faff4aeb55b718f + sha256: 845cea4347a5b40d08caf013a7ae4aefe0112a66dd3bf0aa334068a3a5db962c + optional: false + category: main + build: pyhd8ed1ab_0 + arch: aarch64 + subdir: osx-arm64 + build_number: 0 + license: Apache-2.0 + license_family: Apache + noarch: python + size: 59925 + timestamp: 1695627015341 +- name: minio + version: 7.1.17 + manager: conda + platform: win-64 + dependencies: + certifi: '*' + configparser: '*' + future: '*' + python: '>=3.6' + python-dateutil: '*' + pytz: '*' + urllib3: '*' + url: https://conda.anaconda.org/conda-forge/noarch/minio-7.1.17-pyhd8ed1ab_0.conda + hash: + md5: 37a6599b8de3b46f8faff4aeb55b718f + sha256: 845cea4347a5b40d08caf013a7ae4aefe0112a66dd3bf0aa334068a3a5db962c + optional: false + category: main + build: pyhd8ed1ab_0 + arch: x86_64 + subdir: win-64 + build_number: 0 + license: Apache-2.0 + license_family: Apache + noarch: python + size: 59925 + timestamp: 1695627015341 +- name: minio-server + version: 2023.09.23.03.47.50 + manager: conda + platform: linux-64 + dependencies: {} + url: https://conda.anaconda.org/conda-forge/linux-64/minio-server-2023.09.23.03.47.50-hbcca054_0.conda + hash: + md5: ff773f379a8324cefc3a904a5fcb0c27 + sha256: 83d01977a656502d67ba01b709bdaa39730d19d4630d0965e3150fc5b76de5b1 + optional: false + category: main + build: hbcca054_0 + arch: x86_64 + subdir: linux-64 + build_number: 0 + license: AGPL-3.0-only + license_family: AGPL + size: 28514641 + timestamp: 1695661212355 +- name: minio-server + version: 2023.09.23.03.47.50 + manager: conda + platform: osx-64 + dependencies: {} + url: https://conda.anaconda.org/conda-forge/osx-64/minio-server-2023.09.23.03.47.50-h8857fd0_0.conda + hash: + md5: e5631d220e6361e9945598970cb8cbd3 + sha256: 5466cf7e24112c70405a1631f3f4fb5e86fa9e93c5beaf5ea72a9839ae75f7bb + optional: false + category: main + build: h8857fd0_0 + arch: x86_64 + subdir: osx-64 + build_number: 0 + constrains: + - __osx>=10.12 + license: AGPL-3.0-only + license_family: AGPL + size: 29438678 + timestamp: 1695669781862 +- name: minio-server + version: 2023.09.23.03.47.50 + manager: conda + platform: osx-arm64 + dependencies: {} + url: https://conda.anaconda.org/conda-forge/osx-arm64/minio-server-2023.09.23.03.47.50-hf0a4a13_0.conda + hash: + md5: 716354cbeec9ed541b75c4d1398af8e9 + sha256: e8ecdc8c0943cd01a1ac488f2fdea36ec74161ca1225f47c06b71934dea61ef2 + optional: false + category: main + build: hf0a4a13_0 + arch: aarch64 + subdir: osx-arm64 + build_number: 0 + license: AGPL-3.0-only + license_family: AGPL + size: 28775220 + timestamp: 1695661328725 +- name: minio-server + version: 2023.09.23.03.47.50 + manager: conda + platform: win-64 + dependencies: {} + url: https://conda.anaconda.org/conda-forge/win-64/minio-server-2023.09.23.03.47.50-h56e8100_0.conda + hash: + md5: 842697378d8ac142d1dac05a310cb9eb + sha256: 1382416ae6dc8c5fcde90f448e03860ef0da591d2aa3e06bde312ccfb8bd5607 + optional: false + category: main + build: h56e8100_0 + arch: x86_64 + subdir: win-64 + build_number: 0 + license: AGPL-3.0-only + license_family: AGPL + size: 28599729 + timestamp: 1695661919637 - name: mistune version: 3.0.2 manager: conda diff --git a/pixi.toml b/pixi.toml index 65a1f9c89..d6e3f09b4 100644 --- a/pixi.toml +++ b/pixi.toml @@ -34,6 +34,7 @@ test-py-jupyter-headless = { cmd = "export VEGAFUSION_TEST_HEADLESS=1 && cd pyth build-vegajs-runtime = { cmd = "cd vegafusion-runtime/tests/util/vegajs_runtime && npm install" } test-rs-core = "cargo test -p vegafusion-core $0" test-rs-runtime = { cmd="cargo test -p vegafusion-runtime $0", depends_on = ["build-vegajs-runtime"] } +test-rs-runtime-s3 = { cmd="VEGAFUSION_S3_TESTS=1 cargo test -p vegafusion-runtime $0", depends_on = ["build-vegajs-runtime"] } test-rs-server = "cargo test -p vegafusion-server $0" test-rs-sql = "cargo test -p vegafusion-sql $0" test-rs = { cmd = "cargo test --workspace --exclude vegafusion-python-embed --exclude vegafusion-wasm $0", depends_on = ["build-vegajs-runtime"] } @@ -50,6 +51,9 @@ build-js-embed = { cmd = "cd javascript/vegafusion-embed && npm install && npm r # VegaFusion Server build-rs-vegafusion-server = { cmd = "cargo build -p vegafusion-server --release $0" } +# minio +start-minio = "python automation/start_minio.py" + # Java build-jni = "cargo build -p vegafusion-jni --release $0" build-jar = "cd java && ./gradlew jar" @@ -120,6 +124,8 @@ rust = "1.71.1.*" jupytext = "1.15.0.*" openjdk = "20.0.0.*" build = "0.7.0.*" +minio-server = "2023.9.23.3.47.50.*" +minio = "7.1.17.*" # Dependencies are those required at runtime by the Python packages [dependencies] diff --git a/vegafusion-common/Cargo.toml b/vegafusion-common/Cargo.toml index 32dd2a7a8..4f373e889 100644 --- a/vegafusion-common/Cargo.toml +++ b/vegafusion-common/Cargo.toml @@ -52,3 +52,7 @@ optional = true [dependencies.base64] version = "0.21.2" optional = true + +[dependencies.object_store] +workspace = true +optional = true diff --git a/vegafusion-common/src/error.rs b/vegafusion-common/src/error.rs index c960c6a96..2815c40e6 100644 --- a/vegafusion-common/src/error.rs +++ b/vegafusion-common/src/error.rs @@ -16,6 +16,9 @@ use jni::errors::Error as JniError; #[cfg(feature = "base64")] use base64::DecodeError as Base64DecodeError; +#[cfg(feature = "object_store")] +use object_store::{path::Error as ObjectStorePathError, Error as ObjectStoreError}; + pub type Result = result::Result; #[derive(Clone, Debug, Default)] @@ -90,6 +93,10 @@ pub enum VegaFusionError { #[cfg(feature = "base64")] #[error("Base64 Decode Error: {0}\n{1}")] Base64DecodeError(Base64DecodeError, ErrorContext), + + #[cfg(feature = "object_store")] + #[error("ObjectStoreError Error: {0}\n{1}")] + ObjectStoreError(ObjectStoreError, ErrorContext), } impl VegaFusionError { @@ -175,6 +182,11 @@ impl VegaFusionError { context.contexts.push(context_fn().into()); VegaFusionError::Base64DecodeError(err, context) } + #[cfg(feature = "object_store")] + ObjectStoreError(err, mut context) => { + context.contexts.push(context_fn().into()); + VegaFusionError::ObjectStoreError(err, context) + } } } @@ -264,6 +276,10 @@ impl VegaFusionError { Base64DecodeError(err, context) => { VegaFusionError::Base64DecodeError(err.clone(), context.clone()) } + #[cfg(feature = "object_store")] + ObjectStoreError(err, context) => { + VegaFusionError::ExternalError(err.to_string(), context.clone()) + } } } } @@ -379,6 +395,23 @@ impl From for VegaFusionError { } } +#[cfg(feature = "object_store")] +impl From for VegaFusionError { + fn from(err: ObjectStoreError) -> Self { + Self::ObjectStoreError(err, Default::default()) + } +} + +#[cfg(feature = "object_store")] +impl From for VegaFusionError { + fn from(err: ObjectStorePathError) -> Self { + Self::ObjectStoreError( + ObjectStoreError::InvalidPath { source: err }, + Default::default(), + ) + } +} + pub trait ToExternalError { fn external>(self, context: S) -> Result; } diff --git a/vegafusion-dataframe/src/connection.rs b/vegafusion-dataframe/src/connection.rs index 23f99c2c3..328cb3471 100644 --- a/vegafusion-dataframe/src/connection.rs +++ b/vegafusion-dataframe/src/connection.rs @@ -41,4 +41,11 @@ pub trait Connection: Send + Sync + 'static { "scan_arrow_file not supported by connection", )) } + + /// Scan an Parquet file into a DataFrame + async fn scan_parquet(&self, _url: &str) -> Result> { + Err(VegaFusionError::sql_not_supported( + "scan_parquet not supported by connection", + )) + } } diff --git a/vegafusion-runtime/Cargo.toml b/vegafusion-runtime/Cargo.toml index b6884fff6..bdd03ba81 100644 --- a/vegafusion-runtime/Cargo.toml +++ b/vegafusion-runtime/Cargo.toml @@ -35,6 +35,7 @@ env_logger = "0.10.0" ordered-float = "3.6.0" reqwest-retry = "0.3.0" reqwest-middleware = "0.2.0" +object_store = { workspace=true, features = ["aws"] } [dev-dependencies] futures = "0.3.21" @@ -64,7 +65,7 @@ workspace = true [dependencies.vegafusion-common] path = "../vegafusion-common" -features = [ "json", "sqlparser", "prettyprint",] +features = [ "json", "sqlparser", "prettyprint", "object_store"] version = "1.4.3" [dependencies.vegafusion-core] diff --git a/vegafusion-runtime/src/data/tasks.rs b/vegafusion-runtime/src/data/tasks.rs index 9487831f0..10c638c31 100644 --- a/vegafusion-runtime/src/data/tasks.rs +++ b/vegafusion-runtime/src/data/tasks.rs @@ -27,6 +27,8 @@ use vegafusion_core::proto::gen::transforms::TransformPipeline; use vegafusion_core::task_graph::task::{InputVariable, TaskDependencies}; use vegafusion_core::task_graph::task_value::TaskValue; +use object_store::aws::AmazonS3Builder; +use object_store::ObjectStore; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use vegafusion_common::arrow::datatypes::{DataType, Field, Schema}; @@ -142,6 +144,10 @@ impl TaskCall for DataUrlTask { || (file_type.is_none() && (url.ends_with(".arrow") || url.ends_with(".feather"))) { read_arrow(&url, conn).await? + } else if file_type == Some("parquet") + || (file_type.is_none() && (url.ends_with(".parquet"))) + { + read_parquet(&url, conn).await? } else { return Err(VegaFusionError::internal(format!( "Invalid url file extension {url}" @@ -634,6 +640,21 @@ async fn read_json(url: &str, conn: Arc) -> Result) -> Result) -> Result> { + conn.scan_parquet(url).await +} + pub fn make_request_client() -> ClientWithMiddleware { // Retry up to 3 times with increasing intervals between attempts. let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); diff --git a/vegafusion-runtime/tests/test_pre_transform_values.rs b/vegafusion-runtime/tests/test_pre_transform_values.rs index 5f035e033..7de4f92c7 100644 --- a/vegafusion-runtime/tests/test_pre_transform_values.rs +++ b/vegafusion-runtime/tests/test_pre_transform_values.rs @@ -1,8 +1,9 @@ #[cfg(test)] mod tests { - use crate::crate_dir; + use crate::{crate_dir, setup_s3_environment_vars}; use serde_json::json; use std::collections::HashMap; + use std::env; use std::fs; use std::sync::Arc; use vegafusion_common::data::table::VegaFusionTable; @@ -10,6 +11,7 @@ mod tests { use vegafusion_core::proto::gen::pretransform::pre_transform_values_warning::WarningType; use vegafusion_core::proto::gen::tasks::Variable; use vegafusion_core::spec::chart::ChartSpec; + use vegafusion_core::spec::values::StringOrSignalSpec; use vegafusion_runtime::data::dataset::VegaFusionDataset; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use vegafusion_sql::connection::datafusion_conn::DataFusionConnection; @@ -351,6 +353,73 @@ mod tests { +---------------------+---------------------+---------+---------+---------------+-------------+"; assert_eq!(drag_selected.pretty_format(None).unwrap(), expected); } + + #[tokio::test] + async fn test_pre_transform_dataset_s3() { + let run_s3_tests = env::var("VEGAFUSION_S3_TESTS").is_ok(); + if !run_s3_tests { + return; + } + + // Note: s3 tests require the pixi start-minio job + setup_s3_environment_vars(); + + // Load spec + let spec_path = format!("{}/tests/specs/vegalite/histogram.vg.json", crate_dir()); + let spec_str = fs::read_to_string(spec_path).unwrap(); + let mut spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); + + for file_type in ["json", "csv", "arrow", "parquet"] { + // Prefix data/movies.json with s3:// + println!("File type: {file_type}"); + spec.data[0].url = Some(StringOrSignalSpec::String(format!( + "s3://data/movies.{file_type}" + ))); + + // Initialize task graph runtime + let runtime = VegaFusionRuntime::new( + Arc::new(DataFusionConnection::default()), + Some(16), + Some(1024_i32.pow(3) as usize), + ); + + let (values, warnings) = runtime + .pre_transform_values( + &spec, + &[(Variable::new_data("source_0"), vec![])], + "UTC", + &None, + None, + Default::default(), + ) + .await + .unwrap(); + + // Check there are no warnings + assert!(warnings.is_empty()); + + // Check single returned dataset + assert_eq!(values.len(), 1); + + let dataset = values[0].as_table().cloned().unwrap(); + + let expected = "\ ++----------------------------+--------------------------------+---------+ +| bin_maxbins_10_IMDB Rating | bin_maxbins_10_IMDB Rating_end | __count | ++----------------------------+--------------------------------+---------+ +| 6.0 | 7.0 | 985 | +| 3.0 | 4.0 | 100 | +| 7.0 | 8.0 | 741 | +| 5.0 | 6.0 | 633 | +| 8.0 | 9.0 | 204 | +| 2.0 | 3.0 | 43 | +| 4.0 | 5.0 | 273 | +| 9.0 | 10.0 | 4 | +| 1.0 | 2.0 | 5 | ++----------------------------+--------------------------------+---------+"; + assert_eq!(dataset.pretty_format(None).unwrap(), expected); + } + } } fn crate_dir() -> String { @@ -358,3 +427,11 @@ fn crate_dir() -> String { .display() .to_string() } + +fn setup_s3_environment_vars() { + std::env::set_var("AWS_DEFAULT_REGION", "us-east-1"); + std::env::set_var("AWS_ACCESS_KEY_ID", "access_key123"); + std::env::set_var("AWS_SECRET_ACCESS_KEY", "secret_key123"); + std::env::set_var("AWS_ENDPOINT", "http://127.0.0.1:9000"); + std::env::set_var("AWS_ALLOW_HTTP", "true"); +} diff --git a/vegafusion-sql/Cargo.toml b/vegafusion-sql/Cargo.toml index d92996a77..6a2689cf4 100644 --- a/vegafusion-sql/Cargo.toml +++ b/vegafusion-sql/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" description = "VegaFusion SQL dialect generation and connection implementations" [features] -datafusion-conn = [ "datafusion", "tempfile", "reqwest", "reqwest-retry", "reqwest-middleware", "vegafusion-datafusion-udfs",] +datafusion-conn = [ "datafusion", "tempfile", "reqwest", "reqwest-retry", "reqwest-middleware", "vegafusion-datafusion-udfs", "object_store", "url"] [dependencies] async-trait = "0.1.73" @@ -76,6 +76,15 @@ optional = true version = "^1.5.5" optional = true +[dependencies.object_store] +workspace = true +optional = true +features = [ "aws",] + +[dependencies.url] +version = "2.3.1" +optional = true + [dev-dependencies.async-std] version = "1.12.0" features = [ "attributes",] diff --git a/vegafusion-sql/src/connection/datafusion_conn.rs b/vegafusion-sql/src/connection/datafusion_conn.rs index 426c748af..b1cd94b73 100644 --- a/vegafusion-sql/src/connection/datafusion_conn.rs +++ b/vegafusion-sql/src/connection/datafusion_conn.rs @@ -11,8 +11,11 @@ use datafusion::execution::options::{ArrowReadOptions, ReadOptions}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::optimizer::analyzer::inline_table_scan::InlineTableScan; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; -use datafusion::prelude::{CsvReadOptions as DfCsvReadOptions, SessionConfig, SessionContext}; +use datafusion::prelude::{ + CsvReadOptions as DfCsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext, +}; use log::Level; +use object_store::aws::AmazonS3Builder; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::policies::ExponentialBackoff; use reqwest_retry::RetryTransientMiddleware; @@ -21,6 +24,7 @@ use std::fs::File; use std::io::Write; use std::path::Path; use std::sync::Arc; +use url::Url; use vegafusion_common::column::flat_col; use vegafusion_common::data::table::VegaFusionTable; use vegafusion_common::datatypes::cast_to; @@ -59,6 +63,37 @@ impl DataFusionConnection { ctx, } } + + fn create_s3_datafusion_session_context( + url: &str, + bucket_path: &str, + ) -> Result { + let s3 = AmazonS3Builder::from_env().with_url(url).build().with_context(|| + format!( + "Failed to initialize s3 connection from environment variables.\n\ + See https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.from_env" + ) + )?; + let Some((bucket, _)) = bucket_path.split_once("/") else { + return Err(VegaFusionError::specification(format!("Invalid s3 URL: {url}"))); + }; + let base_url = Url::parse(&format!("s3://{bucket}/")).expect("Should be valid URL"); + let ctx = make_datafusion_context(); + ctx.runtime_env() + .register_object_store(&base_url, Arc::new(s3)); + Ok(ctx) + } + + fn get_parquet_opts(url: &str) -> ParquetReadOptions { + let mut opts = ParquetReadOptions::default(); + let path = Path::new(url); + if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) { + opts.file_extension = ext; + } else { + opts.file_extension = ""; + } + opts + } } impl Default for DataFusionConnection { @@ -165,7 +200,7 @@ impl Connection for DataFusionConnection { let path = tempdir.path().to_str().unwrap(); // Build final csv schema that combines the requested and inferred schemas - let final_schema = build_csv_schema(&df_csv_opts, path).await?; + let final_schema = build_csv_schema(&df_csv_opts, path, &self.ctx).await?; df_csv_opts = df_csv_opts.schema(&final_schema); // Load through VegaFusionTable so that temp file can be deleted @@ -177,12 +212,35 @@ impl Connection for DataFusionConnection { let table = table.with_ordering()?; self.scan_arrow(table).await + } else if let Some(bucket_path) = url.strip_prefix("s3://") { + let s3 = AmazonS3Builder::from_env().with_url(url).build().with_context(|| + format!( + "Failed to initialize s3 connection from environment variables.\n\ + See https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.from_env" + ) + )?; + let Some((bucket, _)) = bucket_path.split_once("/") else { + return Err(VegaFusionError::specification(format!("Invalid s3 URL: {url}"))); + }; + let base_url = Url::parse(&format!("s3://{bucket}/")).expect("Should be valid URL"); + let ctx = make_datafusion_context(); + ctx.runtime_env() + .register_object_store(&base_url, Arc::new(s3)); + + let final_schema = build_csv_schema(&df_csv_opts, url, &ctx).await?; + df_csv_opts = df_csv_opts.schema(&final_schema); + + ctx.register_csv("csv_tbl", url, df_csv_opts).await?; + let sql_conn = DataFusionConnection::new(Arc::new(ctx)); + Ok(Arc::new( + SqlDataFrame::try_new(Arc::new(sql_conn), "csv_tbl", Default::default()).await?, + )) } else { // Build final csv schema that combines the requested and inferred schemas - let final_schema = build_csv_schema(&df_csv_opts, url).await?; + let final_schema = build_csv_schema(&df_csv_opts, url, &self.ctx).await?; df_csv_opts = df_csv_opts.schema(&final_schema); - let df = self.ctx.read_csv(url, df_csv_opts).await.unwrap(); + let df = self.ctx.read_csv(url, df_csv_opts).await?; let schema: SchemaRef = Arc::new(df.schema().into()) as SchemaRef; let batches = df.collect().await?; let table = VegaFusionTable::try_new(schema, batches)?; @@ -229,6 +287,22 @@ impl Connection for DataFusionConnection { let table = VegaFusionTable::try_new(schema, batches)?.with_ordering()?; self.scan_arrow(table).await + } else if let Some(bucket_path) = url.strip_prefix("s3://") { + let ctx = Self::create_s3_datafusion_session_context(url, bucket_path)?; + + let mut opts = ArrowReadOptions::default(); + let path = Path::new(url); + if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) { + opts.file_extension = ext; + } else { + opts.file_extension = ""; + } + + ctx.register_arrow("arrow_tbl", url, opts).await?; + let sql_conn = DataFusionConnection::new(Arc::new(ctx)); + Ok(Arc::new( + SqlDataFrame::try_new(Arc::new(sql_conn), "arrow_tbl", Default::default()).await?, + )) } else { // Assume local file let path = Path::new(url); @@ -247,6 +321,37 @@ impl Connection for DataFusionConnection { )) } } + + async fn scan_parquet(&self, url: &str) -> Result> { + if url.starts_with("http://") || url.starts_with("https://") { + Err(VegaFusionError::internal( + "The DataFusion connection does not yet support loading parquet files over http or https.\n\ + Loading parquet files from the local filesystem and from s3 is supported." + )) + } else if let Some(bucket_path) = url.strip_prefix("s3://") { + let ctx = Self::create_s3_datafusion_session_context(url, bucket_path)?; + + let opts = Self::get_parquet_opts(url); + + ctx.register_parquet("parquet_tbl", url, opts).await?; + let sql_conn = DataFusionConnection::new(Arc::new(ctx)); + Ok(Arc::new( + SqlDataFrame::try_new(Arc::new(sql_conn), "parquet_tbl", Default::default()) + .await?, + )) + } else { + // Assume local file + let ctx = make_datafusion_context(); + let opts = Self::get_parquet_opts(url); + + ctx.register_parquet("parquet_tbl", url, opts).await?; + let sql_conn = DataFusionConnection::new(Arc::new(ctx)); + Ok(Arc::new( + SqlDataFrame::try_new(Arc::new(sql_conn), "parquet_tbl", Default::default()) + .await?, + )) + } + } } #[async_trait::async_trait] @@ -310,8 +415,8 @@ impl SqlConnection for DataFusionConnection { async fn build_csv_schema( csv_opts: &DfCsvReadOptions<'_>, uri: impl Into, + ctx: &SessionContext, ) -> Result { - let ctx = SessionContext::new(); let table_path = ListingTableUrl::parse(uri.into().as_str())?; let listing_options = csv_opts.to_listing_options(&ctx.copied_config()); let inferred_schema = listing_options