Skip to content

Latest commit

 

History

History
873 lines (673 loc) · 31.1 KB

chapter_12_cqrs.asciidoc

File metadata and controls

873 lines (673 loc) · 31.1 KB

Command-Query Responsibility Separation (CQRS)

In this chapter we’re going to start with a fairly uncontroversial insight: reads (commands) and writes (queries) are different, so they should be treated differently. Then we’re going to push that insight as far as we can. If you’re anything like Harry, this will all seem extreme at first, but hopefully we can make the argument that it’s not totally unreasonable.

apwp 1201
Figure 1. Separating reads from writes

Separating reads from writes shows where we might end up. Firstly though, why bother?

Tip

You can find our code for this chapter at github.com/cosmicpython/code/tree/chapter_12_cqrs.

git clone https://github.com/cosmicpython/code.git && cd code
git checkout chapter_12_cqrs
# or, if you want to code along, checkout the previous chapter:
git checkout chapter_11_external_events

Domain models are for writing

We’ve spent a lot of time in this book talking about how to build software that enforces the rules of our domain. These rules, or constraints, will be different for every application and they make up the interesting core of our systems.

In this book we’ve set explicit constraints like "You can’t allocate more stock than is available", as well as implicit constraints like "each order line is allocated to a single batch".

We wrote these rules down as unit tests at the beginning of the book.

Example 1. Our basic domain tests (tests/unit/test_batches.py)
def test_allocating_to_a_batch_reduces_the_available_quantity():
    batch = Batch("batch-001", "SMALL-TABLE", qty=20, eta=date.today())
    line = OrderLine('order-ref', "SMALL-TABLE", 2)

    batch.allocate(line)

    assert batch.available_quantity == 18

...

def test_cannot_allocate_if_available_smaller_than_required():
    small_batch, large_line = make_batch_and_line("ELEGANT-LAMP", 2, 20)
    assert small_batch.can_allocate(large_line) is False

In order to apply these rules properly, we needed to ensure that operations were consistent, and so we introduced patterns like unit of work and aggregates that help us commit small chunks of work.

To communicate changes between those small chunks, we introduced Domain Events so that we can write rules like "When stock is damaged or lost, adjust the available quantity on the batch, and re-allocate orders if necessary".

All of this complexity exists so that we can enforce rules when we change the state of our system. We’ve built a very flexible set of tools for writing data.

What about reads, though?

Most users aren’t going to buy your furniture

At MADE.com we have a system very like the Allocation service. In a busy day we might process 100 orders in an hour, and we have a big gnarly system for allocating stock to those orders.

In that same busy day, though, we might have 100 product views per second. Each time somebody visits a product page, or a product listing page, we need to figure out whether the product is still in stock, and how long it will take us to deliver it.

The domain is the same—​we’re concerned with batches of stock, and their arrival date, and the amount that’s still available—​but the access pattern is very different. For example, our customers won’t notice if the query is a few seconds out of date, but if our allocate service is inconsistent we’ll make a mess of their orders. We can take advantage of this difference by making our reads eventually consistent in order to make them perform better.

Is Read Consistency Truly Attainable?

This idea of trading consistency against performance makes a lot of developers nervous at first, so let’s talk quickly about that.

Let’s imagine that our "Get Available Stock" query is 30 seconds out of date when Bob visits the page for ILL-JUDGED-WARDROBE. Meanwhile, though, Harry has already bought the last item. When we try to allocate Bob’s order, we’ll get a failure and we’ll need to either cancel his order, or buy more stock and delay his delivery.

People who’ve only every worked with relational data stores get really nervous about this problem, but it’s worth considering two other scenarios to gain some perspective.

Firstly, let’s imagine that Bob and Harry both visit the page at the same time. Harry goes off to make coffee, and by the time he returns, Bob has already bought the last wardrobe. When Harry places his order, we send it to the allocation service and, because there’s not enough stock, we have to refund his payment or buy more stock and delay his delivery.

