diff --git a/CHANGELOG.md b/CHANGELOG.md index 09df0d1..c132586 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Changed + +- (Prefixed)ClusterHelper is only used for truly multi-source datasets now (atm it's only MovieGraphBenchmark's multi setting) + +### Fixed + +- Added left/right triples to MovieGraphBenchmark for consistency in binary case +- Addressed property/class links problem in OAEI + ## [0.3.0] - 2024-03-25 ### Added diff --git a/sylloge/base.py b/sylloge/base.py index 1aa8efd..c130523 100644 --- a/sylloge/base.py +++ b/sylloge/base.py @@ -3,7 +3,7 @@ import os import pathlib from abc import abstractmethod -from collections import OrderedDict, defaultdict +from collections import defaultdict from dataclasses import dataclass from glob import glob from typing import ( @@ -16,6 +16,7 @@ Literal, Mapping, Optional, + OrderedDict, Sequence, Tuple, Union, @@ -26,9 +27,9 @@ import dask.dataframe as dd import pandas as pd import pystow -from eche import ClusterHelper, PrefixedClusterHelper +from eche import PrefixedClusterHelper from pystow.utils import read_zipfile_csv -from slugify import slugify +from slugify import slugify # type: ignore[import-untyped] from .dask_utils import read_dask_df_archive_csv from .my_typing import ( @@ -39,6 +40,7 @@ LABEL_RELATION, LABEL_TAIL, DataFrameType, + LinkType, ) from .utils import fix_dataclass_init_docs @@ -66,27 +68,27 @@ def triples(self) -> int: @fix_dataclass_init_docs @dataclass -class TrainTestValSplit: +class TrainTestValSplit(Generic[LinkType]): """Dataclass holding split of gold standard entity links.""" #: entity links for training - train: ClusterHelper + train: LinkType #: entity links for testing - test: ClusterHelper + test: LinkType #: entity links for validation - val: ClusterHelper + val: LinkType @fix_dataclass_init_docs @dataclass -class MultiSourceEADataset(Generic[DataFrameType]): +class MultiSourceEADataset(Generic[DataFrameType, LinkType]): """Dataset class holding information of the alignment class.""" rel_triples: List[DataFrameType] attr_triples: List[DataFrameType] - ent_links: ClusterHelper + ent_links: LinkType dataset_names: Tuple[str, ...] - folds: Optional[Sequence[TrainTestValSplit]] = None + folds: Optional[Sequence[TrainTestValSplit[LinkType]]] = None @property def _canonical_name(self) -> str: @@ -152,7 +154,7 @@ def __repr__(self) -> str: return f"{self.__class__.__name__}({ds_stat_repr})" -class ParquetEADataset(MultiSourceEADataset[DataFrameType]): +class ParquetEADataset(MultiSourceEADataset[DataFrameType, LinkType]): """Dataset class holding information of the alignment task.""" _REL_TRIPLES_PATH: str = "rel_triples" @@ -166,11 +168,11 @@ class ParquetEADataset(MultiSourceEADataset[DataFrameType]): @overload def __init__( - self: "ParquetEADataset[pd.DataFrame]", + self: "ParquetEADataset[pd.DataFrame, LinkType]", *, rel_triples: Sequence[DataFrameType], attr_triples: Sequence[DataFrameType], - ent_links: DataFrameType, + ent_links: LinkType, dataset_names: Tuple[str, ...], ds_prefix_tuples: Optional[Tuple[str, ...]] = None, folds: Optional[Sequence[TrainTestValSplit]] = None, @@ -180,11 +182,11 @@ def __init__( @overload def __init__( - self: "ParquetEADataset[dd.DataFrame]", + self: "ParquetEADataset[dd.DataFrame, LinkType]", *, rel_triples: Sequence[DataFrameType], attr_triples: Sequence[DataFrameType], - ent_links: DataFrameType, + ent_links: LinkType, dataset_names: Tuple[str, ...], ds_prefix_tuples: Optional[Tuple[str, ...]] = None, folds: Optional[Sequence[TrainTestValSplit]] = None, @@ -197,7 +199,7 @@ def __init__( *, rel_triples: Sequence[DataFrameType], attr_triples: Sequence[DataFrameType], - ent_links: DataFrameType, + ent_links: LinkType, dataset_names: Tuple[str, ...], ds_prefix_tuples: Optional[Tuple[str, ...]] = None, folds: Optional[Sequence[TrainTestValSplit]] = None, @@ -239,6 +241,12 @@ def __repr__(self) -> str: def _triple_path_modifier(self, triple_list: List) -> List[str]: return [f"_{idx}_" for idx in range(len(triple_list))] + def _write_links(self, links: LinkType, path: pathlib.Path, **kwargs): + if isinstance(links, PrefixedClusterHelper): + links.to_file(path, write_cluster_id=False) + else: + links.to_parquet(f"{path}_parquet", **kwargs) + def to_parquet(self, path: Union[str, pathlib.Path], **kwargs): """Write dataset to path as several parquet files. @@ -269,7 +277,7 @@ def to_parquet(self, path: Union[str, pathlib.Path], **kwargs): path.joinpath(f"{table_prefix}{infix}parquet"), **kwargs ) - self.ent_links.to_file(path.joinpath(self.__class__._ENT_LINKS_PATH)) + self._write_links(self.ent_links, path.joinpath(self.__class__._ENT_LINKS_PATH)) # write folds if self.folds: @@ -285,15 +293,32 @@ def to_parquet(self, path: Union[str, pathlib.Path], **kwargs): self.__class__._VAL_LINKS_PATH, ], ): - fold_links.to_file( - fold_dir.joinpath(link_path), write_cluster_id=False - ) + self._write_links(fold_links, fold_dir.joinpath(link_path)) + + @classmethod + def _read_ch_or_df_links( + cls, + path: pathlib.Path, + use_cluster_helper: bool, + read_parquet_fn: Optional[Callable[[Any], DataFrameType]] = None, + ds_prefixes: Optional[OrderedDict[str, str]] = None, + **kwargs, + ) -> LinkType: + if use_cluster_helper: + assert ds_prefixes is not None + return PrefixedClusterHelper.from_file(path, ds_prefixes=ds_prefixes) # type: ignore[return-value] + if read_parquet_fn is None: + raise ValueError( + "Need to supply read_parquet_fn if not using ClusterHelper" + ) + return read_parquet_fn(f"{path}_parquet", **kwargs) # type: ignore[return-value] @classmethod def _read_parquet_values( cls, path: Union[str, pathlib.Path], backend: BACKEND_LITERAL = "pandas", + use_cluster_helper: bool = False, **kwargs, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: if not isinstance(path, pathlib.Path): @@ -337,12 +362,13 @@ def _read_parquet_values( tables[table].append( read_parquet_fn(path.joinpath(table_path), **kwargs) ) - if ds_prefixes is None: - ent_links = ClusterHelper.from_file(path.joinpath(cls._ENT_LINKS_PATH)) - else: - ent_links = PrefixedClusterHelper.from_file( - path.joinpath(cls._ENT_LINKS_PATH), ds_prefixes=ds_prefixes - ) + + ent_links = cls._read_ch_or_df_links( + path.joinpath(cls._ENT_LINKS_PATH), + read_parquet_fn=read_parquet_fn, + ds_prefixes=ds_prefixes, + use_cluster_helper=use_cluster_helper, + ) # read folds fold_path = path.joinpath(cls._FOLD_DIR) @@ -351,7 +377,7 @@ def _read_parquet_values( folds = [] for tmp_fold_dir in sorted(sub_dir for sub_dir in os.listdir(fold_path)): fold_dir = fold_path.joinpath(tmp_fold_dir) - train_test_val: Dict[str, ClusterHelper] = {} + train_test_val: Dict[str, LinkType] = {} for links, link_path in zip( ["train", "test", "val"], [ @@ -360,14 +386,12 @@ def _read_parquet_values( cls._VAL_LINKS_PATH, ], ): - if ds_prefixes is None: - train_test_val[links] = ClusterHelper.from_file( - fold_dir.joinpath(link_path) - ) - else: - train_test_val[links] = PrefixedClusterHelper.from_file( - fold_dir.joinpath(link_path), ds_prefixes=ds_prefixes - ) + train_test_val[links] = cls._read_ch_or_df_links( + fold_dir.joinpath(link_path), + read_parquet_fn=read_parquet_fn, + ds_prefixes=ds_prefixes, + use_cluster_helper=use_cluster_helper, + ) folds.append(TrainTestValSplit(**train_test_val)) return ( { @@ -387,8 +411,9 @@ def read_parquet( cls, path: Union[str, pathlib.Path], backend: Literal["dask"], + use_cluster_helper: Literal[False], **kwargs, - ) -> "ParquetEADataset[dd.DataFrame]": + ) -> "ParquetEADataset[dd.DataFrame, dd.DataFrame]": ... @overload @@ -396,9 +421,32 @@ def read_parquet( def read_parquet( cls, path: Union[str, pathlib.Path], - backend: Literal["pandas"] = "pandas", + backend: Literal["pandas"], + use_cluster_helper: Literal[False], + **kwargs, + ) -> "ParquetEADataset[pd.DataFrame, pd.DataFrame]": + ... + + @overload + @classmethod + def read_parquet( + cls, + path: Union[str, pathlib.Path], + backend: Literal["dask"], + use_cluster_helper: Literal[True], + **kwargs, + ) -> "ParquetEADataset[dd.DataFrame, PrefixedClusterHelper]": + ... + + @overload + @classmethod + def read_parquet( + cls, + path: Union[str, pathlib.Path], + backend: Literal["pandas"], + use_cluster_helper: Literal[True], **kwargs, - ) -> "ParquetEADataset[pd.DataFrame]": + ) -> "ParquetEADataset[pd.DataFrame, PrefixedClusterHelper]": ... @classmethod @@ -406,6 +454,7 @@ def read_parquet( cls, path: Union[str, pathlib.Path], backend: BACKEND_LITERAL = "pandas", + use_cluster_helper: bool = True, **kwargs, ) -> "ParquetEADataset": """Read dataset from parquet files in given `path`. @@ -417,6 +466,7 @@ def read_parquet( :param path: Directory with files :param backend: Whether to use pandas or dask for reading :param kwargs: passed on to the respective read function + :param use_cluster_helper: if True uses ClusterHelper to load links :return: EADataset read from parquet .. seealso:: :func:`to_parquet` @@ -429,29 +479,59 @@ def read_parquet( return instance -class CacheableEADataset(ParquetEADataset[DataFrameType]): +class CacheableEADataset(ParquetEADataset[DataFrameType, LinkType]): @overload def __init__( - self: "CacheableEADataset[pd.DataFrame]", + self: "CacheableEADataset[pd.DataFrame, pd.DataFrame]", *, cache_path: Union[str, pathlib.Path], use_cache: bool = True, parquet_load_options: Optional[Mapping] = None, parquet_store_options: Optional[Mapping] = None, backend: Literal["pandas"], + use_cluster_helper: Literal[False], **init_kwargs, ): ... @overload def __init__( - self: "CacheableEADataset[dd.DataFrame]", + self: "CacheableEADataset[dd.DataFrame, dd.DataFrame]", *, cache_path: Union[str, pathlib.Path], use_cache: bool = True, parquet_load_options: Optional[Mapping] = None, parquet_store_options: Optional[Mapping] = None, backend: Literal["dask"], + use_cluster_helper: Literal[False], + **init_kwargs, + ): + ... + + @overload + def __init__( + self: "CacheableEADataset[pd.DataFrame, PrefixedClusterHelper]", + *, + cache_path: Union[str, pathlib.Path], + use_cache: bool = True, + parquet_load_options: Optional[Mapping] = None, + parquet_store_options: Optional[Mapping] = None, + backend: Literal["pandas"], + use_cluster_helper: Literal[True], + **init_kwargs, + ): + ... + + @overload + def __init__( + self: "CacheableEADataset[dd.DataFrame, PrefixedClusterHelper]", + *, + cache_path: Union[str, pathlib.Path], + use_cache: bool = True, + parquet_load_options: Optional[Mapping] = None, + parquet_store_options: Optional[Mapping] = None, + backend: Literal["dask"], + use_cluster_helper: Literal[True], **init_kwargs, ): ... @@ -464,6 +544,7 @@ def __init__( parquet_load_options: Optional[Mapping] = None, parquet_store_options: Optional[Mapping] = None, backend: BACKEND_LITERAL = "pandas", + use_cluster_helper: bool = False, **init_kwargs, ): """EADataset that uses caching after initial read. @@ -473,6 +554,7 @@ def __init__( :param parquet_load_options: handed through to parquet loading function :param parquet_store_options: handed through to parquet writing function :param backend: Whether to use pandas or dask for reading/writing + :param use_cluster_helper: if True uses ClusterHelper to load links :param init_kwargs: other arguments for creating the EADataset instance """ if isinstance(cache_path, str): @@ -480,6 +562,7 @@ def __init__( self.cache_path = cache_path self.parquet_load_options = parquet_load_options or {} self.parquet_store_options = parquet_store_options or {} + self.use_cluster_helper = use_cluster_helper update_cache = False additional_kwargs: Dict[str, Any] = {} @@ -497,7 +580,7 @@ def __init__( if self.cache_path.exists() and os.listdir(self.cache_path): logger.info(f"Loading from cache at {self.cache_path}") ea_ds_kwargs, new_additional_kwargs = self.load_from_cache( - backend=backend + backend=backend, use_cluster_helper=self.use_cluster_helper ) init_kwargs.update(ea_ds_kwargs) additional_kwargs.update(new_additional_kwargs) @@ -534,10 +617,10 @@ def create_cache_path( return cache_path.joinpath(inner_cache_path) def load_from_cache( - self, backend: BACKEND_LITERAL = "pandas" + self, backend: BACKEND_LITERAL = "pandas", use_cluster_helper: bool = False ) -> Tuple[Dict[str, Any], Dict[str, Any]]: return self.__class__._read_parquet_values( - path=self.cache_path, backend=backend + path=self.cache_path, backend=backend, use_cluster_helper=use_cluster_helper ) @abstractmethod @@ -548,12 +631,12 @@ def store_cache(self): self.to_parquet(self.cache_path, **self.parquet_store_options) -class ZipEADataset(CacheableEADataset[DataFrameType]): +class ZipEADataset(CacheableEADataset[DataFrameType, LinkType]): """Dataset created from zip file which is downloaded.""" @overload def __init__( - self: "ZipEADataset[pd.DataFrame]", + self: "ZipEADataset[pd.DataFrame, pd.DataFrame]", *, cache_path: pathlib.Path, zip_path: str, @@ -564,13 +647,14 @@ def __init__( file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), file_name_ent_links: str = "ent_links", backend: Literal["pandas"], + use_cluster_helper: Literal[False], use_cache: bool = True, ): ... @overload def __init__( - self: "ZipEADataset[dd.DataFrame]", + self: "ZipEADataset[dd.DataFrame, dd.DataFrame]", *, cache_path: pathlib.Path, zip_path: str, @@ -581,6 +665,43 @@ def __init__( file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), file_name_ent_links: str = "ent_links", backend: Literal["dask"], + use_cluster_helper: Literal[False], + use_cache: bool = True, + ): + ... + + @overload + def __init__( + self: "ZipEADataset[pd.DataFrame, PrefixedClusterHelper]", + *, + cache_path: pathlib.Path, + zip_path: str, + inner_path: pathlib.PurePosixPath, + dataset_names: Tuple[str, ...], + ds_prefix_tuples: Optional[Tuple[str, ...]] = None, + file_names_rel_triples: Sequence[str] = ("rel_triples_1", "rel_triples_2"), + file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), + file_name_ent_links: str = "ent_links", + backend: Literal["pandas"], + use_cluster_helper: Literal[True], + use_cache: bool = True, + ): + ... + + @overload + def __init__( + self: "ZipEADataset[dd.DataFrame, PrefixedClusterHelper]", + *, + cache_path: pathlib.Path, + zip_path: str, + inner_path: pathlib.PurePosixPath, + dataset_names: Tuple[str, ...], + ds_prefix_tuples: Optional[Tuple[str, ...]] = None, + file_names_rel_triples: Sequence[str] = ("rel_triples_1", "rel_triples_2"), + file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), + file_name_ent_links: str = "ent_links", + backend: Literal["dask"], + use_cluster_helper: Literal[True], use_cache: bool = True, ): ... @@ -597,6 +718,7 @@ def __init__( file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), file_name_ent_links: str = "ent_links", backend: BACKEND_LITERAL = "pandas", + use_cluster_helper: bool = False, use_cache: bool = True, ): """Initialize ZipEADataset. @@ -622,12 +744,13 @@ def __init__( if ds_prefix_tuples is not None else None ) - super().__init__( # type: ignore[misc] + super().__init__( # type: ignore[misc, call-overload] dataset_names=dataset_names, cache_path=cache_path, backend=backend, # type: ignore[arg-type] use_cache=use_cache, ds_prefix_tuples=ds_prefix_tuples, + use_cluster_helper=use_cluster_helper, # type: ignore[arg-type] ) def initial_read(self, backend: BACKEND_LITERAL) -> Dict[str, Any]: @@ -652,23 +775,19 @@ def _read_links( file_name: Union[str, pathlib.Path], sep: str = "\t", encoding: str = "utf8", - ) -> ClusterHelper: - if self._ds_prefixes is not None: - return PrefixedClusterHelper.from_zipped_file( - path=self.zip_path, - inner_path=str(inner_folder.joinpath(file_name)), - has_cluster_id=False, - sep=sep, - encoding=encoding, - ds_prefixes=self._ds_prefixes, - ) - return ClusterHelper.from_zipped_file( + backend: BACKEND_LITERAL = "pandas", + ) -> LinkType: + if not self.use_cluster_helper: + return self._read_triples(file_name, backend, True) # type: ignore[return-value] + assert self._ds_prefixes is not None + return PrefixedClusterHelper.from_zipped_file( path=self.zip_path, inner_path=str(inner_folder.joinpath(file_name)), has_cluster_id=False, sep=sep, encoding=encoding, - ) + ds_prefixes=self._ds_prefixes, + ) # type: ignore[return-value] @overload def _read_triples( @@ -718,12 +837,58 @@ def _read_triples( return cast(DataFrameType, trip) -class ZipEADatasetWithPreSplitFolds(ZipEADataset[DataFrameType]): +class ZipEADatasetWithPreSplitFolds(ZipEADataset[DataFrameType, LinkType]): """Dataset with pre-split folds created from zip file which is downloaded.""" @overload def __init__( - self: "ZipEADatasetWithPreSplitFolds[pd.DataFrame]", + self: "ZipEADatasetWithPreSplitFolds[pd.DataFrame, pd.DataFrame]", + *, + cache_path: pathlib.Path, + zip_path: str, + inner_path: pathlib.PurePosixPath, + dataset_names: Tuple[str, ...], + ds_prefix_tuples: Optional[Tuple[str, ...]] = None, + file_names_rel_triples: Sequence[str] = ("rel_triples_1", "rel_triples_2"), + file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), + file_name_ent_links: str = "ent_links", + backend: Literal["pandas"], + use_cluster_helper: Literal[False], + directory_name_folds: str = "721_5fold", + directory_names_individual_folds: Sequence[str] = ("1", "2", "3", "4", "5"), + file_name_test_links: str = "test_links", + file_name_train_links: str = "train_links", + file_name_valid_links: str = "valid_links", + use_cache: bool = True, + ): + ... + + @overload + def __init__( + self: "ZipEADatasetWithPreSplitFolds[dd.DataFrame, dd.DataFrame]", + *, + cache_path: pathlib.Path, + zip_path: str, + inner_path: pathlib.PurePosixPath, + dataset_names: Tuple[str, ...], + ds_prefix_tuples: Optional[Tuple[str, ...]] = None, + file_names_rel_triples: Sequence[str] = ("rel_triples_1", "rel_triples_2"), + file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), + file_name_ent_links: str = "ent_links", + backend: Literal["dask"], + use_cluster_helper: Literal[False], + directory_name_folds: str = "721_5fold", + directory_names_individual_folds: Sequence[str] = ("1", "2", "3", "4", "5"), + file_name_test_links: str = "test_links", + file_name_train_links: str = "train_links", + file_name_valid_links: str = "valid_links", + use_cache: bool = True, + ): + ... + + @overload + def __init__( + self: "ZipEADatasetWithPreSplitFolds[pd.DataFrame, PrefixedClusterHelper]", *, cache_path: pathlib.Path, zip_path: str, @@ -734,6 +899,7 @@ def __init__( file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), file_name_ent_links: str = "ent_links", backend: Literal["pandas"], + use_cluster_helper: Literal[True], directory_name_folds: str = "721_5fold", directory_names_individual_folds: Sequence[str] = ("1", "2", "3", "4", "5"), file_name_test_links: str = "test_links", @@ -745,7 +911,7 @@ def __init__( @overload def __init__( - self: "ZipEADatasetWithPreSplitFolds[dd.DataFrame]", + self: "ZipEADatasetWithPreSplitFolds[dd.DataFrame, PrefixedClusterHelper]", *, cache_path: pathlib.Path, zip_path: str, @@ -756,6 +922,7 @@ def __init__( file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), file_name_ent_links: str = "ent_links", backend: Literal["dask"], + use_cluster_helper: Literal[True], directory_name_folds: str = "721_5fold", directory_names_individual_folds: Sequence[str] = ("1", "2", "3", "4", "5"), file_name_test_links: str = "test_links", @@ -777,6 +944,7 @@ def __init__( file_names_attr_triples: Sequence[str] = ("attr_triples_1", "attr_triples_2"), file_name_ent_links: str = "ent_links", backend: BACKEND_LITERAL = "pandas", + use_cluster_helper: bool = False, directory_name_folds: str = "721_5fold", directory_names_individual_folds: Sequence[str] = ("1", "2", "3", "4", "5"), file_name_test_links: str = "test_links", @@ -810,12 +978,13 @@ def __init__( self.file_name_test_links = file_name_test_links self.file_name_valid_links = file_name_valid_links - super().__init__( # type: ignore[misc] + super().__init__( # type: ignore[misc, call-overload] dataset_names=dataset_names, zip_path=zip_path, inner_path=inner_path, cache_path=cache_path, backend=backend, # type: ignore[arg-type] + use_cluster_helper=use_cluster_helper, use_cache=use_cache, file_names_rel_triples=file_names_rel_triples, file_names_attr_triples=file_names_attr_triples, @@ -827,17 +996,32 @@ def initial_read(self, backend: BACKEND_LITERAL) -> Dict[str, Any]: base_dict = super().initial_read(backend=backend) folds = [] for fold in self.directory_names_individual_folds: - fold_folder = self.inner_path.joinpath( - pathlib.Path(self.directory_name_folds).joinpath(fold) + fold_folder = pathlib.Path(self.directory_name_folds).joinpath(fold) + train = self._read_triples( + str(fold_folder.joinpath(self.file_name_train_links)), + is_links=True, + backend=backend, + ) + test = self._read_triples( + str(fold_folder.joinpath(self.file_name_test_links)), + is_links=True, + backend=backend, + ) + val = self._read_triples( + str(fold_folder.joinpath(self.file_name_valid_links)), + is_links=True, + backend=backend, + ) + folds.append( + cast( + TrainTestValSplit[LinkType], + TrainTestValSplit(train=train, test=test, val=val), + ) ) - train = self._read_links(fold_folder, self.file_name_train_links) - test = self._read_links(fold_folder, self.file_name_test_links) - val = self._read_links(fold_folder, self.file_name_valid_links) - folds.append(TrainTestValSplit(train=train, test=test, val=val)) return {**base_dict, "folds": folds} -class BinaryEADataset(MultiSourceEADataset[DataFrameType]): +class BinaryEADataset(MultiSourceEADataset[DataFrameType, LinkType]): """Binary class to get left and right triples easier.""" @property @@ -866,7 +1050,7 @@ def __repr__(self) -> str: class BinaryParquetEADataset( - ParquetEADataset[DataFrameType], BinaryEADataset[DataFrameType] + ParquetEADataset[DataFrameType, LinkType], BinaryEADataset[DataFrameType, LinkType] ): """Binary version of ParquetEADataset.""" @@ -874,14 +1058,19 @@ def __repr__(self) -> str: return self._binary_repr_adjustment(super().__repr__()) -class BinaryCacheableEADataset(CacheableEADataset[DataFrameType], BinaryEADataset): +class BinaryCacheableEADataset( + CacheableEADataset[DataFrameType, LinkType], + BinaryEADataset[DataFrameType, LinkType], +): """Binary version of CacheableEADataset.""" def __repr__(self) -> str: return self._binary_repr_adjustment(super().__repr__()) -class BinaryZipEADataset(ZipEADataset[DataFrameType], BinaryEADataset): +class BinaryZipEADataset( + ZipEADataset[DataFrameType, LinkType], BinaryEADataset[DataFrameType, LinkType] +): """Binary version of ZipEADataset.""" def __repr__(self) -> str: @@ -889,7 +1078,8 @@ def __repr__(self) -> str: class BinaryZipEADatasetWithPreSplitFolds( - ZipEADatasetWithPreSplitFolds[DataFrameType], BinaryEADataset + ZipEADatasetWithPreSplitFolds[DataFrameType, LinkType], + BinaryEADataset[DataFrameType, LinkType], ): """Binary version of ZipEADataset.""" diff --git a/sylloge/med_bbk_loader.py b/sylloge/med_bbk_loader.py index 87b6be1..f4c8d35 100644 --- a/sylloge/med_bbk_loader.py +++ b/sylloge/med_bbk_loader.py @@ -14,7 +14,7 @@ MED_BBK_MODULE = BASE_DATASET_MODULE.module("med_bbk") -class MED_BBK(BinaryZipEADataset[DataFrameType]): +class MED_BBK(BinaryZipEADataset[DataFrameType, DataFrameType]): """Class containing the MED-BBK dataset. Published in `Zhang, Z. et. al. (2020) An Industry Evaluation of Embedding-based Entity Alignment `_, @@ -76,6 +76,7 @@ def __init__( inner_path=pathlib.PurePosixPath(inner_path), backend=backend, # type: ignore[arg-type] dataset_names=("MED", "BBK"), + use_cluster_helper=False, ) def initial_read(self, backend: BACKEND_LITERAL) -> Dict[str, Any]: diff --git a/sylloge/moviegraph_benchmark_loader.py b/sylloge/moviegraph_benchmark_loader.py index c898130..e0024cd 100644 --- a/sylloge/moviegraph_benchmark_loader.py +++ b/sylloge/moviegraph_benchmark_loader.py @@ -1,10 +1,12 @@ +import logging import os import pathlib -from typing import Dict, Literal, Optional, Tuple, Union +from typing import Dict, List, Literal, Optional, Tuple, Union, cast, overload import pandas as pd from eche import PrefixedClusterHelper from moviegraphbenchmark import load_data +from moviegraphbenchmark.loading import ERData from .base import ( BACKEND_LITERAL, @@ -12,7 +14,9 @@ CacheableEADataset, TrainTestValSplit, ) +from .my_typing import EA_SIDES, LinkType +logger = logging.getLogger(__name__) MOVIEGRAPH_MODULE = BASE_DATASET_MODULE.module("moviegraphbenchmark") # graph pairs @@ -45,31 +49,60 @@ } -class MovieGraphBenchmark(CacheableEADataset[pd.DataFrame]): +class MovieGraphBenchmark(CacheableEADataset[pd.DataFrame, LinkType]): """Class containing the movie graph benchmark. Published in `Obraczka, D. et. al. (2021) Embedding-Assisted Entity Resolution for Knowledge Graphs `_, *Proceedings of the 2nd International Workshop on Knowledge Graph Construction co-located with 18th Extended Semantic Web Conference* """ - ent_links: PrefixedClusterHelper + ent_links: LinkType + folds: Optional[List[TrainTestValSplit[LinkType]]] + + @overload + def __init__( + self: "MovieGraphBenchmark[PrefixedClusterHelper]", + graph_pair: GraphPair = "imdb-tmdb", + use_cache: bool = True, + cache_path: Optional[Union[str, pathlib.Path]] = None, + use_cluster_helper: Literal[True] = True, + ): + ... + + @overload + def __init__( + self: "MovieGraphBenchmark[pd.DataFrame]", + graph_pair: GraphPair = "imdb-tmdb", + use_cache: bool = True, + cache_path: Optional[Union[str, pathlib.Path]] = None, + use_cluster_helper: Literal[False] = False, + ): + ... def __init__( self, graph_pair: GraphPair = "imdb-tmdb", use_cache: bool = True, cache_path: Optional[Union[str, pathlib.Path]] = None, + use_cluster_helper: bool = True, ): """Initialize a MovieGraphBenchmark dataset. :param graph_pair: which graph pair to use of "imdb-tdmb","imdb-tvdb" or "tmdb-tvdb" or "multi" for multi-source setting :param use_cache: whether to use cache or not :param cache_path: Path where cache will be stored/loaded + :param cache_path: Path where cache will be stored/loaded + :param use_cluster_helper: if True uses ClusterHelper to load links :raises ValueError: if unknown graph pair """ # Input validation. if graph_pair not in GRAPH_PAIRS: raise ValueError(f"Invalid graph pair: Allowed are: {GRAPH_PAIRS}") + if not use_cluster_helper and graph_pair == MULTI: + logging.info( + "Must use ClusterHelper with multi setting! Will ignore the supplied option and use ClusterHelper!" + ) + use_cluster_helper = True self.graph_pair = graph_pair @@ -81,12 +114,13 @@ def __init__( if graph_pair != MULTI else ("imdb", "tmdb", "tvdb") ) - super().__init__( + super().__init__( # type: ignore[misc, call-overload] cache_path=actual_cache_path, use_cache=use_cache, backend="pandas", dataset_names=ds_names, ds_prefix_tuples=GP_TO_DS_PREFIX[graph_pair], + use_cluster_helper=use_cluster_helper, ) if graph_pair != MULTI: self.rel_triples_left = self.rel_triples[0] @@ -94,27 +128,31 @@ def __init__( self.attr_triples_left = self.attr_triples[0] self.attr_triples_right = self.attr_triples[1] - def initial_read(self, backend: BACKEND_LITERAL): - assert self._ds_prefixes - data_path = str(MOVIEGRAPH_MODULE.base) - if self.graph_pair == MULTI: - ds = load_data(pair=IMDB_TMDB, data_path=data_path) - ds2 = load_data(pair=TMDB_TVDB, data_path=data_path) - rel_triples = [ds.rel_triples_1, ds.rel_triples_2, ds2.rel_triples_2] - attr_triples = [ds.attr_triples_1, ds.attr_triples_2, ds2.attr_triples_2] - ent_links = PrefixedClusterHelper.from_file( - os.path.join(data_path, "multi_source_cluster"), - ds_prefixes=self._ds_prefixes, - ) - else: - ds = load_data(pair=self.graph_pair, data_path=data_path) - rel_triples = [ds.rel_triples_1, ds.rel_triples_2] - attr_triples = [ds.attr_triples_1, ds.attr_triples_2] - ent_links = PrefixedClusterHelper.from_file( + def _read_links(self, data_path: str) -> Union[PrefixedClusterHelper, pd.DataFrame]: + if self.use_cluster_helper: + assert self._ds_prefixes + return PrefixedClusterHelper.from_file( os.path.join(data_path, self.graph_pair, "cluster"), ds_prefixes=self._ds_prefixes, - ) - folds = [ + ) # type: ignore[return-value] + return pd.read_csv( + os.path.join(data_path, self.graph_pair, "ent_links"), + sep="\t", + names=EA_SIDES, + ) # type: ignore[return-value] + + def _create_folds(self, ds: ERData) -> Optional[List[TrainTestValSplit]]: + if self.graph_pair == MULTI: + return None + if not self.use_cluster_helper: + return [ + TrainTestValSplit( + train=fold.train_links, test=fold.test_links, val=fold.valid_links + ) + for fold in ds.folds + ] + assert self._ds_prefixes is not None + return [ TrainTestValSplit( train=PrefixedClusterHelper.from_numpy( fold.train_links.to_numpy(), ds_prefixes=self._ds_prefixes @@ -128,10 +166,31 @@ def initial_read(self, backend: BACKEND_LITERAL): ) for fold in ds.folds ] + + def initial_read(self, backend: BACKEND_LITERAL): + assert self._ds_prefixes + data_path = str(MOVIEGRAPH_MODULE.base) + if self.graph_pair == MULTI: + ds = load_data(pair=IMDB_TMDB, data_path=data_path) + ds2 = load_data(pair=TMDB_TVDB, data_path=data_path) + rel_triples = [ds.rel_triples_1, ds.rel_triples_2, ds2.rel_triples_2] + attr_triples = [ds.attr_triples_1, ds.attr_triples_2, ds2.attr_triples_2] + ent_links: Union[ + PrefixedClusterHelper, pd.DataFrame + ] = PrefixedClusterHelper.from_file( + os.path.join(data_path, "multi_source_cluster"), + ds_prefixes=self._ds_prefixes, + ) + else: + ds = load_data(pair=self.graph_pair, data_path=data_path) + rel_triples = [ds.rel_triples_1, ds.rel_triples_2] + attr_triples = [ds.attr_triples_1, ds.attr_triples_2] + ent_links = self._read_links(data_path) + folds = self._create_folds(ds) return { "rel_triples": rel_triples, "attr_triples": attr_triples, - "ent_links": ent_links, + "ent_links": cast(LinkType, ent_links), "folds": folds, } diff --git a/sylloge/my_typing.py b/sylloge/my_typing.py index 6814c60..71e3a3e 100644 --- a/sylloge/my_typing.py +++ b/sylloge/my_typing.py @@ -2,6 +2,7 @@ import dask.dataframe as dd import pandas as pd +from eche import PrefixedClusterHelper # borrowed from pykeen.typing Target = Literal["head", "relation", "tail"] @@ -16,3 +17,4 @@ BACKEND_LITERAL = Literal["pandas", "dask"] DataFrameType = TypeVar("DataFrameType", pd.DataFrame, dd.DataFrame) +LinkType = TypeVar("LinkType", pd.DataFrame, dd.DataFrame, PrefixedClusterHelper) diff --git a/sylloge/oaei_loader.py b/sylloge/oaei_loader.py index 033b5f5..df96cbc 100644 --- a/sylloge/oaei_loader.py +++ b/sylloge/oaei_loader.py @@ -7,7 +7,6 @@ import dask.dataframe as dd import pandas as pd -from eche import PrefixedClusterHelper from .base import ( BASE_DATASET_MODULE, @@ -124,7 +123,7 @@ def fault_tolerant_parse_nt( return subj, pred, obj, triple_type -class OAEI(BinaryCacheableEADataset[dd.DataFrame]): +class OAEI(BinaryCacheableEADataset[dd.DataFrame, dd.DataFrame]): """The OAEI (Ontology Alignment Evaluation Initiative) Knowledge Graph Track tasks contain graphs created from fandom wikis. Five integration tasks are available: @@ -308,6 +307,7 @@ def __init__( dataset_names=(left_name, right_name), backend="dask", ds_prefix_tuples=TASK_NAME_TO_PREFIX[task], + use_cluster_helper=False, ) def initial_read(self, backend: BACKEND_LITERAL): @@ -332,10 +332,7 @@ def initial_read(self, backend: BACKEND_LITERAL): return { "rel_triples": [left_rel, right_rel], "attr_triples": [left_attr, right_attr], - "ent_links": PrefixedClusterHelper.from_numpy( - entity_mapping_df.to_numpy(), - ds_prefixes=self._ds_prefixes, - ), + "ent_links": dd.from_pandas(entity_mapping_df, npartitions=1), } @property @@ -469,13 +466,14 @@ def _read_parquet_values( cls, path: Union[str, pathlib.Path], backend: BACKEND_LITERAL = "pandas", + use_cluster_helper=False, **kwargs, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: if not isinstance(path, pathlib.Path): path = pathlib.Path(path) init_args, additional_init_args = super()._read_parquet_values( - path=path, backend=backend, **kwargs + path=path, backend=backend, use_cluster_helper=False, **kwargs ) for table, table_path in zip( diff --git a/sylloge/open_ea_loader.py b/sylloge/open_ea_loader.py index 4ae8ea9..80b1e03 100644 --- a/sylloge/open_ea_loader.py +++ b/sylloge/open_ea_loader.py @@ -36,7 +36,7 @@ GRAPH_VERSIONS = (V1, V2) -class OpenEA(BinaryZipEADatasetWithPreSplitFolds[DataFrameType]): +class OpenEA(BinaryZipEADatasetWithPreSplitFolds[DataFrameType, DataFrameType]): """Class containing the OpenEA dataset family. Published in `Sun, Z. et. al. (2020) A Benchmarking Study of Embedding-based Entity Alignment for Knowledge Graphs `_, @@ -151,6 +151,7 @@ def __init__( backend=backend, # type: ignore[arg-type] dataset_names=OpenEA._GRAPH_PAIR_TO_DS_NAMES[graph_pair], ds_prefix_tuples=OpenEA._GRAPH_PAIR_TO_PREFIXES[graph_pair], + use_cluster_helper=False, ) @property diff --git a/tests/mocks.py b/tests/mocks.py index 643a53f..ecfb368 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -144,7 +144,7 @@ def mock_read_zipfile_csv(self, inner_path: str, **kwargs) -> pd.DataFrame: relation_triples=False, seed=self.seed, ) - if "ent_links" in inner_path: + if "links" in inner_path: return dummy_df( (int(self.statistic.num_ent_links * self.fraction), 2), content_length=100, diff --git a/tests/test_base.py b/tests/test_base.py index 3df9f76..6b22443 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1,131 +1,15 @@ -import pathlib -from typing import Dict - +import dask.dataframe as dd +import pandas as pd import pytest from eche import PrefixedClusterHelper -from mocks import ResourceMocker -from util import EATaskStatistics from sylloge import ( MED_BBK, OAEI, MovieGraphBenchmark, OpenEA, - ZipEADatasetWithPreSplitFolds, ) -statistics_with_params = [ - ( - { - "dataset_names": ("ds1", "ds2"), - "ds_prefix_tuples": ("left", "right"), - }, - EATaskStatistics( - num_rel_triples=(100, 200), - num_attr_triples=(150, 200), - num_ent_links=100, - num_intra_ent_links=(0, 0), - ), - ), - ( - { - "dataset_names": ("ds1", "ds2", "ds3"), - "ds_prefix_tuples": ("left", "right", "middle"), - "file_names_rel_triples": ( - "rel_triples_1", - "rel_triples_2", - "rel_triples_3", - ), - "file_names_attr_triples": ( - "attr_triples_1", - "attr_triples_2", - "attr_triples_3", - ), - }, - EATaskStatistics( - num_rel_triples=(100, 200, 120), - num_attr_triples=(150, 200, 130), - num_ent_links=100, - num_intra_ent_links=(0, 0, 0), - ), - ), - ( - { - "dataset_names": tuple(f"ds_{idx}" for idx in range(20)), - "ds_prefix_tuples": tuple(f"ds_{idx}" for idx in range(20)), - "file_names_rel_triples": tuple( - f"rel_triples_{idx}" for idx in range(1, 21) - ), - "file_names_attr_triples": tuple( - f"rel_triples_{idx}" for idx in range(1, 21) - ), - }, - EATaskStatistics( - num_rel_triples=tuple(100 + idx for idx in range(20)), - num_attr_triples=tuple(100 + idx for idx in range(20)), - num_ent_links=100, - num_intra_ent_links=(0,) * 19, - ), - ), -] - - -@pytest.mark.parametrize(("params", "statistic"), statistics_with_params) -@pytest.mark.parametrize("backend", ["pandas", "dask"]) -def test_zip_ea_dataset_with_pre_split_folds( - params: Dict, statistic: EATaskStatistics, backend, mocker, tmp_path -): - rm = ResourceMocker(statistic=statistic) - mocker.patch( - "sylloge.base.ClusterHelper.from_zipped_file", - rm.mock_cluster_helper_from_zipped_file, - ) - mocker.patch( - "sylloge.base.PrefixedClusterHelper.from_zipped_file", - rm.mock_cluster_helper_from_zipped_file, - ) - if len(statistic.num_rel_triples) == 2: - mocker.patch("sylloge.base.read_zipfile_csv", rm.mock_read_zipfile_csv) - else: - mocker.patch("sylloge.base.read_zipfile_csv", rm.mock_read_zipfile_csv_multi) - - if len(statistic.num_rel_triples) == 2: - mocker.patch( - "sylloge.base.read_dask_df_archive_csv", rm.mock_read_dask_df_archive_csv - ) - else: - mocker.patch( - "sylloge.base.read_dask_df_archive_csv", - rm.mock_read_dask_df_archive_csv_multi, - ) - for use_cache, cache_exists in [(False, False), (True, False), (True, True)]: - if cache_exists: - # ensure these methods don't get called - mocker.patch("sylloge.base.read_zipfile_csv", rm.assert_not_called) - mocker.patch( - "sylloge.base.ClusterHelper.from_zipped_file", - rm.mock_cluster_helper_from_zipped_file, - ) - mocker.patch( - "sylloge.base.PrefixedClusterHelper.from_zipped_file", - rm.mock_cluster_helper_from_zipped_file, - ) - mocker.patch("sylloge.base.read_dask_df_archive_csv", rm.assert_not_called) - ds = ZipEADatasetWithPreSplitFolds( - backend=backend, - use_cache=use_cache, - cache_path=tmp_path, - zip_path="mocked", - inner_path=pathlib.PurePosixPath("mocked"), - **params, - ) - for idx, _ in enumerate(statistic.num_rel_triples): - assert len(ds.rel_triples[idx]) == statistic.num_rel_triples[idx] - assert len(ds.attr_triples[idx]) == statistic.num_attr_triples[idx] - assert len(ds.ent_links) == statistic.num_ent_links - assert ds.folds is not None - assert len(ds.folds) == 5 - @pytest.mark.slow() @pytest.mark.parametrize( @@ -152,6 +36,7 @@ def test_zip_ea_dataset_with_pre_split_folds( (MovieGraphBenchmark, {"graph_pair": "imdb-tmdb"}), (MovieGraphBenchmark, {"graph_pair": "imdb-tvdb"}), (MovieGraphBenchmark, {"graph_pair": "tmdb-tvdb"}), + (MovieGraphBenchmark, {"graph_pair": "multi"}), (MED_BBK, {}), (OAEI, {}), (OAEI, {"task": "marvelcinematicuniverse-marvel"}), @@ -161,9 +46,10 @@ def test_zip_ea_dataset_with_pre_split_folds( (OAEI, {"task": "starwars-swtor"}), ], ) -def test_all_load_from_existing_cache(cls, args): +@pytest.mark.parametrize("existing_cache", [True, False]) +def test_all_load_from_existing(cls, args, existing_cache, tmp_path): """Smoketest loading.""" - ds = cls(**args) + ds = cls(**args) if existing_cache else cls(**args, cache_path=tmp_path) assert ds.__repr__() is not None assert ds.canonical_name assert ds.rel_triples is not None @@ -172,6 +58,9 @@ def test_all_load_from_existing_cache(cls, args): assert ds.attr_triples is not None assert ds.ent_links is not None assert ds.dataset_names is not None - if cls != MED_BBK: - assert ds._ds_prefixes is not None + if cls == MovieGraphBenchmark: assert isinstance(ds.ent_links, PrefixedClusterHelper) + elif cls == OAEI: + assert isinstance(ds.ent_links, dd.DataFrame) + else: + assert isinstance(ds.ent_links, pd.DataFrame) diff --git a/tests/test_moviebenchmark.py b/tests/test_moviebenchmark.py index d406677..fa0dcee 100644 --- a/tests/test_moviebenchmark.py +++ b/tests/test_moviebenchmark.py @@ -84,13 +84,14 @@ def test_movie_benchmark(params: Dict, statistic: EATaskStatistics): total_links = statistic.num_ent_links + sum(statistic.num_intra_ent_links) assert ds.ent_links.number_of_links == total_links assert ds.ent_links.number_of_no_intra_links == statistic.num_ent_links - assert ds.folds is not None - if params["graph_pair"] == IMDB_TMDB: - _test_fold_number(ds.folds, (434, 1520, 218)) - elif params["graph_pair"] == IMDB_TVDB: - _test_fold_number(ds.folds, (584, 2043, 292)) - elif params["graph_pair"] == TMDB_TVDB: - _test_fold_number(ds.folds, (682, 2388, 341), (20, 70, 10)) + if params["graph_pair"] != MULTI: + assert ds.folds is not None + if params["graph_pair"] == IMDB_TMDB: + _test_fold_number(ds.folds, (434, 1520, 218)) + elif params["graph_pair"] == IMDB_TVDB: + _test_fold_number(ds.folds, (584, 2043, 292)) + elif params["graph_pair"] == TMDB_TVDB: + _test_fold_number(ds.folds, (682, 2388, 341), (20, 70, 10)) @pytest.mark.parametrize(("params", "statistic"), statistics_with_params) @@ -117,7 +118,6 @@ def test_movie_benchmark_mock( assert ds.attr_triples is not None assert ds.attr_triples is not None assert ds.ent_links is not None - assert ds.folds is not None if ds.graph_pair == IMDB_TMDB: assert ds.dataset_names == ("imdb", "tmdb") elif ds.graph_pair == IMDB_TVDB: @@ -126,11 +126,12 @@ def test_movie_benchmark_mock( assert ds.dataset_names == ("tmdb", "tvdb") elif ds.graph_pair == MULTI: assert ds.dataset_names == ("imdb", "tmdb", "tvdb") - for fold in ds.folds: - assert fold.train is not None - assert fold.test is not None - assert fold.val is not None assert ds._ds_prefixes is not None if ds.graph_pair != MULTI: assert "triples_left" in ds.__repr__() assert ds.rel_triples_left is not None + assert ds.folds is not None + for fold in ds.folds: + assert fold.train is not None + assert fold.test is not None + assert fold.val is not None diff --git a/tests/test_oaei.py b/tests/test_oaei.py index 260bbd4..e775fc4 100644 --- a/tests/test_oaei.py +++ b/tests/test_oaei.py @@ -21,12 +21,7 @@ def test_oaei_mock(task, mocker, tmp_path): "sylloge.oaei_loader.read_dask_bag_from_archive_text", rm.mock_read_dask_bag_from_archive_text, ) - mocker.patch("sylloge.oaei_loader.OAEI_MODULE.ensure", rm.mock_ensure) - mocker.patch( - "sylloge.base.PrefixedClusterHelper.from_file", - rm.mock_cluster_helper_from_zipped_file, - ) for use_cache, cache_exists in [(False, False), (True, False), (True, True)]: if cache_exists: # ensure this method doesn't get called diff --git a/tests/test_open_ea.py b/tests/test_open_ea.py index dee2a59..69f7089 100644 --- a/tests/test_open_ea.py +++ b/tests/test_open_ea.py @@ -192,10 +192,6 @@ def test_open_ea_mock( "sylloge.base.read_dask_df_archive_csv", rm.mock_read_dask_df_archive_csv ) mocker.patch("sylloge.open_ea_loader.OPEN_EA_MODULE.ensure", rm.mock_ensure) - mocker.patch( - "sylloge.base.PrefixedClusterHelper.from_zipped_file", - rm.mock_cluster_helper_from_zipped_file, - ) # TODO mock zip ensure for use_cache, cache_exists in [(False, False), (True, False), (True, True)]: if cache_exists: