Skip to content
hozn edited this page Oct 21, 2012 · 1 revision

Writing a Storage Engine

This will describe how to write a storage for queues. Because of their non-persistent nature, topics don't need storage engines. (Yes, there are also some message queues that support persistent topics, but we're not there yet.)

Implement the QueueStore "Interface"

The API for storage engines is quite simple. basic. Here is the full API (though, as you will see later implementing is simplified by subclassing existing work):

class SampleQueueStore(object):
    
    def enqueue(self, destination, frame):
        """ Enqueue a message (frame) on destination queue. """
    
    def dequeue(self, destination):
        """ Pop a message (frame) off of the queue and return it. """
    
    def requeue(self, destination, frame):
        """ Requeue a message (frame) for specified destination. """

    def size(self, destination):
        """ Returns the number of messages (frames) for destination. """

    def has_frames(self, destination):
        """ Whether the specified destination has any messages (frames). """

    def close(self):
        """ Close any resources associated with this store. """
    
    def frames(self, destination):
        """ Returns an iterator over messages (frames) for destination. """

To make life simpler, you should really start by just subclassing the provided [http://packages.python.org/CoilMQ/coilmq.store.QueueStore-class.html QueueStore] class. It makes some assumptions / shortcuts so that you need to implement fewer methods.

Next most helpful thing would probably to look at the source for modules already in the [http://packages.python.org/CoilMQ/coilmq.store-module.html coilmq.store] package.

Concurrency & Threading

Concurrency needs special mention when writing QueueStore subclasses. CoilMQ is a multi-threaded application. A single storage backend can certainly be accessed by several threads at the same time, so any state in your storage engine must be guarded by locks.

Let me repeat: your class must be thread-safe.

This is made relatively much simpler by the availability of a @synchronized decorator that you can use to ensure that only a single thread can enter a method in your object at any one time.

For example:

from coilmq.store import QueueStore
from coilmq.util.concurrency import synchronized

class SampleQueueStore(QueueStore):
    
    @synchronized
    def enqueue(self, destination, frame):
        """ Enqueue a message (frame) on destination queue. """

    # etc.

Note that if you are extending QueueStore, _you must call its constructor or ensure that you set the internal _lock var to a threading.RLock which is used by the @synchronized decorator.

Tell CoilMQ About Your Implementation

Basically, you just need to [ConfigurationReference#Queue_Storage_Backend configure the queue storage backend] in your config file. (Yes, read that thing I just linked to.)

Now, if your queue storage class needs to be configured from other options in your config file, you will probably want to follow the model used for DBM and SQLAlchemy backends and write a simple callable "factory" function that will do the setup for you and returns your QueueStore subclass (or object that quacks like a QueuStore).

Usually this would look something like it does for the dbm implementation:

def make_dbm():
    try:
        data_dir = config.get('coilmq', 'qstore.dbm.data_dir')
        cp_ops = config.getint('coilmq', 'qstore.dbm.checkpoint_operations')
        cp_timeout = config.getint('coilmq', 'qstore.dbm.checkpoint_timeout')
    except ConfigParser.NoOptionError, e:
        raise ConfigError('Missing configuration parameter: %s' % e)

    if not os.path.exists(data_dir):
        raise ConfigError('DBM directory does not exist: %s' % data_dir)
    if not os.access(data_dir, os.W_OK | os.R_OK):
        raise ConfigError('Cannot read and write DBM directory: %s' % data_dir)
    
    store = DbmQueue(data_dir, checkpoint_operations=cp_ops, checkpoint_timeout=cp_timeout)
    return store

Having this factory level of abstraction makes it a lot easier to write unit tests. (You can just configure your QueueStore in your setUp() method for example.)

Writing an Authenticator

TODO