As soon as we render the product page, the data is already stale. This insight is key to understanding why reads can be safely inconsistent: we’ll always need to check the current state of our system when we come to allocate, because all distributed systems are inconsistent. As soon as you have a web server and two customers, you’ve got the potential for stale data.

Okay, let’s assume we solve that problem somehow: we magically build a totally consistent web application where nobody ever sees stale data. This time Harry gets to the page first and buys his wardrobe.

Unfortunately for him, when the warehouse staff try to dispatch his furniture it falls off the fork-lift truck and smashes into a zillion pieces. Now what?

The only thing to do is call Harry up and either refund his order or buy more stock and delay delivery.

No matter what we do, we’re always going to find that our software systems are inconsistent with reality and so we’ll always need business processes to cope with these edge cases. It’s okay to trade performance for consistency on the read side, because stale data is essentially unavoidable.

We can think of these these requirements as forming two halves of a system: the read side, and the write side, shown in Read vs Write.

Table 1. Read vs Write
Read side Write side

Behaviour

Simple read

Complex business logic

Cacheability

Highly cacheable

Uncacheable

Consistency

Can be stale

Must be transactionally consistent

For the write side, our fancy domain architectural patterns help us to evolve our system over time, but the complexity we’ve built so far doesn’t buy anything for reading data. The service layer, the unit of work, and the clever domain model are just bloat.

Post/Redirect/Get and CQS

If you’ve been doing web development, you’re probably familiar with the post/redirect/get pattern. This is a technique where a web endpoint accepts an HTTP POST, and responds with a redirect to see the result. For example, we might accept a POST to /batches to create a new batch, and redirect the user to /batches/123 to see their newly created batch.

This approach fixes the problems that arise when users refresh the results page in their browser, or try to bookmark a results page. In the case of a refresh, it can lead to our users double-submitting data, buying two sofas where they only needed one. In the case of a bookmark our hapless customers will end up with a broken page when they try to GET a POST endpoint.

Both these problems happen because we’re returning data in response to a write operation. Post/Redirect/Get side-steps the issue by separating the read and write phases of our operation.

This technique is a simple example of Command-Query Separation (CQS). In CQS we follow one simple rule: functions should either modify state, or answer questions, but never both. This makes software easier to reason about: we should always be able to ask "are the lights on?" without flicking the light switch.

Note
When building APIs we can apply the same design technique by returning a 201 Created, or a 202 Accepted, with a Location header containing the URI of our new resources. What’s important here isn’t the status code we use, but the logical separation of work into a write phase and a query phase.

As we’ll see, we can use the CQS principle to make our systems faster and more scalable, but first, let’s fix the CQS violation in our existing code. A few chapters ago we introduced an allocate endpoint that takes an order and calls our service layer to allocate some stock. At the end of the call, we return a 200 OK and the batch id. That’s led to some ugly design flaws so that we can get the data we need. Let’s change it to return a simple OK message, and instead provide a new read-only endpoint to retrieve allocation state.

Example 2. API test does a GET after the POST (tests/e2e/test_api.py)
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_202_and_batch_is_allocated():
    orderid = random_orderid()
    sku, othersku = random_sku(), random_sku('other')
    batch1, batch2, batch3 = random_batchref(1), random_batchref(2), random_batchref(3)
    api_client.post_to_add_batch(batch1, sku, 100, '2011-01-02')
    api_client.post_to_add_batch(batch2, sku, 100, '2011-01-01')
    api_client.post_to_add_batch(batch3, othersku, 100, None)

    r = api_client.post_to_allocate(orderid, sku, qty=3)
    assert r.status_code == 202

    r = api_client.get_allocation(orderid)
    assert r.ok
    assert r.json() == [
        {'sku': sku, 'batchref': batch2},
    ]


@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_unhappy_path_returns_400_and_error_message():
    unknown_sku, orderid = random_sku(), random_orderid()
    r = api_client.post_to_allocate(
        orderid, unknown_sku, qty=20, expect_success=False,
    )
    assert r.status_code == 400
    assert r.json()['message'] == f'Invalid sku {unknown_sku}'

    r = api_client.get_allocation(orderid)
    assert r.status_code == 404

