-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to do grid wide processing per partition? #123
Comments
The question would be what you intend to do with the You can run entry processors (or aggregators) against specific partitions using a |
Wow super fast responce "Gridman" - thanks! Will investigate that direction further! |
Another approach to target a single partition is to create a class that implements |
That also sounds worth looking into. I actually have multiple use-cases on my "radar" so different ones may come in handy... What is by the way the simplest way to find the configured number of partitions programmatically? |
To get the partition count programatically you need to be in a cluster member. Then you can get the service for the cache you are interested in, cast it to a |
I created a special cache that weill not contain any data and only will be used to be able to invoke processors using "dummy" partition aware keys (one for each configured partition) and using that I was able to invoke an entry processor once for each partition. In order to do something meaningful I do however need to get to the binary data of my real caches for each partition. Once again I am a bit lost in all the managers, contexts and other Coherence classes I can get to :-( I assume I should turn the entry passed into the processor (invoked on the non-exisitng partition aware key) into a binary entry (hopefully this is possible also for a at this point non-exisitng entry!) but where to go from there? |
Which version of Coherence are you using, because some simpler ways to get related cache entries were added in newer versions. |
I do these experiments with latest version.
…On Wed, Apr 24, 2024, 18:04 Jonathan Knight ***@***.***> wrote:
Which version of Coherence are you using, because some simpler ways to get
related cache entries were added in newer versions.
—
Reply to this email directly, view it on GitHub
<#123 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADXQF77GQNT4MEZZOOK7JDY67JYJAVCNFSM6AAAAABGWVAQMSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANZVGMYDEOJYGI>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
OK, here is a snippet for an entry processor that gets entries from another cache
One thing to be aware of if you intend to enlist multiple other cache entries and you might have multiple instances of this entry processor executing that might try to enlist the same entries. You need to avoid deadlocks - Coherence will throw an exception if two EPs try to enlist the same entries but in different orders to avoid a deadlock. But to really make sure, it is safest to sort the keys that you will enlist, so all EP instance lock keys in the same order, then they will just wait for each other. For read only entries, this does not matter as they are not locked. |
I updated the comment above because the new APIs to get a related entry |
Thanks a lot for the nice example - will build on it!
Are there any article or other resources where one can learn more about
this type of low level Coherence programming. The Java docs are a bit brief
and in situations where you need to cast it is even harder to figure it
out....
…On Wed, Apr 24, 2024, 18:30 Jonathan Knight ***@***.***> wrote:
I updated the comment above because the new APIs to get a related entry
binaryEntry.getAssociatedEntry() take the key as a non-serialised value
(whereas the other APIs on the BackingMapContext take Binary keys).
—
Reply to this email directly, view it on GitHub
<#123 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADXQF5WJIHV65EQ7X4TJN3Y67MYXAVCNFSM6AAAAABGWVAQMSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANZVGM3TIMRZGE>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
Unfortunately this is one of those areas that has no documentation apart from the JavaDoc. |
As for your question about what I need to do in the processor - the project I am first looking at right now is the one I wrote a bit about in the discussion section i.e. to perform an as quick as possible dump / restore of all my partitioned caches to/from AWS S3 objects (one per partition) to enable fast creation of test environments with saved cache content. Even though the Coherence snapshot feature exists and is possible to extend I am thinking of at least initially just do a quick POC to see how much faster one can dump and more importantly restore the partitioned back maps of our near caches using low level programming compared to the traditional load from database we do today. If this becomes as fast as I hope I can look into if it would make sense to integrate it with Coherence snapshot framework (would be nice but may require too much work). My thinking for this POC is so far as follows:
**From the examples you have shared I now know how to get hold of individual entries but I still do not know how to get hold of:
I have seen that I from a binary entry can get its backing map (but this method is deprecated so not good to rely on going forward) and this is just for the current cache. As I do not necessarily know any key in the other caches I cant use the "associatedEntry" method as a first step and through that get hold of the other backing maps... Also for the restore there are not yet any entries in any of the caches and once again I do not in the general case know any keys that belong to any particular partition so I need a method to get hold of a mutable backing map without a key. My hypotesis is that this should be a lot faster than what we do today where we:
With the new method we would:
|
The biggest issue I can see is that while you are creating or restoring your own "snapshot" what happens if other requests come in to read or mutate data. They will either fail or read inconsistent data. The Coherence snapshot process suspends the cache service while data is being snapshotted or restored, so there can be no requests while this is in progress. Although there is a management API to allow you to suspend and unsuspend a service, you could not use this because once the service is suspended you would not be able to execute your entry processors against it. |
Thanks for input - I envision we will only create and restore data to the
cache when system is for sure not receiving any traffic i.e. when preparing
a consistent test environment "template" (a database dump and corresponding
S3 cache dump) after traffic is disabled and in the process of
creating/starting a new environment (before traffic is enabled).
I was also thinking about how the coherence snapshot could run when cache
service was suspended but assuming above this is not required for my POC
(or even future production use).
…On Fri, Apr 26, 2024 at 3:13 PM Jonathan Knight ***@***.***> wrote:
The biggest issue I can see is that while you are creating or restoring
your own "snapshot" what happens if other requests come in to read or
mutate data. They will either fail or read inconsistent data. The Coherence
snapshot process suspends the cache service while data is being snapshotted
or restored, so there can be no requests while this is in progress.
Although there is a management API to allow you to suspend and unsuspend a
service, you could not use this because once the service is suspended you
would not be able to execute your entry processors against it.
—
Reply to this email directly, view it on GitHub
<#123 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADXQF2D5OADRKAZMTMT52DY7JHIXAVCNFSM6AAAAABGWVAQMSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANZZGM3TGMZVGI>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
I completed a first naive implementation of first part of the MVP to create a snapshot of a single partitioned cache into S3 - by using a partition filter I sent an EntryProcessor (returning null) to each partition that used the streaming support in S3 to upload data in a buffered fashion (to avoid having to create a possibly large byte array of the content of a partition that if performed in parallel most likely would result in OOM). As I understand it I need to make one call to cache.invokeAll for each partition or else the processor will only be invoked once on each node with all the entries from all the partitions on that node so right now I spawned off a virtual thread for each partition (I run with 31 right now) that each make a cache.invokeAll call each. The performance even with this first unoptimized way of uploading the data is great - running on my laptop locally (not even in AWS) I easily bottlenecked by 1Gbit Internet connection (to Ireland from Sweden) with the parallel S3 object uploads and I suspect this will be significantly better in AWS where my VMs / containers will have ~5Gbit EACH and network latency will be a lot lower. Ideally I would however have liked to instead in parallel send only one processor of some kind to each storage enabled node and have it upload one S3 object for each partition owned by that node for ALL partitioned caches (or perhaps the caches specified in the constructor to the processor) - this way multiple possibly small caches will not each produce a large number of S3 objects but I have not figured out if this is even possible... Next I need to restore the S3 objects and I am still trying to figure out how to do this - for this I do not necessarily (unless say locking in Coherence requires it) invoke one processor per partition - rather I would like to once again invoke one processor per storage enabled node that in parallel would stream the persisted binary keys/values from the S3 objects corresponding to the owned partitions and insert them directly into the backing maps. The methods for retrieving backing maps I have found so far seem to be deprecated in favor of methods working with single entries so I have still to figure out how to insert binary entries into an empty cache backing map.... Any suggestions are warmly appreciated! |
I continue the prototype implementation using the deprecated method getBackingMap as it is perfect (even seems needed for this use case) in the hope it will not be removed without replacing it with something similar but better:
As aa side note it seems an odd choice that the getAssociatedEntry requires a non-serialized key - this would really slow things down in my case as I right now is operating on only serialized keys and values and having to de-serialize all the keys of the whole cache just to satisfy this method would be unfortunate... Will by the way take a break from working on this due to vacation travel but after that get back to it again :-) |
Hi, are you sure you don't want to just use SnapshotArchiver? It does what you describe... You don't have to wrestle partition ownership and handle entry/key associations for per-member processing, iterate through backing maps or deal with serialization/conversion. Among others. Just give that a shot and see if it meets your SLAs. Otherwise to do what you describe in an efficient manner would likely require going down to the message level. See how BinaryMap works, it is the client for all distributed cache operations. Be aware that using internals code will expose you to possible changes we will make. For you that means possible reverse engineer sessions and rewrites. It seems you already have the S3 downstream processing down, you're almost there. |
The documentation describes quite clearly how to run processors of different kinds for a specific key, for all keys or for each cache node but what about if I want to process the data in each partition separately?
Lacking any "out of the box" solution to do this my current idea is to use an AbstractInvocable that will run on each storage enabled node and then let it find the partitions of the desired cache(s) on that node and do whatever I want to do with them one by one or in parallel (for instance using virtual threads).
I have so far not found how to find the number of partitions on each node (not necessarily perfectly evenly balanced between nodes) or indeed the actual keys/values for each partition :-(
So far I have written this code for my Invocable
Is this the right way to access what I want and if so how do I get further? Cant really find the right methods from here?
The text was updated successfully, but these errors were encountered: