Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: open ended zarrs #39

Open
jacobtomlinson opened this issue Nov 5, 2018 · 21 comments
Open

Proposal: open ended zarrs #39

jacobtomlinson opened this issue Nov 5, 2018 · 21 comments
Labels
protocol-extension Protocol extension related issue

Comments

@jacobtomlinson
Copy link

Problem

We store gridded fields which are continuously being appended to and having old data expunged.

An example of this would be a weather forecast dataset which is being modified hourly as new simulation data is generated and old expired data is removed. Currently we are storing this data as many netCDF files in S3 which expire after 24 hours. However the downside to this is that we have to maintain a metadata index somewhere describing all the files (around half a million at any one time).

We would like to explore storing this data using zarr, but have concerns about how to logically partition the data. Ideally we would like to maintain one zarr group which stores each field as a zarr array, however we are facing some challenges with this:

  • Every time we append data to the array we have to update the attributes with a new shape. Data is ingested in parallel and we are storing it in S3, so maintaining a file lock on the metadata is not possible and updating the object may not replicate fast enough.
  • We wish to remove old data from the beginning of the array. Currently it is my understanding that this would require re-indexing all chunks each time as well as rewriting the metadata.

Proposal

One idea we've had to resolve this is to change the way the attributes are stored and chunks are loaded.

  • We could make storing the shape of the array optional. This would mean that when we append new chunks we would not need to update the metadata.
  • If zarr attempts to load a chunk which does not exist (either because it hasn't been created yet or it has been expunged) it could return a NaN array of the correct shape.

In practice this would mean that I could load my weather forecast zarr and request a slice of temperature data, for 12pm the following day from the 48 simulations run up until that time. This would result in an array with NaN values for old runs which have been removed and new runs which are yet to be run, but it will contain the data which does currently exist including the very latest simulation available.

This would result in ever increasing indices, which would eventually hit limits on object key length. One solution to that would be to create a parallel zarr occasionally which would reset the index and ingest into both and eventually remove the older one when the new one gets to a certain age. This would result in some duplication of data but would avoid this problem.

Conclusion

I would be really keen to hear feedback from the community on this. I discussed it last week with @mrocklin and @jhamman. Would be good to hear more thoughts on this.

@alimanfoo
Copy link
Member

This is an intriguing use case and proposal!

One quick comment re removing old data from the beginning of an array, there is an open proposal (https://github.com/zarr-developers/zarr/issues/267) which would enable this without having to move any data around or reorganise chunks. That proposal is actually intended to allow arrays to be prepended to, i.e., to grow in the "negative" direction, but the same mechanism would I think allow you to shrink too, i.e., to remove data from the beginning of one or more dimensions. I haven't pushed on that because it would add a little complexity to the implementation and I didn't want to do that without a firm use case, but would be happy to revisit if it would be useful for you.

Regarding supporting parallel appends, could you provide a little more detail about how you'd like this to work? I can't quite imagine how you could have multiple workers appending to a dimension of an array in parallel without at least some kind of synchronisation, i.e., ensuring the appends occur in some consistent order and don't get mixed up with each other. The actual writing of data can be parallelised, but your workers would have to somehow synchronise with each other to at least agree on which region of the array each will write to, so you don't end up with two workers overwriting each other. Am I making sense?

@jacobtomlinson
Copy link
Author

You're making total sense! I'll check out that issue, @mrocklin did mention it the other day but I'll have a closer look.

You raise an interesting point, I don't have any answers at the moment. In terms of appending we already have prior knowledge of the shape of the target array, and we know it will be orthogonal. We also know the shape and size of the incoming data. The issue would be if a chunk contains data which came from more than one ingested file. This would require multiple parallel tasks to write to the same chunk. We could work around this by ensuring the chunksize of the zarr is the same as the chunks of the incoming data, but this may not be ideal.

@alimanfoo
Copy link
Member

alimanfoo commented Nov 5, 2018 via email

@jacobtomlinson
Copy link
Author

For expiring data that was what I was trying to propose. The chunks would be removed automatically by the underlying storage system. In our case we would have an S3 bucket policy which deletes files older than 24 hours (unless they are named .zattrs, etc). We would need zarr to fail gracefully when trying to access these chunks and return a fill value array.

@alimanfoo
Copy link
Member

alimanfoo commented Nov 5, 2018 via email

@jacobtomlinson
Copy link
Author

Ah that's great!

@jakirkham
Copy link
Member

For parallel appends, what if you use resize first and then write to the new chunks in parallel? AIUI (though @alimanfoo please correct me if I'm wrong) the resize operation is mainly a metadata change; so, should be quite fast. This can be followed easily with da.store and the specified region to write the new chunks in parallel.

Note: It's important to rechunk any Dask Array to match the Zarr Array chunking to allow lockfree parallel writes.

@alimanfoo
Copy link
Member

alimanfoo commented Nov 7, 2018 via email

@jacobtomlinson
Copy link
Author

This all makes sense! However I think this is more about finding a way to avoid rewriting the metadata.

As you've highlighted writing the metadata sequentially is tricky, it introduces a bottleneck and will almost certainly behave unpredictable. Especially when you couple it with an object store like S3 which uses eventual consistency which would result in you never quite knowing if your update has taken place yet. Best practice for services like S3 is to never update objects once they have been created.

@alimanfoo
Copy link
Member

alimanfoo commented Nov 7, 2018 via email

@jacobtomlinson
Copy link
Author

I appreciate that knowing where the end of the array is is a useful thing, but is it a hard requirement? I can imagine many use cases where knowing the end isn't required.

@alimanfoo
Copy link
Member

alimanfoo commented Nov 7, 2018 via email

@jacobtomlinson
Copy link
Author

Ah interesting! So have a really big zarr, with no actual chunks on disk, and then fill in the missing blanks as we ingest data. That could work as an interim solution.

Going back to open ended. We know where the data is on all dimensions as we ingest it. So we could work out which chunk to write to based on the starting point and the chunk size.

@jakirkham
Copy link
Member

Especially when you couple it with an object store like S3 which uses eventual consistency which would result in you never quite knowing if your update has taken place yet.

When is "eventual"? 😄 The metadata could be pulled into memory for the length of the session and serialized only at the end. This could be useful for other reasons as well.

cc @martindurant

Best practice for services like S3 is to never update objects once they have been created.

This is a good point. Figured we would eventually want immutable key-value stores. Maybe this is worth making a first class thing in Zarr. Perhaps it could wrap an existing store and resolve collisions for it. WDYT?

@jacobtomlinson
Copy link
Author

When is "eventual"? 😄

That's exactly the point. We don't know, I've seen it range from milliseconds to minutes but in theory it could take even longer. This is how an object store scales so well, it makes no promises about consistency. This means all the servers that make up the service can work independently and not be bottlenecked by a central write server.

The metadata could be pulled into memory for the length of the session and serialized only at the end.

We have an AWS lambda service ingesting our data working from queues of files. Therefore there could be hundreds or thousands of ingestion tasks executing at any one time and there isn't really an 'end' as data is produced constantly. Therefore there is no session to store the data in.

One solution could be to use a transactional database to store metadata in, but that introduces a bottleneck.

Figured we would eventually want immutable key-value stores. Maybe this is worth making a first class thing in Zarr.

That sounds good.

@jacobtomlinson
Copy link
Author

I think @tam203 and @niallrobinson have been looking at this. How are you getting on?

@tam203
Copy link

tam203 commented Dec 13, 2018

@jacobtomlinson I've been mainly looking at implementing this feature request zarr-developers/zarr/#267 which allows indexing from somewhere other than 0. By doing this data can be added to the end or removed from the beginning and as long as this represents whole chunks then there is no need to write any data. Updating the metadata in .zarr is enough to incorporate the new chunks.

Currently I'm looking to do this by implementing a custom Zarr store which wraps another store and applies any origin offset to the getitem method when required. If this process the utility I'll look at putting into Zarr proper but this work around allows my not to need to change the internals of Zarr.

@jhamman
Copy link
Member

jhamman commented Dec 14, 2018

ping @rsignell-usgs who was looking for this issue.

@jakirkham
Copy link
Member

It’d be interesting to hear if anyone has thoughts on a good way to implement an immutable store interface for Zarr. From our discussion it sounds like that gets at the core of the issue (e.g. writes should only happen once per key-value pair). Though feel free to correct me if I’ve missed something in this analysis.

@tam203
Copy link

tam203 commented Mar 15, 2019

Cross posting from #9 sorry for those have already seen this.

We've now implemented some of the work that I talked about above. It implements a proposed spec change that allows prepending, appending and 'rolling' easily and safely.

The store/zarr isn't immutable but the chunks indexes effectively are, when you prepend/roll you don't have to move or rewrite any chunks. Another property is that while metadata needs updating "eventual consistency" update safe in that if chunk and metadata updates/reads are out of order you still get a valid zarr.

Anyway more about all this in the blog post - https://medium.com/informatics-lab/creating-a-data-format-for-high-momentum-datasets-a394fa48b671

There is also a binder if you want to jump straight in: Binder

Cheers.

@alimanfoo alimanfoo transferred this issue from zarr-developers/zarr-python Jun 20, 2019
@jstriebel
Copy link
Member

cross-referencing #138 and #46

@jstriebel jstriebel added the protocol-extension Protocol extension related issue label Nov 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
protocol-extension Protocol extension related issue
Projects
None yet
Development

No branches or pull requests

6 participants