-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: open ended zarrs #39
Comments
This is an intriguing use case and proposal! One quick comment re removing old data from the beginning of an array, there is an open proposal (https://github.com/zarr-developers/zarr/issues/267) which would enable this without having to move any data around or reorganise chunks. That proposal is actually intended to allow arrays to be prepended to, i.e., to grow in the "negative" direction, but the same mechanism would I think allow you to shrink too, i.e., to remove data from the beginning of one or more dimensions. I haven't pushed on that because it would add a little complexity to the implementation and I didn't want to do that without a firm use case, but would be happy to revisit if it would be useful for you. Regarding supporting parallel appends, could you provide a little more detail about how you'd like this to work? I can't quite imagine how you could have multiple workers appending to a dimension of an array in parallel without at least some kind of synchronisation, i.e., ensuring the appends occur in some consistent order and don't get mixed up with each other. The actual writing of data can be parallelised, but your workers would have to somehow synchronise with each other to at least agree on which region of the array each will write to, so you don't end up with two workers overwriting each other. Am I making sense? |
You're making total sense! I'll check out that issue, @mrocklin did mention it the other day but I'll have a closer look. You raise an interesting point, I don't have any answers at the moment. In terms of appending we already have prior knowledge of the shape of the target array, and we know it will be orthogonal. We also know the shape and size of the incoming data. The issue would be if a chunk contains data which came from more than one ingested file. This would require multiple parallel tasks to write to the same chunk. We could work around this by ensuring the chunksize of the zarr is the same as the chunks of the incoming data, but this may not be ideal. |
Of course another possible solution to removing old data would be to have
some program that is aware of what chunks need to be deleted and deletes
them directly from the underlying store without changing array metadata.
Then the shape of the array would stay the same, but some regions of the
array would become empty and return whatever fill value had been set. I
guess it depends on whether you want the old regions to be removed without
a trace, or to just remove data but leave some record of the old regions
behind in array metadata.
…On Mon, 5 Nov 2018, 09:12 Jacob Tomlinson ***@***.*** wrote:
Problem
We store gridded fields which are continuously being appended to and
having old data expunged.
An example of this would be a weather forecast dataset which is being
modified hourly as new simulation data is generated and old expired data is
removed. Currently we are storing this data as many netCDF files in S3
which expire after 24 hours. However the downside to this is that we have
to maintain a metadata index somewhere describing all the files (around
half a million at any one time).
We would like to explore storing this data using zarr, but have concerns
about how to logically partition the data. Ideally we would like to
maintain one zarr group which stores each field as a zarr array, however we
are facing some challenges with this:
- Every time we append data to the array we have to update the
attributes with a new shape. Data is ingested in parallel and we are
storing it in S3, so maintaining a file lock on the metadata is not
possible and updating the object may not replicate fast enough.
- We wish to remove old data from the beginning of the array.
Currently it is my understanding that this would require re-indexing all
chunks each time as well as rewriting the metadata.
Proposal
One idea we've had to resolve this is to change the way the attributes are
stored and chunks are loaded.
- We could make storing the shape of the array optional. This would
mean that when we append new chunks we would not need to update the
metadata.
- If zarr attempts to load a chunk which does not exist (either
because it hasn't been created yet or it has been expunged) it could return
a NaN array of the correct shape.
In practice this would mean that I could load my weather forecast zarr and
request a slice of temperature data, for 12pm the following day from the 48
simulations run up until that time. This would result in an array with NaN
values for old runs which have been removed and new runs which are yet to
be run, but it will contain the data which does currently exist including
the very latest simulation available.
This would result in ever increasing indices, which would eventually hit
limits on object key length. One solution to that would be to create a
parallel zarr occasionally which would reset the index and ingest into both
and eventually remove the older one when the new one gets to a certain age.
This would result in some duplication of data but would avoid this problem.
Conclusion
I would be really keen to hear feedback from the community on this. I
discussed it last week with @mrocklin <https://github.com/mrocklin> and
@jhamman <https://github.com/jhamman>. Would be good to hear more
thoughts on this.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<https://github.com/zarr-developers/zarr/issues/323>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAq8Qo_wBQfutnbz3qQDYlIBXOFLNrw0ks5usADmgaJpZM4YOCM9>
.
|
For expiring data that was what I was trying to propose. The chunks would be removed automatically by the underlying storage system. In our case we would have an S3 bucket policy which deletes files older than 24 hours (unless they are named |
On Mon, 5 Nov 2018, 10:57 Jacob Tomlinson ***@***.*** wrote:
For expiring data that was what I was trying to propose. The chunks would
be removed automatically by the underlying storage system. In our case we
would have an S3 bucket policy which deletes files older than 24 hours
(unless they are named .zattrs, etc). We would need zarr to fail
gracefully when trying to access these chunks and return a fill value array.
Cool, yes that will work gracefully with the current implementation. When a
user tries to read a region of an array, if any of the underlying chunks
are missing then the returned data will be filled with the fill value.
I.e., you can safely delete chunks.
|
Ah that's great! |
For parallel appends, what if you use Note: It's important to |
Yes jakirkham's comment captures it nicely.
In conceptual terms, an append operation consists of three tasks. First you
have to read the array metadata to find out the current shape of the array.
Then you write the array metadata to set the new shape. Then you write the
data into the new region of the array. This final task has multiple
subtasks, one for each chunk overlapping the new region.
Say you want to execute some append operations, let's number them 1, 2, 3,
etc. The first append operation has tasks 1-read-metadata,
1-write-metadata, 1-write-data, etc.
The metadata tasks have to be synchronized. I.e., 1-write-metadata has to
finish before 2-read-metadata can start. Otherwise you could end up with
two append operations writing data into the same region of the array.
However, writing data does not have to be synchronized. I.e., you don't
have to wait for 1-write-data to finish before you can start 2-write-data.
Also you can obviously run all the subtasks within all data writing tasks
in parallel, as long as they are writing to different chunks.
In other words, there are potentially two levels of parallelism you can
achieve. One is that within a single append operation, you can write all
the chunks in parallel. This is what jakirkham's comment says. The second
is that you can also overlap the data writing tasks from multiple append
operations.
Or put another way, you only need to synchronize metadata read/write, in
order to update the shape (i.e., resize) the array correctly.
The next question is, how to synchronise metadata read/write. There are
several ways. You could do it in you own code, by simply executing all
resize operations in a single thread/process, and only parallelizing the
chunk writing (as in jakirkham's comment). You could also get zarr to help
you with this, via locking. The only advantage to using locking here is if
you want to implement the second level of parallelism I mentioned above,
i.e., have multiple append operations running concurrently.
Hoping that makes sense. Best solution really depends on factors like how
often you want to append data, and whether you want to have multiple
sources able to append independently (i.e., without knowledge of what other
sources are doing) or whether you can coordinate all appends via a single
controlling process somewhere.
…On Tue, 6 Nov 2018, 17:14 jakirkham ***@***.*** wrote:
For parallel appends, what if you use resize
<https://zarr.readthedocs.io/en/stable/api/core.html?highlight=append#zarr.core.Array.resize>
first and then write to the new chunks in parallel? AIUI (though
@alimanfoo <https://github.com/alimanfoo> please correct me if I'm wrong)
the resize operation is mainly a metadata change; so, should be quite
fast. This can be followed easily with da.store
<http://docs.dask.org/en/latest/array-creation.html#store-dask-arrays>
and the specified region to write the new chunks in parallel.
Note: It's important to rechunk any Dask Array to match the Zarr Array
chunking to allow lockfree parallel writes.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<https://github.com/zarr-developers/zarr/issues/323#issuecomment-436333048>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAq8QhEkVJrK7fCVEKWJt8jP5rj3CGbJks5uscNegaJpZM4YOCM9>
.
|
This all makes sense! However I think this is more about finding a way to avoid rewriting the metadata. As you've highlighted writing the metadata sequentially is tricky, it introduces a bottleneck and will almost certainly behave unpredictable. Especially when you couple it with an object store like S3 which uses eventual consistency which would result in you never quite knowing if your update has taken place yet. Best practice for services like S3 is to never update objects once they have been created. |
Good point re eventual consistency of object stores. But even if you allow
zarr array to have effectively inifinite size dimensions, you still need
something somewhere that keeps track of where the current "end" of the
array is. I.e., you move the problem outside zarr, but it doesn't go away.
I wonder if there's broadly two options here. One option would be to store
zarr array metadata on object storage, and allow dimensions to be
unbounded, then via some other mechanism outside of zarr keep track of
where current array end is. Another option would be to manage everything
within zarr, have dimensions with finite size, put chunks on object storage
but metadata on some kind of transactional storage that supports
synchronization - this is doable as a zarr array can have data (chunks) and
metadata coming from different stores.
…On Wed, 7 Nov 2018, 09:05 Jacob Tomlinson ***@***.*** wrote:
This all makes sense! However I think this is more about finding a way to
avoid rewriting the metadata.
As you've highlighted writing the metadata sequentially is tricky, it
introduces a bottleneck and will almost certainly behave unpredictable.
Especially when you couple it with an object store like S3 which uses
eventual consistency which would result in you never quite knowing if your
update has taken place yet. Best practice for services like S3 is to never
update objects once they have been created.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<https://github.com/zarr-developers/zarr/issues/323#issuecomment-436553913>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAq8QhcpWTowYggIKYb-gAsXp4_baeT7ks5usqJsgaJpZM4YOCM9>
.
|
I appreciate that knowing where the end of the array is is a useful thing, but is it a hard requirement? I can imagine many use cases where knowing the end isn't required. |
Ah sorry, I assumed we were talking about use cases that involve appending,
which in my mind means "sticking more data on at the end".
Yes I can imagine use cases where you don't need to know where the end is,
as some other mechanism is used for decided where to write data within some
effectively unbounded dimension.
Just to have something concrete to talk about, for the use case you had in
mind, if you don't keep track of the end, how do you make decisions about
where to write data within an unbounded dimension?
Btw you could have effectively unbounded dimensions now by setting size of
a dimension to some very large number. Would that suffice?
On 7 Nov 2018 9:35 am, "Jacob Tomlinson" <[email protected]> wrote:
I appreciate that knowing where the end of the array is is a useful thing,
but is it a hard requirement? I can imagine many use cases where knowing
the end isn't required.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<https://github.com/zarr-developers/zarr/issues/323#issuecomment-436562550>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAq8Qng1x1XgZ92OzFIL3LnPyCXiHVejks5usql9gaJpZM4YOCM9>
.
|
Ah interesting! So have a really big zarr, with no actual chunks on disk, and then fill in the missing blanks as we ingest data. That could work as an interim solution. Going back to open ended. We know where the data is on all dimensions as we ingest it. So we could work out which chunk to write to based on the starting point and the chunk size. |
When is "eventual"? 😄 The metadata could be pulled into memory for the length of the session and serialized only at the end. This could be useful for other reasons as well.
This is a good point. Figured we would eventually want immutable key-value stores. Maybe this is worth making a first class thing in Zarr. Perhaps it could wrap an existing store and resolve collisions for it. WDYT? |
That's exactly the point. We don't know, I've seen it range from milliseconds to minutes but in theory it could take even longer. This is how an object store scales so well, it makes no promises about consistency. This means all the servers that make up the service can work independently and not be bottlenecked by a central write server.
We have an AWS lambda service ingesting our data working from queues of files. Therefore there could be hundreds or thousands of ingestion tasks executing at any one time and there isn't really an 'end' as data is produced constantly. Therefore there is no session to store the data in. One solution could be to use a transactional database to store metadata in, but that introduces a bottleneck.
That sounds good. |
I think @tam203 and @niallrobinson have been looking at this. How are you getting on? |
@jacobtomlinson I've been mainly looking at implementing this feature request zarr-developers/zarr/#267 which allows indexing from somewhere other than 0. By doing this data can be added to the end or removed from the beginning and as long as this represents whole chunks then there is no need to write any data. Updating the metadata in .zarr is enough to incorporate the new chunks. Currently I'm looking to do this by implementing a custom Zarr store which wraps another store and applies any origin offset to the getitem method when required. If this process the utility I'll look at putting into Zarr proper but this work around allows my not to need to change the internals of Zarr. |
ping @rsignell-usgs who was looking for this issue. |
It’d be interesting to hear if anyone has thoughts on a good way to implement an immutable store interface for Zarr. From our discussion it sounds like that gets at the core of the issue (e.g. writes should only happen once per key-value pair). Though feel free to correct me if I’ve missed something in this analysis. |
Cross posting from #9 sorry for those have already seen this. We've now implemented some of the work that I talked about above. It implements a proposed spec change that allows prepending, appending and 'rolling' easily and safely. The store/zarr isn't immutable but the chunks indexes effectively are, when you prepend/roll you don't have to move or rewrite any chunks. Another property is that while metadata needs updating "eventual consistency" update safe in that if chunk and metadata updates/reads are out of order you still get a valid zarr. Anyway more about all this in the blog post - https://medium.com/informatics-lab/creating-a-data-format-for-high-momentum-datasets-a394fa48b671 There is also a binder if you want to jump straight in: Cheers. |
Problem
We store gridded fields which are continuously being appended to and having old data expunged.
An example of this would be a weather forecast dataset which is being modified hourly as new simulation data is generated and old expired data is removed. Currently we are storing this data as many netCDF files in S3 which expire after 24 hours. However the downside to this is that we have to maintain a metadata index somewhere describing all the files (around half a million at any one time).
We would like to explore storing this data using zarr, but have concerns about how to logically partition the data. Ideally we would like to maintain one zarr group which stores each field as a zarr array, however we are facing some challenges with this:
Proposal
One idea we've had to resolve this is to change the way the attributes are stored and chunks are loaded.
NaN
array of the correct shape.In practice this would mean that I could load my weather forecast zarr and request a slice of temperature data, for 12pm the following day from the 48 simulations run up until that time. This would result in an array with
NaN
values for old runs which have been removed and new runs which are yet to be run, but it will contain the data which does currently exist including the very latest simulation available.This would result in ever increasing indices, which would eventually hit limits on object key length. One solution to that would be to create a parallel zarr occasionally which would reset the index and ingest into both and eventually remove the older one when the new one gets to a certain age. This would result in some duplication of data but would avoid this problem.
Conclusion
I would be really keen to hear feedback from the community on this. I discussed it last week with @mrocklin and @jhamman. Would be good to hear more thoughts on this.
The text was updated successfully, but these errors were encountered: