Skip to content

Commit

Permalink
Release v0.9 (#35)
Browse files Browse the repository at this point in the history
* added python 3.6 to tox

* Set dev version

* Ignore dist by git

* Merge from master

* show info on django_makemigrations when running make

* added index to ts field for django storage

* Respect aggregate type when querying events (#25)

* added pytest-watch to devel reqs

* travis should test against python 3.6

* updated tox config

* started fixing #24

* Fixed django implementation of get_events

* fixed sqlalchemy storage get_events method

* added test

* added tox to devel reqs

* updated travis and tox

* added migration

* Updated tox config

* start using aggregate type everywhere

* allow to pass args to pytest

* fixed sqlalchemy

* fixed local storage

* added changelog

* Renamed publish to handle_event (#27)

* Renamed publish to handle_event

* Updated changelog

* moved test fixture to conftest

* aggregate version should be 1 after first mutation

* use CQ_ instead of SES_

* use own import_string

* added test for aggregate version bump

* rollback imports at __init__

* added ability to upcast events - fixes #26 (#30)

* added ability to upcast events - fixes #26

* updated changelog

* refactor

* add revision when storing an event

* updated "make django_makemigrations"

* storages should include revision info

* added test

* typo

* Declarative app repos (#31)

* allow to define repos for app declaratively

* updated changelog

* declare repo

* Iter all events (#34)

* implemented LocalStorage.iter_all_events

* updated changelog

* implemented iter_all_events for Django storage

* implemented iter_all_events for sqlalchemy storage

* allow to replay events (fixes #22)

* started working on events replays

* move replay events to storage

* removed unused function

* better info on exception

* removed unused import

* fixed handler for Django test app

* removed print

* updated changelog

* fixed issue of registering multiple mutators under base Aggregate class (#36)

* upcasters should be defined at aggregate class (#37)

* freeze v0.9

* upcast events when rehydrating an aggregate (#38)

* replay_events should respect upcasters param (#39)
  • Loading branch information
lukaszb authored May 4, 2017
1 parent f1fde19 commit 4ca803c
Show file tree
Hide file tree
Showing 37 changed files with 760 additions and 112 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.tox
dist
venv
*.pyc
*.log
Expand Down
11 changes: 6 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
language: python

python:
- "3.5"
- "3.6"

sudo: false

env:
- TOX_ENV=py35-plain
- TOX_ENV=py35-sqla
- TOX_ENV=py35-d109
- TOX_ENV=py35-d110
- TOX_ENV=py36-plain
- TOX_ENV=py36-sqla
- TOX_ENV=py36-d109
- TOX_ENV=py36-d110
- TOX_ENV=py36-d111

matrix:
fast_finish: true
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Release 0.9
===========

- Added ability to replay events (use `Storage.replay_events()` function)
- Added ability to upcast events
- `Storage.get_events` now accepts both `aggregate_type` and `aggregate_id`
- Renamed `cq.handlers.publish` to `cq.handlers.handle_event`
- Repositories can now be declared using `repos` dictionary at the `BaseApp` subclasses
- Implemented `Storage.iter_all_events` method
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PYTHON ?= python3
export VIRTUAL_ENV := $(realpath .)/venv
export TOX_DIR := $(realpath .)/.tox
export PATH := $(VIRTUAL_ENV)/bin:$(PATH)
export DJANGO_MANAGE := $(TOX_DIR)/py35-d110/bin/python examples/djangoapp/manage.py
export DJANGO_MANAGE := $(TOX_DIR)/py36-d111/bin/python examples/djangoapp/manage.py
unexport WORKON_HOME PIP_RESPECT_VIRTUALENV PIP_VIRTUALENV_BASE

help:
Expand All @@ -15,6 +15,7 @@ help:
@echo ' make clean'
@echo ' make test'
@echo ' make tdd'
@echo ' make django_makemigrations'


# Top-level phony targets
Expand Down
9 changes: 9 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
from cq.storages import LocalMemoryStorage
import pytest


@pytest.fixture
def local_storage():
return LocalMemoryStorage()


def pytest_addoption(parser):
parser.addoption('--repeat', default=1, type='int', metavar='repeat',
help='Repeat each test specified number of times')
Expand Down
38 changes: 33 additions & 5 deletions cq/aggregates.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import cq.events
from cq.events import upcaster


__all__ = ['upcaster', 'Aggregate', 'Repository', 'register_mutator']


class Aggregate:
mutators = {}

def __init__(self, id):
self.id = id
self.version = 1
self.version = 0

def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__, self.id)
Expand All @@ -27,10 +33,17 @@ def get_mutator(self, name):
def get_name(cls):
return cls.__name__

@classmethod
def get_upcasters(cls):
return getattr(cls, 'upcasters', [])


def register_mutator(aggregate_class, event_name):

def outer(method):
if not hasattr(aggregate_class, 'mutators'):
aggregate_class.mutators = {}

if event_name in aggregate_class.mutators:
msg = "Mutator for action %s is already registered for %s" % (event_name, aggregate_class)
raise RuntimeError(msg)
Expand All @@ -57,11 +70,26 @@ def __init__(self, storage):
msg = "Repository must define aggregate. Please set %s.aggregate to Aggregate subclass"
raise RuntimeError(msg % self)

def store(self, name, aggregate_id, data=None):
return self.storage.store(name, aggregate_id, data)
def store(self, name, aggregate_id, data=None, revision=1):
event = self.storage.store(
aggregate_type=self.aggregate_class.get_name(),
name=name,
aggregate_id=aggregate_id,
data=data,
revision=revision,
)
event = self.upcast_event(event)
# this way whenever we store an event it would already be upcasted for a handler
return event

def upcast_event(self, event):
upcasters = self.aggregate_class.get_upcasters()
event = cq.events.upcast(event, upcasters)
return event

def get_events(self, aggregate_id):
return self.storage.get_events(aggregate_id)
events = self.storage.get_events(self.get_aggregate_name(), aggregate_id)
return [self.upcast_event(event) for event in events]

def get_aggregate(self, aggregate_id):
aggregate = self.aggregate_class(aggregate_id)
Expand Down
29 changes: 29 additions & 0 deletions cq/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,30 @@
from . import genuuid
from . import settings
from .aggregates import Repository
from .events import upcaster


__all__ = ['BaseApp', 'command', 'query', 'upcaster']


def command(method):
method.is_command = True
return method


def query(method):
method.is_query = True
return method


class BaseApp:
storage_class = None
storage_kwargs = {}
repos = {}

def __init__(self):
self.storage = self.get_storage()
self.create_repos()

def get_storage(self):
storage_class = self.get_storage_class()
Expand All @@ -34,3 +50,16 @@ def get_repo_for_aggregate(self, aggregate_class):
name = '%sRepository' % aggregate_class
repo_class = type(name, (Repository,), {'aggregate_class': aggregate_class})
return repo_class(storage=self.storage)

def get_commands(self):
objects = (getattr(self, attr) for attr in dir(self))
return [obj for obj in objects if getattr(obj, 'is_command', False)]

def get_queries(self):
objects = (getattr(self, attr) for attr in dir(self))
return [obj for obj in objects if getattr(obj, 'is_query', False)]

def create_repos(self):
for repo_name, aggregate in self.repos.items():
repo = self.get_repo_for_aggregate(aggregate)
setattr(self, repo_name, repo)
23 changes: 23 additions & 0 deletions cq/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from importlib import import_module


# bluntly copied from Django

def import_string(dotted_path):
"""
Import a dotted module path and return the attribute/class designated by the
last name in the path. Raise ImportError if the import failed.
"""
try:
module_path, class_name = dotted_path.rsplit('.', 1)
except ValueError as err:
raise ImportError("%s doesn't look like a module path" % dotted_path) from err

module = import_module(module_path)

try:
return getattr(module, class_name)
except AttributeError as err:
raise ImportError('Module "%s" does not define a "%s" attribute/class' % (
module_path, class_name)
) from err
21 changes: 21 additions & 0 deletions cq/contrib/django/migrations/0002_ts_field_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.3 on 2017-03-29 23:39
from __future__ import unicode_literals

from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
('cq', '0001_initial'),
]

operations = [
migrations.AlterField(
model_name='event',
name='ts',
field=models.DateTimeField(db_index=True, default=django.utils.timezone.now),
),
]
38 changes: 38 additions & 0 deletions cq/contrib/django/migrations/0003_event_aggregate_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import unicode_literals
from django.db import migrations, models


def split_event_names(apps, schema_editor):
Event = apps.get_model('cq', 'Event')
for event in Event.objects.all():
event.aggregate_type, event.name = event.name.split('.', 1)
event.save(update_fields=['aggregate_type', 'name'])


def join_event_names(apps, schema_editor):
Event = apps.get_model('cq', 'Event')
for event in Event.objects.all():
event.aggregate_type, event.name = event.name.split('.', 1)
event.name = '%s.%s' % (event.aggregate_type, event.name)


class Migration(migrations.Migration):

dependencies = [
('cq', '0002_ts_field_index'),
]

operations = [
migrations.AddField(
model_name='event',
name='aggregate_type',
field=models.CharField(db_index=True, max_length=128, null=True),
),
migrations.RunPython(split_event_names, join_event_names),
# now we can make aggregate_type non-nullable
migrations.AlterField(
model_name='event',
name='aggregate_type',
field=models.CharField(db_index=True, max_length=128),
),
]
20 changes: 20 additions & 0 deletions cq/contrib/django/migrations/0004_event_revision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-04-30 04:26
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('cq', '0003_event_aggregate_type'),
]

operations = [
migrations.AddField(
model_name='event',
name='revision',
field=models.PositiveIntegerField(default=1),
),
]
6 changes: 4 additions & 2 deletions cq/contrib/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

class Event(models.Model):
id = models.CharField(max_length=128, primary_key=True, default=genuuid)
ts = models.DateTimeField(default=timezone.now)
name = models.CharField(max_length=128, db_index=True)
ts = models.DateTimeField(default=timezone.now, db_index=True)
aggregate_id = models.CharField(max_length=128, db_index=True)
aggregate_type = models.CharField(max_length=128, db_index=True)
name = models.CharField(max_length=128, db_index=True)
data = jsonfield.JSONField(null=True)
revision = models.PositiveIntegerField(default=1)

def __str__(self):
return '%s | %s | %s' % (self.name, self.aggregate_id, self.ts)
Expand Down
16 changes: 13 additions & 3 deletions cq/contrib/django/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ class DjangoStorage(Storage):
def append(self, event):
obj = to_model(event)
obj.save()
return obj
return from_model(obj)

def get_events(self, aggregate_id):
qs = EventModel.objects.filter(aggregate_id=aggregate_id).order_by('ts')
def iter_all_events(self):
return (from_model(e) for e in EventModel.objects.order_by('ts').iterator())

def get_events(self, aggregate_type, aggregate_id=None):
qs = EventModel.objects.filter(aggregate_type=aggregate_type)
if aggregate_id:
qs = qs.filter(aggregate_id=aggregate_id)
qs = qs.order_by('ts')
return (from_model(e) for e in qs)

def book_unique(self, namespace, value, aggregate_id):
Expand All @@ -37,7 +43,9 @@ def to_model(event):
id=event.id,
name=event.name,
aggregate_id=event.aggregate_id,
aggregate_type=event.aggregate_type,
data=event.data,
revision=event.revision,
)


Expand All @@ -46,6 +54,8 @@ def from_model(instance):
id=instance.id,
name=instance.name,
aggregate_id=instance.aggregate_id,
aggregate_type=instance.aggregate_type,
data=instance.data,
ts=instance.ts,
revision=instance.revision,
)
20 changes: 17 additions & 3 deletions cq/contrib/sqlalchemy/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ class EventModel(Base):
id = sqlalchemy.Column(sqlalchemy.String(128), primary_key=True, default=genuuid)
name = sqlalchemy.Column(sqlalchemy.String(255), index=True)
aggregate_id = sqlalchemy.Column(sqlalchemy.String(255), index=True)
aggregate_type = sqlalchemy.Column(sqlalchemy.String(128), index=True)
data = sqlalchemy.Column(sqlalchemy.Text(), default='{}')
ts = sqlalchemy.Column(sqlalchemy.DateTime(), index=True)
revision = sqlalchemy.Column(sqlalchemy.Integer(), default=1)


class SqlAlchemyStorage(Storage):
Expand All @@ -32,12 +34,19 @@ def append(self, event):
session = self.get_session()
session.add(obj)
session.commit()
return obj
return from_model(obj)

def get_events(self, aggregate_id):
def iter_all_events(self):
session = self.get_session()
return (from_model(e) for e in session.query(EventModel).order_by(EventModel.ts))

def get_events(self, aggregate_type, aggregate_id):
session = self.get_session()
# TODO: should be ordered by version, not ts (otoh ts should also work)
query = session.query(EventModel).filter(EventModel.aggregate_id==aggregate_id).order_by(EventModel.ts)
query = session.query(EventModel).filter(EventModel.aggregate_type == aggregate_type)
if aggregate_id:
query = query.filter(EventModel.aggregate_id == aggregate_id)
query = query.order_by(EventModel.ts)
return (from_model(e) for e in query)

def book_unique(self, namespace, value, aggregate_id=None):
Expand Down Expand Up @@ -79,16 +88,21 @@ def to_model(event):
return EventModel(
id=event.id,
name=event.name,
aggregate_type=event.aggregate_type,
aggregate_id=event.aggregate_id,
data=json.dumps(event.data),
ts=event.ts,
revision=event.revision,
)


def from_model(instance):
return Event(
id=instance.id,
aggregate_type=instance.aggregate_type,
name=instance.name,
aggregate_id=instance.aggregate_id,
data=json.loads(instance.data),
ts=instance.ts,
revision=instance.revision,
)
Loading

0 comments on commit 4ca803c

Please sign in to comment.