Skip to content

Distributed computing with ray #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ __pycache__/
*.py[cod]
*$py.class


data

# C extensions
*.so

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


A repository showcasing examples of using [Hub](https://github.com/pytorch/pytorch)
- [Examples go here](example)
- [Distributed processing with Ray](examples/ray)


## Getting Started with Hub 🚀
Expand Down
48 changes: 48 additions & 0 deletions examples/ray/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Distributed Processing with Ray


## Execute locally
Install hub+ray and run the script locally to create a dataset of size `num_samples`
```
pip3 install -r requirements.txt
python3 transform.py --num_workers 2 --ds_out ./tmp/cars --num_samples 1000
```


## Execute on a cluster
#### 1. Start the cluster
Requires AWS credentials. If you skip this step, it will start a ray cluster on your machine. You can further modify the cluster in cluster.yaml
```
ray up ./cluster.yaml
```

#### 2. Execute the code, dataset created on head node
```
ray exec ./cluster.yaml "python3 ~/hub/transform.py --num_workers 2 --ds_out s3://bucket/dataset_name --num_samples 1000"
```
Change number of workers to 6 once all workers are up.


#### 4. Shut down the cluster
```
ray down ./cluster.yaml
```

Notes

* To monitor workers, utilization and jobs use this command
```
ray dashboard ./cluster.yaml
```

* Update locally the code and sync to the cluster

```
ray rsync-up ./cluster.yaml
```

* Attach and execute locally
```
ray attach ./cluster.yaml
> python3 ~/hub/transform.py --num_workers 2
```
171 changes: 171 additions & 0 deletions examples/ray/cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@

# An unique identifier for the head node and workers of this cluster.
cluster_name: hub

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 2

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
# docker:
# image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
# image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull
# container_name: "ray_container"
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
# pull_before_run: True
# run_options: # Extra options to pass into "docker run"
# - --ulimit nofile=65536:65536

# Example of running a GPU head with CPU workers
# head_image: "rayproject/ray-ml:latest-gpu"
# Allow Ray to automatically detect GPUs

# worker_image: "rayproject/ray-ml:latest-cpu"
# worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
# Availability zone(s), comma-separated, that nodes may be launched in.
# Nodes are currently spread between zones by a round-robin approach,
# however this implementation detail should not be relied upon.
# availability_zone: us-west-2a,us-west-2b
# Whether to allow node reuse. If set to False, nodes will be terminated
# instead of stopped.
cache_stopped_nodes: True # If not present, the default is True.

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
ray.head.default:
# The node type's CPU and GPU resources are auto-detected based on AWS instance type.
# If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
# You can also set custom resources.
# For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
# resources: {"CPU": 1, "GPU": 1, "custom": 5}
resources: {}
# Provider-specific config for this node type, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
node_config:
InstanceType: m5.large
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# You can provision additional disk space with a conf as follows
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100
# Additional options in the boto docs.
ray.worker.default:
# The minimum number of worker nodes of this type to launch.
# This number should be >= 0.
min_workers: 2
# The maximum number of worker nodes of this type to launch.
# This takes precedence over min_workers.
max_workers: 2
# The node type's CPU and GPU resources are auto-detected based on AWS instance type.
# If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
# You can also set custom resources.
# For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
# resources: {"CPU": 1, "GPU": 1, "custom": 5}
resources: {}
# Provider-specific config for this node type, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
node_config:
InstanceType: m5.large
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# Run workers on spot by default. Comment this out to use on-demand.
#InstanceMarketOptions:
# MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# MaxPrice: MAX_HOURLY_PRICE
# Additional options in the boto docs.


# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
"~/hub": ".",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: True

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
- "**/.git"
- "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
- ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands:
- cd ~/hub && pip3 install -r requirements.txt

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076

head_node: {}
worker_nodes: {}
2 changes: 2 additions & 0 deletions examples/ray/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ray[default]==1.6
git+https://github.com/activeloopai/Hub.git@ray_compute
87 changes: 87 additions & 0 deletions examples/ray/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import time
import argparse
import tqdm
import numpy as np
from PIL import Image

import hub

NUM_SAMPLES = 500
NUM_WORKERS = 2
DS_OUT_PATH = "~/data/cars_out" # "s3://bucket/dataset_name"

parser = argparse.ArgumentParser(description="PyTorch RPC Batch RL example")
parser.add_argument(
"--num_samples",
type=int,
default=NUM_SAMPLES,
metavar="S",
help="how many samples dataset should have",
)
parser.add_argument(
"--ds_out",
type=str,
default=DS_OUT_PATH,
metavar="O",
help="dataset path to be transformed into",
)
parser.add_argument(
"--num_workers",
type=int,
default=NUM_WORKERS,
metavar="O",
help="number of workers to allocate",
)

args = parser.parse_args()


def define_dataset(path: str) -> hub.Dataset:
"""Define the dataset"""
ds = hub.empty(path, overwrite=True)

ds.create_tensor("labels", htype="class_label")
ds.create_tensor("images", htype="image", sample_compression="jpeg")
ds.create_tensor("images_downsampled")

# Define tensor with customized htype, compression and chunk_size
ds["images_downsampled"].meta.htype = ds["images"].meta.htype
ds["images_downsampled"].meta.sample_compression = ds[
"images"
].meta.sample_compression
ds["images_downsampled"].meta.max_chunk_size = 1 * 1024 * 1024

return ds


# Define the remote compute
@hub.compute
def downsample(index, samples_out):
"""Takes image from a sample_in, downsamples it and pushes to a new tensor"""
array = (255 * np.random.random((100, 100, 3))).astype(np.uint8)
img = Image.fromarray(array)
max_d = max(img.size[0], img.size[1])
min_s = min(100, max_d)
ratio = max_d // min_s
img_downsampled = img.resize((img.size[0] // ratio, img.size[1] // ratio))
array_downsampled = np.array(img_downsampled)
samples_out.images.append(array)
samples_out.images_downsampled.append(array_downsampled)
samples_out.labels.append(index)


if __name__ == "__main__":

# Define a dataset and fill in random images
ds_out = define_dataset(args.ds_out)

# Run the distributed computation
t1 = time.time()
downsample().eval(
list(range(args.num_samples)),
ds_out,
num_workers=args.num_workers,
scheduler="ray",
)
t2 = time.time()
print(f"The processing took {t2-t1}")