Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read journal for DynamoDB #50

Open
manocha-aman opened this issue May 20, 2016 · 17 comments
Open

Read journal for DynamoDB #50

manocha-aman opened this issue May 20, 2016 · 17 comments

Comments

@manocha-aman
Copy link

To be able to use events generated on the read side using Akka Persistence Query.

@patriknw
Copy link
Member

Contributions here would be very interesting

@manocha-aman
Copy link
Author

Thanks for your reply Patrik,
I will be very happy to contribute. I will check how it can be done.

On another note, do you think DynamoDB triggers which launch AWS Lambda functions can be used to populate views?

@joost-de-vries
Copy link
Contributor

@patriknw We're looking into implementing this and offering a PR.
Do you have any specific requirements that we should keep in mind? F.i. wrt Dynamodb performance.
Also; do you have a suggestion which implementation of Akka Persistence Query would be the best example to take as inspiration?

@patriknw
Copy link
Member

Sounds good. One consideration might be the cost aspect of dynamosb queries.

The cassandra plugin is good, but it's using ActorPublisher which should probably be implemented with GraphStage instead.

Another thing, cassandra plugin is polling. If DynamoDB has some push notification that would be nice to use.

@joost-de-vries
Copy link
Contributor

@patriknw There's something called Dynamodb Streams which publishes the events on a Dynamodb table if so configured. The aws client provides an api with a so called iterator. That still sounds like polling. But I haven't analysed the source code to see if they use a streaming protocol...

Regarding your remark about using GraphStage: there's now a Akka Streams Dynamodb implementation.
I guess we could use that as a basis if we'd go the polling route, right?

@patriknw
Copy link
Member

ok

@joost-de-vries
Copy link
Contributor

A quick update: I'm pretty far with implementing support for snapshots. I'm doing this on top of a new project I started so progress is not that fast but it is steady.

@joost-de-vries
Copy link
Contributor

After implementing snapshot support #57 I started working on a read journal.

I first looked into using Dynamodb Streams api to implement this. Once you create a stream writes are written to that stream and are available for 24 hours. So the stream has to be created from the beginning that data is written in Dynamodb and has to run at all times that data is written, otherwise after 24 hours data gets lost. While the Akka Persistence Query api leaves room for instantiating the read journal and querying it at any time.
So I don't think the Dynamodb Streams api is suitable for implementing a read journal.
(Just as well since they are rather hand wavy about how to properly support resharding and appropriate parallellism and direct you to their Kinesis adapter)

So I've started implementing this by querying Dynamodb itself in a polling manner.
The GraphLogic for filtering out 'uncommitted' batches looks rather inscrutable so I hope I'll be able to reuse that as is.

@joost-de-vries
Copy link
Contributor

At Scala Days @patriknw mentioned that it would be nice do the query up to now using the standard polling api and from there to use Dynamodb Streams.
That's a good idea.
I'll implement it first with polling only. And the use of Dynamodb Streams can be added as a separate optimisation.

@joost-de-vries
Copy link
Contributor

joost-de-vries commented Jul 31, 2017

@johanandren @ktoso Since I finished the snapshot support I'm gearing up to implement query.

There are three things I'd like to discuss before I start:

Can we break this up into multiple PRs somehow? Perhaps create a query branch and implement features step by step, PR by PR. And have code review discussions piecemeal. The reason I mention this is that it's my current understanding that the optimal implementation of persistenceIds, currentPersistenceIds, eventsByPersistenceId, currentEventsByPersistenceId, eventsByTag and currentEventsByTag is not trivial to implement on Dynamodb and I think it may be quite a bit of work that requires design choices. And its better to have the work and design choices piecemeal I find.

I'd like to discuss the intended execution plan of queries before implementing them. Would this issue be the best venue for that? Or creating a temporary markdown file in the query branch?

In issue #61 I raise the question of futures vs streams and traits vs functions with implicits. And code reuse. The outcome of that discussion is obviously relevant for implementing this issue. F.i. the recovery logic is very close to eventsByPersistenceId.

@joost-de-vries
Copy link
Contributor

@johanandren @ktoso
I've been doing a lot of thinking how to implement query journal in a performant way. Here's some code exploring how to implement currentEventsByTag.
The major decision is whether we keep storing the individual events only in the journal. Or whether we store them also in a 'eventsByTag' table. The latter makes it much easier and more performant to get all events in an ordered way. But if I'm not mistaken it would also very much undo the high throughput optimisations of the current implementation. Because the partitionkey would be the tag. And that would get much hotter than persistenceid as partitionkey. And even that got too hot.
So I implemented it another way, using the events in the journal table and an extra 'tags' table that's only updated every 100 events.
Please let me know what you think of this as a direction.

@aquamatthias
Copy link

@joost-de-vries I am currently evaluating dynamodb as akka persistence storage engine and as such also very interested in the read side. What is the status? Is there any progress?

@joost-de-vries
Copy link
Contributor

@aquamatthias I think the best way to implement this is to use Dynamodb Streams. So I've done a bit of work on the side of Alpakka to get that going.
Unfortunately Dynamodb Streams api is not that well documented.
Currently I haven't got much time to work on it.

@lutzh
Copy link

lutzh commented Feb 12, 2018

It seems new versions of the akka-persistence-cassandra plugin (from 0.80) no longer use a Cassandra materialized view, but instead introduce a separate table tag_views. I'm wondering if this model is maybe easier to port to dynamodb, and if anyone has given this any thought already?

@coreyoconnor
Copy link
Member

The j5ik2o dynamodb plugin implementation:

looks to use a polled scan request lifted to a stream.

@coreyoconnor
Copy link
Member

Looking into this further. I agree with akka/akka-persistence-dynamodb#50 (comment)

  • DynamoDB streams are efficient for following recent updates but not sufficient by themselves. The added operations cost is also worth keeping in mind.
  • I suspect DynamoDB streams are not a hard requirement. A polling based solution can, with lower efficiency, satisfy the functional requirements. I also suspect a polling based solution can be added for free and not block migrating to dynamodb streams for a bit of speed for a bit of cost.
  • this looks to require a global secondary index which is eventually consistent.
  • I haven't considered tags at all.

some odd secondary indices may be required ;)

In terms of implementation process, I suspect it will look like so:

  1. prep
  2. currentEventsByPersistenceId
  3. currentPersistenceIds
  4. eventsByPersistenceId
  5. persistenceIds
  6. ... consider tags

each a separate PR and at least snapshot release for each.

@kali786516

This comment was marked as off-topic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants