Skip to content

Commit

Permalink
try add redis worker & benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Meteorix committed Jul 31, 2019
1 parent 45ae8dc commit f54830a
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 81 deletions.
2 changes: 1 addition & 1 deletion example/benchmark.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
wrk.method = "POST"
wrk.body = "s=happy birthday to"
wrk.body = "s=Happy birthday to"
wrk.headers["Content-Type"] = "application/x-www-form-urlencoded"
11 changes: 10 additions & 1 deletion example/bert_model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# coding=utf-8
# Created by Meteorix at 2019/7/30
import os
import torch
from pytorch_transformers import *
from service_streamer import ManagedModel


class Model(object):
Expand Down Expand Up @@ -51,6 +51,15 @@ def predict(self, batch):
return batch_outputs


class ManagedBertModel(ManagedModel):

def init_model(self):
self.model = Model()

def predict(self, batch):
return self.model.predict(batch)


if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
Expand Down
16 changes: 11 additions & 5 deletions example/flask_example.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# coding=utf-8
# Created by Meteorix at 2019/7/30

from multiprocessing import freeze_support
import multiprocessing as mp
from flask import Flask, request, jsonify
from service_streamer import ThreadedStreamer, Streamer
from service_streamer import ThreadedStreamer, Streamer, RedisStreamer
from bert_model import Model


Expand All @@ -27,8 +27,14 @@ def stream_predict():


if __name__ == "__main__":
freeze_support()
model = Model()
mp.freeze_support()
# for ThreadedStreamer/Streamer
# mp.set_start_method("spawn", force=True)
# model = Model()
# streamer = ThreadedStreamer(model.predict, batch_size=64, max_latency=0.1)
streamer = Streamer(model.predict, batch_size=64, max_latency=0.1)
# streamer = Streamer(model.predict, batch_size=64, max_latency=0.1)

# for RedisStreamer
streamer = RedisStreamer()

app.run(port=5005, threaded=True, debug=False)
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,15 @@
# Created by Meteorix at 2019/7/30
from gevent import monkey; monkey.patch_all()
from flask import Flask, request, jsonify
from service_streamer import ManagedModel, Streamer
from bert_model import Model
from service_streamer import Streamer
from bert_model import ManagedBertModel


app = Flask(__name__)
model = None
streamer = None


class ManagedBertModel(ManagedModel):

def init_model(self):
self.model = Model()

def predict(self, batch):
return self.model.predict(batch)


@app.route("/naive", methods=["POST"])
def naive_predict():
Expand Down
19 changes: 5 additions & 14 deletions example/future_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,16 @@
import time
import multiprocessing as mp
from tqdm import tqdm
from service_streamer import ThreadedStreamer, Streamer, ManagedModel
from bert_model import Model


class ManagedBertModel(ManagedModel):

def init_model(self):
self.model = Model()

def predict(self, batch):
return self.model.predict(batch)
from service_streamer import ThreadedStreamer, Streamer
from bert_model import Model, ManagedBertModel


def main():
max_batch = 64
model = Model()
text = "Who was Jim Henson ? Jim Henson was a puppeteer"
# streamer = Streamer(ManagedBertModel, batch_size=max_batch, max_latency=0.1, worker_num=1, cuda_devices=(0, 1, 2, 3))
streamer = ThreadedStreamer(model.predict, batch_size=max_batch, max_latency=0.1)
text = "Happy birthday to"
# streamer = ThreadedStreamer(model.predict, batch_size=max_batch, max_latency=0.1)
streamer = Streamer(ManagedBertModel, batch_size=max_batch, max_latency=0.1, worker_num=1, cuda_devices=(0, 1, 2, 3))
num_times = 3000


Expand Down
10 changes: 10 additions & 0 deletions example/redis_worker_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# coding=utf-8
# Created by Meteorix at 2019/7/30
from multiprocessing import freeze_support
from service_streamer import run_redis_workers_forever
from bert_model import ManagedBertModel


if __name__ == "__main__":
freeze_support()
run_redis_workers_forever(ManagedBertModel, 64, 0.1, worker_num=4, cuda_devices=(0, 1, 2, 3))
46 changes: 25 additions & 21 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,50 +75,55 @@ outpus = streamer.predict(batch_inputs)
实际项目中web server的性能(QPS)远高于GPU模型的性能,所以我们支持一个web server搭配多个GPU worker进程。

```python
import multiprocessing; multiprocessing.set_start_method("spawn", force=True)
from service_streamer import Streamer

streamer = Streamer(model.predict, 64, 0.1)
# spawn出4个gpu worker进程
streamer = Streamer(model.predict, 64, 0.1, worker_num=4)
outputs = streamer.predict(batch)
```
``Streamer``默认采用redis进行进程间通信和排队,将大量的请求分配到多个GPU worker中处理
``Streamer``默认采用``spawn``子进程运行gpu worker,利用进程间队列进行通信和排队,将大量的请求分配到多个worker中处理
再将模型batch predict的结果传回到对应的web server,并且返回到对应的http response。

上面这种方式定义简单,但是主进程初始化模型,多占了一份显存,并且模型只能运行在同一块GPU上。
所以我们提供了```ManagedModel```类,方便模型lazy初始化和迁移,以支持多GPU卡。

```python
from service_streamer import RedisWorker, GpuWorkerManager
class ManagedBertModel(ManagedModel):

class GpuWorkers(GpuWorkerManager):
def init_model(self):
self.model = Model()

@staticmethod
def gpu_worker(index, gpu_num):
os.environ["CUDA_VISIBLE_DEVICES"] = str(index % gpu_num)
streamer = RedisWorker(model.predict, 64, max_latency=0.1)
streamer.run_forever()
def predict(self, batch):
return self.model.predict(batch)

if __name__ == "__main__":
GpuWorkers().run_workers(worker_num=8, gpu_num=4)

# spawn出4个gpu worker进程,平均分散在0/1/2/3号GPU上
streamer = Streamer(ManagedBertModel, 64, 0.1, worker_num=4, cuda_devices=(0, 1, 2, 3))
outputs = streamer.predict(batch)
```
我们还提供了简单的GPU worker管理脚本,如上定义,即可启动8个GPU worker,平均分散在4个GPU卡上。

#### 分布式web server

有时候,你的web server中需要进行一些cpu密集型计算,比如图像、文本预处理,再分配到gpu worker进入模型。
这时候web server的cpu资源往往会成为性能瓶颈,于是我们也提供了多web server搭配(单个或多个)gpu worker的模式。
cpu资源往往会成为性能瓶颈,于是我们也提供了多web server搭配(单个或多个)gpu worker的模式。

当你的web server都在同一台服务器时,你甚至不需要改动``streamer``的代码。
只需跟任意python web server的部署一样,用``gunicorn````uwsgi``实现反向代理和负载均衡。

当你的web server/gpu worker不在同一台服务器时,改动也很简单:指定所有web server和gpu worker公用的唯一的redis地址
使用```RedisStreamer```指定所有web server和gpu worker公用的唯一的redis地址

```python
# todo
streamer = Streamer(model.predict, 64, 0.1, redis_broker="172.22.22.22:6379")
streamer = RedisStreamer(model.predict, 64, 0.1, redis_broker="172.22.22.22:6379")
```

然后跟任意python web server的部署一样,用``gunicorn````uwsgi``实现反向代理和负载均衡。

这样每个请求会负载均衡到每个web server中进行cpu预处理,然后均匀的分布到gpu worker中进行模型predict。

### 底层Future API使用

如果你使用过任意concurrent库,应该对`future`不陌生。
当你的使用场景不是web service,又想利用``service_streamer``进行排队或者分布式GPU计算,可以直接使用Future API。

