Skip to content

Latest commit

 

History

History
808 lines (613 loc) · 28.2 KB

chapter_08_events_and_message_bus.asciidoc

File metadata and controls

808 lines (613 loc) · 28.2 KB

Events and the Message Bus

So far we’ve spent a lot of time and energy on a simple problem that we could easily have solved with Django. You might be asking if the increased testability and expressiveness are really worth all the effort.

In practice, though, we find that it’s not the obvious features that make a mess of our codebases: it’s the goop around the edge. It’s reporting, and permissions and workflows that touch a zillion objects.

Our example will be a typical notification requirement: when we can’t allocate an order because we’re out of stock, we should alert the buying team. They’ll go and fix the problem by buying more stock, and all will be well.

For a first version, our product owner says we can just send the alert by email.

Let’s see how our architecture holds up once we need to plug in some of the mundane stuff that makes up so much of our systems.

We’ll start by doing the simplest, most expeditious thing, and talk about why it’s exactly this kind of decision that leads us to Big Ball of Mud.

Then we’ll show how to use Domain Events to separate side-effects from our use cases, and how to build a simple Message Bus for triggering behavior based on those events. We’ll show a few different options for creating those events and how to pass them to the message bus, and finally we’ll show how the Unit of Work can be modified to connect the two together elegantly, as previewed in Events flowing through the system.

apwp 0801
Figure 1. Events flowing through the system
Tip

You can find our code for this chapter at github.com/cosmicpython/code/tree/chapter_08_events_and_message_bus.

git clone https://github.com/cosmicpython/code.git && cd code
git checkout chapter_08_events_and_message_bus
# or, if you want to code along, checkout the previous chapter:
git checkout chapter_07_aggregate

Avoiding Making a Mess.

So. Email alerts when we run out of stock. When we have a new requirement like this, that’s not really to do with the core domain, it’s all too easy to start dumping these things into our web controllers:

First, Avoid Making a Mess of of Our Web Controllers

Example 1. Just whack it in the endpoint, what could go wrong? (src/allocation/entrypoints/flask_app.py)
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
    line = model.OrderLine(
        request.json['orderid'],
        request.json['sku'],
        request.json['qty'],
    )
    try:
        uow = unit_of_work.SqlAlchemyUnitOfWork()
        batchref = services.allocate(line, uow)
    except (model.OutOfStock, services.InvalidSku) as e:
        send_mail(
            'out of stock',
            '[email protected]',
            f'{line.orderid} - {line.sku}'
        )
        return jsonify({'message': str(e)}), 400

    return jsonify({'batchref': batchref}), 201

As a one-off hack, this might be okay, but it’s easy to see how we can quickly end up in a mess by patching things in this way. Sending emails isn’t the job of our HTTP layer, and we’d like to be able to unit test this new feature.

…​ And Let’s Not Make a Mess of Our Model Either

Assuming we don’t want to put this code into our web controllers, because we want them to be as thin as possible, we may look at putting it right at the source, in the model:

Example 2. Email-sending code in our model isn’t lovely either (src/allocation/domain/model.py)
    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(
                b for b in sorted(self.batches) if b.can_allocate(line)
            )
            #...
        except StopIteration:
            email.send_mail('[email protected]', f'Out of stock for {line.sku}')
            raise OutOfStock(f'Out of stock for sku {line.sku}')

But that’s even worse! We don’t want our model to have any dependencies on infrastructure concerns like email.send_mail.

This email sending thing is unwelcome goop messing up the nice clean flow of our system. What we’d like is to keep our domain model focused on the rule "You can’t allocate more stuff than is actually available."

The domain model’s job is to know that we’re out of stock, but the responsibility of sending an alert belongs elsewhere. We should be able to turn this feature on or off, or to switch to SMS notifications instead, without needing to change the rules of our domain model.

…​ Or the Service Layer!

The requirement "Try to allocate some stock, and send an email if it fails" is an example of workflow orchestration: it’s a set of steps that the system has to follow to achieve a goal.

