diff --git a/stable/python/_kubeless.py b/stable/python/_kubeless.py index 07ff81025..9e0edfebb 100755 --- a/stable/python/_kubeless.py +++ b/stable/python/_kubeless.py @@ -3,21 +3,31 @@ import importlib import io import os -import queue import sys - +import threading import bottle import prometheus_client as prom - +import queue # The reason this file has an underscore prefix in its name is to avoid a # name collision with the user-defined module. +module_name = os.getenv('MOD_NAME') +if module_name is None: + print('MOD_NAME have to be provided', flush=True) + exit(1) current_mod = os.path.basename(__file__).split('.')[0] -if os.getenv('MOD_NAME') == current_mod: - raise ValueError(f'Module cannot be named {current_mod}') +if module_name == current_mod: + print('Module cannot be named {} as current module'.format(current_mod), flush=True) + exit(2) sys.path.append('/kubeless') -mod = importlib.import_module(os.getenv('MOD_NAME')) + +mod = importlib.import_module(module_name) +func_name = os.getenv('FUNC_HANDLER') +if func_name is None: + print('FUNC_HANDLER have to be provided', flush=True) + exit(3) + func = getattr(mod, os.getenv('FUNC_HANDLER')) func_port = os.getenv('FUNC_PORT', 8080) timeout = float(os.getenv('FUNC_TIMEOUT', 180)) @@ -67,13 +77,6 @@ def __setstate__(self, env): setattr(self, 'environ', env) -def funcWrap(q, event, c): - try: - q.put(func(event, c)) - except Exception as inst: - q.put(inst) - - @app.get('/healthz') def healthz(): return 'OK' @@ -85,6 +88,11 @@ def metrics(): return prom.generate_latest(prom.REGISTRY) +@app.error(500) +def exception_handler(): + return 'Internal server error' + + @app.route('/<:re:.*>', method=['GET', 'POST', 'PATCH', 'DELETE']) def handler(): req = bottle.request @@ -106,24 +114,17 @@ def handler(): func_calls.labels(method).inc() with func_errors.labels(method).count_exceptions(): with func_hist.labels(method).time(): - q = ctx.Queue() - p = ctx.Process(target=funcWrap, args=(q, event, function_context)) - p.start() - + que = queue.Queue() + t = threading.Thread(target=lambda q, e: q.put(func(e,function_context)), args=(que,event)) + t.start() try: - res = q.get(block=True, timeout=timeout) + res = que.get(block=True, timeout=timeout) except queue.Empty: - p.terminate() - p.join() return bottle.HTTPError(408, "Timeout while processing the function") else: - p.join() - if isinstance(res, Exception) and not isinstance(res, bottle.HTTPResponse): - logging.error("Function returned an exception: %s", res) - raise res + t.join() return res - def preload(): """This is a no-op function used to start the forkserver.""" pass