diff --git a/release/nightly_tests/dataset/multi_node_autoscaling_compute.yaml b/release/nightly_tests/dataset/autoscaling_cpu_compute.yaml similarity index 95% rename from release/nightly_tests/dataset/multi_node_autoscaling_compute.yaml rename to release/nightly_tests/dataset/autoscaling_cpu_compute.yaml index 7b3612d3b4b8..8bfcc772f1c5 100644 --- a/release/nightly_tests/dataset/multi_node_autoscaling_compute.yaml +++ b/release/nightly_tests/dataset/autoscaling_cpu_compute.yaml @@ -2,7 +2,7 @@ cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} region: us-west-2 -max_workers: 0 +max_workers: 10 head_node_type: name: head_node diff --git a/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml b/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml new file mode 100644 index 000000000000..2f53d7ed7fab --- /dev/null +++ b/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml @@ -0,0 +1,19 @@ +# This config matches the default config for Anyscale workspaces with autoscaling, +# except instead of using CPU instances, it uses GPU instances. +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 10 + +head_node_type: + name: head_node + instance_type: m5.2xlarge + resources: + cpu: 0 + +worker_node_types: + - name: worker_node + instance_type: g4dn.2xlarge + min_workers: 0 + max_workers: 10 + use_spot: false diff --git a/release/nightly_tests/dataset/autoscaling_hetero_compute.yaml b/release/nightly_tests/dataset/autoscaling_hetero_compute.yaml new file mode 100644 index 000000000000..e93a71d3027c --- /dev/null +++ b/release/nightly_tests/dataset/autoscaling_hetero_compute.yaml @@ -0,0 +1,23 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 20 + +head_node_type: + name: head_node + instance_type: m5.2xlarge + resources: + cpu: 0 + +worker_node_types: + - name: worker_node_gpu + instance_type: g4dn.2xlarge + min_workers: 0 + max_workers: 10 + use_spot: false + + - name: worker_node_cpu + instance_type: m5.2xlarge + min_workers: 0 + max_workers: 10 + use_spot: false diff --git a/release/nightly_tests/dataset/compute_hetero_10x10_aws.yaml b/release/nightly_tests/dataset/compute_hetero_10x10_aws.yaml deleted file mode 100644 index 415636b3d9a8..000000000000 --- a/release/nightly_tests/dataset/compute_hetero_10x10_aws.yaml +++ /dev/null @@ -1,21 +0,0 @@ -cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} -region: us-west-2 - -max_workers: 19 - -head_node_type: - name: head_node - instance_type: g4dn.12xlarge - -worker_node_types: - - name: worker_node_gpu - instance_type: g4dn.12xlarge - max_workers: 9 - min_workers: 9 - use_spot: false - - - name: worker_node_cpu - instance_type: m5.16xlarge - max_workers: 10 - min_workers: 10 - use_spot: false \ No newline at end of file diff --git a/release/nightly_tests/dataset/gpu_batch_inference.py b/release/nightly_tests/dataset/gpu_batch_inference.py index d9ccc705e141..32308a24e1f7 100644 --- a/release/nightly_tests/dataset/gpu_batch_inference.py +++ b/release/nightly_tests/dataset/gpu_batch_inference.py @@ -88,16 +88,17 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: start_time_without_metadata_fetching = time.time() if smoke_test: - actor_pool_size = 4 + compute = ActorPoolStrategy(size=4) num_gpus = 0 else: - actor_pool_size = int(ray.cluster_resources().get("GPU")) + # Autoscale to use as many GPUs as possible. + compute = ActorPoolStrategy(min_size=1, max_size=None) num_gpus = 1 ds = ds.map_batches(preprocess) ds = ds.map_batches( Predictor, batch_size=BATCH_SIZE, - compute=ActorPoolStrategy(size=actor_pool_size), + compute=compute, num_gpus=num_gpus, fn_constructor_kwargs={"model": model_ref}, max_concurrency=2, diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index 9ea55e27af8e..8bf6285d4108 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -7,8 +7,9 @@ cluster: byod: + # 'type: gpu' means: use the 'ray-ml' image. type: gpu - cluster_compute: multi_node_autoscaling_compute.yaml + cluster_compute: autoscaling_cpu_compute.yaml ############### # Reading tests @@ -33,31 +34,6 @@ timeout: 600 script: python read_and_consume_benchmark.py s3://ray-benchmark-data/parquet/10TiB --format parquet --count -- name: stable_diffusion_benchmark - group: data-tests - working_dir: nightly_tests/dataset - - frequency: nightly - team: data - - cluster: - byod: - type: gpu - post_build_script: byod_stable_diffusion.sh - cluster_compute: stable_diffusion_benchmark_compute.yaml - - run: - timeout: 1800 - script: python stable_diffusion_benchmark.py - - variations: - - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: stable_diffusion_benchmark_compute_gce.yaml - - name: streaming_data_ingest_benchmark_1tb group: data-tests working_dir: nightly_tests/dataset @@ -545,177 +521,49 @@ cluster_compute: shuffle/datasets_large_scale_compute_small_instances_gce.yaml -############################ -# Batch Inference Benchmarks -############################ - -# 10 GB image classification raw images with 1 GPU. -# 1 g4dn.4xlarge -- name: torch_batch_inference_1_gpu_10gb_raw - group: data-tests - working_dir: nightly_tests/dataset - - frequency: nightly - team: data - cluster: - byod: - type: gpu - cluster_compute: compute_gpu_1_cpu_16_aws.yaml - - run: - timeout: 500 - script: python gpu_batch_inference.py --data-directory=10G-image-data-synthetic-raw --data-format raw - - alert: default - - variations: - - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: compute_gpu_1_cpu_16_gce.yaml - -# 10 GB image classification parquet with 1 GPU. -# 1 g4dn.4xlarge -- name: torch_batch_inference_1_gpu_10gb_parquet - group: data-tests - working_dir: nightly_tests/dataset - - frequency: nightly - team: data - cluster: - byod: - type: gpu - cluster_compute: compute_gpu_1_cpu_16_aws.yaml - - run: - timeout: 500 - script: python gpu_batch_inference.py --data-directory=10G-image-data-synthetic-raw-parquet --data-format parquet - - alert: default - - variations: - - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: compute_gpu_1_cpu_16_gce.yaml - +####################### +# Batch inference tests +####################### -# 300 GB image classification raw images with 16 GPUs -# 4 g4dn.12xlarge -- name: torch_batch_inference_16_gpu_300gb_raw - group: data-tests - working_dir: nightly_tests/dataset +# 300 GB image classification parquet data up to 10 GPUs +# 10 g4dn.12xlarge. +- name: batch_inference - frequency: nightly - team: data cluster: - byod: - type: gpu - cluster_compute: compute_gpu_4x4_aws.yaml + cluster_compute: autoscaling_gpu_compute.yaml run: - timeout: 1000 - script: python gpu_batch_inference.py --data-directory 300G-image-data-synthetic-raw --data-format raw - - wait_for_nodes: - num_nodes: 4 - - alert: default - - variations: - - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: compute_gpu_4x4_gce.yaml - + timeout: 1800 + script: > + python gpu_batch_inference.py + --data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet -- name: chaos_torch_batch_inference_16_gpu_300gb_raw - group: data-tests +- name: batch_inference_chaos + stable: False + # Don't use 'nightly_tests/dataset' as the working directory because we need to run + # the 'setup_chaos.py' script. working_dir: nightly_tests - stable: false - frequency: nightly - team: data cluster: - byod: - type: gpu - cluster_compute: dataset/compute_gpu_4x4_aws.yaml + cluster_compute: dataset/autoscaling_gpu_compute.yaml run: - timeout: 1000 + timeout: 1800 prepare: python setup_chaos.py --max-to-kill 2 --kill-delay 30 - script: python dataset/gpu_batch_inference.py --data-directory 300G-image-data-synthetic-raw --data-format raw - - wait_for_nodes: - num_nodes: 4 - - alert: default - - variations: - - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: dataset/compute_gpu_4x4_gce.yaml - - -# 300 GB image classification parquet data with 16 GPUs -# 4 g4dn.12xlarge -- name: torch_batch_inference_16_gpu_300gb_parquet - group: data-tests - working_dir: nightly_tests/dataset - - frequency: nightly - team: data - - cluster: - byod: - type: gpu - cluster_compute: compute_gpu_4x4_aws.yaml - - run: - timeout: 1000 - script: python gpu_batch_inference.py --data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet - - wait_for_nodes: - num_nodes: 4 - - alert: default - - variations: - - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: compute_gpu_4x4_gce.yaml + script: > + python dataset/gpu_batch_inference.py + --data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet -# 10 TB image classification parquet data with heterogenous cluster +# 10 TB image classification parquet data with autoscaling heterogenous cluster # 10 g4dn.12xlarge, 10 m5.16xlarge -- name: torch_batch_inference_hetero_10tb_parquet - group: data-tests - working_dir: nightly_tests/dataset - +- name: batch_inference_hetero frequency: weekly - team: data cluster: - byod: - type: gpu - cluster_compute: compute_hetero_10x10_aws.yaml + cluster_compute: autoscaling_hetero_compute.yaml run: - timeout: 2000 - script: python gpu_batch_inference.py --data-directory 10T-image-data-synthetic-raw-parquet --data-format parquet - - wait_for_nodes: - num_nodes: 20 - - alert: default + timeout: 3600 + script: > + python gpu_batch_inference.py + --data-directory 10T-image-data-synthetic-raw-parquet --data-format parquet diff --git a/src/ray/util/shared_lru.h b/src/ray/util/shared_lru.h index 8132e38b6f12..39c43231ac4d 100644 --- a/src/ray/util/shared_lru.h +++ b/src/ray/util/shared_lru.h @@ -27,11 +27,10 @@ // // Check and consume `val`. // // TODO(hjiang): -// 1. Add template arguments for key hash and key equal, to pass into absl::flat_hash_map. -// 2. Provide a key hash wrapper to save a copy. -// 3. flat hash map supports heterogeneous lookup, expose `KeyLike` templated interface. -// 4. Add a `GetOrCreate` interface, which takes factory function to creation value. -// 5. For thread-safe cache, add a sharded container wrapper to reduce lock contention. +// 1. Write a wrapper around KeyHash and KeyEq, which takes std::reference_wrapper, +// so we could store keys only in std::list, and reference in absl::flat_hash_map. +// 2. Add a `GetOrCreate` interface, which takes factory function to creation value. +// 3. For thread-safe cache, add a sharded container wrapper to reduce lock contention. #pragma once @@ -39,7 +38,6 @@ #include #include #include -#include #include #include @@ -90,7 +88,8 @@ class SharedLruCache final { // Delete the entry with key `key`. Return true if the entry was found for // `key`, false if the entry was not found. In both cases, there is no entry // with key `key` existed after the call. - bool Delete(const Key &key) { + template + bool Delete(KeyLike &&key) { auto it = cache_.find(key); if (it == cache_.end()) { return false; @@ -100,8 +99,10 @@ class SharedLruCache final { return true; } - // Look up the entry with key `key`. Return nullptr if key doesn't exist. - std::shared_ptr Get(const Key &key) { + // Look up the entry with key `key`. Return std::nullopt if key doesn't exist. + // If found, return a copy for the value. + template + std::shared_ptr Get(KeyLike &&key) { const auto cache_iter = cache_.find(key); if (cache_iter == cache_.end()) { return nullptr; @@ -173,16 +174,18 @@ class ThreadSafeSharedLruCache final { // Delete the entry with key `key`. Return true if the entry was found for // `key`, false if the entry was not found. In both cases, there is no entry // with key `key` existed after the call. - bool Delete(const Key &key) { + template + bool Delete(KeyLike &&key) { std::lock_guard lck(mu_); - return cache_.Delete(key); + return cache_.Delete(std::forward(key)); } // Look up the entry with key `key`. Return std::nullopt if key doesn't exist. // If found, return a copy for the value. - std::shared_ptr Get(const Key &key) { + template + std::shared_ptr Get(KeyLike &&key) { std::lock_guard lck(mu_); - return cache_.Get(key); + return cache_.Get(std::forward(key)); } // Clear the cache. diff --git a/src/ray/util/tests/shared_lru_test.cc b/src/ray/util/tests/shared_lru_test.cc index 7c47f4d1daf0..673395a82f3b 100644 --- a/src/ray/util/tests/shared_lru_test.cc +++ b/src/ray/util/tests/shared_lru_test.cc @@ -23,6 +23,19 @@ namespace ray::utils::container { namespace { constexpr size_t kTestCacheSz = 1; + +class TestClassWithHashAndEq { + public: + TestClassWithHashAndEq(std::string data) : data_(std::move(data)) {} + bool operator==(const TestClassWithHashAndEq &rhs) const { return data_ == rhs.data_; } + template + friend H AbslHashValue(H h, const TestClassWithHashAndEq &obj) { + return H::combine(std::move(h), obj.data_); + } + + private: + std::string data_; +}; } // namespace TEST(SharedLruCache, PutAndGet) { @@ -34,22 +47,21 @@ TEST(SharedLruCache, PutAndGet) { // Check put and get. cache.Put("1", std::make_shared("1")); - val = cache.Get("1"); + val = cache.Get(std::string_view{"1"}); EXPECT_NE(val, nullptr); EXPECT_EQ(*val, "1"); // Check key eviction. cache.Put("2", std::make_shared("2")); - val = cache.Get("1"); + val = cache.Get(std::string_view{"1"}); EXPECT_EQ(val, nullptr); - val = cache.Get("2"); + val = cache.Get(std::string_view{"2"}); EXPECT_NE(val, nullptr); EXPECT_EQ(*val, "2"); // Check deletion. - EXPECT_FALSE(cache.Delete("1")); - EXPECT_TRUE(cache.Delete("2")); - val = cache.Get("2"); + EXPECT_FALSE(cache.Delete(std::string_view{"1"})); + val = cache.Get(std::string_view{"1"}); EXPECT_EQ(val, nullptr); } @@ -73,4 +85,13 @@ TEST(SharedLruConstCache, TypeAliasAssertion) { std::is_same_v, SharedLruCache>); } +TEST(SharedLruConstCache, CustomizedKey) { + TestClassWithHashAndEq obj1{"hello"}; + TestClassWithHashAndEq obj2{"hello"}; + SharedLruCache cache{2}; + cache.Put(obj1, std::make_shared("val")); + auto val = cache.Get(obj2); + EXPECT_EQ(*val, "val"); +} + } // namespace ray::utils::container