We’ve written a service layer to manage orchestration for us, but even here the feature feels out of place:

Example 3. And in the services layer it’s out of place (src/allocation/service_layer/services.py)
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except model.OutOfStock:
            email.send_mail('[email protected]', f'Out of stock for {line.sku}')
            raise

Catching an exception and re-raising it? I mean, it could be worse, but it’s definitely making us unhappy. Why is it so hard to find a suitable home for this code?

Single Responsibility Principle

Really this is a violation of the single responsibility principle[1]. Our use case is allocation. Our endpoint, service function, and domain methods are all called allocate, not allocate_and_send_mail_if_out_of_stock.

Tip
Rule of thumb: if you can’t describe what your function does without using words like "then" or "and," you might be violating the SRP.

One formulation of the SRP is that each class should only have a single reason to change. When we switch from email to SMS, we shouldn’t have to update our "allocate" function, because that’s clearly a separate responsibility.

To solve the problem, we’re going to split the orchestration[2] into separate steps, so that the different concerns don’t get tangled up. The domain model’s job is to know that we’re out of stock, but the responsibility of sending an alert belongs elsewhere. We should be able to turn this feature on or off, or to switch to SMS notifications instead, without needing to change the rules of our domain model.

We’d also like to keep the service layer free of implementation details. We want to apply the Dependency Inversion Principle to notifications, so that our service layer depends on an abstraction, in the same way as we avoid depending on the database by using a UnitOfWork.

All Aboard the Message Bus!

The patterns we’re going to introduce here are Domain Events and the Message Bus. There’s a few different ways you can implement them, so we’ll show a couple of different ones before settling on the one we most like.

The Model Records Events

First, rather than being concerned about emails, our model will be in charge of recording "events"--facts about things that have happened. We’ll use a Message Bus to respond to events, and invoke some new operation.

Events Are Simple Dataclasses

An Event is a kind of value object. They don’t have any behavior, because they’re pure data structures. We always name events in the language of the domain, and we think of them as part of our domain model.

We could store them in model.py, but we may as well keep them in their own file. (this might be a good time to consider refactoring out a directory called "domain," so we have domain/model.py and domain/events.py).

Example 4. Event classes (src/allocation/domain/events.py)
from dataclasses import dataclass

class Event:  #(1)
    pass

@dataclass
class OutOfStock(Event):  #(2)
    sku: str
  1. Once we have a number of events we’ll find it useful to have a parent class that can store common attributes. It’s also useful for type hints in our message bus, as we’ll see shortly.

  2. dataclasses are great for domain events too.

The Model Raises Events

When our domain model records a fact that happened, we say it "raises" an event.

Here’s what it will look like from the outside: if we ask Product to allocate but it can’t, it should raise an event:

Example 5. Test Our Aggregate Raises Events (tests/unit/test_product.py)
def test_records_out_of_stock_event_if_cannot_allocate():
    batch = Batch('batch1', 'SMALL-FORK', 10, eta=today)
    product = Product(sku="SMALL-FORK", batches=[batch])
    product.allocate(OrderLine('order1', 'SMALL-FORK', 10))

    allocation = product.allocate(OrderLine('order2', 'SMALL-FORK', 1))
    assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK")  #(1)
    assert allocation is None
  1. Our Aggregate will expose a new attribute called .events which will contain a list of facts about what has happened, in the form of Event objects.

Here’s what it looks like on the inside:

Example 6. The model raises a domain event (src/allocation/domain/model.py)
class Product:

    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # type: List[events.Event]  #(1)

    def allocate(self, line: OrderLine) -> str:
        try:
            #...
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))  #(2)
            # raise OutOfStock(f'Out of stock for sku {line.sku}')  #(3)
            return None
  1. Here’s our new .events attribute in use.

  2. Rather than invoking some email-sending code directly, we record those events at the place they occur, using only the language of the domain.

  3. We’re also going to stop raising an exception for the out-of-stock case. The event will do the job the exception was doing.

