Skip to content

Latest commit

 

History

History
303 lines (241 loc) · 9.02 KB

appendix_csvs.asciidoc

File metadata and controls

303 lines (241 loc) · 9.02 KB

Appendix A: Swapping Out the Infrastructure: Do Everything with CSVs

This appendix is intended as a little illustration of the benefits of the Repository, Unit of Work, and Service Layer patterns. It’s intended to follow on from [chapter_06_uow].

Just as we finish building out our Flask API and getting it ready for release, the business come to us apologetically saying they’re not ready to use our API and could we build a thing that reads just batches and orders from a couple of CSVs and outputs a third with allocations.

Ordinarily this is the kind of thing that might have a team cursing and spitting and making notes for their memoirs. But not us! Oh no, we’ve ensured that our infrastructure concerns are nicely decoupled from our domain model and service layer. Switching to CSVs will be a simple matter of writing a couple of new Repository and UnitOfWork classes, and then we’ll be able to reuse all of our logic from the domain layer and the service layer.

Here’s some E2E test to show you how the CSVs flow in and out:

Example 1. A first CSV test (tests/e2e/test_csv.py)
def test_cli_app_reads_csvs_with_batches_and_orders_and_outputs_allocations(
        make_csv
):
    sku1, sku2 = random_ref('s1'), random_ref('s2')
    batch1, batch2, batch3 = random_ref('b1'), random_ref('b2'), random_ref('b3')
    order_ref = random_ref('o')
    make_csv('batches.csv', [
        ['ref', 'sku', 'qty', 'eta'],
        [batch1, sku1, 100, ''],
        [batch2, sku2, 100, '2011-01-01'],
        [batch3, sku2, 100, '2011-01-02'],
    ])
    orders_csv = make_csv('orders.csv', [
        ['orderid', 'sku', 'qty'],
        [order_ref, sku1, 3],
        [order_ref, sku2, 12],
    ])

    run_cli_script(orders_csv.parent)

    expected_output_csv = orders_csv.parent / 'allocations.csv'
    with open(expected_output_csv) as f:
        rows = list(csv.reader(f))
    assert rows == [
        ['orderid', 'sku', 'qty', 'batchref'],
        [order_ref, sku1, '3', batch1],
        [order_ref, sku2, '12', batch2],
    ]

Diving in and implementing without thinking about repositories and all that jazz, you might start with something like this:

Example 2. A first cut of our CSV reader/writer (src/bin/allocate-from-csv)
#!/usr/bin/env python
import csv
import sys
from datetime import datetime
from pathlib import Path

from allocation import model

def load_batches(batches_path):
    batches = []
    with batches_path.open() as inf:
        reader = csv.DictReader(inf)
        for row in reader:
            if row['eta']:
                eta = datetime.strptime(row['eta'], '%Y-%m-%d').date()
            else:
                eta = None
            batches.append(model.Batch(
                ref=row['ref'],
                sku=row['sku'],
                qty=int(row['qty']),
                eta=eta
            ))
    return batches



def main(folder):
    batches_path = Path(folder) / 'batches.csv'
    orders_path = Path(folder) / 'orders.csv'
    allocations_path = Path(folder) / 'allocations.csv'

    batches = load_batches(batches_path)

    with orders_path.open() as inf, allocations_path.open('w') as outf:
        reader = csv.DictReader(inf)
        writer = csv.writer(outf)
        writer.writerow(['orderid', 'sku', 'batchref'])
        for row in reader:
            orderid, sku = row['orderid'], row['sku']
            qty = int(row['qty'])
            line = model.OrderLine(orderid, sku, qty)
            batchref = model.allocate(line, batches)
            writer.writerow([line.orderid, line.sku, batchref])



if __name__ == '__main__':
    main(sys.argv[1])

It’s actually not looking too bad! And we’re re-using our domain model objects and our domain service…​

But it’s actually not going to work. Existing allocations need to also be part of our permanent CSV storage. We can write a second test to force us to improve things:

