In this chapter we’re going to start with a fairly uncontroversial insight: reads (commands) and writes (queries) are different, so they should be treated differently. Then we’re going to push that insight as far as we can. If you’re anything like Harry, this will all seem extreme at first, but hopefully we can make the argument that it’s not totally unreasonable.
Separating reads from writes shows where we might end up. Firstly though, why bother?
Tip
|
You can find our code for this chapter at github.com/cosmicpython/code/tree/chapter_12_cqrs. git clone https://github.com/cosmicpython/code.git && cd code git checkout chapter_12_cqrs # or, if you want to code along, checkout the previous chapter: git checkout chapter_11_external_events |
We’ve spent a lot of time in this book talking about how to build software that enforces the rules of our domain. These rules, or constraints, will be different for every application and they make up the interesting core of our systems.
In this book we’ve set explicit constraints like "You can’t allocate more stock than is available", as well as implicit constraints like "each order line is allocated to a single batch".
We wrote these rules down as unit tests at the beginning of the book.
def test_allocating_to_a_batch_reduces_the_available_quantity():
batch = Batch("batch-001", "SMALL-TABLE", qty=20, eta=date.today())
line = OrderLine('order-ref', "SMALL-TABLE", 2)
batch.allocate(line)
assert batch.available_quantity == 18
...
def test_cannot_allocate_if_available_smaller_than_required():
small_batch, large_line = make_batch_and_line("ELEGANT-LAMP", 2, 20)
assert small_batch.can_allocate(large_line) is False
In order to apply these rules properly, we needed to ensure that operations were consistent, and so we introduced patterns like unit of work and aggregates that help us commit small chunks of work.
To communicate changes between those small chunks, we introduced Domain Events so that we can write rules like "When stock is damaged or lost, adjust the available quantity on the batch, and re-allocate orders if necessary".
All of this complexity exists so that we can enforce rules when we change the state of our system. We’ve built a very flexible set of tools for writing data.
What about reads, though?
At MADE.com we have a system very like the Allocation service. In a busy day we might process 100 orders in an hour, and we have a big gnarly system for allocating stock to those orders.
In that same busy day, though, we might have 100 product views per second. Each time somebody visits a product page, or a product listing page, we need to figure out whether the product is still in stock, and how long it will take us to deliver it.
The domain is the same—we’re concerned with batches of stock, and their
arrival date, and the amount that’s still available—but the access pattern
is very different. For example, our customers won’t notice if the query
is a few seconds out of date, but if our allocate
service is inconsistent
we’ll make a mess of their orders. We can take advantage of this difference by
making our reads eventually consistent in order to make them perform better.
This idea of trading consistency against performance makes a lot of developers nervous at first, so let’s talk quickly about that.
Let’s imagine that our "Get Available Stock" query is 30 seconds out of date when Bob visits the page for ILL-JUDGED-WARDROBE. Meanwhile, though, Harry has already bought the last item. When we try to allocate Bob’s order, we’ll get a failure and we’ll need to either cancel his order, or buy more stock and delay his delivery.
People who’ve only every worked with relational data stores get really nervous about this problem, but it’s worth considering two other scenarios to gain some perspective.
Firstly, let’s imagine that Bob and Harry both visit the page at the same time. Harry goes off to make coffee, and by the time he returns, Bob has already bought the last wardrobe. When Harry places his order, we send it to the allocation service and, because there’s not enough stock, we have to refund his payment or buy more stock and delay his delivery.
As soon as we render the product page, the data is already stale. This insight is key to understanding why reads can be safely inconsistent: we’ll always need to check the current state of our system when we come to allocate, because all distributed systems are inconsistent. As soon as you have a web server and two customers, you’ve got the potential for stale data.
Okay, let’s assume we solve that problem somehow: we magically build a totally consistent web application where nobody ever sees stale data. This time Harry gets to the page first and buys his wardrobe.
Unfortunately for him, when the warehouse staff try to dispatch his furniture it falls off the fork-lift truck and smashes into a zillion pieces. Now what?
The only thing to do is call Harry up and either refund his order or buy more stock and delay delivery.
No matter what we do, we’re always going to find that our software systems are inconsistent with reality and so we’ll always need business processes to cope with these edge cases. It’s okay to trade performance for consistency on the read side, because stale data is essentially unavoidable.
We can think of these these requirements as forming two halves of a system: the read side, and the write side, shown in Read vs Write.
Read side | Write side | |
---|---|---|
Behaviour |
Simple read |
Complex business logic |
Cacheability |
Highly cacheable |
Uncacheable |
Consistency |
Can be stale |
Must be transactionally consistent |
For the write side, our fancy domain architectural patterns help us to evolve our system over time, but the complexity we’ve built so far doesn’t buy anything for reading data. The service layer, the unit of work, and the clever domain model are just bloat.
If you’ve been doing web development, you’re probably familiar with the
post/redirect/get pattern. This is a technique where a web endpoint accepts an
HTTP POST, and responds with a redirect to see the result. For example, we might
accept a POST to /batches
to create a new batch, and redirect the user to
/batches/123
to see their newly created batch.
This approach fixes the problems that arise when users refresh the results page in their browser, or try to bookmark a results page. In the case of a refresh, it can lead to our users double-submitting data, buying two sofas where they only needed one. In the case of a bookmark our hapless customers will end up with a broken page when they try to GET a POST endpoint.
Both these problems happen because we’re returning data in response to a write operation. Post/Redirect/Get side-steps the issue by separating the read and write phases of our operation.
This technique is a simple example of Command-Query Separation (CQS). In CQS we follow one simple rule: functions should either modify state, or answer questions, but never both. This makes software easier to reason about: we should always be able to ask "are the lights on?" without flicking the light switch.
Note
|
When building APIs we can apply the same design technique by returning a 201 Created, or a 202 Accepted, with a Location header containing the URI of our new resources. What’s important here isn’t the status code we use, but the logical separation of work into a write phase and a query phase. |
As we’ll see, we can use the CQS principle to make our systems faster and more
scalable, but first, let’s fix the CQS violation in our existing code. A few
chapters ago we introduced an allocate
endpoint that takes an order and
calls our service layer to allocate some stock. At the end of the call, we
return a 200 OK and the batch id. That’s led to some ugly design flaws so that
we can get the data we need. Let’s change it to return a simple OK message, and
instead provide a new read-only endpoint to retrieve allocation state.
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_202_and_batch_is_allocated():
orderid = random_orderid()
sku, othersku = random_sku(), random_sku('other')
batch1, batch2, batch3 = random_batchref(1), random_batchref(2), random_batchref(3)
api_client.post_to_add_batch(batch1, sku, 100, '2011-01-02')
api_client.post_to_add_batch(batch2, sku, 100, '2011-01-01')
api_client.post_to_add_batch(batch3, othersku, 100, None)
r = api_client.post_to_allocate(orderid, sku, qty=3)
assert r.status_code == 202
r = api_client.get_allocation(orderid)
assert r.ok
assert r.json() == [
{'sku': sku, 'batchref': batch2},
]
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_unhappy_path_returns_400_and_error_message():
unknown_sku, orderid = random_sku(), random_orderid()
r = api_client.post_to_allocate(
orderid, unknown_sku, qty=20, expect_success=False,
)
assert r.status_code == 400
assert r.json()['message'] == f'Invalid sku {unknown_sku}'
r = api_client.get_allocation(orderid)
assert r.status_code == 404
OK what might the flask app look like?
from allocation import views
...
@app.route("/allocations/<orderid>", methods=['GET'])
def allocations_view_endpoint(orderid):
uow = unit_of_work.SqlAlchemyUnitOfWork()
result = views.allocations(orderid, uow) #(1)
if not result:
return 'not found', 404
return jsonify(result), 200
-
All right, a views.py, fair enough, we can keep read-only stuff in there, and it’ll be a real views.py, not like Django’s, something that knows how to build read-only views of our data…
…so we can probably just add a list method to our existing repository obj-…
from allocation.service_layer import unit_of_work
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
results = list(uow.session.execute(
'SELECT ol.sku, b.reference'
' FROM allocations AS a'
' JOIN batches AS b ON a.batch_id = b.id'
' JOIN order_lines AS ol ON a.orderline_id = ol.id'
' WHERE ol.orderid = :orderid',
dict(orderid=orderid)
))
return [{'sku': sku, 'batchref': batchref} for sku, batchref in results]
Excuse me? Raw SQL?
If you’re anything like Harry encountering this pattern for the first time, you’ll be wondering what on earth Bob has been smoking. We’re hand-rolling our own SQL now, and converting database rows directly to dicts? After all the effort we put into building a nice domain model? And what about Repository Pattern, isn’t that meant to be our abstraction around the database, why don’t we reuse that?
Well, let’s explore that seemingly simpler alternative first, and see what it looks like in practice.
We’ll still keep our view in a separate views.py module; enforcing a clear distinction between reads and writes in your application is still a good idea. We apply command-query separation, and it’s easy to see which code modifies state (the event handlers) and which code just retrieves read-only state (the views).
Tip
|
Split out your read-only views from your state-modifying command and event handlers. |
Before we get into exploring various options, let’s talk about testing. Whichever approaches you decide to go for, you’re probably going to need at least one integration test. Something like this:
def test_allocations_view(sqlite_session_factory):
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
messagebus.handle(commands.CreateBatch('sku1batch', 'sku1', 50, None), uow) #(1)
messagebus.handle(commands.CreateBatch('sku2batch', 'sku2', 50, date.today()), uow)
messagebus.handle(commands.Allocate('order1', 'sku1', 20), uow)
messagebus.handle(commands.Allocate('order1', 'sku2', 20), uow)
# add a spurious batch and order to make sure we're getting the right ones
messagebus.handle(commands.CreateBatch('sku1batch-later', 'sku1', 50, date.today()), uow)
messagebus.handle(commands.Allocate('otherorder', 'sku1', 30), uow)
messagebus.handle(commands.Allocate('otherorder', 'sku2', 10), uow)
assert views.allocations('order1', uow) == [
{'sku': 'sku1', 'batchref': 'sku1batch'},
{'sku': 'sku2', 'batchref': 'sku2batch'},
]
-
We do the setup for the integration test using the public entrypoint to our application, the messagebus. That keeps our tests decoupled from any impementation/infrastructure details about how things get stored.
How about adding a helper method to our products repository?
from allocation import unit_of_work
def allocations(orderid: str, uow: unit_of_work.AbstractUnitOfWork):
with uow:
products = uow.products.for_order(orderid=orderid) #(1)
batches = [b for p in products for b in p.batches] #(2)
return [
{'sku': b.sku, 'batchref': b.reference}
for b in batches
if orderid in b.orderids #(3)
]
-
Our repository returns product objects, and we need to find all the products for the skus in a given order, so we’ll build a new helper method called
.for_order()
on the repository. -
Now we have products but we actually want batch references, so we get all the possible batches with a list comprehension.
-
And then we filter again to get just the batches for our specific order. That in turn relies on our batch objects being able to tell us which order IDs it has allocated to it:
class Batch:
...
@property
def orderids(self):
return {l.orderid for l in self._allocations}
You can start to see that reusing our existing repository and domain model classes is not as straightforward as you might have assumed. We’ve had to add new helper methods to both, and we’re doing a bunch of looping and filtering in Python, which is work that would be much more efficiently done by the database.
So, yes, on the plus side we’re re-using our existing abstractions, but on the downside, it all feels quite clunky.
What we’re seeing here are the effects of the fact that our domain model is designed primarily for write operations, and our requirements for reads are often conceptually quite different.
This is the chinstrokey-architect justification for CQRS. As we’ve said before, a Domain Model is not a data model—we’re trying to capture the way the business works: workflow, rules around state changes, messages exchanged; concerns about how the system reacts to external events and user input. Most of this stuff is totally irrelevant for read-only operations.
Making a facile point, your domain classes will have a number of methods for modifying state, and you won’t need any of them for read-only operations.
As the complexity of your domain model grows, you will find yourself making more and more choices about how to structure that model, which make it more and more awkward to use for read operations.
Tip
|
This justification for CQRS is related to the justification for Domain Model. If you’re building a simple CRUD app, then reads and writes are going to be closely related, so you don’t need a Domain Model or CQRS. But the more complex your domain, the more likely you are to need both. |
You may be thinking, OK, if our repository is clunky, and working with
Products
is clunky, then I can at least use my ORM and work with Batches
.
That’s what it’s for!
from allocation import unit_of_work, model
def allocations(orderid: str, uow: unit_of_work.AbstractUnitOfWork):
with uow:
batches = uow.session.query(model.Batch).join(
model.OrderLine, model.Batch._allocations
).filter(
model.OrderLine.orderid == orderid
)
return [
{'sku': b.sku, 'batchref': b.batchref}
for b in batches
]
But is that actually any easier to write or understand than the raw SQL version from Views do… raw SQL??? (src/allocation/views.py)? It may not look too bad up there, but we can tell you it took several attempts, and plenty of digging through the SQLAlchemy docs. SQL is just SQL.
But the ORM can also expose us to performance problems.
The so-called SELECT N+1 problem is a common performance problem with ORMs: when retrieving a list of objects, your ORM will often perform an initial query to, say, get all the IDs of the objects it needs, and then issue individual queries for each object to retrieve their attributes. This is especially likely if there are any foreign key relationships on your objects.
Note
|
In all fairness we should say that SQLAlchemy is quite good at avoiding the SELECT N+1 problem. It doesn’t display it in the above example, and you can request eager loading explicitly to avoid it when dealing with joined objects. |
Beyond SELECT N+1
, you may have other reasons that you want to decouple the
way you persist state changes from the way that you retrieve current state.
A set of fully normalized relational tables is a good way to make sure that
write operations never cause data corruption. But retrieving data using lots
of JOINs can be slow. It’s common in such cases to add some denormalized views
build read replicas, or even add caching layers.
On that note: have we convinced you that our raw SQL version isn’t so weird as it first seemed? Perhaps we were exaggerating for effect? Just you wait.
So. Reasonable or not, that hardcoded SQL query is pretty ugly right? What if we made it nicer…
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
results = list(uow.session.execute(
'SELECT sku, batchref FROM allocations_view WHERE orderid = :orderid',
dict(orderid=orderid)
))
...
…by keeping a totally separate, denormalized datastore for our view model?
allocations_view = Table(
'allocations_view', metadata,
Column('orderid', String(255)),
Column('sku', String(255)),
Column('batchref', String(255)),
)
OK, nicer-looking SQL queries wouldn’t be a justification for anything really, but building a denormalized copy of your data that’s optimized for read operations isn’t uncommon, once you’ve reached the limits of what you can do with indexes.
Even with well-tuned indexes, a relational database uses a lot of CPU to perform
joins. The fastest queries will always be SELECT * from MyTable WHERE key = :value
More than raw speed, though, this approach buys us scale. When we’re writing data to a relational database, we need to make sure that we get a lock over the rows we’re changing so that we don’t run into consistency problems.
If multiple clients are changing data at the same time, we’ll have weird race conditions. When we’re reading data, though, there’s no limit to the number of clients that can concurrently execute. For this reason read-only stores can be horizontally scaled out.
Tip
|
Because read replicas can be inconsistent, there’s no limit to how many we can have. If you’re struggling to scale a system with a complex data store, ask whether you could build a simpler read model. |
Keeping them up to date is the challenge! Database views (materialized or otherwise) and triggers are a common solution, but that limits you to your database. We’d like to show you how we can reuse our event-driven architecture instead.
We add a second handler to the Allocated
event:
EVENT_HANDLERS = {
events.Allocated: [
handlers.publish_allocated_event,
handlers.add_allocation_to_read_model
],
Here’s what our update-view-model code looks like:
def add_allocation_to_read_model(
event: events.Allocated, uow: unit_of_work.SqlAlchemyUnitOfWork,
):
with uow:
uow.session.execute(
'INSERT INTO allocations_view (orderid, sku, batchref)'
' VALUES (:orderid, :sku, :batchref)',
dict(orderid=event.orderid, sku=event.sku, batchref=event.batchref)
)
uow.commit()
Believe it or not, that will pretty much work! And it will work against the exact same integration tests as the rest of our options.
(OK you’ll also need to handle deallocated:)
events.Deallocated: [
handlers.remove_allocation_from_read_model,
handlers.reallocate
],
...
def remove_allocation_from_read_model(
event: events.Deallocated, uow: unit_of_work.SqlAlchemyUnitOfWork,
):
with uow:
uow.session.execute(
'DELETE FROM allocations_view '
' WHERE orderid = :orderid AND sku = :sku',
Sequence diagram for read model shows the flow across the two requests: two transactions in the POST/write operation, one to update the write model and one to update the read model, which the GET/read operation can use.
[plantuml, apwp_1202, config=plantuml.cfg] @startuml actor User order 1 boundary Flask order 2 participant MessageBus order 3 participant "Domain Model" as Domain order 4 participant View order 9 database DB order 10 User -> Flask: POST to allocate Endpoint Flask -> MessageBus : Allocate Command group UoW/transaction 1 MessageBus -> Domain : allocate() MessageBus -> DB: commit write model end group UoW/transaction 2 Domain -> MessageBus : raise Allocated event(s) MessageBus -> DB : update view model end Flask -> User: 202 OK User -> Flask: GET allocations endpoint Flask -> View: get allocations View -> DB: SELECT on view model DB -> View: some allocations View -> Flask: some allocations Flask -> User: some allocations @enduml
"What happens when it breaks" should be the first question we ask as engineers.
How do we deal with a view model that hasn’t been updated because of a bug or temporary outage? Well, this is just another case where events and commands can fail independently.
If we never updated the view model, and the ILL-JUDGED-WARDROBE was forever in
stock, that would be annoying for customers, but the allocate
service would
still fail, and we’d take action to fix the problem.
Rebuilding a view model is easy, though. Since we’re using a service layer to update our view model, we can write a tool that
-
Queries the current state of the write-side to work out what’s currently allocated.
-
Calls the
add_allocate_to_read_model
handler for each allocated item.
We can use this technique to create entirely new read-models from historical data.
Let’s see the flexibility that our event-driven model buys us in action, by seeing what happens if we ever decide we want to implement a read model using a totally separate storage engine, Redis.
Just watch.
def add_allocation_to_read_model(event: events.Allocated, _):
redis_eventpublisher.update_readmodel(event.orderid, event.sku, event.batchref)
def remove_allocation_from_read_model(event: events.Deallocated, _):
redis_eventpublisher.update_readmodel(event.orderid, event.sku, None)
The helpers in our Redis module are one-liners:
def update_readmodel(orderid, sku, batchref):
r.hset(orderid, sku, batchref)
def get_readmodel(orderid):
return r.hgetall(orderid)
(maybe the name redis_eventpublisher.py is a misnomer now, but you get the idea).
And the view itself changes very slightly to adapt to its new backend:
def allocations(orderid):
batches = redis_eventpublisher.get_readmodel(orderid)
return [
{'batchref': b.decode(), 'sku': s.decode()}
for s, b in batches.items()
]
And the exact same integration tests that we had before still pass, because they are written at a level of abstraction that’s decoupled from the implementation: setup puts messages on the messagebus, and the assertions are against our view.
Tip
|
Event handlers are a great way to manage updates to a read model, if you decide you need one. They also make it easy to change the implementation of that read model at a later date. |
Tradeoffs of various view model options proposes some pros and cons for each of our options:
Option | Pros | Cons |
---|---|---|
Just use repositories |
Simple, consistent approach. |
Expect performance issues with complex query patterns. |
Use custom queries with your ORM |
Allows re-use of db configuration and model definitions |
Adds another query language with its own quirks and syntax. |
Use hand-rolled SQL |
Offers fine control over performance with a standard query syntax |
Changes to db schema have to be made to your hand-rolled queries and your ORM definitions. Highly normalised schemas may stil have performance limitations |
Create separate read stores with events |
Read-only copies are easy to scale out. Views can be constructed when data changes so that queries are as simple as possible. |
Complex technique. Harry will be forever suspicious of your tastes and motives |
As it happens, the allocation service at MADE.com does use "full blown" CQRS, with a read model stored in Redis, and even a second layer of cache provided by Varnish. But its use cases are actually quite a bit different from what we’ve shown here. For the kind of allocation service we’re building, it seems unlikely that you’d need to use a separate read model and event handlers for updating it.
But as your domain model becomes richer and more complex, a simplified read model become ever more compelling.
Often, your read operations will be acting on the same conceptual objects as your write model, so using the ORM, adding some read methods to your repositories, and using Domain Model classes for your read operations is just fine.
In our book example, the read operations act on quite different conceptual
entities to our Domain Model. The allocation service thinks in terms of
Batches
for a single sku, but users care about allocations for a whole order,
with multiple skus, so using the ORM ends up being a little awkward. We’d be
quite tempted to go with the raw-SQL view we showed right at the beginning of
the chapter.
On that note, let’s sally forth into our final chapter.