Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple gpu support in worker and loader #63

Merged
merged 16 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ permissions:
jobs:
build:

runs-on: ubuntu-latest
runs-on: self-hosted

steps:
- uses: actions/checkout@v3
Expand All @@ -30,3 +30,6 @@ jobs:
- name: Test with pytest
run: |
cd tests && python pipe_test.py
- name: Test worker
run: |
cd tests && python test_worker.py
16 changes: 16 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM nvidia/cuda:12.0.0-devel-ubuntu22.04
RUN apt-get update
RUN apt-get install -y curl libicu70 python3
# needed by opencv
RUN apt-get -y install ffmpeg libsm6 libxext6
RUN useradd -ms /bin/bash tester
user tester
workdir /tester
RUN mkdir actions-runner && cd actions-runner
RUN curl -o actions-runner-linux-x64-2.317.0.tar.gz -L https://github.com/actions/runner/releases/download/v2.317.0/actions-runner-linux-x64-2.317.0.tar.gz
RUN echo "9e883d210df8c6028aff475475a457d380353f9d01877d51cc01a17b2a91161d actions-runner-linux-x64-2.317.0.tar.gz" | shasum -a 256 -c
RUN tar xzf ./actions-runner-linux-x64-2.317.0.tar.gz
# GITHUB TOKEN
ARG token=vasya
RUN ./config.sh --url https://github.com/singnet/metafusion --token $token
entrypoint /bin/sh
181 changes: 148 additions & 33 deletions multigen/loader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from typing import Type
from typing import Type, List
import random
import copy
from contextlib import nullcontext
import torch
import logging
import threading
import psutil
import sys
import diffusers

