From 5ea24e7f35d922ccd64b1b982348ad62652b72c8 Mon Sep 17 00:00:00 2001 From: Jeff Terrace Date: Tue, 22 Feb 2011 11:52:42 -0500 Subject: [PATCH] Updating cassandra backend with latest pycassa changes --- celery/backends/cassandra.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/celery/backends/cassandra.py b/celery/backends/cassandra.py index dc492793217..741dde5e1e6 100644 --- a/celery/backends/cassandra.py +++ b/celery/backends/cassandra.py @@ -2,7 +2,7 @@ try: import pycassa from thrift import Thrift - C = __import__('cassandra').ttypes # FIXME Namespace kludge + C = pycassa.cassandra.ttypes except ImportError: pycassa = None @@ -59,7 +59,7 @@ def __init__(self, servers=None, keyspace=None, column_family=None, if not pycassa: raise ImproperlyConfigured( "You need to install the pycassa library to use the " - "Cassandra backend. See http://github.com/vomjom/pycassa") + "Cassandra backend. See https://github.com/pycassa/pycassa") self.servers = servers or \ self.app.conf.get("CASSANDRA_SERVERS", self.servers) @@ -72,6 +72,20 @@ def __init__(self, servers=None, keyspace=None, column_family=None, self.cassandra_options = dict(cassandra_options or {}, **self.app.conf.get("CASSANDRA_OPTIONS", {})) + read_cons = self.app.conf.get("CASSANDRA_READ_CONSISTENCY", + "LOCAL_QUORUM") + write_cons = self.app.conf.get("CASSANDRA_WRITE_CONSISTENCY", + "LOCAL_QUORUM") + try: + self.read_consistency = getattr(pycassa.ConsistencyLevel, read_cons) + except AttributeError: + self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM + try: + self.write_consistency = getattr(pycassa.ConsistencyLevel, + write_cons) + except AttributeError: + self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM + if not self.servers or not self.keyspace or not self.column_family: raise ImproperlyConfigured( "Cassandra backend not configured.") @@ -86,7 +100,6 @@ def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except (pycassa.InvalidRequestException, - pycassa.NoServerAvailable, pycassa.TimedOutException, pycassa.UnavailableException, socket.error, @@ -100,13 +113,12 @@ def wrapper(*args, **kwargs): def _get_column_family(self): if self._column_family is None: - conn = pycassa.connect(self.servers, + conn = pycassa.connect(self.keyspace, servers=self.servers, **self.cassandra_options) self._column_family = \ - pycassa.ColumnFamily(conn, self.keyspace, - self.column_family, - read_consistency_level=pycassa.ConsistencyLevel.DCQUORUM, - write_consistency_level=pycassa.ConsistencyLevel.DCQUORUM) + pycassa.ColumnFamily(conn, self.column_family, + read_consistency_level=self.read_consistency, + write_consistency_level=self.write_consistency) return self._column_family def process_cleanup(self): @@ -159,7 +171,7 @@ def cleanup(self): count=2 ** 30)) columns = cf.client.multiget_slice(cf.keyspace, self._index_keys, column_parent, slice_pred, - pycassa.ConsistencyLevel.DCQUORUM) + self.read_consistency) index_cols = [c.column.name for c in itertools.chain(*columns.values())]