diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 20c28a7beb..1d61dcbb59 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -212,8 +212,7 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None: for contributions, replicas in transforms: tallies.update(self.contribute(catalog, contributions)) self.replicate(catalog, replicas) - if tallies: - self.aggregate(tallies) + self.aggregate(tallies) def delete(self, catalog: CatalogName, bundle: Bundle) -> None: """ @@ -461,6 +460,12 @@ def aggregate(self, tallies: CataloguedTallies): Also note that the input tallies can refer to entities from different catalogs. """ + # Attempting to filter by an empty array of coordinates while reading + # the aggregates will fail with a 400 error from ElasticSearch. This + # happens when indexing replica bundles for AnVIL, since they emit no + # contributions. + if not tallies: + return # Use catalog specified in each tally writer = self._create_writer(DocumentType.aggregate, catalog=None) while True: diff --git a/src/azul/plugins/metadata/anvil/bundle.py b/src/azul/plugins/metadata/anvil/bundle.py index 7e6e064f8a..4e1b73433b 100644 --- a/src/azul/plugins/metadata/anvil/bundle.py +++ b/src/azul/plugins/metadata/anvil/bundle.py @@ -130,29 +130,27 @@ def reject_joiner(self, catalog: CatalogName): pass def to_json(self) -> MutableJSON: - def serialize_entities(entities): + def to_json(entities): return { str(entity_ref): entity for entity_ref, entity in sorted(entities.items()) } return { - 'entities': serialize_entities(self.entities), - 'orphans': serialize_entities(self.orphans), + 'entities': to_json(self.entities), + 'orphans': to_json(self.orphans), 'links': [link.to_json() for link in sorted(self.links)] } @classmethod def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self: - def deserialize_entities(json_entities): + def from_json(entities): return { EntityReference.parse(entity_ref): entity - for entity_ref, entity in json_entities.items() + for entity_ref, entity in entities.items() } - return cls( - fqid=fqid, - entities=deserialize_entities(json_['entities']), - links=set(map(EntityLink.from_json, json_['links'])), - orphans=deserialize_entities(json_['orphans']) - ) + return cls(fqid=fqid, + entities=from_json(json_['entities']), + links=set(map(EntityLink.from_json, json_['links'])), + orphans=from_json(json_['orphans'])) diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 9e2f4f22ed..c5bc430299 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -170,8 +170,8 @@ def aggregator(cls, entity_type) -> EntityAggregator: def estimate(self, partition: BundlePartition) -> int: # Orphans are not considered when deciding whether to partition the - # bundle, but if the bundle is partitioned then each orphan will be - # replicated in a single partition + # bundle, but if the bundle is partitioned then each partition will only + # emit replicas for the orphans that it contains return sum(map(partial(self._contains, partition), self.bundle.entities)) def transform(self, @@ -578,13 +578,13 @@ def transform(self, ) -> Iterable[Contribution | Replica]: yield from super().transform(partition) if config.enable_replicas: - # Replicas are only emitted by the file transformer for entities - # that are linked to at least one file. This excludes all orphans, - # and a small number of linked entities, usually from primary - # bundles don't include any files. Some of the replicas we emit here - # will be redundant with those emitted by the file transformer, but - # these will be coalesced by the index service before they are - # written to ElasticSearch. + # The file transformer only emits replicas for entities that are + # linked to at least one file. This excludes all orphans, and a + # small number of linked entities, usually from primary bundles + # don't include any files. Some of the replicas we emit here will be + # redundant with those emitted by the file transformer, but these + # will be consolidated by the index service before they are written + # to ElasticSearch. dataset = self._only_dataset() for entity in chain(self.bundle.orphans, self.bundle.entities): if partition.contains(UUID(entity.entity_id)): diff --git a/src/azul/plugins/repository/canned/__init__.py b/src/azul/plugins/repository/canned/__init__.py index be4257c134..a554c6c3b0 100644 --- a/src/azul/plugins/repository/canned/__init__.py +++ b/src/azul/plugins/repository/canned/__init__.py @@ -26,9 +26,6 @@ from furl import ( furl, ) -from more_itertools import ( - ilen, -) from azul import ( CatalogName, @@ -165,11 +162,11 @@ def staging_area(self, url: str) -> StagingArea: def count_bundles(self, source: SOURCE_SPEC) -> int: staging_area = self.staging_area(source.spec.name) - return ilen( - links_id - for links_id in staging_area.links - if source.prefix is None or links_id.startswith(source.prefix.common) - ) + if source.prefix is None: + return len(staging_area.links) + else: + prefix = source.prefix.common + return sum(1 for links_id in staging_area.links if links_id.startswith(prefix)) def list_bundles(self, source: CannedSourceRef, diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index e841615931..b376636400 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -11,7 +11,6 @@ AbstractSet, Callable, Iterable, - Self, cast, ) import uuid @@ -80,63 +79,86 @@ class BundleType(Enum): """ - AnVIL snapshots have no inherent notion of a "bundle". During indexing, we - dynamically construct bundles by querying each table in the snapshot. This - class enumerates the tables that require special strategies for listing and - fetching their bundles. - - Primary bundles are defined by a biosample entity, termed the bundle entity. - Each primary bundle includes all of the bundle entity descendants and all of - those those entities' ancestors, which are discovered by iteratively - following foreign keys. Biosamples were chosen for this role based on a - desirable balance between the size and number of the resulting bundles as - well as the degree of overlap between them. The implementation of the graph - traversal is tightly coupled to this choice, and switching to a different - entity type would require re-implementing much of the Plugin code. Primary - bundles consist of at least one biosample (the bundle entity), exactly one - dataset, and zero or more other entities of assorted types. Primary bundles - never contain orphans because they are bijective to rows in the biosample - table. + Unlike HCA, AnVIL has no inherent notion of a "bundle". Its data model is + strictly relational: each row in a table represents an entity, each entity + has a primary key, and entities reference each other via foreign keys. + During indexing, we dynamically construct bundles by querying each table in + the snapshot. This class enumerates the tables that require special + strategies for listing and fetching their bundles. + + An orphan is defined as an AnVIL entity that does not appear in any of + Azul's `/index/{entity_type}`. Bundles *can* contain orphans, but they will + only ever manifest as replicas in our index. A *local orphan* is an entity + in a bundle that is not referenced anywhere in that bundle's links. Local + orphans may or may not be true/global orphans (because they may still be + references in *other* bundles' links), but all global orphans are always + local orphans. Bundles only contain local orphans from the table that + matches the bundle's `table_name` attribute. + + Primary bundles are defined by a biosample entity, termed the *bundle + entity*. Each primary bundle includes all of the bundle entity's descendants + and all of those entities' ancestors. Descendants and ancestors are + discovered by iteratively following foreign keys. Biosamples were chosen to + act as the bundle entities for primary bundles based on a desirable balance + between the size and number of the resulting bundles as well as the degree + of overlap between them. The implementation of the graph traversal is + tightly coupled to this choice, and switching to a different bundle entity + type would require re-implementing much of the Plugin code. Primary bundles + consist of at least one biosample (the bundle entity), exactly one dataset + entity, and zero or more other entities of assorted types. Primary bundles + never contain local orphans because they are bijective to rows in the + biosample table. Supplementary bundles consist of batches of file entities, which may include - supplementary files, which lack any foreign keys that associate them with - any other entity. Non-supplementary files in the bundle are classified as - orphans. The bundle also includes a dataset entity linked to the - supplementary files. - - Duos bundles consist of a single dataset entity. This "entity" includes only + supplementary files. Supplementary files lack any foreign keys that would + associate them with any other entity. Each supplementary bundle also + includes a dataset entity, and we create synthetic links between the + supplementary files and the dataset. Without these links, the relationship + between these files and their parent dataset would not be properly + represented in the service response. Supplementary files therefore are never + local or global orphans. + + Normal (non-supplementary) files are not linked to the dataset and thus are + local orphans within these bundles. This is because these files may also + appear in primary bundles. If they do, then those bundles will contribute + them to the index alongside all of their linked entities. If they don't, + then they are global orphans. In either case, it would be pointless for a + supplementary bundle to emit contributions for them, hence we treat them as + orphans. + + DUOS bundles consist of a single dataset entity. This "entity" includes only the dataset description retrieved from DUOS, while a copy of the BigQuery row for this dataset is also included as an orphan. We chose this design because there is only one dataset per snapshot, which is referenced in all - primary and supplementary bundles. Therefore, only one request to DUOS per - *snapshot* is necessary, but if `description` is retrieved at the same time - as the other dataset fields, we will make one request per *bundle* instead, + bundles. Therefore, only one request to DUOS per *snapshot* is necessary. If + the DUOS `description` were retrieved at the same time as the other fields + of the dataset entity, we would make one request per *bundle* instead, potentially overloading the DUOS service. Our solution is to retrieve - `description` only in a dedicated bundle format, once per snapshot, and - merge it with the other dataset fields during aggregation. + `description` only in a bundle of this dedicated DUOS type, once per + snapshot, and merge it with the other dataset fields during aggregation. All other bundles are replica bundles. Replica bundles consist of a batch of rows from an arbitrary BigQuery table, which may or may not be described by - the AnVIL schema. Replica bundles only include orphans and have no links. + the AnVIL schema, and the snapshot's dataset entity. Replica bundles contain + no links and thus all of their entities are local orphans. """ primary = 'anvil_biosample' supplementary = 'anvil_file' duos = 'anvil_dataset' - def is_batched(self: Self | str) -> bool: + @classmethod + def is_batched(cls, table_name: str) -> bool: """ - >>> BundleType.primary.is_batched() + True if bundles for the table of the given name represent batches of + rows, or False if each bundle represents a single row. + + >>> BundleType.is_batched(BundleType.primary.value) False >>> BundleType.is_batched('anvil_activity') True """ - if isinstance(self, str): - try: - self = BundleType(self) - except ValueError: - return True - return self not in (BundleType.primary, BundleType.duos) + return table_name not in (cls.primary.value, cls.duos.value) class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON): @@ -245,28 +267,28 @@ def list_bundles(self, self._assert_source(source) bundles = [] spec = source.spec + if config.duos_service_url is not None: + # We intentionally omit the WHERE clause for datasets in order to + # verify our assumption that each snapshot only contains rows for a + # single dataset. This verification is performed independently and + # concurrently for every partition, but only one partition actually + # emits the bundle. row = one(self._run_sql(f''' SELECT datarepo_row_id FROM {backtick(self._full_table_name(spec, BundleType.duos.value))} ''')) dataset_row_id = row['datarepo_row_id'] - # We intentionally omit the WHERE clause for datasets in order - # to verify our assumption that each snapshot only contains rows - # for a single dataset. This verification is performed - # independently and concurrently for every partition, but only - # one partition actually emits the bundle. if dataset_row_id.startswith(prefix): bundle_uuid = change_version(dataset_row_id, self.datarepo_row_uuid_version, self.bundle_uuid_version) - bundles.append(TDRAnvilBundleFQID( - uuid=bundle_uuid, - version=self._version, - source=source, - table_name=BundleType.duos.value, - batch_prefix=None, - )) + bundle_fqid = TDRAnvilBundleFQID(uuid=bundle_uuid, + version=self._version, + source=source, + table_name=BundleType.duos.value, + batch_prefix=None) + bundles.append(bundle_fqid) for row in self._run_sql(f''' SELECT datarepo_row_id FROM {backtick(self._full_table_name(spec, BundleType.primary.value))} @@ -275,13 +297,12 @@ def list_bundles(self, bundle_uuid = change_version(row['datarepo_row_id'], self.datarepo_row_uuid_version, self.bundle_uuid_version) - bundles.append(TDRAnvilBundleFQID( - uuid=bundle_uuid, - version=self._version, - source=source, - table_name=BundleType.primary.value, - batch_prefix=None, - )) + bundle_fqid = TDRAnvilBundleFQID(uuid=bundle_uuid, + version=self._version, + source=source, + table_name=BundleType.primary.value, + batch_prefix=None) + bundles.append(bundle_fqid) prefix_lengths_by_table = self._batch_tables(source.spec, prefix) for table_name, (batch_prefix_length, _) in prefix_lengths_by_table.items(): batch_prefixes = Prefix(common=prefix, @@ -351,6 +372,8 @@ def repeat(fmt): log.info('Calculating batch prefix lengths for partition %r of %d tables ' 'in source %s', prefix, len(table_names), source) # The extraneous outer 'SELECT *' works around a bug in BigQuery emulator + # FIXME: BigQuery Emulator rejects valid query + # https://github.com/DataBiosphere/azul/issues/6704 query = ' UNION ALL '.join(f'''( SELECT * FROM ( SELECT diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index f695a7659f..63db05c356 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -1985,6 +1985,8 @@ class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta): @property def entity_type(self) -> str: + # Orphans only have projects/datasets as hubs, so we need to retrieve + # aggregates of those types in order to join against orphan replicas return self.implicit_hub_type if self.include_orphans else 'files' @property @@ -2003,6 +2005,9 @@ def implicit_hub_type(self) -> str: @property def include_orphans(self) -> bool: + # When filtering only by project/dataset ID, we need to include + # *everything* in the selected projects/datasets, even rows that don't + # appear anywhere in the rest of the service response. special_fields = self.service.metadata_plugin(self.catalog).special_fields return self.filters.explicit.keys() == {special_fields.implicit_hub_id} @@ -2110,6 +2115,7 @@ def create_file(self) -> tuple[str, Optional[str]]: plugin = self.service.metadata_plugin(self.catalog) replicas = list(self._all_replicas()) replica_schemas = plugin.verbatim_pfb_schema(replicas) + # Ensure field order is consistent for unit tests replica_schemas.sort(key=itemgetter('name')) replica_types = [s['name'] for s in replica_schemas] pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index 4762118bc5..e71e2c8714 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -93,8 +93,8 @@ class CannedFileTestCase(AzulUnitTestCase): """ @classmethod - def _data_path(cls, module: Literal['service', 'indexer']) -> Path: - return Path(config.project_root) / 'test' / module / 'data' + def _data_path(cls, module: Literal['service', 'indexer'], *path: str) -> Path: + return Path(config.project_root).joinpath('test', module, 'data', *path) @classmethod def _load_canned_file(cls, @@ -120,7 +120,7 @@ def _load_canned_file_version(cls, ) -> Union[MutableJSONs, MutableJSON]: suffix = '' if version is None else '.' + version file_name = f'{uuid}{suffix}.{extension}.json' - with open(cls._data_path('indexer') / file_name, 'r') as infile: + with open(cls._data_path('indexer', file_name), 'r') as infile: return json.load(infile) diff --git a/test/integration_test.py b/test/integration_test.py index 38f49f733a..d4c3394dde 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -289,11 +289,23 @@ def managed_access_sources_by_catalog(self managed_access_sources[catalog].add(ref) return managed_access_sources - def _choose_source(self, + def _select_source(self, catalog: CatalogName, *, public: bool | None = None ) -> SourceRef | None: + """ + Choose an indexed source at random. + + :param catalog: The name of the catalog to select a source from. + + :param public: If none (as by default), allow the source to be either + public or non-public. If true, choose a public source, or + raise an `AssertionError` if the catalog contains no + public sources. If false, choose a non-public source, or + return `None` if the catalog contains no non-public + sources. + """ plugin = self.repository_plugin(catalog) sources = set(config.sources(catalog)) if public is not None: @@ -312,7 +324,7 @@ def _choose_source(self, else: assert False, public if len(sources) == 0: - assert public is False, 'Every catalog should contain a public source' + assert public is False, 'An IT catalog must contain at least one public source' return None else: source = self.random.choice(sorted(sources)) @@ -410,10 +422,10 @@ def _wait_for_indexer(): catalogs: list[Catalog] = [] for catalog in config.integration_test_catalogs: if index: - public_source = self._choose_source(catalog, public=True) - ma_source = self._choose_source(catalog, public=False) - notifications, fqids = self._prepare_notifications(catalog, - sources=alist(public_source, ma_source)) + public_source = self._select_source(catalog, public=True) + ma_source = self._select_source(catalog, public=False) + sources = alist(public_source, ma_source) + notifications, fqids = self._prepare_notifications(catalog, sources) else: with self._service_account_credentials: fqids = self._get_indexed_bundles(catalog) @@ -1905,10 +1917,10 @@ def test_can_bundle_canned_repository(self): self._test_catalog(mock_catalog) def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID: - source = self._choose_source(catalog) + source = self._select_source(catalog) # The plugin will raise an exception if the source lacks a prefix source = source.with_prefix(Prefix.of_everything) - bundle_fqids = self.repository_plugin(catalog).list_bundles(source, '') + bundle_fqids = self.azul_client.list_bundles(catalog, source, prefix='') return self.random.choice(sorted(bundle_fqids)) def _can_bundle(self, diff --git a/test/pfb_test_case.py b/test/pfb_test_case.py deleted file mode 100644 index 57d0641437..0000000000 --- a/test/pfb_test_case.py +++ /dev/null @@ -1,28 +0,0 @@ -import json - -import fastavro - -from indexer import ( - CannedFileTestCase, -) - - -class PFBTestCase(CannedFileTestCase): - - def _assert_pfb_schema(self, schema): - fastavro.parse_schema(schema) - # Parsing successfully proves our schema is valid - with self.assertRaises(KeyError): - fastavro.parse_schema({'this': 'is not', 'an': 'avro schema'}) - - def to_json(records): - return json.dumps(records, indent=4, sort_keys=True) - - results_file = self._data_path('service') / 'pfb_manifest.schema.json' - if results_file.exists(): - with open(results_file, 'r') as f: - expected_records = json.load(f) - self.assertEqual(expected_records, json.loads(to_json(schema))) - else: - with open(results_file, 'w') as f: - f.write(to_json(schema)) diff --git a/test/service/data/pfb_manifest.results.json b/test/service/data/manifest/terra/pfb_entities.json similarity index 100% rename from test/service/data/pfb_manifest.results.json rename to test/service/data/manifest/terra/pfb_entities.json diff --git a/test/service/data/pfb_manifest.schema.json b/test/service/data/manifest/terra/pfb_schema.json similarity index 100% rename from test/service/data/pfb_manifest.schema.json rename to test/service/data/manifest/terra/pfb_schema.json diff --git a/test/service/data/verbatim/anvil/pfb_entities.json b/test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json similarity index 90% rename from test/service/data/verbatim/anvil/pfb_entities.json rename to test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json index 29d14d974e..8cb9a00eda 100644 --- a/test/service/data/verbatim/anvil/pfb_entities.json +++ b/test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json @@ -88,6 +88,13 @@ "ontology_reference": "", "properties": [], "values": {} + }, + { + "links": [], + "name": "non_schema_orphan_table", + "ontology_reference": "", + "properties": [], + "values": {} } ] }, @@ -132,6 +139,16 @@ }, "relations": [] }, + { + "id": "28ed0f3a-157b-417b-a05a-48f57f9d3a34", + "name": "non_schema_orphan_table", + "object": { + "datarepo_row_id": "28ed0f3a-157b-417b-a05a-48f57f9d3a34", + "non_schema_column": "eggs", + "version": "2022-06-01T00:00:00.000000Z" + }, + "relations": [] + }, { "id": "826dea02-e274-4ffe-aabc-eb3db63ad068", "name": "anvil_biosample", @@ -279,6 +296,26 @@ }, "relations": [] }, + { + "id": "9687b86d-a2ae-a083-b910-a16bcbef1ba4", + "name": "non_schema_orphan_table", + "object": { + "datarepo_row_id": "9687b86d-a2ae-a083-b910-a16bcbef1ba4", + "non_schema_column": "spam", + "version": "2022-06-01T00:00:00.000000Z" + }, + "relations": [] + }, + { + "id": "9db5952c-c454-49d9-8a62-5abb026701c0", + "name": "non_schema_orphan_table", + "object": { + "datarepo_row_id": "9db5952c-c454-49d9-8a62-5abb026701c0", + "non_schema_column": "baked beans", + "version": "2022-06-01T00:00:00.000000Z" + }, + "relations": [] + }, { "id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "name": "anvil_file", diff --git a/test/service/data/verbatim/anvil/pfb_schema.json b/test/service/data/manifest/verbatim/pfb/anvil/pfb_schema.json similarity index 98% rename from test/service/data/verbatim/anvil/pfb_schema.json rename to test/service/data/manifest/verbatim/pfb/anvil/pfb_schema.json index 07aee95c55..9bdd6fcf66 100644 --- a/test/service/data/verbatim/anvil/pfb_schema.json +++ b/test/service/data/manifest/verbatim/pfb/anvil/pfb_schema.json @@ -1158,6 +1158,27 @@ ], "name": "anvil_variantcallingactivity", "type": "record" + }, + { + "fields": [ + { + "name": "datarepo_row_id", + "namespace": "non_schema_orphan_table", + "type": "string" + }, + { + "name": "non_schema_column", + "namespace": "non_schema_orphan_table", + "type": "string" + }, + { + "name": "version", + "namespace": "non_schema_orphan_table", + "type": "string" + } + ], + "name": "non_schema_orphan_table", + "type": "record" } ] }, diff --git a/test/service/data/verbatim/hca/pfb_entities.json b/test/service/data/manifest/verbatim/pfb/hca/pfb_entities.json similarity index 100% rename from test/service/data/verbatim/hca/pfb_entities.json rename to test/service/data/manifest/verbatim/pfb/hca/pfb_entities.json diff --git a/test/service/data/verbatim/hca/pfb_schema.json b/test/service/data/manifest/verbatim/pfb/hca/pfb_schema.json similarity index 100% rename from test/service/data/verbatim/hca/pfb_schema.json rename to test/service/data/manifest/verbatim/pfb/hca/pfb_schema.json diff --git a/test/service/test_manifest.py b/test/service/test_manifest.py index dac94047db..6eb1e52e0a 100644 --- a/test/service/test_manifest.py +++ b/test/service/test_manifest.py @@ -29,6 +29,7 @@ ) from typing import ( Optional, + cast, ) from unittest.mock import ( MagicMock, @@ -59,6 +60,7 @@ ) from azul import ( + RequirementError, config, ) from azul.collections import ( @@ -83,12 +85,16 @@ from azul.plugins import ( ManifestFormat, ) +from azul.plugins.metadata.hca import ( + FileTransformer, +) from azul.plugins.repository.dss import ( DSSBundle, ) from azul.service import ( Filters, FiltersJSON, + avro_pfb, manifest_service, ) from azul.service.manifest_service import ( @@ -116,11 +122,9 @@ ) from indexer import ( AnvilCannedBundleTestCase, + CannedFileTestCase, DCP1CannedBundleTestCase, ) -from pfb_test_case import ( - PFBTestCase, -) from service import ( DocumentCloningTestCase, StorageServiceTestCase, @@ -135,8 +139,46 @@ def setUpModule(): configure_test_logging(log) +class CannedManifestTestCase(CannedFileTestCase): + """ + Support for tests that deal with canned manifests + """ + + def _canned_manifest_path(self, *path: str) -> Path: + return self._data_path('service', 'manifest', *path) + + def _load_canned_manifest(self, *path: str) -> MutableJSON: + with open(self._canned_manifest_path(*path)) as f: + return json.load(f) + + def _load_canned_pfb(self, *path: str) -> tuple[MutableJSON, MutableJSON]: + schema = self._load_canned_manifest(*path, 'pfb_schema.json') + entities = self._load_canned_manifest(*path, 'pfb_entities.json') + return schema, entities + + def _assert_pfb_schema(self, schema): + fastavro.parse_schema(schema) + # Parsing successfully proves our schema is valid + with self.assertRaises(KeyError): + fastavro.parse_schema({'this': 'is not', 'an': 'avro schema'}) + + actual = json.dumps(schema, indent=4, sort_keys=True) + expected = self._canned_manifest_path('terra', 'pfb_schema.json') + self._assert_or_create_json_can(expected, actual) + + def _assert_or_create_json_can(self, expected: Path, actual: str): + if expected.exists(): + with open(expected, 'r') as f: + expected = json.load(f) + self.assertEqual(expected, json.loads(actual)) + else: + with open(expected, 'w') as f: + f.write(actual) + + class ManifestTestCase(WebServiceTestCase, StorageServiceTestCase, + CannedManifestTestCase, metaclass=ABCMeta): def setUp(self): @@ -296,7 +338,7 @@ class DCP1ManifestTestCase(ManifestTestCase, DCP1CannedBundleTestCase): pass -class TestManifests(DCP1ManifestTestCase, PFBTestCase): +class TestManifests(DCP1ManifestTestCase): def run(self, result: Optional[unittest.result.TestResult] = None @@ -309,7 +351,7 @@ def run(self, _drs_domain_name = 'drs-test.lan' # see canned PFB results - def test_pfb_manifest(self): + def test_terra_pfb_manifest(self): # This test uses canned expectations. It might be difficult to manually # update the can after changes to the indexer. If that is the case, # delete the file and run this test. It will repopulate the file. Run @@ -328,10 +370,6 @@ def test_pfb_manifest(self): shared_file_bundle = self._shared_file_bundle(new_bundle_fqid) self._index_bundle(shared_file_bundle, delete=False) - def to_json(records): - # 'default' is specified to handle the conversion of datetime values - return json.dumps(records, indent=4, sort_keys=True, default=str) - # We write entities differently depending on debug so we test both cases for debug in (1, 0): with self.subTest(debug=debug): @@ -343,14 +381,10 @@ def to_json(records): schema = reader.writer_schema self._assert_pfb_schema(schema) records = list(reader) - results_file = Path(__file__).parent / 'data' / 'pfb_manifest.results.json' - if results_file.exists(): - with open(results_file, 'r') as f: - expected_records = json.load(f) - self.assertEqual(expected_records, json.loads(to_json(records))) - else: - with open(results_file, 'w') as f: - f.write(to_json(records)) + expected = self._canned_manifest_path('terra', 'pfb_entities.json') + # 'default' is specified to handle the conversion of datetime values + actual = json.dumps(records, indent=4, sort_keys=True, default=str) + self._assert_or_create_json_can(expected, actual) def _shared_file_bundle(self, bundle): """ @@ -1367,10 +1401,8 @@ def test_verbatim_jsonl_manifest(self): def test_verbatim_pfb_manifest(self): response = self._get_manifest(ManifestFormat.verbatim_pfb, filters={}) self.assertEqual(200, response.status_code) - with open(self._data_path('service') / 'verbatim/hca/pfb_schema.json') as f: - expected_schema = json.load(f) - with open(self._data_path('service') / 'verbatim/hca/pfb_entities.json') as f: - expected_entities = json.load(f) + canned_pfb = self._load_canned_pfb('verbatim', 'pfb', 'hca') + expected_schema, expected_entities = canned_pfb self._assert_pfb(expected_schema, expected_entities, response) @@ -2122,10 +2154,64 @@ def hash_entities(entities: dict[EntityReference, JSON]) -> dict[str, JSON]: return all_entities_by_hash.values(), linked_entities_by_hash.values() def test_verbatim_pfb_manifest(self): - response = self._get_manifest(ManifestFormat.verbatim_pfb, filters={}) - self.assertEqual(200, response.status_code) - with open(self._data_path('service') / 'verbatim/anvil/pfb_schema.json') as f: - expected_schema = json.load(f) - with open(self._data_path('service') / 'verbatim/anvil/pfb_entities.json') as f: - expected_entities = json.load(f) - self._assert_pfb(expected_schema, expected_entities, response) + canned_pfb = self._load_canned_pfb('verbatim', 'pfb', 'anvil') + expected_schema, expected_entities = canned_pfb + + def test(filters): + response = self._get_manifest(ManifestFormat.verbatim_pfb, filters) + self.assertEqual(200, response.status_code) + self._assert_pfb(expected_schema, expected_entities, response) + + with self.subTest(orphans=True): + test({'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}}) + + with self.subTest(orphans=False): + # Dynamically edit out references to the orphaned entities (and + # their schemas) that are only expected when filtering by dataset + # ID. This subtest modifies the expectations in place, and therefore + # needs to come last. + self.assertEqual('Entity', expected_schema['name']) + object_field_schema = one( + field + for field in expected_schema['fields'] + if field['name'] == 'object' + ) + # The `object` field is of a union type, so the schema's `type` + # property is an array + schemas = object_field_schema['type'] + # The first AVRO record is the *metadata entity* in PFB terms, + # declaring higher level constraints that can't expressed in the + # AVRO schema + metadata_entity = expected_entities[0] + self.assertEqual('Metadata', metadata_entity['name']) + higher_schemas = metadata_entity['object']['nodes'] + for part in [schemas, higher_schemas, expected_entities]: + filtered = [e for e in part if e['name'] != 'non_schema_orphan_table'] + assert len(filtered) < len(part), 'Expected to filter orphan references' + part[:] = filtered + test({}) + + +class TestPFB(CannedManifestTestCase): + """ + Tests of terra.pfb code that don't require an ES index. + """ + + def test_terra_pfb_schema(self): + self.maxDiff = None + field_types = FileTransformer.field_types() + schema = avro_pfb.pfb_schema_from_field_types(field_types) + self._assert_pfb_schema(schema) + + def test_pfb_metadata_object(self): + metadata_entity = avro_pfb.pfb_metadata_entity(FileTransformer.field_types()) + field_types = FileTransformer.field_types() + schema = avro_pfb.pfb_schema_from_field_types(field_types) + parsed_schema = fastavro.parse_schema(cast(dict, schema)) + fastavro.validate(metadata_entity, parsed_schema) + + def test_pfb_entity_id(self): + # Terra limits ID's 254 chars + avro_pfb.PFBEntity(id='a' * 254, name='foo', object={}) + with self.assertRaises(RequirementError): + avro_pfb.PFBEntity(id='a' * 255, name='foo', object={}) diff --git a/test/service/test_pfb.py b/test/service/test_pfb.py deleted file mode 100644 index 8311d37c43..0000000000 --- a/test/service/test_pfb.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import ( - cast, -) - -import fastavro - -import azul -from azul.plugins.metadata.hca import ( - FileTransformer, -) -from azul.service import ( - avro_pfb, -) -from pfb_test_case import ( - PFBTestCase, -) - - -class TestPFB(PFBTestCase): - - def test_pfb_schema(self): - self.maxDiff = None - field_types = FileTransformer.field_types() - schema = avro_pfb.pfb_schema_from_field_types(field_types) - self._assert_pfb_schema(schema) - - def test_pfb_metadata_object(self): - metadata_entity = avro_pfb.pfb_metadata_entity(FileTransformer.field_types()) - field_types = FileTransformer.field_types() - schema = avro_pfb.pfb_schema_from_field_types(field_types) - parsed_schema = fastavro.parse_schema(cast(dict, schema)) - fastavro.validate(metadata_entity, parsed_schema) - - def test_pfb_entity_id(self): - # Terra limits ID's 254 chars - avro_pfb.PFBEntity(id='a' * 254, name='foo', object={}) - with self.assertRaises(azul.RequirementError): - avro_pfb.PFBEntity(id='a' * 255, name='foo', object={})