Note
We’re actually addressing a code smell we had until now, which is that we were using exceptions for control flow. In general, if you’re implementing domain events, don’t raise exceptions to describe the same domain concept. As we’ll see later when we handle events in the Unit of Work, it’s confusing to have to reason about events and exceptions together.

The Message Bus Maps Events to Handlers

A message bus basically says "when I see this event, I should invoke the following handler function". In other words, it’s a simple publish-subscribe system. Handlers are subscribed to receive events, which we publish to the bus. It sounds harder than it is, and we usually implement it with a dict:

Example 7. Simple message bus (src/allocation/service_layer/messagebus.py)
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)


def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        '[email protected]',
        f'Out of stock for {event.sku}',
    )


HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],

}  # type: Dict[Type[events.Event], List[Callable]]
Note
Note that the messagebus as implemented doesn’t give us concurrency since only one handler will run at a time. Our objective isn’t to support parallel threads, but to separate tasks conceptually, and keep each unit of work as small as possible. This helps us to understand the code base because the "recipe" for how to run each use-case is written in a single place. See Is this like Celery?.
Is this like Celery?

Celery is a popular tool in the Python world for deferring self-contained chunks of work to an asynchronous task queue. The messagebus we’re presenting here is very different, so the short answer is no; our messagebus has more in common with a node.js app, a UI event loop, or an actor framework.

If you do have a requirement for moving work off the main thread, you can still use our event-based metaphors, but we would suggest you use external events for that. There’s more discussion in [chapter_11_external_events_tradeoffs], but essentially, if you implement a way of persisting events to a centralized store, then you can subscribe other containers or other microservices to them. Then you can extend the same concept of using events to separate responsibilities across units of work within a single process / service, to being across multiple processes—​which may be different containers within the same service, or totally different microservices.

If you follow us in this approach, then your API for distributing tasks is your event classes—​or a JSON representation of them. This allows you a lot of flexibility in who you distribute tasks to, they need not necessarily be Python services. Celery’s API for distributing tasks is essentially "function name plus arguments", which is more restrictive, and Python-only.

Option 1 : The Service Layer Takes Events from the Model and Puts them on the Message Bus

Our domain model raises events, and our message bus will call the right handlers whenever an event happens. Now all we need is to connect the two. We need something to catch events from the model and pass them to the message bus—​the "publishing" step.

The simplest way to do this is by adding some code into our service layer.

Example 8. The service layer with an explicit message bus (src/allocation/service_layer/services.py)
from . import messagebus
...

def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        try:  #(1)
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        finally:  #(1)
            messagebus.handle(product.events)  #(2)
  1. We keep the try/finally from our ugly earlier implementation (we haven’t got rid of all exceptions yet, just OutOfStock).

  2. But now instead of depending directly on some email infrastructure, the service layer is just in charge of passing events from the model up to the message bus.

That already avoids some of the ugliness that we had in our naive implementation, and we have several systems that work like this, in which the service layer explicitly collects events from aggregates, and passes them to the messagebus.

Option 2: The Service Layer Raises Its Own Events

Another variant on this which we’ve used is that you can have the service layer in charge of creating and raising events directly, rather than having them raised by the domain model.

Example 9. Service layer calls messagebus.handle directly (src/allocation/service_layer/services.py)
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        uow.commit()

        if batchref is None:
            messagebus.handle(events.OutOfStock(line.sku))
        return batchref

Again, we have applications in production that implement the pattern in this way. What works for you will depend on the particular trade-offs you face, but we’d like to show you what we think is the most elegant solution, in which we put the unit of work in charge of collecting and raising events.

Option 3: The Unit of Work Can Publish Events to the Message Bus

The UoW already has a try/finally, and it knows about all the aggregates currently in play because it provides access to the Repository. So it’s a good place to spot events and pass them to the message bus:

