-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcollection_generation.py
138 lines (121 loc) · 4.58 KB
/
collection_generation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from typing import Any, Dict
import fsspec
import xarray as xr
import xstac
from veda_data_pipeline.utils.schemas import SpatioTemporalExtent
from datetime import datetime, timezone
class GenerateCollection:
common = {
"links": [],
"extent": {
"spatial": {"bbox": [[-180, -90, 180, 90]]},
"temporal": {"interval": [[None, None]]},
},
"type": "Collection",
"stac_version": "1.0.0",
}
keys_to_ignore = [
"collection",
"data_type",
"sample_files",
"discovery_items",
"spatial_extent",
"temporal_extent",
"is_periodic",
"time_density",
"type",
"transfer"
]
def get_template(self, dataset: Dict[str, Any]) -> dict:
extra_fields = {
key: dataset[key]
for key in dataset.keys()
if key not in GenerateCollection.keys_to_ignore
}
collection_dict = {
"id": dataset["collection"],
**GenerateCollection.common,
**extra_fields,
}
# Default REQUIRED fields
if not collection_dict.get("description"):
collection_dict["description"] = dataset["collection"]
if not collection_dict.get("license"):
collection_dict["license"] = "proprietary"
return collection_dict
def _create_zarr_template(self, dataset: Dict[str, Any], store_path: str) -> dict:
template = self.get_template(dataset)
template["assets"] = {
"zarr": {
"href": store_path,
"title": "Zarr Array Store",
"description": "Zarr array store with one or several arrays (variables)",
"roles": ["data", "zarr"],
"type": "application/vnd+zarr",
"xarray:open_kwargs": {
"engine": "zarr",
"chunks": {},
**dataset.xarray_kwargs,
},
}
}
return template
def create_zarr_collection(self, dataset: Dict[str, Any], role_arn: str) -> dict:
"""
Creates a zarr stac collection based off of the user input
"""
discovery = dataset.discovery_items[0]
store_path = f"s3://{discovery.bucket}/{discovery.prefix}{discovery.zarr_store}"
template = self._create_zarr_template(dataset, store_path)
fs = fsspec.filesystem("s3", anon=False, role_arn=role_arn)
store = fs.get_mapper(store_path)
ds = xr.open_zarr(
store, consolidated=bool(dataset.xarray_kwargs.get("consolidated"))
)
collection = xstac.xarray_to_stac(
ds,
template,
temporal_dimension=dataset.temporal_dimension or "time",
x_dimension=dataset.x_dimension or "lon",
y_dimension=dataset.y_dimension or "lat",
reference_system=dataset.reference_system or 4326,
)
return collection.to_dict()
def create_cog_collection(self, dataset: Dict[str, Any]) -> dict:
collection_stac = self.get_template(dataset)
# Override the extents if they exists
if spatial_extent := dataset.get("spatial_extent"):
collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]}
if temporal_extent := dataset.get("temporal_extent"):
collection_stac["extent"]["temporal"] = {
"interval": [
[
x
if x else None
for x in list(temporal_extent.values())
]
]
}
collection_stac["item_assets"] = {
"cog_default": {
"type": "image/tiff; application=geotiff; profile=cloud-optimized",
"roles": ["data", "layer"],
"title": "Default COG Layer",
"description": "Cloud optimized default layer to display on map",
}
}
return collection_stac
def generate_stac(
self, dataset_config: Dict[str, Any], role_arn: str = None
) -> dict:
"""
Generates a STAC collection based on the dataset and data type
Args:
dataset_config (Dict[str, Any]): dataset configuration
role_arn (str): role arn for Zarr collection generation
"""
data_type = dataset_config.get("data_type", "cog")
if data_type == "zarr":
return self.create_zarr_collection(dataset_config, role_arn)
else:
return self.create_cog_collection(dataset_config)