OK what might the flask app look like?

Example 3. Endpoint for viewing allocations (src/allocation/entrypoints/flask_app.py)
from allocation import views
...

@app.route("/allocations/<orderid>", methods=['GET'])
def allocations_view_endpoint(orderid):
    uow = unit_of_work.SqlAlchemyUnitOfWork()
    result = views.allocations(orderid, uow)  #(1)
    if not result:
        return 'not found', 404
    return jsonify(result), 200
  1. All right, a views.py, fair enough, we can keep read-only stuff in there, and it’ll be a real views.py, not like Django’s, something that knows how to build read-only views of our data…​

Hold on to Your Lunch Folks.

…​so we can probably just add a list method to our existing repository obj-…​

Example 4. Views do…​ raw SQL??? (src/allocation/views.py)
from allocation.service_layer import unit_of_work

def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
    with uow:
        results = list(uow.session.execute(
            'SELECT ol.sku, b.reference'
            ' FROM allocations AS a'
            ' JOIN batches AS b ON a.batch_id = b.id'
            ' JOIN order_lines AS ol ON a.orderline_id = ol.id'
            ' WHERE ol.orderid = :orderid',
            dict(orderid=orderid)
        ))
    return [{'sku': sku, 'batchref': batchref} for sku, batchref in results]

Excuse me? Raw SQL?

— Our Readers

If you’re anything like Harry encountering this pattern for the first time, you’ll be wondering what on earth Bob has been smoking. We’re hand-rolling our own SQL now, and converting database rows directly to dicts? After all the effort we put into building a nice domain model? And what about Repository Pattern, isn’t that meant to be our abstraction around the database, why don’t we reuse that?

Well, let’s explore that seemingly simpler alternative first, and see what it looks like in practice.

We’ll still keep our view in a separate views.py module; enforcing a clear distinction between reads and writes in your application is still a good idea. We apply command-query separation, and it’s easy to see which code modifies state (the event handlers) and which code just retrieves read-only state (the views).

Tip
Split out your read-only views from your state-modifying command and event handlers.

Testing CQRS Views

Before we get into exploring various options, let’s talk about testing. Whichever approaches you decide to go for, you’re probably going to need at least one integration test. Something like this:

Example 5. An integration test for a view (tests/integration/test_views.py)
def test_allocations_view(sqlite_session_factory):
    uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
    messagebus.handle(commands.CreateBatch('sku1batch', 'sku1', 50, None), uow)  #(1)
    messagebus.handle(commands.CreateBatch('sku2batch', 'sku2', 50, date.today()), uow)
    messagebus.handle(commands.Allocate('order1', 'sku1', 20), uow)
    messagebus.handle(commands.Allocate('order1', 'sku2', 20), uow)
    # add a spurious batch and order to make sure we're getting the right ones
    messagebus.handle(commands.CreateBatch('sku1batch-later', 'sku1', 50, date.today()), uow)
    messagebus.handle(commands.Allocate('otherorder', 'sku1', 30), uow)
    messagebus.handle(commands.Allocate('otherorder', 'sku2', 10), uow)

    assert views.allocations('order1', uow) == [
        {'sku': 'sku1', 'batchref': 'sku1batch'},
        {'sku': 'sku2', 'batchref': 'sku2batch'},
    ]
  1. We do the setup for the integration test using the public entrypoint to our application, the messagebus. That keeps our tests decoupled from any impementation/infrastructure details about how things get stored.

"Obvious" Alternative 1: Using the Existing Repository

How about adding a helper method to our products repository?

Example 6. A simple view that uses the repository (src/allocation/views.py)
from allocation import unit_of_work

