@@ -20,6 +20,7 @@ import java.util.Optional
20
20
21
21
import akka .NotUsed
22
22
import akka .actor .ActorSystem
23
+ import akka .event .{Logging , LoggingAdapter }
23
24
import akka .stream .scaladsl .Flow
24
25
import com .google .protobuf .{Descriptors , Any => JavaPbAny }
25
26
import com .google .protobuf .any .{Any => ScalaPbAny }
@@ -35,14 +36,16 @@ import io.cloudstate.javasupport.impl.{
35
36
ResolvedEntityFactory ,
36
37
ResolvedServiceMethod
37
38
}
39
+ import io .cloudstate .protocol .entity .{Command , Failure }
38
40
import io .cloudstate .protocol .event_sourced .EventSourcedStreamIn .Message .{
39
41
Command => InCommand ,
40
42
Empty => InEmpty ,
41
43
Event => InEvent ,
42
44
Init => InInit
43
45
}
44
- import io .cloudstate .protocol .event_sourced .EventSourcedStreamOut .Message .{Reply => OutReply }
46
+ import io .cloudstate .protocol .event_sourced .EventSourcedStreamOut .Message .{Failure => OutFailure , Reply => OutReply }
45
47
import io .cloudstate .protocol .event_sourced ._
48
+ import scala .util .control .NonFatal
46
49
47
50
final class EventSourcedStatefulService (val factory : EventSourcedEntityFactory ,
48
51
override val descriptor : Descriptors .ServiceDescriptor ,
@@ -65,11 +68,53 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory,
65
68
this
66
69
}
67
70
71
+ object EventSourcedImpl {
72
+ final case class EntityException (entityId : String , commandId : Long , commandName : String , message : String )
73
+ extends RuntimeException (message)
74
+
75
+ object EntityException {
76
+ def apply (message : String ): EntityException =
77
+ EntityException (entityId = " " , commandId = 0 , commandName = " " , message)
78
+
79
+ def apply (command : Command , message : String ): EntityException =
80
+ EntityException (command.entityId, command.id, command.name, message)
81
+
82
+ def apply (context : CommandContext , message : String ): EntityException =
83
+ EntityException (context.entityId, context.commandId, context.commandName, message)
84
+ }
85
+
86
+ object ProtocolException {
87
+ def apply (message : String ): EntityException =
88
+ EntityException (entityId = " " , commandId = 0 , commandName = " " , " Protocol error: " + message)
89
+
90
+ def apply (init : EventSourcedInit , message : String ): EntityException =
91
+ EntityException (init.entityId, commandId = 0 , commandName = " " , " Protocol error: " + message)
92
+
93
+ def apply (command : Command , message : String ): EntityException =
94
+ EntityException (command.entityId, command.id, command.name, " Protocol error: " + message)
95
+ }
96
+
97
+ def failure (cause : Throwable ): Failure = cause match {
98
+ case e : EntityException => Failure (e.commandId, e.message)
99
+ case e => Failure (description = " Unexpected failure: " + e.getMessage)
100
+ }
101
+
102
+ def failureMessage (cause : Throwable ): String = cause match {
103
+ case EntityException (entityId, commandId, commandName, _) =>
104
+ val commandDescription = if (commandId != 0 ) s " for command [ $commandName] " else " "
105
+ val entityDescription = if (entityId.nonEmpty) s " entity [ $entityId] " else " entity"
106
+ s " Terminating $entityDescription due to unexpected failure $commandDescription"
107
+ case _ => " Terminating entity due to unexpected failure"
108
+ }
109
+ }
110
+
68
111
final class EventSourcedImpl (_system : ActorSystem ,
69
112
_services : Map [String , EventSourcedStatefulService ],
70
113
rootContext : Context ,
71
114
configuration : Configuration )
72
115
extends EventSourced {
116
+ import EventSourcedImpl ._
117
+
73
118
private final val system = _system
74
119
private final val services = _services.iterator
75
120
.map({
@@ -79,6 +124,8 @@ final class EventSourcedImpl(_system: ActorSystem,
79
124
})
80
125
.toMap
81
126
127
+ private val log = Logging (system.eventStream, this .getClass)
128
+
82
129
/**
83
130
* The stream. One stream will be established per active entity.
84
131
* Once established, the first message sent will be Init, which contains the entity ID, and,
@@ -99,18 +146,17 @@ final class EventSourcedImpl(_system: ActorSystem,
99
146
case (Seq (EventSourcedStreamIn (InInit (init), _)), source) =>
100
147
source.via(runEntity(init))
101
148
case _ =>
102
- // todo better error
103
- throw new RuntimeException (" Expected Init message" )
149
+ throw ProtocolException (" Expected Init message" )
104
150
}
105
151
.recover {
106
- case e =>
107
- // FIXME translate to failure message
108
- throw e
152
+ case error =>
153
+ log.error(error, failureMessage(error))
154
+ EventSourcedStreamOut ( OutFailure (failure(error)))
109
155
}
110
156
111
157
private def runEntity (init : EventSourcedInit ): Flow [EventSourcedStreamIn , EventSourcedStreamOut , NotUsed ] = {
112
158
val service =
113
- services.getOrElse(init.serviceName, throw new RuntimeException ( s " Service not found: ${init.serviceName}" ))
159
+ services.getOrElse(init.serviceName, throw ProtocolException (init, s " Service not found: ${init.serviceName}" ))
114
160
val handler = service.factory.create(new EventSourcedContextImpl (init.entityId))
115
161
val thisEntityId = init.entityId
116
162
@@ -137,33 +183,35 @@ final class EventSourcedImpl(_system: ActorSystem,
137
183
(event.sequence, None )
138
184
case ((sequence, _), InCommand (command)) =>
139
185
if (thisEntityId != command.entityId)
140
- throw new IllegalStateException (" Receiving entity is not the intended recipient of command" )
141
- val cmd = ScalaPbAny .toJavaProto(command.payload.get)
142
- val context = new CommandContextImpl (thisEntityId,
143
- sequence,
144
- command.name,
145
- command.id,
146
- service.anySupport,
147
- handler,
148
- service.snapshotEvery)
186
+ throw ProtocolException (command, " Receiving entity is not the intended recipient of command" )
187
+ val cmd =
188
+ ScalaPbAny .toJavaProto(command.payload.getOrElse(throw ProtocolException (command, " No command payload" )))
189
+ val context =
190
+ new CommandContextImpl (thisEntityId, sequence, command.name, command.id, service.anySupport, log)
149
191
150
192
val reply = try {
151
- handler.handleCommand(cmd, context) // FIXME is this allowed to throw
193
+ handler.handleCommand(cmd, context)
152
194
} catch {
153
- case FailInvoked =>
154
- Optional .empty[ JavaPbAny ]()
155
- // Ignore, error already captured
195
+ case FailInvoked => Optional .empty[ JavaPbAny ]() // Ignore, error already captured
196
+ case e : EntityException => throw e
197
+ case NonFatal (error) => throw EntityException (command, " Unexpected failure: " + error.getMessage)
156
198
} finally {
157
199
context.deactivate() // Very important!
158
200
}
159
201
160
202
val clientAction = context.createClientAction(reply, false )
161
203
162
204
if (! context.hasError) {
163
- val endSequenceNumber = sequence + context.events.size
205
+ // apply events from successful command to local entity state
206
+ context.events.zipWithIndex.foreach {
207
+ case (event, i) =>
208
+ handler.handleEvent(ScalaPbAny .toJavaProto(event), new EventContextImpl (thisEntityId, sequence + i + 1 ))
209
+ }
164
210
211
+ val endSequenceNumber = sequence + context.events.size
212
+ val performSnapshot = (endSequenceNumber / service.snapshotEvery) > (sequence / service.snapshotEvery)
165
213
val snapshot =
166
- if (context. performSnapshot) {
214
+ if (performSnapshot) {
167
215
val s = handler.snapshot(new SnapshotContext with AbstractContext {
168
216
override def entityId : String = entityId
169
217
override def sequenceNumber : Long = endSequenceNumber
@@ -195,9 +243,9 @@ final class EventSourcedImpl(_system: ActorSystem,
195
243
))
196
244
}
197
245
case (_, InInit (i)) =>
198
- throw new IllegalStateException ( " Entity already inited" )
246
+ throw ProtocolException (init, " Entity already inited" )
199
247
case (_, InEmpty ) =>
200
- throw new IllegalStateException ( " Received empty/unknown message" )
248
+ throw ProtocolException (init, " Received empty/unknown message" )
201
249
}
202
250
.collect {
203
251
case (_, Some (message)) => EventSourcedStreamOut (message)
@@ -213,25 +261,22 @@ final class EventSourcedImpl(_system: ActorSystem,
213
261
override val commandName : String ,
214
262
override val commandId : Long ,
215
263
val anySupport : AnySupport ,
216
- val handler : EventSourcedEntityHandler ,
217
- val snapshotEvery : Int )
264
+ val log : LoggingAdapter )
218
265
extends CommandContext
219
266
with AbstractContext
220
267
with AbstractClientActionContext
221
268
with AbstractEffectContext
222
269
with ActivatableContext {
223
270
224
271
final var events : Vector [ScalaPbAny ] = Vector .empty
225
- final var performSnapshot : Boolean = false
226
272
227
273
override def emit (event : AnyRef ): Unit = {
228
274
checkActive()
229
- val encoded = anySupport.encodeScala(event)
230
- val nextSequenceNumber = sequenceNumber + events.size + 1
231
- handler.handleEvent(ScalaPbAny .toJavaProto(encoded), new EventContextImpl (entityId, nextSequenceNumber))
232
- events :+= encoded
233
- performSnapshot = (snapshotEvery > 0 ) && (performSnapshot || (nextSequenceNumber % snapshotEvery == 0 ))
275
+ events :+= anySupport.encodeScala(event)
234
276
}
277
+
278
+ override protected def logError (message : String ): Unit =
279
+ log.error(" Fail invoked for command [{}] for entity [{}]: {}" , commandName, entityId, message)
235
280
}
236
281
237
282
class EventSourcedContextImpl (override final val entityId : String ) extends EventSourcedContext with AbstractContext
0 commit comments