Example 3. And another one, with existing allocations (tests/e2e/test_csv.py)
def test_cli_app_also_reads_existing_allocations_and_can_append_to_them(
        make_csv
):
    sku = random_ref('s')
    batch1, batch2 = random_ref('b1'), random_ref('b2')
    old_order, new_order = random_ref('o1'), random_ref('o2')
    make_csv('batches.csv', [
        ['ref', 'sku', 'qty', 'eta'],
        [batch1, sku, 10, '2011-01-01'],
        [batch2, sku, 10, '2011-01-02'],
    ])
    make_csv('allocations.csv', [
        ['orderid', 'sku', 'qty', 'batchref'],
        [old_order, sku, 10, batch1],
    ])
    orders_csv = make_csv('orders.csv', [
        ['orderid', 'sku', 'qty'],
        [new_order, sku, 7],
    ])

    run_cli_script(orders_csv.parent)

    expected_output_csv = orders_csv.parent / 'allocations.csv'
    with open(expected_output_csv) as f:
        rows = list(csv.reader(f))
    assert rows == [
        ['orderid', 'sku', 'qty', 'batchref'],
        [old_order, sku, '10', batch1],
        [new_order, sku, '7', batch2],
    ]

And we could keep hacking about and adding extra lines to that load_batches function, and some sort of way of tracking and saving new allocations…​

But we already have a model for doing that! It’s called our Repository and our Unit of Work.

All we need to do ("all we need to do") is reimplement those same abstractions, but with CSVs underlying them, instead of a database. And as you’ll see, it’s actually quite straightforward.

Implementing a Repository and Unit of Work for CSVs

Here’s what a CSV-based repository could look like. It abstracts away all the logic for reading CSVs from disk, including the fact that it has to read two different CSVs, one for batches and one for allocations, and it just gives us the familiar .list() API which gives us the illusion of an in-memory collection of domain objects.

Example 4. A repository that uses CSV as its storage mechanism (src/allocation/service_layer/csv_uow.py)
class CsvRepository(repository.AbstractRepository):

    def __init__(self, folder):
        self._batches_path = Path(folder) / 'batches.csv'
        self._allocations_path = Path(folder) / 'allocations.csv'
        self._batches = {}  # type: Dict[str, model.Batch]
        self._load()

    def get(self, reference):
        return self._batches.get(reference)

    def add(self, batch):
        self._batches[batch.reference] = batch

    def _load(self):
        with self._batches_path.open() as f:
            reader = csv.DictReader(f)
            for row in reader:
                ref, sku = row['ref'], row['sku']
                qty = int(row['qty'])
                if row['eta']:
                    eta = datetime.strptime(row['eta'], '%Y-%m-%d').date()
                else:
                    eta = None
                self._batches[ref] = model.Batch(
                    ref=ref, sku=sku, qty=qty, eta=eta
                )
        if self._allocations_path.exists() is False:
            return
        with self._allocations_path.open() as f:
            reader = csv.DictReader(f)
            for row in reader:
                batchref, orderid, sku = row['batchref'], row['orderid'], row['sku']
                qty = int(row['qty'])
                line = model.OrderLine(orderid, sku, qty)
                batch = self._batches[batchref]
                batch._allocations.add(line)

    def list(self):
        return list(self._batches.values())

And here’s what a Unit of Work for CSVs would look like:

Example 5. A Unit of Work for CSVs: commit = csv.writer. (src/allocation/service_layer/csv_uow.py)
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):

    def __init__(self, folder):
        self.batches = CsvRepository(folder)

    def commit(self):
        with self.batches._allocations_path.open('w') as f:
            writer = csv.writer(f)
            writer.writerow(['orderid', 'sku', 'qty', 'batchref'])
            for batch in self.batches.list():
                for line in batch._allocations:
                    writer.writerow(
                        [line.orderid, line.sku, line.qty, batch.reference]
                    )

    def rollback(self):
        pass

And once we have that, our CLI app for reading and writing batches and allocations to CSV is just pared down to what it should be: a bit of code for reading order lines, and a bit of code that invokes our existing service layer:

Example 6. Allocation with CSVs in 9 lines (src/bin/allocate-from-csv)
def main(folder):
    orders_path = Path(folder) / 'orders.csv'
    uow = csv_uow.CsvUnitOfWork(folder)
    with orders_path.open() as f:
        reader = csv.DictReader(f)
        for row in reader:
            orderid, sku = row['orderid'], row['sku']
            qty = int(row['qty'])
            services.allocate(orderid, sku, qty, uow)

Ta-da! NOW ARE Y’ALL IMPRESSED OR WHAT?

much love, Bob and Harry.