diff --git a/stable/python/_kubeless.py b/stable/python/_kubeless.py index 6c336b9..6136cc3 100755 --- a/stable/python/_kubeless.py +++ b/stable/python/_kubeless.py @@ -5,7 +5,7 @@ import os import queue import sys - +import threading import bottle import prometheus_client as prom @@ -67,13 +67,6 @@ def __setstate__(self, env): setattr(self, 'environ', env) -def funcWrap(q, event, c): - try: - q.put(func(event, c)) - except Exception as inst: - q.put(inst) - - @app.get('/healthz') def healthz(): return 'OK' @@ -85,6 +78,11 @@ def metrics(): return prom.generate_latest(prom.REGISTRY) +@app.error(500) +def exception_handler(): + return 'Internal server error' + + @app.route('/<:re:.*>', method=['GET', 'POST', 'PATCH', 'DELETE']) def handler(): req = bottle.request @@ -104,21 +102,15 @@ def handler(): func_calls.labels(method).inc() with func_errors.labels(method).count_exceptions(): with func_hist.labels(method).time(): - q = ctx.Queue() - p = ctx.Process(target=funcWrap, args=(q, event, function_context)) - p.start() - + que = queue.Queue() + t = threading.Thread(target=lambda q, e: q.put(func(e,function_context)), args=(que,event)) + t.start() try: - res = q.get(block=True, timeout=timeout) + res = que.get(block=True, timeout=timeout) except queue.Empty: - p.terminate() - p.join() return bottle.HTTPError(408, "Timeout while processing the function") else: - p.join() - if isinstance(res, Exception) and not isinstance(res, bottle.HTTPResponse): - logging.error("Function returned an exception: %s", res) - raise res + t.join() return res