def allocations(orderid: str, uow: unit_of_work.AbstractUnitOfWork):
    with uow:
        products = uow.products.for_order(orderid=orderid)  #(1)
        batches = [b for p in products for b in p.batches]  #(2)
        return [
            {'sku': b.sku, 'batchref': b.reference}
            for b in batches
            if orderid in b.orderids  #(3)
        ]
  1. Our repository returns product objects, and we need to find all the products for the skus in a given order, so we’ll build a new helper method called .for_order() on the repository.

  2. Now we have products but we actually want batch references, so we get all the possible batches with a list comprehension.

  3. And then we filter again to get just the batches for our specific order. That in turn relies on our batch objects being able to tell us which order IDs it has allocated to it:

Example 7. An arguably-unnecessary property on our model (src/allocation/domain/model.py)
class Batch:
    ...

    @property
    def orderids(self):
        return {l.orderid for l in self._allocations}

You can start to see that reusing our existing repository and domain model classes is not as straightforward as you might have assumed. We’ve had to add new helper methods to both, and we’re doing a bunch of looping and filtering in Python, which is work that would be much more efficiently done by the database.

So, yes, on the plus side we’re re-using our existing abstractions, but on the downside, it all feels quite clunky.

Your Domain Model is not Optimized for Read Operations

What we’re seeing here are the effects of the fact that our domain model is designed primarily for write operations, and our requirements for reads are often conceptually quite different.

This is the chinstrokey-architect justification for CQRS. As we’ve said before, a Domain Model is not a data model—​we’re trying to capture the way the business works: workflow, rules around state changes, messages exchanged; concerns about how the system reacts to external events and user input. Most of this stuff is totally irrelevant for read-only operations.

Making a facile point, your domain classes will have a number of methods for modifying state, and you won’t need any of them for read-only operations.

As the complexity of your domain model grows, you will find yourself making more and more choices about how to structure that model, which make it more and more awkward to use for read operations.

Tip
This justification for CQRS is related to the justification for Domain Model. If you’re building a simple CRUD app, then reads and writes are going to be closely related, so you don’t need a Domain Model or CQRS. But the more complex your domain, the more likely you are to need both.

"Obvious" Alternative 2: Using the ORM

You may be thinking, OK, if our repository is clunky, and working with Products is clunky, then I can at least use my ORM and work with Batches. That’s what it’s for!

Example 8. A simple view that uses the ORM (src/allocation/views.py)
from allocation import unit_of_work, model

def allocations(orderid: str, uow: unit_of_work.AbstractUnitOfWork):
    with uow:
        batches = uow.session.query(model.Batch).join(
            model.OrderLine, model.Batch._allocations
        ).filter(
            model.OrderLine.orderid == orderid
        )
        return [
            {'sku': b.sku, 'batchref': b.batchref}
            for b in batches
        ]

But is that actually any easier to write or understand than the raw SQL version from Views do…​ raw SQL??? (src/allocation/views.py)? It may not look too bad up there, but we can tell you it took several attempts, and plenty of digging through the SQLAlchemy docs. SQL is just SQL.

But the ORM can also expose us to performance problems.

SELECT N+1, and Other Performance Considerations

The so-called SELECT N+1 problem is a common performance problem with ORMs: when retrieving a list of objects, your ORM will often perform an initial query to, say, get all the IDs of the objects it needs, and then issue individual queries for each object to retrieve their attributes. This is especially likely if there are any foreign key relationships on your objects.

Note
In all fairness we should say that SQLAlchemy is quite good at avoiding the SELECT N+1 problem. It doesn’t display it in the above example, and you can request eager loading explicitly to avoid it when dealing with joined objects.

Beyond SELECT N+1, you may have other reasons that you want to decouple the way you persist state changes from the way that you retrieve current state. A set of fully normalized relational tables is a good way to make sure that write operations never cause data corruption. But retrieving data using lots of JOINs can be slow. It’s common in such cases to add some denormalized views build read replicas, or even add caching layers.

Time to completely jump the shark

On that note: have we convinced you that our raw SQL version isn’t so weird as it first seemed? Perhaps we were exaggerating for effect? Just you wait.

So. Reasonable or not, that hardcoded SQL query is pretty ugly right? What if we made it nicer…​

