-
Notifications
You must be signed in to change notification settings - Fork 18
Extending
This will describe how to write a storage for queues. Because of their non-persistent nature, topics don't need storage engines. (Yes, there are also some message queues that support persistent topics, but we're not there yet.)
The API for storage engines is quite simple. basic. Here is the full API (though, as you will see later implementing is simplified by subclassing existing work):
class SampleQueueStore(object):
def enqueue(self, destination, frame):
""" Enqueue a message (frame) on destination queue. """
def dequeue(self, destination):
""" Pop a message (frame) off of the queue and return it. """
def requeue(self, destination, frame):
""" Requeue a message (frame) for specified destination. """
def size(self, destination):
""" Returns the number of messages (frames) for destination. """
def has_frames(self, destination):
""" Whether the specified destination has any messages (frames). """
def close(self):
""" Close any resources associated with this store. """
def frames(self, destination):
""" Returns an iterator over messages (frames) for destination. """
To make life simpler, you should really start by just subclassing the provided [http://packages.python.org/CoilMQ/coilmq.store.QueueStore-class.html QueueStore] class. It makes some assumptions / shortcuts so that you need to implement fewer methods.
Next most helpful thing would probably to look at the source for modules already in the [http://packages.python.org/CoilMQ/coilmq.store-module.html coilmq.store] package.
Concurrency needs special mention when writing QueueStore
subclasses. CoilMQ is a multi-threaded application. A single storage backend can certainly be accessed by several threads at the same time, so any state in your storage engine must be guarded by locks.
Let me repeat: your class must be thread-safe.
This is made relatively much simpler by the availability of a @synchronized
decorator that you can use to ensure that only a single thread can enter a method in your object at any one time.
For example:
from coilmq.store import QueueStore
from coilmq.util.concurrency import synchronized
class SampleQueueStore(QueueStore):
@synchronized
def enqueue(self, destination, frame):
""" Enqueue a message (frame) on destination queue. """
# etc.
Note that if you are extending QueueStore
, _you must call its constructor or ensure that you set the internal _lock
var to a threading.RLock
which is used by the @synchronized
decorator.
Basically, you just need to [ConfigurationReference#Queue_Storage_Backend configure the queue storage backend] in your config file. (Yes, read that thing I just linked to.)
Now, if your queue storage class needs to be configured from other options in your config file, you will probably want to follow the model used for DBM and SQLAlchemy backends and write a simple callable "factory" function that will do the setup for you and returns your QueueStore
subclass (or object that quacks like a QueuStore
).
Usually this would look something like it does for the dbm implementation:
def make_dbm():
try:
data_dir = config.get('coilmq', 'qstore.dbm.data_dir')
cp_ops = config.getint('coilmq', 'qstore.dbm.checkpoint_operations')
cp_timeout = config.getint('coilmq', 'qstore.dbm.checkpoint_timeout')
except ConfigParser.NoOptionError, e:
raise ConfigError('Missing configuration parameter: %s' % e)
if not os.path.exists(data_dir):
raise ConfigError('DBM directory does not exist: %s' % data_dir)
if not os.access(data_dir, os.W_OK | os.R_OK):
raise ConfigError('Cannot read and write DBM directory: %s' % data_dir)
store = DbmQueue(data_dir, checkpoint_operations=cp_ops, checkpoint_timeout=cp_timeout)
return store
Having this factory level of abstraction makes it a lot easier to write unit tests. (You can just configure your QueueStore
in your setUp()
method for example.)
TODO