```python
from service_streamer import ThreadedStreamer as Streamer
streamer = Streamer(model.predict, 64, 0.1)
Expand Down Expand Up @@ -169,7 +174,7 @@ All the code and bench scripts are in [example](./example).
### multiple gpu workers

这里对比单web server进程的情况下,多gpu worker的性能,验证通信和负载均衡机制的性能损耗。
* Flask多线程server已经成为瓶颈,故采用gevent server,代码参考[flask_example_multigpu.py](example/flask_example_multigpu.py)
Flask多线程server已经成为性能瓶颈,故采用gevent server,代码参考[flask_multigpu_example.py](example/flask_multigpu_example.py)

| gpu_worker_num | Naive | ThreadedStreamer |Streamer|RedisStreamer
|-|-|-|-|-|
Expand All @@ -178,8 +183,7 @@ All the code and bench scripts are in [example](./example).
|4|||426.60||

* ```ThreadedStreamer```由于Python GIL的限制,多worker并没有意义,仅测单gpu worker数据进行对比。
* ```Streamer```大于2个gpu worker时,性能提升并不是线性。
这是由于flask的性能问题,server进程的cpu利用率达到100,此时瓶颈是cpu而不是gpu。
* ```Streamer```大于2个gpu worker时,性能提升并不是线性。这是由于flask的性能问题,server进程的cpu利用率达到100,此时瓶颈是cpu而不是gpu。

### multiple gpu workers future api

Expand Down
4 changes: 2 additions & 2 deletions service_streamer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# coding=utf-8
# Created by Meteorix at 2019/7/22

from .service_streamer import ThreadedStreamer, Streamer, RedisStreamer, RedisWorker
from .managed_model import ManagedModel, GpuWorkerManager
from .service_streamer import ThreadedStreamer, Streamer, RedisStreamer, RedisWorker, run_redis_workers_forever
from .managed_model import ManagedModel
21 changes: 0 additions & 21 deletions service_streamer/managed_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,3 @@ def init_model(self):

def predict(self, batch):
raise NotImplementedError


class GpuWorkerManager(object):

@staticmethod
def gpu_worker(worker_index, gpu_num):
devices = str(worker_index % gpu_num)
os.environ["CUDA_VISIBLE_DEVICES"] = devices
print("gpu worker starting...pid: %d cuda: %s" % (os.getpid(), devices))
# define your gpu stream worker here
print("gpu worker exits...")

def run_workers_forever(self, worker_num, gpu_num):
procs = []
for i in range(worker_num):
p = Process(target=self.gpu_worker, args=(i, gpu_num,))
p.start()
procs.append(p)

for p in procs:
p.join()
37 changes: 31 additions & 6 deletions service_streamer/service_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
from .managed_model import ManagedModel


mp.set_start_method("spawn", force=True)


class Future(object):
def __init__(self, task_id, task_size, future_cache_ref):
self._id = task_id
Expand Down Expand Up @@ -94,7 +91,7 @@ def _input(self, batch: List) -> int:
self._future_cache[task_id] = future

for model_input in batch:
# print(f"sending_request: PID:{os.getpid()} task_id:{task_id} request_id: {request_id}")
print(f"sending_request: PID:{os.getpid()} task_id:{task_id} request_id: {request_id}")
self._send_request(task_id, request_id, model_input)
request_id += 1

Expand Down Expand Up @@ -310,15 +307,26 @@ def _recv_response(self, timeout=1):


class RedisWorker(_BaseStreamWorker):
def __init__(self, predict_function, batch_size, max_latency=0.1):
super().__init__(predict_function, batch_size, max_latency)
def __init__(self, model_class, batch_size, max_latency=0.1):
assert issubclass(model_class, ManagedModel)
super().__init__(model_class, batch_size, max_latency)

self._redis = _RedisServer(0)
self._requests_queue = Queue()

self.back_thread = threading.Thread(target=self._loop_recv_request, name="thread_recv_request")
self.back_thread.daemon = True
self.back_thread.start()

def run_forever(self, gpu_id=None):
print("[gpu worker %d] init model on gpu:%s" % (os.getpid(), gpu_id))
model_class = self._predict
self._model = model_class(gpu_id)
self._model.init_model()
self._predict = self._model.predict

super().run_forever()

def _loop_recv_request(self):
print(self, "start loop_recv_request")
while True:
Expand All @@ -342,6 +350,23 @@ def _send_response(self, client_id, task_id, request_id, model_output):
self._redis.send_response(client_id, task_id, request_id, model_output)


def run_redis_workers_forever(model_class, batch_size, max_latency=0.1, worker_num=1, cuda_devices=None):
procs = []
for i in range(worker_num):
if cuda_devices is not None:
gpu_id = cuda_devices[i % len(cuda_devices)]
args = (gpu_id,)
else:
args = None
redis_worker = RedisWorker(model_class, batch_size, max_latency)
p = mp.Process(target=redis_worker.run_forever, args=args, name="stream_worker", daemon=True)
p.start()
procs.append(p)

for p in procs:
p.join()


class _RedisAgent(object):
def __init__(self, redis_id):
self._redis_id = redis_id
Expand Down

0 comments on commit f54830a

Please sign in to comment.