Example 9. A much nicer query (src/allocation/views.py)
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
    with uow:
        results = list(uow.session.execute(
            'SELECT sku, batchref FROM allocations_view WHERE orderid = :orderid',
            dict(orderid=orderid)
        ))
        ...

…​by keeping a totally separate, denormalized datastore for our view model?

Example 10. Hee hee hee, no foreign keys, just strings, YOLO. (src/allocation/adapters/orm.py)
allocations_view = Table(
    'allocations_view', metadata,
    Column('orderid', String(255)),
    Column('sku', String(255)),
    Column('batchref', String(255)),
)

OK, nicer-looking SQL queries wouldn’t be a justification for anything really, but building a denormalized copy of your data that’s optimized for read operations isn’t uncommon, once you’ve reached the limits of what you can do with indexes.

Even with well-tuned indexes, a relational database uses a lot of CPU to perform joins. The fastest queries will always be SELECT * from MyTable WHERE key = :value

More than raw speed, though, this approach buys us scale. When we’re writing data to a relational database, we need to make sure that we get a lock over the rows we’re changing so that we don’t run into consistency problems.

If multiple clients are changing data at the same time, we’ll have weird race conditions. When we’re reading data, though, there’s no limit to the number of clients that can concurrently execute. For this reason read-only stores can be horizontally scaled out.

Tip
Because read replicas can be inconsistent, there’s no limit to how many we can have. If you’re struggling to scale a system with a complex data store, ask whether you could build a simpler read model.

Keeping them up to date is the challenge! Database views (materialized or otherwise) and triggers are a common solution, but that limits you to your database. We’d like to show you how we can reuse our event-driven architecture instead.

Updating a Read Model Table Using an Event Handler

We add a second handler to the Allocated event:

