Skip to content

Commit

Permalink
Use multiple processes to accept requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Habush committed Aug 13, 2019
1 parent 33684a7 commit 19a1ef2
Showing 1 changed file with 67 additions and 16 deletions.
83 changes: 67 additions & 16 deletions service/moses_service_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@
import base64
import pandas as pd
import tempfile
from config import get_logger
import multiprocessing
import socket
import contextlib

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT


class MosesService(moses_service_pb2_grpc.MosesServiceServicer):

def StartAnalysis(self, request, context):
logger = logging.getLogger("mozi_snet")
session_id = uuid.uuid4()
mnemonic = encode(session_id)
logger = get_logger(mnemonic)

crossval_opts = {"folds": request.crossValOpts.folds, "testSize": request.crossValOpts.testSize, "randomSeed": request.crossValOpts.randomSeed}
crossval_opts = {"folds": request.crossValOpts.folds, "testSize": request.crossValOpts.testSize,
"randomSeed": request.crossValOpts.randomSeed}
moses_opts, dataset, target_feature = request.mosesOpts, request.dataset, request.targetFeature
filter_opts = {"score": request.filter.score, "value": request.filter.value}
logger.info(f"Received request with Moses Options: {moses_opts}\n Cross Validation Options: {crossval_opts}\n")

session_id = uuid.uuid4()
mnemonic = encode(session_id)

if is_valid_dataset(dataset, target_feature):
start_analysis.delay(id=session_id, moses_options=moses_opts, crossval_options=crossval_opts,
filter_opts=filter_opts, dataset=dataset, mnemonic=mnemonic,
Expand All @@ -46,7 +52,8 @@ def StartAnalysis(self, request, context):
context.set_details(f"Invalid dataset.Dataset doesn't contain a column with named {target_feature} or has "
f"invalid characters")
logger.error("Error occurred while validating request")
return Result(resultUrl="", description=f"Validation error occurred. Dataset doesn't contain a column with named {target_feature} or has invalid characters")
return Result(resultUrl="",
description=f"Validation error occurred. Dataset doesn't contain a column with named {target_feature} or has invalid characters")


def is_valid_dataset(b_string, target_feature):
Expand Down Expand Up @@ -79,23 +86,67 @@ def is_valid_dataset(b_string, target_feature):

def serve(port=GRPC_PORT):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)
])
moses_service_pb2_grpc.add_MosesServiceServicer_to_server(MosesService(), server)
server.add_insecure_port(f"[::]:{port}")
return server


if __name__ == "__main__":
setup_logging()
if len(sys.argv) == 2:
server = serve(sys.argv[1])
else:
server = serve()
server.start()
def _wait_forever(server):
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
server.stop(None)


@contextlib.contextmanager
def _reserve_port():
"""Find and reserve a port for all subprocesses to use."""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
raise RuntimeError("Failed to set SO_REUSEPORT.")
sock.bind(('', int(GRPC_PORT)))
try:
yield sock.getsockname()[1]
finally:
sock.close()


def _run_server(bind_address):
"""Start a server in a subprocess"""
options = (("grpc.so_reuseport", 1), ('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1))
# WARNING: This example takes advantage of SO_REUSEPORT. Due to the
# limitations of manylinux1, none of our precompiled Linux wheels currently
# support this option. (https://github.com/grpc/grpc/issues/18210). To take
# advantage of this feature, install from source with
# `pip install grpcio --no-binary grpcio`.

server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY),
options=options
)
moses_service_pb2_grpc.add_MosesServiceServicer_to_server(MosesService(), server)
server.add_insecure_port(bind_address)
server.start()
_wait_forever(server)


if __name__ == "__main__":
setup_logging()
with _reserve_port() as port:
address = "0.0.0.0:{}".format(port)
workers = []
for _ in range(_PROCESS_COUNT):
worker = multiprocessing.Process(
target=_run_server, args=(address,)
)
worker.start()
workers.append(workers)

for worker in workers:
worker.join()

0 comments on commit 19a1ef2

Please sign in to comment.