Example 10. The UoW meets the message bus (src/allocation/service_layer/unit_of_work.py)
class AbstractUnitOfWork(abc.ABC):
    ...

    def commit(self):
        self._commit()  #(1)
        self.publish_events()  #(2)

    def publish_events(self):  #(2)
        for product in self.products.seen:  #(3)
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

    @abc.abstractmethod
    def _commit(self):
        raise NotImplementedError

...

class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    ...

    def _commit(self):  #(1)
        self.session.commit()
  1. We’ll change our commit method to require a private ._commit() method from subclasses.

  2. After committing, we run through all the objects that our repository has seen and pass their events to the message bus.

  3. That relies on the repository keeping track of aggregates that have been loaded using a new attribute, .seen, as we’ll see in the next listing.

Note
Are you wondering about error-handling, what happens if one of the handlers fails? We’ll discuss that in detail in [chapter_10_commands].
Example 11. Repository tracks aggregates that pass through it (src/allocation/adapters/repository.py)
class AbstractRepository(abc.ABC):

    def __init__(self):
        self.seen = set()  # type: Set[model.Product]  #(1)

    def add(self, product: model.Product):  #(2)
        self._add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:  #(3)
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):  #(2)
        raise NotImplementedError

    @abc.abstractmethod  #(3)
    def _get(self, sku) -> model.Product:
        raise NotImplementedError



class SqlAlchemyRepository(AbstractRepository):

    def __init__(self, session):
        super().__init__()
        self.session = session

    def _add(self, product):  #(2)
        self.session.add(product)

    def _get(self, sku):  #(3)
        return self.session.query(model.Product).filter_by(sku=sku).first()
  1. For the UoW to be able to publish new events, it needs to be able to ask the repository for which Product objects have been used during this session. We use a set called .seen to store them. That means our implementations need to call super().__init__().

  2. The parent add() method adds things to .seen, and now requires subclasses to implement ._add()

  3. Similarly, .get() delegates to a ._get() function, to be implemented by subclasses, in order to capture objects seen.

Note
The use of ._underscorey() methods and subclassing is definitely not the only way you could implement these patterns. Have a go at the exercise for the reader in this chapter and experiment with some alternatives.

Once the UoW and repository collaborate in this way to automatically keep track of live objects and process their events, the service layer can now be totally free of event-handling concerns:

Example 12. Service layer is clean again (src/allocation/service_layer/services.py)
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        uow.commit()
        return batchref

We do also have to remember to change the fakes in the service layer and make them call super() in the right places, and implement underscorey methods, but the changes are minimal:

Example 13. Service-layer fakes need tweaking. (tests/unit/test_services.py)
class FakeRepository(repository.AbstractRepository):

    def __init__(self, products):
        super().__init__()
        self._products = set(products)

    def _add(self, product):
        self._products.add(product)

    def _get(self, sku):
        return next((p for p in self._products if p.sku == sku), None)

...

class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
    ...

    def _commit(self):
        self.committed = True

You may be starting to worry that maintaining these fakes is going to be a maintenance burden. There’s no doubt that it is work, but in our experience it’s not a lot of work. Once your project is up and running, the interface for your repository and UoW abstractions really don’t change much. And if you’re using ABC’s, they’ll help remind you when things get out of sync.

Exercise for the Reader

Are you finding all those ._add() and ._commit() methods "super-gross", in the words of our beloved tech reviewer Hynek? Does it "make you want to beat Harry around the head with a plushie snake?" Hey, our code listings are only meant to be examples, not the perfect solution! Why not go see if you can do better?

One composition over inheritance way to go would be to implement a wrapper class:

Example 14. A wrapper adds functionality and then delegates (src/adapters/repository.py)
class TrackingRepository:
    seen: Set[model.Product]

    def __init__(self, repo: AbstractRepository):
        self.seen = set()  # type: Set[model.Product]
        self._repo = repo

    def add(self, product: model.Product):  #(1)
        self._repo.add(product)  #(1)
        self.seen.add(product)

    def get(self, sku) -> model.Product:
        product = self._repo.get(sku)
        if product:
            self.seen.add(product)
        return product
  1. By wrapping the repository, we can call the actual .add() and .get() methods, avoiding weird underscore methods.

