Skip to content

Commit

Permalink
Improved Decider threaded backend
Browse files Browse the repository at this point in the history
  • Loading branch information
syrusakbary committed Jul 5, 2018
1 parent fa4eeda commit 52d7753
Show file tree
Hide file tree
Showing 5 changed files with 523 additions and 59 deletions.
3 changes: 3 additions & 0 deletions graphql/backend/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def get_unique_document_id(query_str):


class GraphQLCachedBackend(GraphQLBackend):
"""GraphQLCachedBackend will cache the document response from the backend
given a key for that document"""

def __init__(
self,
backend, # type: GraphQLBackend
Expand Down
3 changes: 3 additions & 0 deletions graphql/backend/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def execute_and_validate(


class GraphQLCoreBackend(GraphQLBackend):
"""GraphQLCoreBackend will return a document using the default
graphql executor"""

def __init__(self, executor=None):
# type: (Optional[Any]) -> None
self.execute_params = {"executor": executor}
Expand Down
216 changes: 197 additions & 19 deletions graphql/backend/decider.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,209 @@
import atexit
import logging
import threading
import os
from time import sleep, time


from .base import GraphQLBackend, GraphQLDocument
from .cache import GraphQLCachedBackend
from ..pyutils.compat import Queue, check_threads

# Necessary for static type checking
if False: # flake8: noqa
from typing import List, Union, Any, Optional
from typing import List, Union, Any, Optional, Hashable, Dict, Tuple
from ..type.schema import GraphQLSchema


class GraphQLDeciderBackend(GraphQLBackend):
def __init__(self, backends):
# type: (List[GraphQLBackend], ) -> None
if not backends:
DEFAULT_TIMEOUT = 10

logger = logging.getLogger("graphql.errors")


# Code shamelessly taken from
# https://github.com/getsentry/raven-python/blob/master/raven/transport/threaded.py
# Why to create when we can take something that works?
# Attributions to the Sentry team :)
class AsyncWorker(object):
_terminator = object()

def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT):
check_threads()
self._queue = Queue(-1)
self._lock = threading.Lock()
self._thread = None
self._thread_for_pid = None
self.options = {"shutdown_timeout": shutdown_timeout}
self.start()

def is_alive(self):
if self._thread_for_pid != os.getpid():
return False
return self._thread and self._thread.is_alive()

def _ensure_thread(self):
if self.is_alive():
return
self.start()

def main_thread_terminated(self):
with self._lock:
if not self.is_alive():
# thread not started or already stopped - nothing to do
return

# wake the processing thread up
self._queue.put_nowait(self._terminator)

timeout = self.options["shutdown_timeout"]

# wait briefly, initially
initial_timeout = min(0.1, timeout)

if not self._timed_queue_join(initial_timeout):
# if that didn't work, wait a bit longer
# NB that size is an approximation, because other threads may
# add or remove items
size = self._queue.qsize()

print("Sentry is attempting to send %i pending error messages" % size)
print("Waiting up to %s seconds" % timeout)

if os.name == "nt":
print("Press Ctrl-Break to quit")
else:
print("Press Ctrl-C to quit")

self._timed_queue_join(timeout - initial_timeout)

self._thread = None

def _timed_queue_join(self, timeout):
"""
implementation of Queue.join which takes a 'timeout' argument
returns true on success, false on timeout
"""
deadline = time() + timeout
queue = self._queue

queue.all_tasks_done.acquire()
try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
# timed out
return False

queue.all_tasks_done.wait(timeout=delay)

return True

finally:
queue.all_tasks_done.release()

def start(self):
"""
Starts the task thread.
"""
self._lock.acquire()
try:
if not self.is_alive():
self._thread = threading.Thread(
target=self._target, name="graphql.AsyncWorker"
)
self._thread.setDaemon(True)
self._thread.start()
self._thread_for_pid = os.getpid()
finally:
self._lock.release()
atexit.register(self.main_thread_terminated)

def stop(self, timeout=None):
"""
Stops the task thread. Synchronous!
"""
with self._lock:
if self._thread:
self._queue.put_nowait(self._terminator)
self._thread.join(timeout=timeout)
self._thread = None
self._thread_for_pid = None

def queue(self, callback, *args, **kwargs):
self._ensure_thread()
self._queue.put_nowait((callback, args, kwargs))

def _target(self):
while True:
record = self._queue.get()
try:
if record is self._terminator:
break
callback, args, kwargs = record
try:
callback(*args, **kwargs)
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
self._queue.task_done()

sleep(0)


class GraphQLDeciderBackend(GraphQLCachedBackend):
"""GraphQLDeciderBackend will offload the document generation to the
main backend in a new thread, serving meanwhile the document from the fallback
backend"""

_worker = None
fallback_backend = None # type: GraphQLBackend
# _in_queue = object()

def __init__(
self,
backend, # type: Union[List[GraphQLBackend], Tuple[GraphQLBackend, GraphQLBackend], GraphQLBackend]
fallback_backend=None, # type: Optional[GraphQLBackend]
cache_map=None, # type: Optional[Dict[Hashable, GraphQLDocument]]
use_consistent_hash=False, # type: bool
):
# type: (...) -> None
if not backend:
raise Exception("Need to provide backends to decide into.")
if not isinstance(backends, (list, tuple)):
raise Exception("Provided backends need to be a list or tuple.")
self.backends = backends
super(GraphQLDeciderBackend, self).__init__()
if isinstance(backend, (list, tuple)):
if fallback_backend:
raise Exception("Can't set a fallback backend and backends as array")
if len(backend) != 2:
raise Exception("Only two backends are supported for now")
backend, fallback_backend = backend[0], backend[1] # type: ignore
else:
if not fallback_backend:
raise Exception("Need to provide a fallback backend")

self.fallback_backend = fallback_backend # type: ignore
super(GraphQLDeciderBackend, self).__init__(
backend, cache_map=cache_map, use_consistent_hash=use_consistent_hash
)

def queue_backend(self, key, schema, request_string):
# type: (Hashable, GraphQLSchema, str) -> None
self.cache_map[key] = self.backend.document_from_string(schema, request_string)

def get_worker(self):
# type: () -> AsyncWorker
if self._worker is None or not self._worker.is_alive():
self._worker = AsyncWorker()
return self._worker

def document_from_string(self, schema, request_string):
# type: (GraphQLSchema, str) -> GraphQLDocument
for backend in self.backends:
try:
return backend.document_from_string(schema, request_string)
except Exception:
continue

raise Exception(
"GraphQLDeciderBackend was not able to retrieve a document. Backends tried: {}".format(
repr(self.backends)
"""This method returns a GraphQLQuery (from cache if present)"""
key = self.get_key_for_schema_and_document_string(schema, request_string)
if key not in self.cache_map:
# We return from the fallback
self.cache_map[key] = self.fallback_backend.document_from_string(
schema, request_string
)
)
# We ensure the main backend response is in the queue
self.get_worker().queue(self.queue_backend, key, schema, request_string)

return self.cache_map[key]
Loading

0 comments on commit 52d7753

Please sign in to comment.