from diffusers import DiffusionPipeline, StableDiffusionControlNetPipeline, StableDiffusionXLControlNetPipeline
from diffusers.utils import is_accelerate_available
if is_accelerate_available():
Expand Down Expand Up @@ -33,39 +40,147 @@ class Loader:
class for loading diffusion pipelines from files.
"""
def __init__(self):
self._pipes = dict()

def load_pipeline(self, cls: Type[DiffusionPipeline], path, torch_dtype=torch.float16, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
, **additional_args):
for key, pipe in self._pipes.items():
if key == path:
pipe = copy_pipe(pipe)
components = pipe.components
if issubclass(cls, StableDiffusionXLControlNetPipeline) or issubclass(cls, StableDiffusionControlNetPipeline):
# todo: keep controlnets in cache explicitly
if 'controlnet' in additional_args:
components.pop('controlnet')
return cls(**components, **additional_args)
# handling the case when the model in cache has controlnet in it
# but we don't need it
if 'controlnet' in components:
components.pop('controlnet')
return cls(**components, **additional_args)

if path.endswith('safetensors'):
result = cls.from_single_file(path, **additional_args)
else:
result = cls.from_pretrained(path, **additional_args)
result = result.to(torch_dtype).to(device)
self.register_pipeline(result, path)
result = copy_pipe(result)
return result
self._lock = threading.RLock()
self._cpu_pipes = dict()
# idx -> list of (model_id, pipe) pairs
self._gpu_pipes = dict()

def get_gpu(self, model_id) -> List[int]:
"""
return list of gpus with loaded model
"""
with self._lock:
result = list()
for idx, items in self._gpu_pipes.items():
for (model, _) in items:
if model == model_id:
result.append(idx)
return result

def load_pipeline(self, cls: Type[DiffusionPipeline], path, torch_dtype=torch.float16, device=None,
**additional_args):
with self._lock:
logger.debug(f'looking for pipeline {cls} from {path} on {device}')
result = None
if device is None:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type == 'cuda':
idx = device.index
gpu_pipes = self._gpu_pipes.get(idx, [])
for (key, value) in gpu_pipes:
if key == path:
logger.debug(f'found pipe in gpu cache {key}')
result = self.from_pipe(cls, value, additional_args)
logger.debug(f'created pipe from gpu cache {key} on {device}')
return result
for (key, pipe) in self._cpu_pipes.items():
if key == path:
logger.debug(f'found pipe in cpu cache {key}')
result = self.from_pipe(cls, copy.deepcopy(pipe), additional_args)
break
if result is None:
logger.info(f'not found {path} in cache, loading')
if path.endswith('safetensors'):
result = cls.from_single_file(path, **additional_args)
else:
result = cls.from_pretrained(path, **additional_args)
if device.type == 'cuda':
self.clear_cache(device)
result = result.to(dtype=torch_dtype, device=device)
self.cache_pipeline(result, path)
result = copy_pipe(result)
assert result.device == device
logger.debug(f'returning {type(result)} from {path} on {result.device}')
return result

def from_pipe(self, cls, pipe, additional_args):
pipe = copy_pipe(pipe)
components = pipe.components
if issubclass(cls, StableDiffusionXLControlNetPipeline) or issubclass(cls, StableDiffusionControlNetPipeline):
# todo: keep controlnets in cache explicitly
if 'controlnet' in additional_args:
components.pop('controlnet')
return cls(**components, **additional_args)
# handling the case when the model in cache has controlnet in it
# but we don't need it
if 'controlnet' in components:
components.pop('controlnet')
return cls(**components, **additional_args)

def register_pipeline(self, pipe: DiffusionPipeline, model_id):
self._pipes[model_id] = pipe
def cache_pipeline(self, pipe: DiffusionPipeline, model_id):
with self._lock:
device = pipe.device
if model_id not in self._cpu_pipes:
# deepcopy is needed since Module.to is an inplace operation
size = get_model_size(pipe)
ram = awailable_ram()
if ram < size * 3:
key_to_delete = random.choice(list(self._cpu_pipes.keys()))
self._cpu_pipes.pop(key_to_delete)
self._cpu_pipes[model_id] = copy.deepcopy(pipe.to('cpu'))
pipe.to(device)
if pipe.device.type == 'cuda':
self._store_gpu_pipe(pipe, model_id)
logger.debug(f'storing {model_id} on {pipe.device}')

def clear_cache(self, device):
logger.debug(f'clear_cache pipelines from {device}')
with self._lock:
if device.type == 'cuda':
self._gpu_pipes[device.index] = []

def _store_gpu_pipe(self, pipe, model_id):
idx = pipe.device.index
# for now just clear all other pipelines
self._gpu_pipes[idx] = [(model_id, pipe)]

def remove_pipeline(self, model_id):
self._pipes.pop(model_id)
self._cpu_pipes.pop(model_id)

def get_pipeline(self, model_id, device=None):
with self._lock:
if device is None:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type == 'cuda':
idx = device.index
gpu_pipes = self._gpu_pipes.get(idx, ())
for (key, value) in gpu_pipes:
if key == model_id:
return value
for (key, pipe) in self._cpu_pipes.items():
if key == model_id:
return pipe

return None


def count_params(model):
total_size = sum(param.numel() for param in model.parameters())
mul = 2
if model.dtype == torch.float16:
mul = 2
elif model.dtype == torch.float32:
mul = 4
return total_size * mul


def get_size(obj):
return sys.getsizeof(obj)


def get_model_size(pipeline):
total_size = 0
for name, component in pipeline.components.items():
if isinstance(component, torch.nn.Module):
total_size += count_params(component)
elif hasattr(component, 'tokenizer'):
total_size += count_params(component.tokenizer)
else:
total_size += get_size(component)
return total_size / (1024 * 1024)


def get_pipeline(self, model_id):
return self._pipes.get(model_id, None)
def awailable_ram():
mem = psutil.virtual_memory()
available_ram = mem.available
return available_ram / (1024 * 1024)
26 changes: 26 additions & 0 deletions multigen/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
import threading


def thread_id_filter(record):
"""Inject thread_id to log records"""
record.thread_id = threading.get_ident()
return record

def setup_logger(path='log_file.log'):
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('log_file.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)

ch.addFilter(thread_id_filter)
fh.addFilter(thread_id_filter)
formatter = logging.Formatter('%(asctime)s - %(thread)d - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

logger.addHandler(fh)
logger.addHandler(ch)

10 changes: 6 additions & 4 deletions multigen/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class BasePipe:
def __init__(self, model_id: str,
sd_pipe_class: Optional[Type[DiffusionPipeline]] = None,
pipe: Optional[DiffusionPipeline] = None,
model_type: Optional[ModelType] = None, lpw=False, **args):
model_type: Optional[ModelType] = None, device=None, lpw=False, **args):
"""
Constructor

Expand All @@ -86,7 +86,8 @@ def __init__(self, model_id: str,
**args:
additional arguments passed to sd_pipe_class constructor
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device is None:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.pipe = pipe
self._scheduler = None
self._hypernets = []
Expand Down Expand Up @@ -125,8 +126,9 @@ def _get_model_type(self):
raise RuntimeError("unsuported model type {self.pipe.__class__}")

def _initialize_pipe(self, device):
if self.pipe.device != device:
self.pipe.to(device)
# sometimes text encoder is on a different device
# if self.pipe.device != device:
self.pipe.to(device)
# self.pipe.enable_attention_slicing()
# self.pipe.enable_vae_slicing()
self.pipe.vae.enable_tiling()
Expand Down
Loading
Loading