From 58187326623763c63943377f9ec55cacb0e7e9c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 28 Nov 2024 11:09:00 +0100 Subject: [PATCH] [Minor] Use std::thread::available_parallelism instead of `num_cpus` (#13579) * Use std::thread::available_parallelism * Use std::thread::available_parallelism * Use std::thread::available_parallelism * Use std::thread::available_parallelism * Use std::thread::available_parallelism * Use std::thread::available_parallelism --- Cargo.toml | 1 - benchmarks/Cargo.toml | 1 - benchmarks/src/bin/external_aggr.rs | 8 +++++++- benchmarks/src/bin/h2o.rs | 6 +++++- benchmarks/src/imdb/run.rs | 8 +++++++- benchmarks/src/sort.rs | 8 +++++++- benchmarks/src/sort_tpch.rs | 8 +++++++- benchmarks/src/tpch/run.rs | 8 +++++++- benchmarks/src/util/options.rs | 10 +++++++++- benchmarks/src/util/run.rs | 6 +++++- datafusion-cli/Cargo.lock | 12 ------------ datafusion/common/Cargo.toml | 1 - datafusion/common/src/config.rs | 6 ++++-- datafusion/core/Cargo.toml | 1 - .../aggregation_fuzzer/context_generator.rs | 6 ++++-- datafusion/core/tests/sql/mod.rs | 9 ++++++++- datafusion/sqllogictest/Cargo.toml | 1 - datafusion/sqllogictest/bin/sqllogictests.rs | 8 +++++++- 18 files changed, 77 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 93af308ff041..76bc50d59a08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,6 @@ hashbrown = { version = "0.14.5", features = ["raw"] } indexmap = "2.0.0" itertools = "0.13" log = "^0.4" -num_cpus = "1.13.0" object_store = { version = "0.11.0", default-features = false } parking_lot = "0.12" parquet = { version = "53.3.0", default-features = false, features = [ diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 7f29f7471b6f..ad8debaf2fa3 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -42,7 +42,6 @@ env_logger = { workspace = true } futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", optional = true, default-features = false } -num_cpus = { workspace = true } parquet = { workspace = true, default-features = true } serde = { version = "1.0.136", features = ["derive"] } serde_json = { workspace = true } diff --git a/benchmarks/src/bin/external_aggr.rs b/benchmarks/src/bin/external_aggr.rs index 6438593a20a0..8e51a0787fc6 100644 --- a/benchmarks/src/bin/external_aggr.rs +++ b/benchmarks/src/bin/external_aggr.rs @@ -18,9 +18,11 @@ //! external_aggr binary entrypoint use std::collections::HashMap; +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; use std::sync::OnceLock; +use std::thread::available_parallelism; use structopt::StructOpt; use arrow::record_batch::RecordBatch; @@ -325,7 +327,11 @@ impl ExternalAggrConfig { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } /// Parse memory limit from string to number of bytes diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs index 1ddeb786a591..a91af463fa0f 100644 --- a/benchmarks/src/bin/h2o.rs +++ b/benchmarks/src/bin/h2o.rs @@ -27,8 +27,10 @@ use datafusion::datasource::MemTable; use datafusion::prelude::CsvReadOptions; use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext}; use datafusion_benchmarks::util::BenchmarkRun; +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use structopt::StructOpt; use tokio::time::Instant; @@ -91,7 +93,9 @@ async fn group_by(opt: &GroupBy) -> Result<()> { .with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default()))) .with_schema(Arc::new(schema)); let csv = ListingTable::try_new(listing_config)?; - let partition_size = num_cpus::get(); + let partition_size = available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(); let memtable = MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?; ctx.register_table("x", Arc::new(memtable))?; diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 47c356990881..3d85658bb6ff 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES}; use crate::util::{BenchmarkRun, CommonOpt}; @@ -468,7 +470,11 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } } diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs index f4b707611cfb..9bc3d82d34af 100644 --- a/benchmarks/src/sort.rs +++ b/benchmarks/src/sort.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt}; @@ -147,7 +149,11 @@ impl RunOpt { rundata.start_new_case(title); for i in 0..self.common.iterations { let config = SessionConfig::new().with_target_partitions( - self.common.partitions.unwrap_or(num_cpus::get()), + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ), ); let ctx = SessionContext::new_with_config(config); let (rows, elapsed) = diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 4b83b3b8889a..341474a39e11 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -22,8 +22,10 @@ //! runs end-to-end sort queries and test the performance on multiple CPU cores. use futures::StreamExt; +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use structopt::StructOpt; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -315,6 +317,10 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } } diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 9ff1f72d8606..f6f754caa14a 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use super::{ get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES, @@ -296,7 +298,11 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } } diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b9398e5b522f..50b8ac42fa83 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::{num::NonZero, thread::available_parallelism}; + use datafusion::prelude::SessionConfig; use structopt::StructOpt; @@ -48,7 +50,13 @@ impl CommonOpt { /// Modify the existing config appropriately pub fn update_config(&self, config: SessionConfig) -> SessionConfig { config - .with_target_partitions(self.partitions.unwrap_or(num_cpus::get())) + .with_target_partitions( + self.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ), + ) .with_batch_size(self.batch_size) } } diff --git a/benchmarks/src/util/run.rs b/benchmarks/src/util/run.rs index 5ee6691576b4..fd081826f385 100644 --- a/benchmarks/src/util/run.rs +++ b/benchmarks/src/util/run.rs @@ -20,7 +20,9 @@ use serde::{Serialize, Serializer}; use serde_json::Value; use std::{ collections::HashMap, + num::NonZero, path::Path, + thread::available_parallelism, time::{Duration, SystemTime}, }; @@ -68,7 +70,9 @@ impl RunContext { Self { benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), datafusion_version: DATAFUSION_VERSION.to_owned(), - num_cpus: num_cpus::get(), + num_cpus: available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), start_time: SystemTime::now(), arguments: std::env::args().skip(1).collect::>(), } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index d2a92fea311e..04b0b0d22cfd 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1222,7 +1222,6 @@ dependencies = [ "itertools", "log", "num-traits", - "num_cpus", "object_store", "parking_lot", "parquet", @@ -1296,7 +1295,6 @@ dependencies = [ "hashbrown 0.14.5", "indexmap", "libc", - "num_cpus", "object_store", "parquet", "paste", @@ -2747,16 +2745,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" version = "0.36.5" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 9f2db95721f5..d76848dfe95e 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -58,7 +58,6 @@ half = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } libc = "0.2.140" -num_cpus = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } paste = "1.0.15" diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1ad10d164868..e91568075f43 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -20,7 +20,9 @@ use std::any::Any; use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Display}; +use std::num::NonZero; use std::str::FromStr; +use std::thread::available_parallelism; use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; @@ -250,7 +252,7 @@ config_namespace! { /// concurrency. /// /// Defaults to the number of CPU cores on the system - pub target_partitions: usize, default = num_cpus::get() + pub target_partitions: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get() /// The default time zone /// @@ -266,7 +268,7 @@ config_namespace! { /// This is mostly use to plan `UNION` children in parallel. /// /// Defaults to the number of CPU cores on the system - pub planning_concurrency: usize, default = num_cpus::get() + pub planning_concurrency: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get() /// When set to true, skips verifying that the schema produced by /// planning the input of `LogicalPlan::Aggregate` exactly matches the diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index fc7b96cf9e13..268e0fb17f7b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -116,7 +116,6 @@ glob = "0.3.0" itertools = { workspace = true } log = { workspace = true } num-traits = { version = "0.2", optional = true } -num_cpus = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true, optional = true, default-features = true } diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs index af454bee7ce8..33d13fc1b605 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{cmp, sync::Arc}; +use std::{cmp, num::NonZero, sync::Arc, thread::available_parallelism}; use datafusion::{ datasource::MemTable, @@ -73,7 +73,9 @@ impl SessionContextGenerator { ]; let max_batch_size = cmp::max(1, dataset_ref.total_rows_num); - let max_target_partitions = num_cpus::get(); + let max_target_partitions = available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(); Self { dataset: dataset_ref, diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 177427b47d21..c6902a11d8db 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::sync::Arc; +use std::thread::available_parallelism; use arrow::{ array::*, datatypes::*, record_batch::RecordBatch, @@ -259,7 +261,12 @@ impl ExplainNormalizer { // convert things like partitioning=RoundRobinBatch(16) // to partitioning=RoundRobinBatch(NUM_CORES) - let needle = format!("RoundRobinBatch({})", num_cpus::get()); + let needle = format!( + "RoundRobinBatch({})", + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get() + ); replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string())); Self { replacements } diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index ed2b9c49715e..849003f8eeac 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -70,7 +70,6 @@ postgres = [ [dev-dependencies] env_logger = { workspace = true } -num_cpus = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } [[test]] diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index c3e739d146c6..9f8aac85a03a 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -17,7 +17,9 @@ use std::ffi::OsStr; use std::fs; +use std::num::NonZero; use std::path::{Path, PathBuf}; +use std::thread::available_parallelism; use clap::Parser; use datafusion_sqllogictest::{DataFusion, TestContext}; @@ -112,7 +114,11 @@ async fn run_tests() -> Result<()> { .join() }) // run up to num_cpus streams in parallel - .buffer_unordered(num_cpus::get()) + .buffer_unordered( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) .flat_map(|result| { // Filter out any Ok() leaving only the DataFusionErrors futures::stream::iter(match result {