Skip to content

Commit

Permalink
[CDF-22139] 🤦 Find metadata columns before start writing to disk (#788)
Browse files Browse the repository at this point in the history
* fix: find metadata columns before start writing to disk

* style: better comment

* build: changelog
  • Loading branch information
doctrino authored Jul 22, 2024
1 parent beaed99 commit 3ded40d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.cdf-tk.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Changes are grouped as follows:
- [Feature Preview] When dumping assets to `csv`, headers are no longer repeated for each 1000 asset.
- [Feature Preview] When dumping assets to `parquet`, you can now dump more than 1000 assets without
getting the error `TypeError: write() got an unexpected keyword argument 'mode'`.
- [Feature Preview] When dumping assets to `parquet/csv`, the Toolkit now keeps all asset in memory until it finds
all unique metadata keys. This is to ensure that header is correct in the resulting `parquet/csv` file.
- In the `config.[env].yaml`, the `name` parameter in the `environment` section is no longer required.
This was supposed to be remove in `0.2.0a4`.
- If you run `cdf-tk build --env dev`, and then `cdf-tk deploy -env prod` the Toolkit will
Expand Down
72 changes: 64 additions & 8 deletions cognite_toolkit/_cdf_tk/prototypes/commands/dump_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import questionary
import yaml
from cognite.client import CogniteClient
from cognite.client.data_classes import AssetList, DataSetWrite, DataSetWriteList
from cognite.client.data_classes import AssetFilter, AssetList, DataSetWrite, DataSetWriteList
from cognite.client.exceptions import CogniteAPIError

from cognite_toolkit._cdf_tk.commands._base import ToolkitCommand
Expand Down Expand Up @@ -74,15 +74,31 @@ def execute(

parquet_engine = self._get_parquet_engine()

# To set the header correctly in the csv file/parquet table, we need to know the medata columns at the
# beginning. Ideally, we could have used the .aggregate_unique_properties() method to look this up directly.
# Unfortunately, this method lowercases the metadata keys, which is not what we want. Therefore, we need to
# check how many metadata columns there are and continue fetching assets until we have all the metadata
# columns, before we start writing the assets to the file.
if format_ == "csv" or format_ == "parquet":
metadata_cols_count = self._get_metadata_column_count(ToolGlobals.client, data_sets, hierarchies)
else:
# For the YAML format, we don't need to know the metadata columns in advance
metadata_cols_count = 0

count = 0
for assets in ToolGlobals.client.assets(
chunk_size=1000,
asset_subtree_external_ids=hierarchies or None,
data_set_external_ids=data_set or None,
limit=limit,
for assets, metadata_columns in self._find_metadata_columns(
ToolGlobals.client.assets(
chunk_size=1000,
asset_subtree_external_ids=hierarchies or None,
data_set_external_ids=data_set or None,
limit=limit,
),
metadata_cols_count,
):
for group_name, group in self._group_by_hierarchy(ToolGlobals.client, assets):
group_write = self._to_write(ToolGlobals.client, group, expand_metadata=format_ != "yaml")
group_write = self._to_write(
ToolGlobals.client, group, expand_metadata=format_ != "yaml", metadata_columns=metadata_columns
)
clean_name = to_directory_compatible(group_name)
file_path = output_dir / AssetLoader.folder_name / f"{clean_name}.Asset.{format_}"
if file_path.exists() and format_ == "yaml":
Expand Down Expand Up @@ -132,6 +148,26 @@ def execute(

print(f"Dumped {len(self.data_set_by_id)} data sets to {file_path}")

@staticmethod
def _find_metadata_columns(
asset_iterator: Iterator[AssetList], metadata_column_count: int
) -> Iterator[tuple[AssetList, set[str]]]:
"""Iterates over assets until all metadata columns are found."""
metadata_columns: set[str] = set()
stored_assets = AssetList([])
for assets in asset_iterator:
if len(metadata_columns) >= metadata_column_count:
yield assets, metadata_columns
continue
metadata_columns |= {key for asset in assets for key in (asset.metadata or {}).keys()}
if len(metadata_columns) >= metadata_column_count:
if stored_assets:
yield stored_assets, metadata_columns
stored_assets = AssetList([])
yield assets, metadata_columns
continue
stored_assets.extend(assets)

@staticmethod
def _get_parquet_engine() -> str:
try:
Expand All @@ -147,6 +183,18 @@ def _get_parquet_engine() -> str:
except ImportError:
return "none"

@staticmethod
def _get_metadata_column_count(
client: CogniteClient, data_sets: list[str] | None, hierarchies: list[str] | None
) -> int:
return client.assets.aggregate_cardinality_properties(
"metadata",
filter=AssetFilter(
data_set_ids=[{"externalId": id_} for id_ in data_sets or []] or None,
asset_subtree_ids=[{"externalId": id_} for id_ in hierarchies or []] or None,
),
)

def _select_hierarchy_and_data_set(
self, client: CogniteClient, hierarchy: list[str] | None, data_set: list[str] | None, interactive: bool
) -> tuple[list[str], list[str]]:
Expand Down Expand Up @@ -209,7 +257,9 @@ def _group_by_hierarchy(self, client: CogniteClient, assets: AssetList) -> Itera
for root_id, asset in groupby(sorted(assets, key=lambda a: a.root_id), lambda a: a.root_id):
yield self._get_asset_external_id(client, root_id), AssetList(list(asset))

def _to_write(self, client: CogniteClient, assets: AssetList, expand_metadata: bool) -> list[dict[str, Any]]:
def _to_write(
self, client: CogniteClient, assets: AssetList, expand_metadata: bool, metadata_columns: set[str]
) -> list[dict[str, Any]]:
write_assets: list[dict[str, Any]] = []
for asset in assets:
write = asset.as_write().dump(camel_case=True)
Expand All @@ -221,6 +271,12 @@ def _to_write(self, client: CogniteClient, assets: AssetList, expand_metadata: b
metadata = write.pop("metadata")
for key, value in metadata.items():
write[f"metadata.{key}"] = value
missing = metadata_columns - set(metadata.keys())
for col in missing:
write[f"metadata.{col}"] = None
elif expand_metadata:
for col in metadata_columns:
write[f"metadata.{col}"] = None
write_assets.append(write)
return write_assets

Expand Down

0 comments on commit 3ded40d

Please sign in to comment.