Skip to content

Commit

Permalink
Performance Python runtime (vmware-archive#1232)
Browse files Browse the repository at this point in the history
  • Loading branch information
rJankowski93 committed Jun 4, 2021
1 parent d082cb1 commit f8097b0
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions stable/python/_kubeless.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,31 @@
import importlib
import io
import os
import queue
import sys

import threading
import bottle
import prometheus_client as prom

import queue

# The reason this file has an underscore prefix in its name is to avoid a
# name collision with the user-defined module.
module_name = os.getenv('MOD_NAME')
if module_name is None:
print('MOD_NAME have to be provided', flush=True)
exit(1)
current_mod = os.path.basename(__file__).split('.')[0]
if os.getenv('MOD_NAME') == current_mod:
raise ValueError(f'Module cannot be named {current_mod}')
if module_name == current_mod:
print('Module cannot be named {} as current module'.format(current_mod), flush=True)
exit(2)

sys.path.append('/kubeless')
mod = importlib.import_module(os.getenv('MOD_NAME'))

mod = importlib.import_module(module_name)
func_name = os.getenv('FUNC_HANDLER')
if func_name is None:
print('FUNC_HANDLER have to be provided', flush=True)
exit(3)

func = getattr(mod, os.getenv('FUNC_HANDLER'))
func_port = os.getenv('FUNC_PORT', 8080)
timeout = float(os.getenv('FUNC_TIMEOUT', 180))
Expand Down Expand Up @@ -67,13 +77,6 @@ def __setstate__(self, env):
setattr(self, 'environ', env)


def funcWrap(q, event, c):
try:
q.put(func(event, c))
except Exception as inst:
q.put(inst)


@app.get('/healthz')
def healthz():
return 'OK'
Expand All @@ -85,6 +88,11 @@ def metrics():
return prom.generate_latest(prom.REGISTRY)


@app.error(500)
def exception_handler():
return 'Internal server error'


@app.route('/<:re:.*>', method=['GET', 'POST', 'PATCH', 'DELETE'])
def handler():
req = bottle.request
Expand All @@ -106,24 +114,17 @@ def handler():
func_calls.labels(method).inc()
with func_errors.labels(method).count_exceptions():
with func_hist.labels(method).time():
q = ctx.Queue()
p = ctx.Process(target=funcWrap, args=(q, event, function_context))
p.start()

que = queue.Queue()
t = threading.Thread(target=lambda q, e: q.put(func(e,function_context)), args=(que,event))
t.start()
try:
res = q.get(block=True, timeout=timeout)
res = que.get(block=True, timeout=timeout)
except queue.Empty:
p.terminate()
p.join()
return bottle.HTTPError(408, "Timeout while processing the function")
else:
p.join()
if isinstance(res, Exception) and not isinstance(res, bottle.HTTPResponse):
logging.error("Function returned an exception: %s", res)
raise res
t.join()
return res


def preload():
"""This is a no-op function used to start the forkserver."""
pass
Expand Down

0 comments on commit f8097b0

Please sign in to comment.