diff --git a/Cargo.lock b/Cargo.lock index 672fec9c08..b9ae87f30a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -250,7 +250,7 @@ dependencies = [ "hex", "http", "hyper", - "ring", + "ring 0.16.20", "time", "tokio", "tower", @@ -938,9 +938,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.7.1" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" +checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "core-foundation" @@ -1040,16 +1040,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "crypto-bigint" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21" -dependencies = [ - "generic-array", - "subtle", -] - [[package]] name = "crypto-common" version = "0.1.6" @@ -1390,13 +1380,13 @@ checksum = "c61ceff48ed7e0e66d428a569d36485a091c39fe118ee1220217655f6b814fa9" [[package]] name = "der" -version = "0.5.1" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ "const-oid", - "crypto-bigint", "pem-rfc7468", + "zeroize", ] [[package]] @@ -1732,8 +1722,7 @@ dependencies = [ [[package]] name = "google-cloud-auth" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1087f1fbd2dd3f58c17c7574ddd99cd61cbbbc2c4dc81114b8687209b196cb" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=65251957f6aed2eb8adb62b9a9a1b9996150d265#65251957f6aed2eb8adb62b9a9a1b9996150d265" dependencies = [ "async-trait", "base64 0.21.5", @@ -1754,8 +1743,7 @@ dependencies = [ [[package]] name = "google-cloud-metadata" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc279bfb50487d7bcd900e8688406475fc750fe474a835b2ab9ade9eb1fc90e2" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=65251957f6aed2eb8adb62b9a9a1b9996150d265#65251957f6aed2eb8adb62b9a9a1b9996150d265" dependencies = [ "reqwest", "thiserror", @@ -1764,11 +1752,11 @@ dependencies = [ [[package]] name = "google-cloud-storage" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bea96059b656bc5f3332865c02cadc532238b269bd29b2fde64d8ecb878d1b13" +version = "0.15.0" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=65251957f6aed2eb8adb62b9a9a1b9996150d265#65251957f6aed2eb8adb62b9a9a1b9996150d265" dependencies = [ "async-stream", + "async-trait", "base64 0.21.5", "bytes", "futures-util", @@ -1778,10 +1766,10 @@ dependencies = [ "hex", "once_cell", "percent-encoding", + "pkcs8", "regex", "reqwest", - "ring", - "rsa", + "ring 0.17.6", "serde", "serde_json", "sha2", @@ -1794,9 +1782,8 @@ dependencies = [ [[package]] name = "google-cloud-token" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcd62eb34e3de2f085bcc33a09c3e17c4f65650f36d53eb328b00d63bcb536a" +version = "0.1.2" +source = "git+https://github.com/yoshidan/google-cloud-rust?rev=65251957f6aed2eb8adb62b9a9a1b9996150d265#65251957f6aed2eb8adb62b9a9a1b9996150d265" dependencies = [ "async-trait", ] @@ -2131,7 +2118,7 @@ checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.5", "pem", - "ring", + "ring 0.16.20", "serde", "serde_json", "simple_asn1", @@ -2142,9 +2129,6 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -dependencies = [ - "spin", -] [[package]] name = "lexical-core" @@ -2216,12 +2200,6 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" -[[package]] -name = "libm" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" - [[package]] name = "linux-raw-sys" version = "0.4.10" @@ -2417,23 +2395,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-bigint-dig" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" -dependencies = [ - "byteorder", - "lazy_static", - "libm", - "num-integer", - "num-iter", - "num-traits", - "rand 0.8.5", - "smallvec", - "zeroize", -] - [[package]] name = "num-complex" version = "0.4.4" @@ -2464,17 +2425,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-iter" -version = "0.1.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - [[package]] name = "num-rational" version = "0.4.1" @@ -2493,7 +2443,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", - "libm", ] [[package]] @@ -2679,9 +2628,9 @@ dependencies = [ [[package]] name = "pem-rfc7468" -version = "0.3.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01de5d978f34aa4b2296576379fcc416034702fd94117c56ffd8a1a767cefb30" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" dependencies = [ "base64ct", ] @@ -2762,26 +2711,14 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkcs1" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a78f66c04ccc83dd4486fd46c33896f4e17b24a7a3a6400dedc48ed0ddd72320" -dependencies = [ - "der", - "pkcs8", - "zeroize", -] - [[package]] name = "pkcs8" -version = "0.8.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der", "spki", - "zeroize", ] [[package]] @@ -3118,30 +3055,24 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] [[package]] -name = "rsa" -version = "0.6.1" +name = "ring" +version = "0.17.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cf22754c49613d2b3b119f0e5d46e34a2c628a937e3024b8762de4e7d8c710b" +checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866" dependencies = [ - "byteorder", - "digest", - "num-bigint-dig", - "num-integer", - "num-iter", - "num-traits", - "pkcs1", - "pkcs8", - "rand_core 0.6.4", - "smallvec", - "subtle", - "zeroize", + "cc", + "getrandom 0.2.10", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys", ] [[package]] @@ -3455,11 +3386,17 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "spki" -version = "0.5.4" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", "der", @@ -3866,6 +3803,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.4.1" diff --git a/src/daft-io/Cargo.toml b/src/daft-io/Cargo.toml index fe6bfdfc07..6f27620574 100644 --- a/src/daft-io/Cargo.toml +++ b/src/daft-io/Cargo.toml @@ -17,7 +17,6 @@ common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} futures = {workspace = true} globset = "0.4" -google-cloud-storage = {version = "0.14.0", default-features = false, features = ["default-tls", "auth"]} hyper = "0.14.27" hyper-tls = "0.5.0" itertools = {workspace = true} @@ -35,6 +34,15 @@ tokio = {workspace = true} tokio-stream = {workspace = true} url = {workspace = true} +[dependencies.google-cloud-storage] +default-features = false +features = ["default-tls", "auth"] +# branch = "main" +git = "https://github.com/yoshidan/google-cloud-rust" +package = "google-cloud-storage" +rev = "65251957f6aed2eb8adb62b9a9a1b9996150d265" +version = "0.15.0" + [dependencies.reqwest] default-features = false features = ["stream", "native-tls"] diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 2abc62c9c7..34cdde825f 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -19,7 +19,6 @@ use crate::object_io::FileMetadata; use crate::object_io::FileType; use crate::object_io::LSResult; use crate::object_io::ObjectSource; -use crate::s3_like; use crate::stats::IOStatsRef; use crate::stream_utils::io_stats_on_bytestream; use crate::GetResult; @@ -109,10 +108,7 @@ impl From for super::Error { } } -enum GCSClientWrapper { - Native(Client), - S3Compat(Arc), -} +struct GCSClientWrapper(Client); fn parse_uri(uri: &url::Url) -> super::Result<(&str, &str)> { let bucket = match uri.host_str() { @@ -136,79 +132,65 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; - match self { - GCSClientWrapper::Native(client) => { - let req = GetObjectRequest { - bucket: bucket.into(), - object: key.into(), - ..Default::default() - }; - use google_cloud_storage::http::objects::download::Range as GRange; - let (grange, size) = if let Some(range) = range { - ( - GRange(Some(range.start as u64), Some(range.end as u64)), - Some(range.len()), - ) - } else { - (GRange::default(), None) - }; - let owned_uri = uri.to_string(); - let response = client - .download_streamed_object(&req, &grange) - .await - .context(UnableToOpenFileSnafu { - path: uri.to_string(), - })?; - let response = response.map_err(move |e| { - UnableToReadBytesSnafu:: { - path: owned_uri.clone(), - } - .into_error(e) - .into() - }); - if let Some(is) = io_stats.as_ref() { - is.mark_get_requests(1) - } - Ok(GetResult::Stream( - io_stats_on_bytestream(response, io_stats), - size, - None, - )) - } - GCSClientWrapper::S3Compat(client) => { - let uri = format!("s3://{}/{}", bucket, key); - client.get(&uri, range, io_stats).await + let client = &self.0; + let req = GetObjectRequest { + bucket: bucket.into(), + object: key.into(), + ..Default::default() + }; + use google_cloud_storage::http::objects::download::Range as GRange; + let (grange, size) = if let Some(range) = range { + ( + GRange(Some(range.start as u64), Some(range.end as u64)), + Some(range.len()), + ) + } else { + (GRange::default(), None) + }; + let owned_uri = uri.to_string(); + let response = client + .download_streamed_object(&req, &grange) + .await + .context(UnableToOpenFileSnafu { + path: uri.to_string(), + })?; + let response = response.map_err(move |e| { + UnableToReadBytesSnafu:: { + path: owned_uri.clone(), } + .into_error(e) + .into() + }); + if let Some(is) = io_stats.as_ref() { + is.mark_get_requests(1) } + Ok(GetResult::Stream( + io_stats_on_bytestream(response, io_stats), + size, + None, + )) } async fn get_size(&self, uri: &str, io_stats: Option) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; - match self { - GCSClientWrapper::Native(client) => { - let req = GetObjectRequest { - bucket: bucket.into(), - object: key.into(), - ..Default::default() - }; - - let response = client - .get_object(&req) - .await - .context(UnableToOpenFileSnafu { - path: uri.to_string(), - })?; - if let Some(is) = io_stats.as_ref() { - is.mark_head_requests(1) - } - Ok(response.size as usize) - } - GCSClientWrapper::S3Compat(client) => { - let uri = format!("s3://{}/{}", bucket, key); - client.get_size(&uri, io_stats).await - } + let client = &self.0; + let req = GetObjectRequest { + bucket: bucket.into(), + object: key.into(), + ..Default::default() + }; + + let response = client + .get_object(&req) + .await + .context(UnableToOpenFileSnafu { + path: uri.to_string(), + })?; + if let Some(is) = io_stats.as_ref() { + is.mark_head_requests(1) } + Ok(response.size as usize) } #[allow(clippy::too_many_arguments)] async fn _ls_impl( @@ -271,76 +253,69 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let (bucket, key) = parse_uri(&uri)?; - match self { - GCSClientWrapper::Native(client) => { - if posix { - // Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix) - let forced_directory_key = if key.is_empty() { - "".to_string() - } else { - format!("{}{GCS_DELIMITER}", key.trim_end_matches(GCS_DELIMITER)) - }; - let forced_directory_ls_result = self - ._ls_impl( - client, - bucket, - forced_directory_key.as_str(), - Some(GCS_DELIMITER), - continuation_token, - page_size, - io_stats.as_ref(), - ) - .await?; - - // If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's - // details as the one-and-only-one entry - if forced_directory_ls_result.files.is_empty() { - let mut file_result = self - ._ls_impl( - client, - bucket, - key, - Some(GCS_DELIMITER), - continuation_token, - page_size, - io_stats.as_ref(), - ) - .await?; - - // Only retain exact matches (since the API does prefix lists by default) - let target_path = format!("{GCS_SCHEME}://{bucket}/{key}"); - file_result.files.retain(|fm| fm.filepath == target_path); - - // Not dir and not file, so it is missing - if file_result.files.is_empty() { - return Err(Error::NotFound { - path: path.to_string(), - } - .into()); - } - - Ok(file_result) - } else { - Ok(forced_directory_ls_result) - } - } else { - self._ls_impl( + let client = &self.0; + + if posix { + // Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix) + let forced_directory_key = if key.is_empty() { + "".to_string() + } else { + format!("{}{GCS_DELIMITER}", key.trim_end_matches(GCS_DELIMITER)) + }; + let forced_directory_ls_result = self + ._ls_impl( + client, + bucket, + forced_directory_key.as_str(), + Some(GCS_DELIMITER), + continuation_token, + page_size, + io_stats.as_ref(), + ) + .await?; + + // If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's + // details as the one-and-only-one entry + if forced_directory_ls_result.files.is_empty() { + let mut file_result = self + ._ls_impl( client, bucket, key, - None, // Force a prefix-listing + Some(GCS_DELIMITER), continuation_token, page_size, io_stats.as_ref(), ) - .await + .await?; + + // Only retain exact matches (since the API does prefix lists by default) + let target_path = format!("{GCS_SCHEME}://{bucket}/{key}"); + file_result.files.retain(|fm| fm.filepath == target_path); + + // Not dir and not file, so it is missing + if file_result.files.is_empty() { + return Err(Error::NotFound { + path: path.to_string(), + } + .into()); } + + Ok(file_result) + } else { + Ok(forced_directory_ls_result) } - GCSClientWrapper::S3Compat(client) => { - client - .ls(path, posix, continuation_token, page_size, io_stats) - .await - } + } else { + self._ls_impl( + client, + bucket, + key, + None, // Force a prefix-listing + continuation_token, + page_size, + io_stats.as_ref(), + ) + .await } } } @@ -350,40 +325,29 @@ pub(crate) struct GCSSource { } impl GCSSource { - async fn build_s3_compat_client() -> super::Result> { - let s3_config = common_io_config::S3Config { - anonymous: true, - endpoint_url: Some("https://storage.googleapis.com".to_string()), - ..Default::default() - }; - let s3_client = s3_like::S3LikeSource::get_client(&s3_config).await?; - Ok(GCSSource { - client: GCSClientWrapper::S3Compat(s3_client), - } - .into()) - } pub async fn get_client(config: &GCSConfig) -> super::Result> { - if config.anonymous { - GCSSource::build_s3_compat_client().await - } else { - let config = ClientConfig::default() + let config = if !config.anonymous { + let attempted = ClientConfig::default() .with_auth() .await .context(UnableToLoadCredentialsSnafu {}); - match config { - Ok(config) => { - let client = Client::new(config); - Ok(GCSSource { - client: GCSClientWrapper::Native(client), - } - .into()) - } + + match attempted { + Ok(attempt) => attempt, Err(err) => { log::warn!("Google Cloud Storage Credentials not provided or found when making client. Reverting to Anonymous mode.\nDetails\n{err}"); - GCSSource::build_s3_compat_client().await + ClientConfig::default().anonymous() } } + } else { + ClientConfig::default().anonymous() + }; + + let client = Client::new(config); + Ok(GCSSource { + client: GCSClientWrapper(client), } + .into()) } }