forked from OrleansContrib/Orleankka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathInfrastructure.cs
71 lines (57 loc) · 1.83 KB
/
Infrastructure.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleankka;
using Orleankka.Meta;
namespace Example
{
public abstract class CqsActor : Actor
{
public override Task<object> OnReceive(object message)
{
var cmd = message as Command;
if (cmd != null)
return HandleCommand(cmd);
var query = message as Query;
if (query != null)
return HandleQuery(query);
throw new InvalidOperationException("Unknown message type: " + message.GetType());
}
protected abstract Task<object> HandleCommand(Command cmd);
protected abstract Task<object> HandleQuery(Query query);
}
public abstract class EventSourcedActor : CqsActor
{
StreamRef stream;
public override Task OnActivate()
{
stream = System.StreamOf("sms", $"{GetType().Name}-{Id}");
return base.OnActivate();
}
protected override Task<object> HandleQuery(Query query)
{
return Dispatch(query);
}
protected override async Task<object> HandleCommand(Command cmd)
{
var events = await Dispatch<IEnumerable<Event>>(cmd);
foreach (var @event in events)
{
await Dispatch(@event);
await Project(@event);
}
return events;
}
Task Project(Event @event)
{
var envelope = Wrap(@event);
return stream.Push(envelope);
}
object Wrap(Event @event)
{
var envelopeType = typeof(EventEnvelope<>).MakeGenericType(@event.GetType());
return Activator.CreateInstance(envelopeType, Id, @event);
}
protected ActorRef Projection { get; set; }
}
}