See if you can apply a similar pattern to our Unit of Work class, in order to get rid of those Java-ey _commit() methods too?

Tip
Switching all the ABCs to typing.Protocol is a good way to force yourself to avoid using inheritance.

Let us know if you come up with something nice!

Wrap-Up

Domain events give us a way to handle workflows in our system. We often find, listening to our domain experts, that they express requirements in a causal or temporal way, for example "When we try to allocate stock, but there’s none available, then we should send an email to the buying team".

The magic words "When X then Y" often tell us about an event that we can make concrete in our system. Treating events as first-class things in our model helps us to make our code more testable and observable, and helps to isolate concerns.

Events are useful for more than just sending emails, though. In [chapter_05_high_gear_low_gear] we spent a lot of time convincing you that you should define aggregates, or boundaries where we guarantee consistency. People often ask "what should I do if I need to change multiple aggregates as part of a request?" Now we have the tools we need to answer the question.

If we have two things that can be transactionally isolated (eg. an Order and a Product) then we can make them eventually consistent by using events. When an Order is cancelled, then we should find the products that were allocated to it, and remove the allocations.

In Events and the Message Bus, we’ll look at this idea in more detail as we build a more complex workflow with our new message bus.

Recap: Domain Events and the Message Bus
Events can help with the Single Responsibility Principle

Code gets tangled up when we mix multiple concerns in one place. Events can help us to keep things tidy by separating primary use-cases from secondary ones. We also use events for communicating between aggregates so that we don’t need to run long-running transactions that lock against multiple tables.

A Message Bus routes messages to handlers

You can think of a message bus as a dict that maps from events to their consumers. It doesn’t "know" anything about the meaning of events, it’s just a piece of dumb infrastructure for getting messages around the system.

Option 1: Service Layer raises events and passes them to Message Bus

The simplest way to start using events in your system is to raise them from handlers, by calling bus.handle(some_new_event) after you commit your unit of work.

Option 2: Domain Model raises events, Service Layer passes them to Message Bus

The logic about when to raise an event really should live with the model, so we can improve our system’s design and testability by raising events from the domain model. It’s easy for our handlers to collect events off the model objects after commit and pass them to the bus.

Option 3: Unit of Work collects events from Aggregates and passes them to Message Bus

Adding bus.handle(aggregate.events) to every handler is annoying, so we can tidy up by making our unit of work responsible for raising events that were raised by loaded objects. This is the most complex design and might rely on ORM magic, but it’s clean and easy to use once it’s set up.

Table 1. Domain Events: The Trade-Offs
Pros Cons
  • A message bus gives us a nice way to separate responsibilities when we have to take multiple actions in response to a request.

  • Event Handlers are nicely decoupled from the "core" application logic, making it easy to change their implementation later.

  • Domain events are a great way to model the real world, and we can use them as part of the business language we use when modeling with stakeholders.

  • The Message Bus is an additional thing to wrap your head around; the implementation in which unit of work raises events for us which is neat but also magic. It’s not obvious when we call 'commit' that we’re also going to go and send emails to people.

  • What’s more, that hidden event-handling code executes synchronously, meaning your service-layer function doesn’t finish until all the handlers for any events are finished. That could potentially cause unexpected performance problems in your web endpoints (adding asynchronous processing is possible but makes things even more confusing.)

  • More generally, event-driven workflows can be confusing because once things are split across a chain of multiple handlers, there is no single place in the system where you can understand how a request will be fulfilled.

  • You also open yourself up to the possibility of circular dependencies between your event handlers, and infinite loops.


1. the S from SOLID
2. Our tech reviewer Ed likes to say that the move from imperative to event-based flow control changes what used to be orchestration into choreography.