Example 11. Allocated event gets a new handler (src/allocation/service_layer/messagebus.py)
EVENT_HANDLERS = {
    events.Allocated: [
        handlers.publish_allocated_event,
        handlers.add_allocation_to_read_model
    ],

Here’s what our update-view-model code looks like:

Example 12. Update on allocation (src/allocation/service_layer/handlers.py)
def add_allocation_to_read_model(
        event: events.Allocated, uow: unit_of_work.SqlAlchemyUnitOfWork,
):
    with uow:
        uow.session.execute(
            'INSERT INTO allocations_view (orderid, sku, batchref)'
            ' VALUES (:orderid, :sku, :batchref)',
            dict(orderid=event.orderid, sku=event.sku, batchref=event.batchref)
        )
        uow.commit()

Believe it or not, that will pretty much work! And it will work against the exact same integration tests as the rest of our options.

(OK you’ll also need to handle deallocated:)

Example 13. A second listener for read model updates
events.Deallocated: [
    handlers.remove_allocation_from_read_model,
    handlers.reallocate
],

...

def remove_allocation_from_read_model(
        event: events.Deallocated, uow: unit_of_work.SqlAlchemyUnitOfWork,
):
    with uow:
        uow.session.execute(
            'DELETE FROM allocations_view '
            ' WHERE orderid = :orderid AND sku = :sku',

Sequence diagram for read model shows the flow across the two requests: two transactions in the POST/write operation, one to update the write model and one to update the read model, which the GET/read operation can use.

apwp 1202
Figure 2. Sequence diagram for read model
[plantuml, apwp_1202, config=plantuml.cfg]
@startuml
actor User order 1
boundary Flask order 2
participant MessageBus order 3
participant "Domain Model" as Domain order 4
participant View order 9
database DB order 10

User -> Flask: POST to allocate Endpoint
Flask -> MessageBus : Allocate Command

group UoW/transaction 1
    MessageBus -> Domain : allocate()
    MessageBus -> DB: commit write model
end

group UoW/transaction 2
    Domain -> MessageBus : raise Allocated event(s)
    MessageBus -> DB : update view model
end

Flask -> User: 202 OK

User -> Flask: GET allocations endpoint
Flask -> View: get allocations
View -> DB: SELECT on view model
DB -> View: some allocations
View -> Flask: some allocations
Flask -> User: some allocations

@enduml

"What happens when it breaks" should be the first question we ask as engineers.

How do we deal with a view model that hasn’t been updated because of a bug or temporary outage? Well, this is just another case where events and commands can fail independently.

If we never updated the view model, and the ILL-JUDGED-WARDROBE was forever in stock, that would be annoying for customers, but the allocate service would still fail, and we’d take action to fix the problem.

Rebuilding a view model is easy, though. Since we’re using a service layer to update our view model, we can write a tool that

  • Queries the current state of the write-side to work out what’s currently allocated.

  • Calls the add_allocate_to_read_model handler for each allocated item.

We can use this technique to create entirely new read-models from historical data.

Changing our Read Model Implementation is Easy

Let’s see the flexibility that our event-driven model buys us in action, by seeing what happens if we ever decide we want to implement a read model using a totally separate storage engine, Redis.

Just watch.

Example 14. Handlers update a Redis read model (src/allocation/service_layer/handlers.py)
def add_allocation_to_read_model(event: events.Allocated, _):
    redis_eventpublisher.update_readmodel(event.orderid, event.sku, event.batchref)

def remove_allocation_from_read_model(event: events.Deallocated, _):
    redis_eventpublisher.update_readmodel(event.orderid, event.sku, None)

The helpers in our Redis module are one-liners:

Example 15. Redis read model read + update (src/allocation/adapters/redis_eventpublisher.py)
def update_readmodel(orderid, sku, batchref):
    r.hset(orderid, sku, batchref)


def get_readmodel(orderid):
    return r.hgetall(orderid)

(maybe the name redis_eventpublisher.py is a misnomer now, but you get the idea).

And the view itself changes very slightly to adapt to its new backend:

Example 16. View adapted to redis (src/allocation/views.py)
def allocations(orderid):
    batches = redis_eventpublisher.get_readmodel(orderid)
    return [
        {'batchref': b.decode(), 'sku': s.decode()}
        for s, b in batches.items()
    ]

And the exact same integration tests that we had before still pass, because they are written at a level of abstraction that’s decoupled from the implementation: setup puts messages on the messagebus, and the assertions are against our view.

Tip
Event handlers are a great way to manage updates to a read model, if you decide you need one. They also make it easy to change the implementation of that read model at a later date.

But Would You Really? CRUD versus CQRS.

Tradeoffs of various view model options proposes some pros and cons for each of our options:

Table 2. Tradeoffs of various view model options
Option Pros Cons

Just use repositories

Simple, consistent approach.

Expect performance issues with complex query patterns.

Use custom queries with your ORM

Allows re-use of db configuration and model definitions

Adds another query language with its own quirks and syntax.

Use hand-rolled SQL

Offers fine control over performance with a standard query syntax

Changes to db schema have to be made to your hand-rolled queries and your ORM definitions. Highly normalised schemas may stil have performance limitations

Create separate read stores with events

Read-only copies are easy to scale out. Views can be constructed when data changes so that queries are as simple as possible.

Complex technique. Harry will be forever suspicious of your tastes and motives

As it happens, the allocation service at MADE.com does use "full blown" CQRS, with a read model stored in Redis, and even a second layer of cache provided by Varnish. But its use cases are actually quite a bit different from what we’ve shown here. For the kind of allocation service we’re building, it seems unlikely that you’d need to use a separate read model and event handlers for updating it.

But as your domain model becomes richer and more complex, a simplified read model become ever more compelling.

Often, your read operations will be acting on the same conceptual objects as your write model, so using the ORM, adding some read methods to your repositories, and using Domain Model classes for your read operations is just fine.

In our book example, the read operations act on quite different conceptual entities to our Domain Model. The allocation service thinks in terms of Batches for a single sku, but users care about allocations for a whole order, with multiple skus, so using the ORM ends up being a little awkward. We’d be quite tempted to go with the raw-SQL view we showed right at the beginning of the chapter.

On that note, let’s sally forth into our final chapter.