diff --git a/.github/PULL_REQUEST_TEMPLATE/upgrade.md b/.github/PULL_REQUEST_TEMPLATE/upgrade.md index 117b82d285..136e5b8373 100644 --- a/.github/PULL_REQUEST_TEMPLATE/upgrade.md +++ b/.github/PULL_REQUEST_TEMPLATE/upgrade.md @@ -107,7 +107,7 @@ Connected issue: #0000 ### Operator -- [ ] At least one hour has passed since `anvildev.shared` was last deployed +- [ ] At least 24 hours have passed since `anvildev.shared` was last deployed - [ ] Ran `script/export_inspector_findings.py` against `anvildev`, imported results to [Google Sheet](https://docs.google.com/spreadsheets/d/1RWF7g5wRKWPGovLw4jpJGX_XMi8aWLXLOvvE5rxqgH8) and posted screenshot of relevant1 findings as a comment on the connected issue. - [ ] Propagated the `deploy:shared`, `deploy:gitlab`, `deploy:runner` and `backup:gitlab` labels to the next promotion PRs or this PR carries none of these labels - [ ] Propagated any specific instructions related to the `deploy:shared`, `deploy:gitlab`, `deploy:runner` and `backup:gitlab` labels, from the description of this PR to that of the next promotion PRs or this PR carries none of these labels diff --git a/.github/pull_request_template.md.template.py b/.github/pull_request_template.md.template.py index eb28c95dc8..1fd2f544bd 100644 --- a/.github/pull_request_template.md.template.py +++ b/.github/pull_request_template.md.template.py @@ -958,7 +958,7 @@ def emit(t: T, target_branch: str): *iif(t is T.upgrade, [ { 'type': 'cli', - 'content': 'At least one hour has passed since `anvildev.shared` was last deployed' + 'content': 'At least 24 hours have passed since `anvildev.shared` was last deployed' }, { 'type': 'cli', diff --git a/deployments/anvilprod/environment.py b/deployments/anvilprod/environment.py index e7e584c0b8..9f96cd53f4 100644 --- a/deployments/anvilprod/environment.py +++ b/deployments/anvilprod/environment.py @@ -975,7 +975,6 @@ def env() -> Mapping[str, Optional[str]]: repository=dict(name='tdr_anvil')), sources=list(filter(None, sources.values()))) for atlas, catalog, sources in [ - ('anvil', 'anvil7', anvil7_sources), ('anvil', 'anvil8', anvil8_sources), ] for suffix, internal in [ diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py index dab3c1bca0..89693fcf64 100644 --- a/deployments/hammerbox/environment.py +++ b/deployments/hammerbox/environment.py @@ -989,7 +989,6 @@ def env() -> Mapping[str, Optional[str]]: repository=dict(name='tdr_anvil')), sources=list(filter(None, sources.values()))) for atlas, catalog, sources in [ - ('anvil', 'anvil7', anvil7_sources), ('anvil', 'anvil8', anvil8_sources), ] for suffix, internal in [ diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index 079999b289..8ce9214a9a 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -1181,6 +1181,7 @@ def mkdict(previous_catalog: dict[str, str], mksrc('bigquery', 'datarepo-c9158593', 'lungmap_prod_20037472ea1d4ddb9cd356a11a6f0f76__20220307_20241002_lm8'), mksrc('bigquery', 'datarepo-35a6d7ca', 'lungmap_prod_3a02d15f9c6a4ef7852b4ddec733b70b__20241001_20241002_lm8'), mksrc('bigquery', 'datarepo-131a1234', 'lungmap_prod_4ae8c5c91520437198276935661f6c84__20231004_20241002_lm8'), + mksrc('bigquery', 'datarepo-936db385', 'lungmap_prod_6135382f487d4adb9cf84d6634125b68__20230207_20241106_lm8'), mksrc('bigquery', 'datarepo-3c4905d2', 'lungmap_prod_834e0d1671b64425a8ab022b5000961c__20241001_20241002_lm8'), mksrc('bigquery', 'datarepo-d7447983', 'lungmap_prod_f899709cae2c4bb988f0131142e6c7ec__20220310_20241002_lm8'), mksrc('bigquery', 'datarepo-c11ef363', 'lungmap_prod_fdadee7e209745d5bf81cc280bd8348e__20240206_20241002_lm8'), diff --git a/scripts/can_bundle.py b/scripts/can_bundle.py index 9314e10ff5..a2021b27c1 100644 --- a/scripts/can_bundle.py +++ b/scripts/can_bundle.py @@ -1,6 +1,8 @@ """ -Download manifest and metadata for a given bundle from the given repository -source and store them as separate JSON files in the index test data directory. +Download the contents of a given bundle from the given repository source and +store it as a single JSON file. Users are expected to be familiar with the +structure of the bundle FQIDs for the given source and provide the appropriate +attributes. Note: silently overwrites the destination file. """ @@ -15,10 +17,6 @@ import sys import uuid -from more_itertools import ( - one, -) - from azul import ( cache, config, @@ -45,6 +43,7 @@ from azul.types import ( AnyJSON, AnyMutableJSON, + JSON, ) log = logging.getLogger(__name__) @@ -52,34 +51,45 @@ def main(argv): parser = argparse.ArgumentParser(description=__doc__, formatter_class=AzulArgumentHelpFormatter) - default_catalog = config.default_catalog - plugin_cls = RepositoryPlugin.load(default_catalog) - plugin = plugin_cls.create(default_catalog) - if len(plugin.sources) == 1: - source_arg = {'default': str(one(plugin.sources))} - else: - source_arg = {'required': True} parser.add_argument('--source', '-s', - **source_arg, + required=True, help='The repository source containing the bundle') parser.add_argument('--uuid', '-b', required=True, help='The UUID of the bundle to can.') parser.add_argument('--version', '-v', - help='The version of the bundle to can (default: the latest version).') + help='The version of the bundle to can. Required for HCA, ignored for AnVIL.') + parser.add_argument('--table-name', + help='The BigQuery table of the bundle to can. Only applicable for AnVIL.') + parser.add_argument('--batch-prefix', + help='The batch prefix of the bundle to can. Only applicable for AnVIL. ' + 'Use "null" for non-batched bundle formats.') parser.add_argument('--output-dir', '-O', default=os.path.join(config.project_root, 'test', 'indexer', 'data'), help='The path to the output directory (default: %(default)s).') parser.add_argument('--redaction-key', '-K', help='Provide a key to redact confidential or sensitive information from the output files') args = parser.parse_args(argv) - bundle = fetch_bundle(args.source, args.uuid, args.version) + fqid_fields = parse_fqid_fields(args) + bundle = fetch_bundle(args.source, fqid_fields) if args.redaction_key: redact_bundle(bundle, args.redaction_key.encode()) save_bundle(bundle, args.output_dir) -def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle: +def parse_fqid_fields(args: argparse.Namespace) -> JSON: + fields = {'uuid': args.uuid, 'version': args.version} + if args.table_name is not None: + fields['table_name'] = args.table_name + batch_prefix = args.batch_prefix + if batch_prefix is not None: + if batch_prefix == 'null': + batch_prefix = None + fields['batch_prefix'] = batch_prefix + return fields + + +def fetch_bundle(source: str, fqid_args: JSON) -> Bundle: for catalog in config.catalogs: plugin = plugin_for(catalog) try: @@ -90,10 +100,8 @@ def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle: log.debug('Searching for %r in catalog %r', source, catalog) for plugin_source_spec in plugin.sources: if source_ref.spec.eq_ignoring_prefix(plugin_source_spec): - fqid = SourcedBundleFQIDJSON(source=source_ref.to_json(), - uuid=bundle_uuid, - version=bundle_version) - fqid = plugin.resolve_bundle(fqid) + fqid = SourcedBundleFQIDJSON(source=source_ref.to_json(), **fqid_args) + fqid = plugin.bundle_fqid_from_json(fqid) bundle = plugin.fetch_bundle(fqid) log.info('Fetched bundle %r version %r from catalog %r.', fqid.uuid, fqid.version, catalog) diff --git a/scripts/post_deploy_tdr.py b/scripts/post_deploy_tdr.py index 98c680f312..a6566345ea 100644 --- a/scripts/post_deploy_tdr.py +++ b/scripts/post_deploy_tdr.py @@ -96,13 +96,13 @@ def verify_source(self, plugin = self.repository_plugin(catalog) ref = plugin.resolve_source(str(source_spec)) log.info('TDR client is authorized for API access to %s.', source_spec) - ref = plugin.partition_source(catalog, ref) - prefix = ref.spec.prefix if config.deployment.is_main: - require(prefix.common == '', source_spec) + if source_spec.prefix is not None: + require(source_spec.prefix.common == '', source_spec) self.tdr.check_bigquery_access(source_spec) else: - subgraph_count = len(plugin.list_bundles(ref, prefix.common)) + ref = plugin.partition_source(catalog, ref) + subgraph_count = plugin.count_bundles(ref.spec) require(subgraph_count > 0, 'Common prefix is too long', ref.spec) require(subgraph_count <= 512, 'Common prefix is too short', ref.spec) diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py index 67a90a1f62..5917dfc533 100644 --- a/src/azul/azulclient.py +++ b/src/azul/azulclient.py @@ -225,7 +225,11 @@ def list_bundles(self, source = plugin.resolve_source(source) else: assert isinstance(source, SourceRef), source - return plugin.list_bundles(source, prefix) + log.info('Listing bundles with prefix %r in source %r.', prefix, source) + bundle_fqids = plugin.list_bundles(source, prefix) + log.info('There are %i bundle(s) with prefix %r in source %r.', + len(bundle_fqids), prefix, source) + return bundle_fqids @property def sqs(self): diff --git a/src/azul/indexer/__init__.py b/src/azul/indexer/__init__.py index 5d17349c0b..f8228cc835 100644 --- a/src/azul/indexer/__init__.py +++ b/src/azul/indexer/__init__.py @@ -2,6 +2,9 @@ ABCMeta, abstractmethod, ) +from functools import ( + total_ordering, +) from itertools import ( product, ) @@ -17,6 +20,7 @@ Self, TypeVar, TypedDict, + final, ) import attrs @@ -45,21 +49,115 @@ BundleVersion = str -@attrs.frozen(kw_only=True, order=True) -class BundleFQID(SupportsLessAndGreaterThan): +# PyCharm can't handle mixing `attrs` with `total_ordering` and falsely claims +# that comparison operators besides `__lt__` are not defined. +# noinspection PyDataclass +@attrs.frozen(kw_only=True, eq=False) +@total_ordering +class BundleFQID: """ - >>> list(sorted([ - ... BundleFQID(uuid='d', version='e'), - ... BundleFQID(uuid='a', version='c'), - ... BundleFQID(uuid='a', version='b'), - ... ])) - ... # doctest: +NORMALIZE_WHITESPACE - [BundleFQID(uuid='a', version='b'), - BundleFQID(uuid='a', version='c'), - BundleFQID(uuid='d', version='e')] + A fully qualified bundle identifier. The attributes defined in this class + must always be sufficient to decide whether two instances of this class or + its subclasses identify the same bundle or not. Subclasses may define + additional attributes to help describe the bundle, but they are forbidden + from using these attributes in the implementations of their `__eq__` or + `__hash__` methods, either explicitly or in code generated by `attrs`. """ - uuid: BundleUUID = attrs.field(order=str.lower) - version: BundleVersion = attrs.field(order=str.lower) + uuid: BundleUUID + version: BundleVersion + + def _nucleus(self) -> tuple[str, str]: + return self.uuid.lower(), self.version.lower() + + # We can't use attrs' generated implementation because it always + # considers operands with different types to be unequal, regardless of + # their inheritance relationships or how their attributes are annotated + # (e.g. specifying `eq=False` has no effect). We want instances of + # all subclasses to compare equal as long as `uuid` and `version` are + # equal. For the same reason, we can't use `typing.Self` in the signature + # because it would constrain the RHS to instances of subclasses of the LHS. + @final + def __eq__(self, other: 'BundleFQID') -> bool: + """ + >>> b1 = BundleFQID(uuid='a', version='b') + >>> b2 = BundleFQID(uuid='a', version='b') + >>> b1 == b2 + True + + >>> s1 = SourceRef(id='x', spec=SimpleSourceSpec.parse('y:/0')) + >>> sb1 = SourcedBundleFQID(uuid='a', version='b', source=s1) + >>> sb2 = SourcedBundleFQID(uuid='a', version='b', source=s1) + >>> sb1 == sb2 + True + + >>> b1 == sb1 + True + + >>> s2 = SourceRef(id='w', spec=SimpleSourceSpec.parse('z:/0')) + >>> sb3 = SourcedBundleFQID(uuid='a', version='b', source=s2) + >>> b1 == sb3 + True + + >>> sb1 == sb3 + ... # doctest: +NORMALIZE_WHITESPACE + Traceback (most recent call last): + ... + AssertionError: (('a', 'b'), + SourceRef(id='x', spec=SimpleSourceSpec(prefix=Prefix(common='', partition=0), name='y')), + SourceRef(id='w', spec=SimpleSourceSpec(prefix=Prefix(common='', partition=0), name='z'))) + """ + same_bundle = self._nucleus() == other._nucleus() + if ( + same_bundle + and isinstance(self, SourcedBundleFQID) + and isinstance(other, SourcedBundleFQID) + ): + assert self.source == other.source, (self._nucleus(), self.source, other.source) + return same_bundle + + @final + def __hash__(self) -> int: + return hash(self._nucleus()) + + def __init_subclass__(cls, **kwargs): + """ + >>> @attrs.frozen(kw_only=True) + ... class FooBundleFQID(SourcedBundleFQID): + ... foo: str + Traceback (most recent call last): + ... + AssertionError: + + >>> @attrs.frozen(kw_only=True, eq=False) + ... class FooBundleFQID(SourcedBundleFQID): + ... foo: str + """ + super().__init_subclass__(**kwargs) + assert cls.__eq__ is BundleFQID.__eq__, cls + assert cls.__hash__ is BundleFQID.__hash__, cls + + # attrs doesn't allow `order=True` when `eq=False` + def __lt__(self, other: 'BundleFQID') -> bool: + """ + >>> aa = BundleFQID(uuid='a', version='a') + >>> ab = BundleFQID(uuid='a', version='b') + >>> ba = BundleFQID(uuid='b', version='a') + >>> aa < ab < ba + True + + >>> ba > ab > aa + True + + >>> aa <= ab <= ba + True + + >>> ba >= ab >= aa + True + + >>> aa != ab != ba + True + """ + return self._nucleus() < other._nucleus() def to_json(self) -> MutableJSON: return attrs.asdict(self, recurse=False) @@ -454,6 +552,9 @@ def spec_cls(cls) -> type[SourceSpec]: spec_cls, ref_cls = get_generic_type_params(cls, SourceSpec, SourceRef) return spec_cls + def with_prefix(self, prefix: Prefix) -> Self: + return attrs.evolve(self, spec=attrs.evolve(self.spec, prefix=prefix)) + class SourcedBundleFQIDJSON(BundleFQIDJSON): source: SourceJSON @@ -462,7 +563,7 @@ class SourcedBundleFQIDJSON(BundleFQIDJSON): BUNDLE_FQID = TypeVar('BUNDLE_FQID', bound='SourcedBundleFQID') -@attrs.frozen(kw_only=True, order=True) +@attrs.frozen(kw_only=True, eq=False) class SourcedBundleFQID(BundleFQID, Generic[SOURCE_REF]): """ >>> spec = SimpleSourceSpec(name='', prefix=(Prefix(partition=0))) @@ -493,10 +594,6 @@ def from_json(cls, json: SourcedBundleFQIDJSON) -> 'SourcedBundleFQID': source = cls.source_ref_cls().from_json(json.pop('source')) return cls(source=source, **json) - def upcast(self) -> BundleFQID: - return BundleFQID(uuid=self.uuid, - version=self.version) - def to_json(self) -> SourcedBundleFQIDJSON: return dict(super().to_json(), source=self.source.to_json()) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 5e8a0cd64d..fe5430967e 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -504,13 +504,12 @@ class ContributionCoordinates(DocumentCoordinates[E], Generic[E]): subgraph ("bundle") and represent either the addition of metadata to an entity or the removal of metadata from an entity. - Contributions produced by - transformers don't specify a catalog, the catalog is supplied when the - contributions are written to the index and it is guaranteed to be the same - for all contributions produced in response to one notification. When - contributions are read back during aggregation, they specify a catalog, the - catalog they were read from. Because of that duality this class has to be - generic in E, the type of EntityReference. + Contributions produced by transformers don't specify a catalog. The catalog + is supplied when the contributions are written to the index and it is + guaranteed to be the same for all contributions produced in response to one + notification. When contributions are read back during aggregation, they + specify a catalog, the catalog they were read from. Because of that duality + this class has to be generic in E, the type of EntityReference. """ doc_type: ClassVar[DocumentType] = DocumentType.contribution @@ -519,23 +518,6 @@ class ContributionCoordinates(DocumentCoordinates[E], Generic[E]): deleted: bool - def __attrs_post_init__(self): - # If we were to allow instances of subclasses, we'd risk breaking - # equality and hashing semantics. It is impossible to correctly - # implement the transitivity property of equality between instances of - # type and subtype without ignoring the additional attributes added by - # the subtype. Consider types T and S where S is a subtype of T. - # Transitivity requires that s1 == s2 for any two instances s1 and s2 - # of S for which s1 == t and s2 == t, where t is any instance of T. The - # subtype instances s1 and s2 can only be equal to t if they match in - # all attributes that T defines. The additional attributes introduced - # by S must be ignored, even when comparing s1 and s2, otherwise s1 and - # s2 might turn out to be unequal. In this particular case that is not - # desirable: we do want to be able to compare instances of subtypes of - # BundleFQID without ignoring any of their attributes. - concrete_type = type(self.bundle) - assert concrete_type is BundleFQID, concrete_type - @property def document_id(self) -> str: return '_'.join(( diff --git a/src/azul/indexer/document_service.py b/src/azul/indexer/document_service.py index 3f0e7e0ef5..0e9bb11dec 100644 --- a/src/azul/indexer/document_service.py +++ b/src/azul/indexer/document_service.py @@ -49,6 +49,14 @@ def metadata_plugin(self, catalog: CatalogName) -> MetadataPlugin: def aggregate_class(self, catalog: CatalogName) -> Type[Aggregate]: return self.metadata_plugin(catalog).aggregate_class() + @property + def always_limit_access(self) -> bool: + """ + True if access restrictions are enforced unconditionally. False, if the + filter stage is allowed to weaken them, e.g., based on the entity type. + """ + return True + def transformer_types(self, catalog: CatalogName ) -> Iterable[Type[Transformer]]: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index fbf0493572..20c28a7beb 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -197,7 +197,7 @@ def fetch_bundle(self, bundle_fqid: SourcedBundleFQIDJSON ) -> Bundle: plugin = self.repository_plugin(catalog) - bundle_fqid = plugin.resolve_bundle(bundle_fqid) + bundle_fqid = plugin.bundle_fqid_from_json(bundle_fqid) return plugin.fetch_bundle(bundle_fqid) def index(self, catalog: CatalogName, bundle: Bundle) -> None: @@ -212,7 +212,8 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None: for contributions, replicas in transforms: tallies.update(self.contribute(catalog, contributions)) self.replicate(catalog, replicas) - self.aggregate(tallies) + if tallies: + self.aggregate(tallies) def delete(self, catalog: CatalogName, bundle: Bundle) -> None: """ diff --git a/src/azul/indexer/transform.py b/src/azul/indexer/transform.py index 4f6efb7f50..4a2f2c8dc9 100644 --- a/src/azul/indexer/transform.py +++ b/src/azul/indexer/transform.py @@ -123,7 +123,7 @@ def _contribution(self, ) -> Contribution: entity = EntityReference(entity_type=self.entity_type(), entity_id=entity_id) coordinates = ContributionCoordinates(entity=entity, - bundle=self.bundle.fqid.upcast(), + bundle=self.bundle.fqid, deleted=self.deleted) return Contribution(coordinates=coordinates, version=None, @@ -133,6 +133,7 @@ def _contribution(self, def _replica(self, entity: EntityReference, *, + root_hub: EntityID, file_hub: EntityID | None, ) -> Replica: replica_type, contents = self._replicate(entity) @@ -144,7 +145,7 @@ def _replica(self, contents=contents, # The other hubs will be added when the indexer # consolidates duplicate replicas. - hub_ids=alist(file_hub)) + hub_ids=alist(file_hub, root_hub)) @classmethod @abstractmethod diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py index e6f9bef7a3..1f9c19da54 100644 --- a/src/azul/plugins/__init__.py +++ b/src/azul/plugins/__init__.py @@ -127,11 +127,25 @@ def order(self) -> str: @attr.s(auto_attribs=True, frozen=True, kw_only=True) class SpecialFields: + """ + Azul defines a number of fields in each /index/{entity_type} response that + are synthetic (not directly taken from the metadata) and/or are used + internally. Their naming is inconsistent between metadata plugin + implementations. This class encapsulates the naming of these fields so that + we don't need to litter the source with strings literals and conditionals. + + It is an incomplete abstraction in that it does not express the name of the + inner entity the field is a property of in the /index/{entity_type} + response. In that way, the values of the attributes of instances of this + class are more akin to a facet name, rather than a field name. However, not + every field represented here is actually a facet. + """ accessible: ClassVar[FieldName] = 'accessible' source_id: FieldName source_spec: FieldName bundle_uuid: FieldName bundle_version: FieldName + implicit_hub_id: FieldName class ManifestFormat(Enum): @@ -409,6 +423,9 @@ def _field_mapping(self) -> _FieldMapping: @property @abstractmethod def special_fields(self) -> SpecialFields: + """ + See :py:class:`SpecialFields`. + """ raise NotImplementedError @property @@ -433,8 +450,8 @@ def manifest_config(self) -> ManifestConfig: raise NotImplementedError def verbatim_pfb_schema(self, - replicas: Iterable[JSON] - ) -> tuple[Iterable[JSON], Sequence[str], JSON]: + replicas: list[JSON] + ) -> list[JSON]: """ Generate a PFB schema for the verbatim manifest. The default, metadata-agnostic implementation loads all replica documents into memory @@ -445,17 +462,14 @@ def verbatim_pfb_schema(self, :param replicas: The replica documents to be described by the PFB schema - :return: a triple of - 1. the same set of replicas passed to this method - 2. the set of entity types defined by the PFB schema - 3. a PFB schema describing the provided replicas + :return: a tuple of + 1. the set of entity types defined by the PFB schema + 2. a PFB schema describing the provided replicas """ from azul.service import ( avro_pfb, ) - replicas = list(replicas) - replica_types, pfb_schema = avro_pfb.pfb_schema_from_replicas(replicas) - return replicas, replica_types, pfb_schema + return avro_pfb.pfb_schema_from_replicas(replicas) @abstractmethod def document_slice(self, entity_type: str) -> DocumentSlice | None: @@ -583,6 +597,13 @@ def _bundle_fqid_cls(self) -> type[BUNDLE_FQID]: bundle_cls, spec_cls, ref_cls, fqid_cls = self._generic_params return fqid_cls + def bundle_fqid_from_json(self, fqid: SourcedBundleFQIDJSON) -> BUNDLE_FQID: + """ + Instantiate a :class:`SourcedBundleFQID` from its JSON representation. + The expected input matches the output format of `SourcedBundleFQID.to_json`. + """ + return self._bundle_fqid_cls.from_json(fqid) + @property def _bundle_cls(self) -> type[BUNDLE]: bundle_cls, spec_cls, ref_cls, fqid_cls = self._generic_params @@ -607,13 +628,11 @@ def _lookup_source_id(self, spec: SOURCE_SPEC) -> str: """ raise NotImplementedError - def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> BUNDLE_FQID: - return self._bundle_fqid_cls.from_json(fqid) - @abstractmethod - def _count_subgraphs(self, source: SOURCE_SPEC) -> int: + def count_bundles(self, source: SOURCE_SPEC) -> int: """ - The total number of subgraphs in the given source, ignoring its prefix. + The total number of subgraphs in the given source. The source's prefix + may be None. """ raise NotImplementedError @@ -627,7 +646,7 @@ def partition_source(self, should be appropriate for indexing in the given catalog. """ if source.spec.prefix is None: - count = self._count_subgraphs(source.spec) + count = self.count_bundles(source.spec) is_main = config.deployment.is_main is_it = catalog in config.integration_test_catalogs # We use the "lesser" heuristic during IT to avoid indexing an @@ -636,7 +655,7 @@ def partition_source(self, prefix = Prefix.for_main_deployment(count) else: prefix = Prefix.for_lesser_deployment(count) - source = attr.evolve(source, spec=attr.evolve(source.spec, prefix=prefix)) + source = source.with_prefix(prefix) return source @abstractmethod diff --git a/src/azul/plugins/metadata/anvil/__init__.py b/src/azul/plugins/metadata/anvil/__init__.py index b3fe445326..ab4c55441c 100644 --- a/src/azul/plugins/metadata/anvil/__init__.py +++ b/src/azul/plugins/metadata/anvil/__init__.py @@ -53,9 +53,6 @@ AnvilSearchResponseStage, AnvilSummaryResponseStage, ) -from azul.service.avro_pfb import ( - avro_pfb_schema, -) from azul.service.manifest_service import ( ManifestFormat, ) @@ -246,7 +243,8 @@ def special_fields(self) -> SpecialFields: return SpecialFields(source_id='source_id', source_spec='source_spec', bundle_uuid='bundle_uuid', - bundle_version='bundle_version') + bundle_version='bundle_version', + implicit_hub_id='datasets.dataset_id') @property def implicit_hub_type(self) -> str: @@ -327,16 +325,24 @@ def recurse(mapping: MetadataPlugin._FieldMapping, path: FieldPath): return result def verbatim_pfb_schema(self, - replicas: Iterable[JSON] - ) -> tuple[Iterable[JSON], Sequence[str], JSON]: - entity_schemas = [] - entity_types = [] - for table_schema in sorted(anvil_schema['tables'], key=itemgetter('name')): - table_name = table_schema['name'] + replicas: list[JSON] + ) -> list[JSON]: + table_schemas_by_name = { + schema['name']: schema + for schema in anvil_schema['tables'] + } + non_schema_replicas = [ + r for r in replicas + if r['replica_type'] not in table_schemas_by_name + ] + # For tables not described by the AnVIL schema, fall back to building + # their PFB schema dynamically from the shapes of the replicas + entity_schemas = super().verbatim_pfb_schema(non_schema_replicas) + # For the rest, use the AnVIL schema as the basis of the PFB schema + for table_name, table_schema in table_schemas_by_name.items(): # FIXME: Improve handling of DUOS replicas # https://github.com/DataBiosphere/azul/issues/6139 is_duos_type = table_name == 'anvil_dataset' - entity_types.append(table_name) field_schemas = [ self._pfb_schema_from_anvil_column(table_name=table_name, column_name='datarepo_row_id', @@ -369,7 +375,7 @@ def verbatim_pfb_schema(self, 'type': 'record', 'fields': field_schemas }) - return replicas, entity_types, avro_pfb_schema(entity_schemas) + return entity_schemas def _pfb_schema_from_anvil_column(self, *, diff --git a/src/azul/plugins/metadata/anvil/bundle.py b/src/azul/plugins/metadata/anvil/bundle.py index 8df46cb8c5..7e6e064f8a 100644 --- a/src/azul/plugins/metadata/anvil/bundle.py +++ b/src/azul/plugins/metadata/anvil/bundle.py @@ -122,6 +122,7 @@ class AnvilBundle(Bundle[BUNDLE_FQID], ABC): # the former to the latter during transformation. entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict) links: set[EntityLink] = attrs.field(factory=set) + orphans: dict[EntityReference, MutableJSON] = attrs.field(factory=dict) def reject_joiner(self, catalog: CatalogName): # FIXME: Optimize joiner rejection and re-enable it for AnVIL @@ -129,21 +130,29 @@ def reject_joiner(self, catalog: CatalogName): pass def to_json(self) -> MutableJSON: - return { - 'entities': { + def serialize_entities(entities): + return { str(entity_ref): entity - for entity_ref, entity in sorted(self.entities.items()) - }, + for entity_ref, entity in sorted(entities.items()) + } + + return { + 'entities': serialize_entities(self.entities), + 'orphans': serialize_entities(self.orphans), 'links': [link.to_json() for link in sorted(self.links)] } @classmethod def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self: + def deserialize_entities(json_entities): + return { + EntityReference.parse(entity_ref): entity + for entity_ref, entity in json_entities.items() + } + return cls( fqid=fqid, - entities={ - EntityReference.parse(entity_ref): entity - for entity_ref, entity in json_['entities'].items() - }, - links=set(map(EntityLink.from_json, json_['links'])) + entities=deserialize_entities(json_['entities']), + links=set(map(EntityLink.from_json, json_['links'])), + orphans=deserialize_entities(json_['orphans']) ) diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 858899bd59..9e2f4f22ed 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -169,6 +169,9 @@ def aggregator(cls, entity_type) -> EntityAggregator: assert False, entity_type def estimate(self, partition: BundlePartition) -> int: + # Orphans are not considered when deciding whether to partition the + # bundle, but if the bundle is partitioned then each orphan will be + # replicated in a single partition return sum(map(partial(self._contains, partition), self.bundle.entities)) def transform(self, @@ -188,7 +191,8 @@ def _transform(self, raise NotImplementedError def _replicate(self, entity: EntityReference) -> tuple[str, JSON]: - return entity.entity_type, self.bundle.entities[entity] + content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity] + return entity.entity_type, content def _convert_entity_type(self, entity_type: str) -> str: assert entity_type == 'bundle' or entity_type.startswith('anvil_'), entity_type @@ -406,7 +410,10 @@ def _file(self, file: EntityReference) -> MutableJSON: uuid=file.entity_id) def _only_dataset(self) -> EntityReference: - return one(self._entities_by_type['anvil_dataset']) + try: + return one(self._entities_by_type['anvil_dataset']) + except ValueError: + return one(o for o in self.bundle.orphans if o.entity_type == 'anvil_dataset') @cached_property def _activity_polymorphic_types(self) -> AbstractSet[str]: @@ -506,7 +513,9 @@ def _dataset(self, dataset: EntityReference) -> MutableJSON: return super()._dataset(dataset) def _list_entities(self) -> Iterable[EntityReference]: - yield self._singleton() + # Suppress contributions for bundles that only contain orphans + if self.bundle.entities: + yield self._singleton() @abstractmethod def _singleton(self) -> EntityReference: @@ -564,6 +573,23 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) + def transform(self, + partition: BundlePartition + ) -> Iterable[Contribution | Replica]: + yield from super().transform(partition) + if config.enable_replicas: + # Replicas are only emitted by the file transformer for entities + # that are linked to at least one file. This excludes all orphans, + # and a small number of linked entities, usually from primary + # bundles don't include any files. Some of the replicas we emit here + # will be redundant with those emitted by the file transformer, but + # these will be coalesced by the index service before they are + # written to ElasticSearch. + dataset = self._only_dataset() + for entity in chain(self.bundle.orphans, self.bundle.entities): + if partition.contains(UUID(entity.entity_id)): + yield self._replica(entity, file_hub=None, root_hub=dataset.entity_id) + class DatasetTransformer(SingletonTransformer): @@ -574,13 +600,6 @@ def entity_type(cls) -> str: def _singleton(self) -> EntityReference: return self._only_dataset() - def _transform(self, - entity: EntityReference - ) -> Iterable[Contribution | Replica]: - yield from super()._transform(entity) - if self._is_duos(entity): - yield self._replica(entity, file_hub=None) - class DonorTransformer(BaseTransformer): @@ -614,6 +633,7 @@ def _transform(self, entity: EntityReference ) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) + dataset = self._only_dataset() contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -627,7 +647,7 @@ def _transform(self, ) yield self._contribution(contents, entity.entity_id) if config.enable_replicas: - yield self._replica(entity, file_hub=entity.entity_id) + yield self._replica(entity, file_hub=entity.entity_id, root_hub=dataset.entity_id) for linked_entity in linked: yield self._replica( linked_entity, @@ -637,4 +657,5 @@ def _transform(self, # hub IDs field empty for datasets and rely on the tenet # that every file is an implicit hub of its parent dataset. file_hub=None if linked_entity.entity_type == 'anvil_dataset' else entity.entity_id, + root_hub=dataset.entity_id ) diff --git a/src/azul/plugins/metadata/anvil/service/filter.py b/src/azul/plugins/metadata/anvil/service/filter.py index ac49c2c7fb..0b21e2c62b 100644 --- a/src/azul/plugins/metadata/anvil/service/filter.py +++ b/src/azul/plugins/metadata/anvil/service/filter.py @@ -5,5 +5,6 @@ class AnvilFilterStage(FilterStage): + @property def _limit_access(self) -> bool: return self.entity_type != 'datasets' diff --git a/src/azul/plugins/metadata/hca/__init__.py b/src/azul/plugins/metadata/hca/__init__.py index 0286a2b489..46a8b04988 100644 --- a/src/azul/plugins/metadata/hca/__init__.py +++ b/src/azul/plugins/metadata/hca/__init__.py @@ -282,7 +282,8 @@ def special_fields(self) -> SpecialFields: return SpecialFields(source_id='sourceId', source_spec='sourceSpec', bundle_uuid='bundleUuid', - bundle_version='bundleVersion') + bundle_version='bundleVersion', + implicit_hub_id='projectId') @property def implicit_hub_type(self) -> str: diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index 00380d8564..174d43878e 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -1470,17 +1470,19 @@ def _transform(self, for entity_type, values in additional_contents.items(): contents[entity_type].extend(values) file_id = file.ref.entity_id + project_ref = self._api_project.ref + project_id = project_ref.entity_id yield self._contribution(contents, file_id) if config.enable_replicas: - yield self._replica(self.api_bundle.ref, file_hub=file_id) + yield self._replica(self.api_bundle.ref, file_hub=file_id, root_hub=project_id) # Projects are linked to every file in their snapshot, # making an explicit list of hub IDs for the project both # redundant and impractically large. Therefore, we leave the # hub IDs field empty for projects and rely on the tenet # that every file is an implicit hub of its parent project. - yield self._replica(self._api_project.ref, file_hub=None) + yield self._replica(project_ref, file_hub=None, root_hub=project_id) for linked_entity in visitor.entities: - yield self._replica(linked_entity, file_hub=file_id) + yield self._replica(linked_entity, file_hub=file_id, root_hub=project_id) def matrix_stratification_values(self, file: api.File) -> JSON: """ diff --git a/src/azul/plugins/metadata/hca/service/filter.py b/src/azul/plugins/metadata/hca/service/filter.py index 86374979b4..c2ba76ab88 100644 --- a/src/azul/plugins/metadata/hca/service/filter.py +++ b/src/azul/plugins/metadata/hca/service/filter.py @@ -5,5 +5,6 @@ class HCAFilterStage(FilterStage): + @property def _limit_access(self) -> bool: return self.entity_type != 'projects' diff --git a/src/azul/plugins/repository/canned/__init__.py b/src/azul/plugins/repository/canned/__init__.py index 3c2705fdf5..be4257c134 100644 --- a/src/azul/plugins/repository/canned/__init__.py +++ b/src/azul/plugins/repository/canned/__init__.py @@ -26,6 +26,9 @@ from furl import ( furl, ) +from more_itertools import ( + ilen, +) from azul import ( CatalogName, @@ -61,9 +64,6 @@ from azul.types import ( JSON, ) -from azul.uuids import ( - validate_uuid_prefix, -) from humancellatlas.data.metadata.helpers.staging_area import ( CannedStagingAreaFactory, StagingArea, @@ -163,27 +163,27 @@ def staging_area(self, url: str) -> StagingArea: ref) return factory.load_staging_area(path) - def _count_subgraphs(self, source: SOURCE_SPEC) -> int: + def count_bundles(self, source: SOURCE_SPEC) -> int: staging_area = self.staging_area(source.spec.name) - return len(staging_area.links) + return ilen( + links_id + for links_id in staging_area.links + if source.prefix is None or links_id.startswith(source.prefix.common) + ) def list_bundles(self, source: CannedSourceRef, prefix: str ) -> list[CannedBundleFQID]: self._assert_source(source) - validate_uuid_prefix(prefix) - log.info('Listing bundles with prefix %r in source %r.', prefix, source) - bundle_fqids = [] staging_area = self.staging_area(source.spec.name) - for link in staging_area.links.values(): - if link.uuid.startswith(prefix): - bundle_fqids.append(CannedBundleFQID(source=source, - uuid=link.uuid, - version=link.version)) - log.info('There are %i bundle(s) with prefix %r in source %r.', - len(bundle_fqids), prefix, source) - return bundle_fqids + return [ + CannedBundleFQID(source=source, + uuid=link.uuid, + version=link.version) + for link in staging_area.links.values() + if link.uuid.startswith(prefix) + ] def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> CannedBundle: self._assert_source(bundle_fqid.source) diff --git a/src/azul/plugins/repository/dss/__init__.py b/src/azul/plugins/repository/dss/__init__.py index 96a6da973a..4e5809f4a6 100644 --- a/src/azul/plugins/repository/dss/__init__.py +++ b/src/azul/plugins/repository/dss/__init__.py @@ -118,7 +118,7 @@ def sources(self) -> AbstractSet[SimpleSourceSpec]: def _lookup_source_id(self, spec: SimpleSourceSpec) -> str: return DSSSourceRef.id_from_spec(spec) - def _count_subgraphs(self, source: SimpleSourceSpec) -> NoReturn: + def count_bundles(self, source: SimpleSourceSpec) -> NoReturn: assert False, 'DSS is EOL' def list_sources(self, diff --git a/src/azul/plugins/repository/tdr.py b/src/azul/plugins/repository/tdr.py index a45cf96483..0ca79cd786 100644 --- a/src/azul/plugins/repository/tdr.py +++ b/src/azul/plugins/repository/tdr.py @@ -191,17 +191,6 @@ def _drs_client(cls, def _lookup_source_id(self, spec: TDRSourceSpec) -> str: return self.tdr.lookup_source(spec) - def list_bundles(self, - source: TDRSourceRef, - prefix: str - ) -> list[TDRBundleFQID]: - self._assert_source(source) - log.info('Listing bundles with prefix %r in source %r.', prefix, source) - bundle_fqids = self._list_bundles(source, prefix) - log.info('There are %i bundle(s) with prefix %r in source %r.', - len(bundle_fqids), prefix, source) - return bundle_fqids - def fetch_bundle(self, bundle_fqid: TDRBundleFQID) -> TDR_BUNDLE: self._assert_source(bundle_fqid.source) now = time.time() @@ -223,13 +212,6 @@ def _run_sql(self, query): def _full_table_name(self, source: TDRSourceSpec, table_name: str) -> str: return source.qualify_table(table_name) - @abstractmethod - def _list_bundles(self, - source: TDRSourceRef, - prefix: str - ) -> list[TDRBundleFQID]: - raise NotImplementedError - @abstractmethod def _emulate_bundle(self, bundle_fqid: TDRBundleFQID) -> TDR_BUNDLE: raise NotImplementedError diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 530850366d..e841615931 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -2,6 +2,7 @@ from enum import ( Enum, ) +import itertools import logging from operator import ( itemgetter, @@ -10,7 +11,10 @@ AbstractSet, Callable, Iterable, + Self, + cast, ) +import uuid import attrs from more_itertools import ( @@ -24,12 +28,14 @@ uuids, ) from azul.bigquery import ( + BigQueryRow, backtick, ) from azul.drs import ( DRSURI, ) from azul.indexer import ( + Prefix, SourcedBundleFQIDJSON, ) from azul.indexer.document import ( @@ -74,58 +80,88 @@ class BundleType(Enum): """ - AnVIL snapshots have no inherent notion of a "bundle". When indexing these - snapshots, we dynamically construct bundles by selecting individual entities - and following their foreign keys to discover associated entities. The - initial entity from which this graph traversal begins is termed the - "bundle entity", and its FQID serves as the basis for the bundle FQID. Each - member of this enumeration represents a strategy for selecting bundle - entities. - - Our primary such strategy is to use every biosample in a given snapshot as a - bundle entity. Biosamples were chosen for this role based on a desirable - balance between the size and number of the resulting bundles as well as the - degree of overlap between them. The implementation of the graph traversal is - tightly coupled to this choice, and switching to a different entity type - would require re-implementing much of the Plugin code. Primary bundles - consist of at least one biosample (the bundle entity), exactly one dataset, - and zero or more other entities of assorted types. - - Some snapshots include file entities that lack any foreign keys that - associate the file with any other entity. To ensure that these "orphaned" - files are indexed, they are also used as bundle entities. As with primary - bundles, the creation of these supplementary bundles depends on a - specifically tailored traversal implementation. Supplementary bundles always - consist of exactly two entities: one file (the bundle entity) and one - dataset. - - The `dataset.description` field is unusual in that it is not stored in - BigQuery and must be retrieved via Terra's DUOS API. There is only one - dataset per snapshot, which is referenced in all primary and supplementary - bundles. Therefore, only one request to DUOS per *snapshot* is necessary, - but if `description` is retrieved at the same time as the other dataset - fields, we will make one request per *bundle* instead, potentially - overloading the DUOS service. Our solution is to retrieve `description` only - in a dedicated bundle format, once per snapshot, and merge it with the other - dataset fields during aggregation. This bundle contains only a single - dataset entity with only the `description` field populated. + AnVIL snapshots have no inherent notion of a "bundle". During indexing, we + dynamically construct bundles by querying each table in the snapshot. This + class enumerates the tables that require special strategies for listing and + fetching their bundles. + + Primary bundles are defined by a biosample entity, termed the bundle entity. + Each primary bundle includes all of the bundle entity descendants and all of + those those entities' ancestors, which are discovered by iteratively + following foreign keys. Biosamples were chosen for this role based on a + desirable balance between the size and number of the resulting bundles as + well as the degree of overlap between them. The implementation of the graph + traversal is tightly coupled to this choice, and switching to a different + entity type would require re-implementing much of the Plugin code. Primary + bundles consist of at least one biosample (the bundle entity), exactly one + dataset, and zero or more other entities of assorted types. Primary bundles + never contain orphans because they are bijective to rows in the biosample + table. + + Supplementary bundles consist of batches of file entities, which may include + supplementary files, which lack any foreign keys that associate them with + any other entity. Non-supplementary files in the bundle are classified as + orphans. The bundle also includes a dataset entity linked to the + supplementary files. + + Duos bundles consist of a single dataset entity. This "entity" includes only + the dataset description retrieved from DUOS, while a copy of the BigQuery + row for this dataset is also included as an orphan. We chose this design + because there is only one dataset per snapshot, which is referenced in all + primary and supplementary bundles. Therefore, only one request to DUOS per + *snapshot* is necessary, but if `description` is retrieved at the same time + as the other dataset fields, we will make one request per *bundle* instead, + potentially overloading the DUOS service. Our solution is to retrieve + `description` only in a dedicated bundle format, once per snapshot, and + merge it with the other dataset fields during aggregation. + + All other bundles are replica bundles. Replica bundles consist of a batch of + rows from an arbitrary BigQuery table, which may or may not be described by + the AnVIL schema. Replica bundles only include orphans and have no links. """ primary = 'anvil_biosample' supplementary = 'anvil_file' duos = 'anvil_dataset' + def is_batched(self: Self | str) -> bool: + """ + >>> BundleType.primary.is_batched() + False + + >>> BundleType.is_batched('anvil_activity') + True + """ + if isinstance(self, str): + try: + self = BundleType(self) + except ValueError: + return True + return self not in (BundleType.primary, BundleType.duos) + class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON): table_name: str + batch_prefix: str | None -@attrs.frozen(kw_only=True) +@attrs.frozen(kw_only=True, eq=False) class TDRAnvilBundleFQID(TDRBundleFQID): - table_name: BundleType = attrs.field(converter=BundleType) + table_name: str + batch_prefix: str | None def to_json(self) -> TDRAnvilBundleFQIDJSON: - return dict(super().to_json(), - table_name=self.table_name.value) + return cast(TDRAnvilBundleFQIDJSON, super().to_json()) + + def __attrs_post_init__(self): + should_be_batched = BundleType.is_batched(self.table_name) + is_batched = self.is_batched + assert is_batched == should_be_batched, self + if is_batched: + assert len(self.batch_prefix) <= 8, self + + @property + def is_batched(self) -> bool: + return self.batch_prefix is not None class TDRAnvilBundle(AnvilBundle[TDRAnvilBundleFQID], TDRBundle): @@ -137,9 +173,14 @@ def canning_qualifier(cls) -> str: def add_entity(self, entity: EntityReference, version: str, - row: MutableJSON + row: MutableJSON, + *, + is_orphan: bool = False ) -> None: - assert entity not in self.entities, entity + target = self.orphans if is_orphan else self.entities + # In DUOS bundles, the dataset is represented as both as entity and an + # orphan + assert entity not in target, entity metadata = dict(row, version=version) if entity.entity_type == 'anvil_file': @@ -149,7 +190,7 @@ def add_entity(self, metadata.update(drs_uri=drs_uri, sha256='', crc32='') - self.entities[entity] = metadata + target[entity] = metadata def add_links(self, links: Iterable[EntityLink]): self.links.update(links) @@ -167,107 +208,186 @@ def _version(self): tzinfo=datetime.timezone.utc)) datarepo_row_uuid_version = 4 + batch_uuid_version = 5 bundle_uuid_version = 10 - def _count_subgraphs(self, source: TDRSourceSpec) -> int: - rows = self._run_sql(f''' + def _batch_uuid(self, + source: TDRSourceSpec, + table_name: str, + batch_prefix: str + ) -> str: + namespace = uuid.UUID('b8b3ac80-e035-4904-8b02-2d04f9e9a369') + batch_uuid = uuid.uuid5(namespace, f'{source}:{table_name}:{batch_prefix}') + return change_version(str(batch_uuid), + self.batch_uuid_version, + self.bundle_uuid_version) + + def count_bundles(self, source: TDRSourceSpec) -> int: + prefix = '' if source.prefix is None else source.prefix.common + primary_count = one(self._run_sql(f''' SELECT COUNT(*) AS count FROM {backtick(self._full_table_name(source, BundleType.primary.value))} - UNION ALL + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + '''))['count'] + duos_count = 0 if config.duos_service_url is None else one(self._run_sql(f''' SELECT COUNT(*) AS count - FROM {backtick(self._full_table_name(source, BundleType.supplementary.value))} - WHERE is_supplementary - ''') - return sum(row['count'] for row in rows) - - def _list_bundles(self, - source: TDRSourceRef, - prefix: str - ) -> list[TDRAnvilBundleFQID]: - spec = source.spec - primary = BundleType.primary.value - supplementary = BundleType.supplementary.value - duos = BundleType.duos.value - rows = list(self._run_sql(f''' - SELECT datarepo_row_id, {primary!r} AS table_name - FROM {backtick(self._full_table_name(spec, primary))} - WHERE STARTS_WITH(datarepo_row_id, '{prefix}') - UNION ALL - SELECT datarepo_row_id, {supplementary!r} AS table_name - FROM {backtick(self._full_table_name(spec, supplementary))} AS supp - WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{prefix}') - ''' + ( - '' - if config.duos_service_url is None else - f''' - UNION ALL - SELECT datarepo_row_id, {duos!r} AS table_name - FROM {backtick(self._full_table_name(spec, duos))} - ''' - ))) + FROM {backtick(self._full_table_name(source, BundleType.duos.value))} + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + '''))['count'] + sizes_by_table = self._batch_tables(source, prefix) + batched_count = sum(batch_size for (_, batch_size) in sizes_by_table.values()) + return primary_count + duos_count + batched_count + + def list_bundles(self, + source: TDRSourceRef, + prefix: str + ) -> list[TDRAnvilBundleFQID]: + self._assert_source(source) bundles = [] - duos_count = 0 - for row in rows: - # Reversibly tweak the entity UUID to prevent - # collisions between entity IDs and bundle IDs - bundle_uuid = uuids.change_version(row['datarepo_row_id'], - self.datarepo_row_uuid_version, - self.bundle_uuid_version) - # We intentionally omit the WHERE clause for datasets so that we can - # verify our assumption that each snapshot only contains rows for a - # single dataset. This verification is performed independently and - # concurrently for every partition, but only one partition actually - # emits the bundle. - if row['table_name'] == duos: - require(0 == duos_count) - duos_count += 1 - # Ensure that one partition will always contain the DUOS bundle - # regardless of the choice of common prefix - if not bundle_uuid.startswith(prefix): - continue + spec = source.spec + if config.duos_service_url is not None: + row = one(self._run_sql(f''' + SELECT datarepo_row_id + FROM {backtick(self._full_table_name(spec, BundleType.duos.value))} + ''')) + dataset_row_id = row['datarepo_row_id'] + # We intentionally omit the WHERE clause for datasets in order + # to verify our assumption that each snapshot only contains rows + # for a single dataset. This verification is performed + # independently and concurrently for every partition, but only + # one partition actually emits the bundle. + if dataset_row_id.startswith(prefix): + bundle_uuid = change_version(dataset_row_id, + self.datarepo_row_uuid_version, + self.bundle_uuid_version) + bundles.append(TDRAnvilBundleFQID( + uuid=bundle_uuid, + version=self._version, + source=source, + table_name=BundleType.duos.value, + batch_prefix=None, + )) + for row in self._run_sql(f''' + SELECT datarepo_row_id + FROM {backtick(self._full_table_name(spec, BundleType.primary.value))} + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + '''): + bundle_uuid = change_version(row['datarepo_row_id'], + self.datarepo_row_uuid_version, + self.bundle_uuid_version) bundles.append(TDRAnvilBundleFQID( - source=source, uuid=bundle_uuid, version=self._version, - table_name=BundleType(row['table_name']) + source=source, + table_name=BundleType.primary.value, + batch_prefix=None, )) + prefix_lengths_by_table = self._batch_tables(source.spec, prefix) + for table_name, (batch_prefix_length, _) in prefix_lengths_by_table.items(): + batch_prefixes = Prefix(common=prefix, + partition=batch_prefix_length - len(prefix)).partition_prefixes() + for batch_prefix in batch_prefixes: + bundle_uuid = self._batch_uuid(spec, table_name, batch_prefix) + bundles.append(TDRAnvilBundleFQID(uuid=bundle_uuid, + version=self._version, + source=source, + table_name=table_name, + batch_prefix=batch_prefix)) return bundles - def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID: - if 'table_name' not in fqid: - # Resolution of bundles without the table name is expensive, so we - # only support it during canning. - assert not config.is_in_lambda, ('Bundle FQID lacks table name', fqid) - source = self.source_from_json(fqid['source']) - entity_id = uuids.change_version(fqid['uuid'], - self.bundle_uuid_version, - self.datarepo_row_uuid_version) - rows = self._run_sql(' UNION ALL '.join(( - f''' - SELECT {bundle_type.value!r} AS table_name - FROM {backtick(self._full_table_name(source.spec, bundle_type.value))} - WHERE datarepo_row_id = {entity_id!r} - ''' - for bundle_type in BundleType - ))) - fqid = {**fqid, **one(rows)} - return super().resolve_bundle(fqid) - def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: - if bundle_fqid.table_name is BundleType.primary: + if bundle_fqid.table_name == BundleType.primary.value: log.info('Bundle %r is a primary bundle', bundle_fqid.uuid) return self._primary_bundle(bundle_fqid) - elif bundle_fqid.table_name is BundleType.supplementary: + elif bundle_fqid.table_name == BundleType.supplementary.value: log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid) return self._supplementary_bundle(bundle_fqid) - elif bundle_fqid.table_name is BundleType.duos: + elif bundle_fqid.table_name == BundleType.duos.value: assert config.duos_service_url is not None, bundle_fqid log.info('Bundle %r is a DUOS bundle', bundle_fqid.uuid) return self._duos_bundle(bundle_fqid) else: - assert False, bundle_fqid.table_name + log.info('Bundle %r is a replica bundle', bundle_fqid.uuid) + return self._replica_bundle(bundle_fqid) + + def _batch_tables(self, + source: TDRSourceSpec, + prefix: str, + ) -> dict[str, tuple[int, int]]: + """ + Find a batch prefix length that yields as close to 256 rows per batch + as possible for each table within the specified partition. The result's + keys are table names and its values are tuples where the first element + is the prefix length (*including* the partition prefix) and the second + element is the resulting number of batches. Tables are only included in + the result if they are non-empty and are used to produce batched bundle + formats (i.e. replica and supplementary). + + Because the partitions of a table do not contain exactly the same number + of bundles, calculating the batch size statistics for the entire table + at once produces a different result than performing the same calculation + for any individual partition. We expect the inconsistencies to average + out across partitions so that `count_bundles` and `list_bundles` give + consistent results as long the partition size is substantially larger + than the batch size. + + This method relies on BigQuery's `AVG` function, which is + nondeterministic for floating-point return values. The probability that + this affects this method's return value is very small, but nonzero. + https://cloud.google.com/bigquery/docs/reference/standard-sql/aggregate_functions#avg + """ + max_length = 4 + + def repeat(fmt): + return ', '.join(fmt.format(i=i) for i in range(1, max_length + 1)) + + target_size = 256 + prefix_len = len(prefix) + table_names = self.tdr.list_tables(source) + # This table is present in all snapshots. It is large and contains no + # useful metadata, so we skip indexing replicas from it. + table_names.discard('datarepo_row_ids') + table_names = sorted(filter(BundleType.is_batched, table_names)) + log.info('Calculating batch prefix lengths for partition %r of %d tables ' + 'in source %s', prefix, len(table_names), source) + # The extraneous outer 'SELECT *' works around a bug in BigQuery emulator + query = ' UNION ALL '.join(f'''( + SELECT * FROM ( + SELECT + {table_name!r} AS table_name, + {prefix_len} + LENGTH(CONCAT( + {repeat('IFNULL(p{i}, "")')} + )) AS batch_prefix_length, + AVG(num_rows) AS average_batch_size, + COUNT(*) AS num_batches + FROM ( + SELECT + {repeat(f'SUBSTR(datarepo_row_id, {prefix_len} + {{i}}, 1) AS p{{i}}')}, + COUNT(*) AS num_rows + FROM {backtick(self._full_table_name(source, table_name))} + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + GROUP BY ROLLUP ({repeat('p{i}')}) + ) + GROUP BY batch_prefix_length + ORDER BY ABS({target_size} - average_batch_size) + LIMIT 1 + ) + )''' for table_name in table_names) + + def result(row): + table_name = row['table_name'] + prefix_length = row['batch_prefix_length'] + average_size = row['average_batch_size'] + num_batches = row['num_batches'] + log.info('Selected batch prefix length %d for table %r (average ' + 'batch size %.1f, num batches %d)', + prefix_length, table_name, average_size, num_batches) + return table_name, (prefix_length, num_batches) + + return dict(map(result, self._run_sql(query))) def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + assert not bundle_fqid.is_batched, bundle_fqid source = bundle_fqid.source bundle_entity = self._bundle_entity(bundle_fqid) @@ -317,56 +437,80 @@ def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: return result def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: - entity_id = uuids.change_version(bundle_fqid.uuid, - self.bundle_uuid_version, - self.datarepo_row_uuid_version) + assert bundle_fqid.is_batched, bundle_fqid source = bundle_fqid.source.spec - table_name = bundle_fqid.table_name.value result = TDRAnvilBundle(fqid=bundle_fqid) - columns = self._columns(table_name) - bundle_entity = dict(one(self._run_sql(f''' - SELECT {', '.join(sorted(columns))} - FROM {backtick(self._full_table_name(source, table_name))} - WHERE datarepo_row_id = '{entity_id}' - '''))) - dataset_table = 'anvil_dataset' - columns = self._columns(dataset_table) - linked_entity = dict(one(self._run_sql(f''' - SELECT {', '.join(sorted(columns))} - FROM {backtick(self._full_table_name(source, dataset_table))} - '''))) - link_args = {} - for entity_type, row, arg in [ - ('anvil_file', bundle_entity, 'outputs'), - (dataset_table, linked_entity, 'inputs') - ]: - entity_ref = EntityReference(entity_type=entity_type, entity_id=row['datarepo_row_id']) - result.add_entity(entity_ref, self._version, row) - link_args[arg] = {entity_ref} - result.add_links({EntityLink(**link_args)}) + linked_file_refs = set() + for file_ref, file_row in self._get_batch(bundle_fqid): + is_supplementary = file_row['is_supplementary'] + result.add_entity(file_ref, + self._version, + dict(file_row), + is_orphan=not is_supplementary) + if is_supplementary: + linked_file_refs.add(file_ref) + dataset_ref, dataset_row = self._get_dataset(source) + result.add_entity(dataset_ref, self._version, dict(dataset_row)) + result.add_links([EntityLink(inputs={dataset_ref}, outputs=linked_file_refs)]) return result def _duos_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + assert not bundle_fqid.is_batched, bundle_fqid duos_info = self.tdr.get_duos(bundle_fqid.source) description = None if duos_info is None else duos_info.get('studyDescription') - entity_id = change_version(bundle_fqid.uuid, - self.bundle_uuid_version, - self.datarepo_row_uuid_version) - entity = EntityReference(entity_type='anvil_dataset', - entity_id=entity_id) + ref, row = self._get_dataset(bundle_fqid.source.spec) + expected_entity_id = change_version(bundle_fqid.uuid, + self.bundle_uuid_version, + self.datarepo_row_uuid_version) + assert ref.entity_id == expected_entity_id, (ref, bundle_fqid) bundle = TDRAnvilBundle(fqid=bundle_fqid) - bundle.add_entity(entity=entity, - version=self._version, - row={'description': description}) + bundle.add_entity(ref, self._version, {'description': description}) + # Classify as orphan to suppress the emission of a contribution + bundle.add_entity(ref, self._version, dict(row), is_orphan=True) return bundle + def _replica_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + assert bundle_fqid.is_batched, bundle_fqid + source = bundle_fqid.source.spec + result = TDRAnvilBundle(fqid=bundle_fqid) + batch = self._get_batch(bundle_fqid) + dataset = self._get_dataset(source) + for (ref, row) in itertools.chain([dataset], batch): + result.add_entity(ref, self._version, dict(row), is_orphan=True) + return result + + def _get_dataset(self, source: TDRSourceSpec) -> tuple[EntityReference, BigQueryRow]: + table_name = 'anvil_dataset' + columns = self._columns(table_name) + row = one(self._run_sql(f''' + SELECT {', '.join(sorted(columns))} + FROM {backtick(self._full_table_name(source, table_name))} + ''')) + ref = EntityReference(entity_type=table_name, entity_id=row['datarepo_row_id']) + return ref, row + + def _get_batch(self, + bundle_fqid: TDRAnvilBundleFQID + ) -> Iterable[tuple[EntityReference, BigQueryRow]]: + source = bundle_fqid.source.spec + batch_prefix = bundle_fqid.batch_prefix + table_name = bundle_fqid.table_name + columns = self._columns(table_name) + for row in self._run_sql(f''' + SELECT {', '.join(sorted(columns))} + FROM {backtick(self._full_table_name(source, table_name))} + WHERE STARTS_WITH(datarepo_row_id, {batch_prefix!r}) + '''): + ref = EntityReference(entity_type=table_name, entity_id=row['datarepo_row_id']) + yield ref, row + def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference: source = bundle_fqid.source bundle_uuid = bundle_fqid.uuid entity_id = uuids.change_version(bundle_uuid, self.bundle_uuid_version, self.datarepo_row_uuid_version) - table_name = bundle_fqid.table_name.value + table_name = bundle_fqid.table_name pk_column = table_name.removeprefix('anvil_') + '_id' bundle_entity = one(self._run_sql(f''' SELECT {pk_column} @@ -677,7 +821,8 @@ def convert_column(value): log.debug('Retrieved %i entities of type %r', len(rows), entity_type) missing = keys - {row[pk_column] for row in rows} require(not missing, - f'Required entities not found in {table_name}: {missing}') + f'Found only {len(rows)} out of {len(keys)} expected rows in {table_name}. ' + f'Missing entities: {missing}') return rows else: return [] @@ -688,6 +833,11 @@ def convert_column(value): } def _columns(self, table_name: str) -> set[str]: - columns = set(self._schema_columns[table_name]) - columns.add('datarepo_row_id') - return columns + try: + columns = self._schema_columns[table_name] + except KeyError: + return {'*'} + else: + columns = set(columns) + columns.add('datarepo_row_id') + return columns diff --git a/src/azul/plugins/repository/tdr_hca/__init__.py b/src/azul/plugins/repository/tdr_hca/__init__.py index 320b0a4f1b..e022196832 100644 --- a/src/azul/plugins/repository/tdr_hca/__init__.py +++ b/src/azul/plugins/repository/tdr_hca/__init__.py @@ -278,17 +278,21 @@ def _parse_drs_uri(self, class Plugin(TDRPlugin[TDRHCABundle, TDRSourceSpec, TDRSourceRef, TDRBundleFQID]): - def _count_subgraphs(self, source: TDRSourceSpec) -> int: - rows = self._run_sql(f''' - SELECT COUNT(*) AS count - FROM {backtick(self._full_table_name(source, 'links'))} - ''') + def count_bundles(self, source: TDRSourceSpec) -> int: + prefix = '' if source.prefix is None else source.prefix.common + query = f''' + SELECT COUNT(*) AS count + FROM {backtick(self._full_table_name(source, 'links'))} + WHERE STARTS_WITH(datarepo_row_id, {prefix!r}) + ''' + rows = self._run_sql(query) return one(rows)['count'] - def _list_bundles(self, - source: TDRSourceRef, - prefix: str - ) -> list[TDRBundleFQID]: + def list_bundles(self, + source: TDRSourceRef, + prefix: str + ) -> list[TDRBundleFQID]: + self._assert_source(source) current_bundles = self._query_unique_sorted(f''' SELECT links_id, version FROM {backtick(self._full_table_name(source.spec, 'links'))} @@ -466,7 +470,8 @@ def _retrieve_entities(self, log.debug('Retrieved %i entities of type %r', len(rows), entity_type) missing = expected - {row[pk_column] for row in rows} require(not missing, - f'Required entities not found in {table_name}: {missing}') + f'Found only {len(rows)} out of {len(entity_ids)} expected rows in {table_name}. ' + f'Missing entities: {missing}') return rows def _in(self, diff --git a/src/azul/service/avro_pfb.py b/src/azul/service/avro_pfb.py index d88be244de..eb1f3b7fbe 100644 --- a/src/azul/service/avro_pfb.py +++ b/src/azul/service/avro_pfb.py @@ -17,7 +17,6 @@ ClassVar, MutableSet, Self, - Sequence, ) from uuid import ( UUID, @@ -315,18 +314,15 @@ def pfb_schema_from_field_types(field_types: FieldTypes) -> JSON: return avro_pfb_schema(entity_schemas) -def pfb_schema_from_replicas(replicas: Iterable[JSON] - ) -> tuple[Sequence[str], JSON]: - schemas_by_replica_type = {} +def pfb_schema_from_replicas(replicas: Iterable[JSON]) -> list[JSON]: + schemas_by_replica_type: dict[str, MutableJSON] = {} for replica in replicas: replica_type, replica_contents = replica['replica_type'], replica['contents'] _update_replica_schema(schema=schemas_by_replica_type, path=(replica_type,), key=replica_type, value=replica_contents) - schemas_by_replica_type = sorted(schemas_by_replica_type.items()) - keys, values = zip(*schemas_by_replica_type) - return keys, avro_pfb_schema(values) + return list(schemas_by_replica_type.values()) def avro_pfb_schema(azul_avro_schema: Iterable[JSON]) -> JSON: diff --git a/src/azul/service/elasticsearch_service.py b/src/azul/service/elasticsearch_service.py index 2b6b81631c..177a16c87b 100644 --- a/src/azul/service/elasticsearch_service.py +++ b/src/azul/service/elasticsearch_service.py @@ -192,14 +192,18 @@ def process_response(self, response: Response) -> Response: @cached_property def prepared_filters(self) -> TranslatedFilters: - filters_json = self.filters.reify(self.plugin, limit_access=self._limit_access()) + limit_access = self.service.always_limit_access or self._limit_access + filters_json = self.filters.reify(self.plugin, limit_access=limit_access) return self._translate_filters(filters_json) + @property @abstractmethod def _limit_access(self) -> bool: """ Whether to enforce the managed access controls during filter - reification. + reification, provided that the service allows such conditional + enforcement of access. If it doesn't, the return value should be + ignored, and access must be enforced unconditionally. """ raise NotImplementedError diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index c82222fda2..f695a7659f 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -1985,7 +1985,7 @@ class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta): @property def entity_type(self) -> str: - return 'files' + return self.implicit_hub_type if self.include_orphans else 'files' @property def included_fields(self) -> list[FieldPath]: @@ -2001,6 +2001,11 @@ def included_fields(self) -> list[FieldPath]: def implicit_hub_type(self) -> str: return self.service.metadata_plugin(self.catalog).implicit_hub_type + @property + def include_orphans(self) -> bool: + special_fields = self.service.metadata_plugin(self.catalog).special_fields + return self.filters.explicit.keys() == {special_fields.implicit_hub_id} + @attrs.frozen(kw_only=True) class ReplicaKeys: """ @@ -2019,8 +2024,11 @@ def _replica_keys(self) -> Iterable[ReplicaKeys]: hub_type = self.implicit_hub_type request = self._create_request() for hit in request.scan(): + replica_id = one(hit['contents'][hub_type])['document_id'] + if self.entity_type != hub_type: + replica_id = one(replica_id) yield self.ReplicaKeys(hub_id=hit['entity_id'], - replica_id=one(one(hit['contents'][hub_type])['document_id'])) + replica_id=replica_id) def _all_replicas(self) -> Iterable[JSON]: emitted_replica_ids = set() @@ -2037,14 +2045,7 @@ def _all_replicas(self) -> Iterable[JSON]: if replica_id not in emitted_replica_ids: num_new_replicas += 1 yield replica.to_dict() - # Note that this will be zero for replicas that use implicit - # hubs, in which case there are actually many hubs - explicit_hub_count = len(replica.hub_ids) - # We don't have to track the IDs of replicas with only one - # hub, since we know that there are no other hubs that could - # cause their re-emission. - if explicit_hub_count != 1: - emitted_replica_ids.add(replica_id) + emitted_replica_ids.add(replica_id) log.info('Found %d replicas (%d already emitted) from page of %d hubs', num_replicas, num_replicas - num_new_replicas, len(page)) @@ -2107,8 +2108,11 @@ def format(cls) -> ManifestFormat: def create_file(self) -> tuple[str, Optional[str]]: plugin = self.service.metadata_plugin(self.catalog) - replicas = self._all_replicas() - replicas, replica_types, pfb_schema = plugin.verbatim_pfb_schema(replicas) + replicas = list(self._all_replicas()) + replica_schemas = plugin.verbatim_pfb_schema(replicas) + replica_schemas.sort(key=itemgetter('name')) + replica_types = [s['name'] for s in replica_schemas] + pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas) pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types, links=False) def pfb_entities(): diff --git a/src/azul/service/repository_service.py b/src/azul/service/repository_service.py index 0ea5889f3a..e3257278b6 100644 --- a/src/azul/service/repository_service.py +++ b/src/azul/service/repository_service.py @@ -374,3 +374,7 @@ def _hit_to_doc(hit: Hit) -> JSON: if file_version is not None: assert file_version == file['version'] return file + + @property + def always_limit_access(self) -> bool: + return False diff --git a/src/azul/terra.py b/src/azul/terra.py index a468c382e3..e5046850a7 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -39,10 +39,14 @@ bigquery, ) from google.cloud.bigquery import ( + DatasetReference, QueryJob, QueryJobConfig, QueryPriority, ) +from google.cloud.bigquery.table import ( + RowIterator, +) from more_itertools import ( one, ) @@ -504,13 +508,21 @@ def run_sql(self, query: str) -> BigQueryRows: else: assert False if log.isEnabledFor(logging.DEBUG): - log.debug('Job info: %s', json.dumps(self._job_info(job))) + log.debug('Job info: %s', json.dumps(self._job_info(job, result))) return result + def list_tables(self, source: TDRSourceSpec) -> set[str]: + bigquery = self._bigquery(self.credentials.project_id) + ref = DatasetReference(project=source.subdomain, dataset_id=source.name) + return { + table.to_api_repr()['tableReference']['tableId'] + for table in bigquery.list_tables(ref) + } + def _trunc_query(self, query: str) -> str: return trunc_ellipses(query, 2048) - def _job_info(self, job: QueryJob) -> JSON: + def _job_info(self, job: QueryJob, result: RowIterator) -> JSON: # noinspection PyProtectedMember stats = job._properties['statistics']['query'] if config.debug < 2: @@ -518,6 +530,7 @@ def _job_info(self, job: QueryJob) -> JSON: stats = {k: v for k, v in stats.items() if k not in ignore} return { 'job_id': job.job_id, + 'total_rows': result.total_rows, 'stats': stats, 'query': self._trunc_query(job.query) } diff --git a/src/azul/uuids.py b/src/azul/uuids.py index 194217e9e5..ad1017ad1d 100644 --- a/src/azul/uuids.py +++ b/src/azul/uuids.py @@ -124,7 +124,7 @@ def change_version(uuid: str, old_version: int, new_version: int) -> str: prefix, version, suffix = uuid[:14], uuid[14], uuid[15:] version = int(version, 16) assert version == old_version, (uuid, version, old_version) - uuid = prefix + hex(new_version)[2:] + suffix + uuid = f'{prefix}{new_version:x}{suffix}' assert UUID(uuid).version == new_version, (uuid, old_version) if new_version in (1, 3, 4, 5): validate_uuid(uuid) diff --git a/terraform/authentication.tf.json.template.py b/terraform/authentication.tf.json.template.py index 782c1581b9..cf291378b5 100644 --- a/terraform/authentication.tf.json.template.py +++ b/terraform/authentication.tf.json.template.py @@ -60,6 +60,7 @@ "title": f"azul_{config.deployment_stage}", "permissions": [ "bigquery.jobs.create", + "bigquery.tables.list", *[ f'bigquery.{resource}.{action}' for resource in ('capacityCommitments', 'reservations') diff --git a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json index 7da9d5f3cd..9859b200f1 100644 --- a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json +++ b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json @@ -5,5 +5,30 @@ "version": "2022-06-01T00:00:00.000000Z" } }, - "links": [] + "links": [], + "orphans": { + "anvil_dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "consent_group": [ + "DS-BDIS" + ], + "data_modality": [], + "data_use_permission": [ + "DS-BDIS" + ], + "datarepo_row_id": "2370f948-2783-4eb6-afea-e022897f4dcf", + "dataset_id": "52ee7665-7033-63f2-a8d9-ce8e32666739", + "owner": [ + "Debbie Nickerson" + ], + "principal_investigator": [], + "registered_identifier": [ + "phs000693" + ], + "source_datarepo_row_ids": [ + "workspace_attributes:7a22b629-9d81-4e4d-9297-f9e44ed760bc" + ], + "title": "ANVIL_CMG_UWASH_DS_BDIS", + "version": "2022-06-01T00:00:00.000000Z" + } + } } \ No newline at end of file diff --git a/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json b/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json index 422b461de6..8a119af5c6 100644 --- a/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json +++ b/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json @@ -3,8 +3,9 @@ "anvil_activity": { "rows": [ { - "activity_id": "", - "activity_type": "", + "activity_id": "95aa5a62-0c42-48c7-aa70-e9987cfd9824", + "activity_type": "other", + "datarepo_row_id": "85342e55-8897-481e-a2c3-b7344071a72c", "generated_file_id": [], "source_datarepo_row_ids": [], "used_biosample_id": [], @@ -15,9 +16,10 @@ "anvil_alignmentactivity": { "rows": [ { - "activity_type": "", - "alignmentactivity_id": "", + "activity_type": "alignment", + "alignmentactivity_id": "bb22e6de-7704-4435-8684-a34b1e4ff188", "data_modality": [], + "datarepo_row_id": "a6f3f829-dec6-4be8-9d53-2c963f9206fc", "generated_file_id": [], "reference_assembly": [], "source_datarepo_row_ids": [], @@ -28,11 +30,12 @@ "anvil_assayactivity": { "rows": [ { - "activity_type": "", + "activity_type": "assay", "antibody_id": [], "assay_type": "", - "assayactivity_id": "", + "assayactivity_id": "d9150ab0-8e4a-46ab-9501-4b91c6f548e1", "data_modality": [], + "datarepo_row_id": "9258bb9f-1e2b-4139-9226-3588154a4632", "generated_file_id": [], "source_datarepo_row_ids": [], "used_biosample_id": [] @@ -241,13 +244,30 @@ "anvil_variantcallingactivity": { "rows": [ { - "activity_type": "", + "activity_type": "variant calling", "data_modality": [], + "datarepo_row_id": "c826d5b0-5e86-432b-ab53-36eced42ece0", "generated_file_id": [], "reference_assembly": [], "source_datarepo_row_ids": [], "used_file_id": [], - "variantcallingactivity_id": "" + "variantcallingactivity_id": "8db89e5e-1c8a-487e-b0c1-b26218ac6b7b" + } + ] + }, + "non_schema_orphan_table": { + "rows": [ + { + "datarepo_row_id": "9687b86d-a2ae-a083-b910-a16bcbef1ba4", + "non_schema_column": "spam" + }, + { + "datarepo_row_id": "28ed0f3a-157b-417b-a05a-48f57f9d3a34", + "non_schema_column": "eggs" + }, + { + "datarepo_row_id": "9db5952c-c454-49d9-8a62-5abb026701c0", + "non_schema_column": "baked beans" } ] } diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json index 5d3fd82432..2e09718c18 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json @@ -437,7 +437,8 @@ "entity_id": "1509ef40-d1ba-440d-b298-16b7c173dcd4", "replica_type": "anvil_sequencingactivity", "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6" + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "2370f948-2783-4eb6-afea-e022897f4dcf" ], "contents": { "activity_type": "Sequencing", @@ -871,7 +872,8 @@ "entity_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "replica_type": "anvil_file", "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6" + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "2370f948-2783-4eb6-afea-e022897f4dcf" ], "contents": { "data_modality": [], @@ -904,6 +906,7 @@ "replica_type": "anvil_diagnosis", "hub_ids": [ "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "2370f948-2783-4eb6-afea-e022897f4dcf", "3b17377b-16b1-431c-9967-e5d01fc5923f" ], "contents": { @@ -1477,7 +1480,9 @@ "_source": { "entity_id": "2370f948-2783-4eb6-afea-e022897f4dcf", "replica_type": "anvil_dataset", - "hub_ids": [], + "hub_ids": [ + "2370f948-2783-4eb6-afea-e022897f4dcf" + ], "contents": { "consent_group": [ "DS-BDIS" @@ -1748,6 +1753,7 @@ }, "replica_type": "anvil_file", "hub_ids": [ + "2370f948-2783-4eb6-afea-e022897f4dcf", "3b17377b-16b1-431c-9967-e5d01fc5923f" ] } @@ -2190,6 +2196,7 @@ "entity_id": "816e364e-1193-4e5b-a91a-14e4b009157c", "replica_type": "anvil_sequencingactivity", "hub_ids": [ + "2370f948-2783-4eb6-afea-e022897f4dcf", "3b17377b-16b1-431c-9967-e5d01fc5923f" ], "contents": { @@ -2939,6 +2946,7 @@ "replica_type": "anvil_biosample", "hub_ids": [ "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "2370f948-2783-4eb6-afea-e022897f4dcf", "3b17377b-16b1-431c-9967-e5d01fc5923f" ], "contents": { @@ -3519,6 +3527,7 @@ "replica_type": "anvil_diagnosis", "hub_ids": [ "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "2370f948-2783-4eb6-afea-e022897f4dcf", "3b17377b-16b1-431c-9967-e5d01fc5923f" ], "contents": { @@ -4107,6 +4116,7 @@ "replica_type": "anvil_donor", "hub_ids": [ "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "2370f948-2783-4eb6-afea-e022897f4dcf", "3b17377b-16b1-431c-9967-e5d01fc5923f" ] } diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json index 9c461ee994..ec875d5fa8 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json @@ -231,5 +231,6 @@ "anvil_biosample/826dea02-e274-4ffe-aabc-eb3db63ad068" ] } - ] + ], + "orphans": {} } diff --git a/test/indexer/data/9a135c9a-069b-a90e-b588-eaf8d1aeeac9.tdr.anvil.json b/test/indexer/data/9a135c9a-069b-a90e-b588-eaf8d1aeeac9.tdr.anvil.json new file mode 100644 index 0000000000..a9ff2d7af3 --- /dev/null +++ b/test/indexer/data/9a135c9a-069b-a90e-b588-eaf8d1aeeac9.tdr.anvil.json @@ -0,0 +1,44 @@ +{ + "entities": {}, + "links": {}, + "orphans": { + "non_schema_orphan_table/9687b86d-a2ae-a083-b910-a16bcbef1ba4": { + "datarepo_row_id": "9687b86d-a2ae-a083-b910-a16bcbef1ba4", + "non_schema_column": "spam", + "version": "2022-06-01T00:00:00.000000Z" + }, + "non_schema_orphan_table/28ed0f3a-157b-417b-a05a-48f57f9d3a34": { + "datarepo_row_id": "28ed0f3a-157b-417b-a05a-48f57f9d3a34", + "non_schema_column": "eggs", + "version": "2022-06-01T00:00:00.000000Z" + }, + "non_schema_orphan_table/9db5952c-c454-49d9-8a62-5abb026701c0": { + "datarepo_row_id": "9db5952c-c454-49d9-8a62-5abb026701c0", + "non_schema_column": "baked beans", + "version": "2022-06-01T00:00:00.000000Z" + }, + "anvil_dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "consent_group": [ + "DS-BDIS" + ], + "data_modality": [], + "data_use_permission": [ + "DS-BDIS" + ], + "datarepo_row_id": "2370f948-2783-4eb6-afea-e022897f4dcf", + "dataset_id": "52ee7665-7033-63f2-a8d9-ce8e32666739", + "owner": [ + "Debbie Nickerson" + ], + "principal_investigator": [], + "registered_identifier": [ + "phs000693" + ], + "source_datarepo_row_ids": [ + "workspace_attributes:7a22b629-9d81-4e4d-9297-f9e44ed760bc" + ], + "title": "ANVIL_CMG_UWASH_DS_BDIS", + "version": "2022-06-01T00:00:00.000000Z" + } + } +} \ No newline at end of file diff --git a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json index d7b65274b4..24aab5fdc7 100644 --- a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json +++ b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json @@ -70,7 +70,8 @@ "entity_id": "aaa96233-bf27-44c7-82df-b4dc15ad4d9d", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ], "replica_type": "links" } @@ -3485,7 +3486,8 @@ "entity_id": "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", "replica_type": "sequence_file", "hub_ids": [ - "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb" + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ] }, "_type": "_doc" @@ -3524,7 +3526,8 @@ "replica_type": "cell_suspension", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ] }, "_type": "_doc" @@ -3555,7 +3558,8 @@ "entity_id": "70d1af4a-82c8-478a-8960-e9028b3616ca", "replica_type": "sequence_file", "hub_ids": [ - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ] }, "_type": "_doc" @@ -3608,7 +3612,8 @@ "replica_type": "specimen_from_organism", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ] }, "_type": "_doc" @@ -3688,7 +3693,9 @@ }, "entity_id": "e8642221-4c2c-4fd7-b926-a68bce363c88", "replica_type": "project", - "hub_ids": [] + "hub_ids": [ + "e8642221-4c2c-4fd7-b926-a68bce363c88" + ] }, "_type": "_doc" }, @@ -3750,7 +3757,8 @@ "entity_id": "7b07b9d0-cc0e-4098-9f64-f4a569f7d746", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ], "replica_type": "donor_organism" }, @@ -3793,7 +3801,8 @@ "entity_id": "9c32cf70-3ed7-4720-badc-5ee71e8a38af", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ], "replica_type": "library_preparation_protocol" }, @@ -3830,7 +3839,8 @@ "entity_id": "61e629ed-0135-4492-ac8a-5c4ab3ccca8a", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ], "replica_type": "sequencing_protocol" }, @@ -3856,7 +3866,8 @@ "entity_id": "771ddaf6-3a4f-4314-97fe-6294ff8e25a4", "hub_ids": [ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" + "70d1af4a-82c8-478a-8960-e9028b3616ca", + "e8642221-4c2c-4fd7-b926-a68bce363c88" ], "replica_type": "process" }, diff --git a/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json b/test/indexer/data/c67e7adb-1a9c-a3b9-bc91-eb10a428374a.tdr.anvil.json similarity index 51% rename from test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json rename to test/indexer/data/c67e7adb-1a9c-a3b9-bc91-eb10a428374a.tdr.anvil.json index e689fbe3f7..dd3946fbcc 100644 --- a/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json +++ b/test/indexer/data/c67e7adb-1a9c-a3b9-bc91-eb10a428374a.tdr.anvil.json @@ -27,8 +27,8 @@ "data_modality": [], "datarepo_row_id": "6b0f6c0f-5d80-4242-accb-840921351cd5", "file_format": ".txt", - "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_md5sum": "S/GBrRjzZAQYqh3rdiPYzA==", + "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_name": "CCDG_13607_B01_GRM_WGS_2019-02-19_chr15.recalibrated_variants.annotated.coding.txt", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_size": 15079345, @@ -53,5 +53,45 @@ "anvil_file/6b0f6c0f-5d80-4242-accb-840921351cd5" ] } - ] + ], + "orphans": { + "anvil_file/15b76f9c-6b46-433f-851d-34e89f1b9ba6": { + "data_modality": [], + "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "file_format": ".vcf.gz", + "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67", + "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz", + "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", + "file_size": 213021639, + "is_supplementary": false, + "reference_assembly": [], + "source_datarepo_row_ids": [ + "file_inventory:81d16471-97ac-48fe-99a0-73d9ec62c2c0" + ], + "version": "2022-06-01T00:00:00.000000Z", + "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", + "sha256": "", + "crc32": "" + }, + "anvil_file/3b17377b-16b1-431c-9967-e5d01fc5923f": { + "data_modality": [], + "datarepo_row_id": "3b17377b-16b1-431c-9967-e5d01fc5923f", + "file_format": ".bam", + "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37", + "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam", + "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", + "file_size": 3306845592, + "is_supplementary": false, + "reference_assembly": [], + "source_datarepo_row_ids": [ + "file_inventory:9658d94a-511d-4b49-82c3-d0cb07e0cff2" + ], + "version": "2022-06-01T00:00:00.000000Z", + "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", + "sha256": "", + "crc32": "" + } + } } \ No newline at end of file diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index 263f833d1f..1cbc52ba0e 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -105,13 +105,14 @@ def bundle_fqid(cls, *, uuid, version=None, - table_name=BundleType.primary + table_name=BundleType.primary.value, ) -> TDRAnvilBundleFQID: assert version is None, 'All AnVIL bundles should use the same version' return TDRAnvilBundleFQID(source=cls.source, uuid=uuid, version=cls.version, - table_name=table_name) + table_name=table_name, + batch_prefix='' if BundleType.is_batched(table_name) else None) @classmethod def primary_bundle(cls) -> TDRAnvilBundleFQID: @@ -119,13 +120,18 @@ def primary_bundle(cls) -> TDRAnvilBundleFQID: @classmethod def supplementary_bundle(cls) -> TDRAnvilBundleFQID: - return cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5', - table_name=BundleType.supplementary) + return cls.bundle_fqid(uuid='c67e7adb-1a9c-a3b9-bc91-eb10a428374a', + table_name=BundleType.supplementary.value) @classmethod def duos_bundle(cls) -> TDRAnvilBundleFQID: return cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', - table_name=BundleType.duos) + table_name=BundleType.duos.value) + + @classmethod + def replica_bundle(cls) -> TDRAnvilBundleFQID: + return cls.bundle_fqid(uuid='9a135c9a-069b-a90e-b588-eaf8d1aeeac9', + table_name='non_schema_orphan_table') class TestAnvilIndexer(AnvilIndexerTestCase, @@ -166,15 +172,33 @@ def test_indexing(self): def test_list_and_fetch_bundles(self): source_ref = self.source self._make_mock_tdr_tables(source_ref) - expected_bundle_fqids = sorted([ + canned_bundle_fqids = [ self.primary_bundle(), self.supplementary_bundle(), - self.duos_bundle() + self.duos_bundle(), + self.replica_bundle(), + ] + expected_bundle_fqids = sorted(canned_bundle_fqids + [ + # Replica bundles for the AnVIL schema tables, which we don't can + self.bundle_fqid(uuid='cd512d30-7d5f-af35-88f0-efaadabfc17a', + table_name='anvil_activity'), + self.bundle_fqid(uuid='2fd53d37-b8a3-a797-8477-4056bf2473bb', + table_name='anvil_alignmentactivity'), + self.bundle_fqid(uuid='a52d7f0a-25c3-a4cf-af95-0fd8ecec8127', + table_name='anvil_assayactivity'), + self.bundle_fqid(uuid='45e532bb-5af7-a977-8939-24933c154380', + table_name='anvil_diagnosis'), + self.bundle_fqid(uuid='8cc4686a-76cd-a411-a11c-ec3a9fa1d417', + table_name='anvil_donor'), + self.bundle_fqid(uuid='88092968-d1ec-ad7e-9112-4b38bd0df4c2', + table_name='anvil_sequencingactivity'), + self.bundle_fqid(uuid='976fc781-4a33-a062-a67d-2c068e4d5e72', + table_name='anvil_variantcallingactivity') ]) plugin = self.plugin_for_source_spec(source_ref.spec) bundle_fqids = sorted(plugin.list_bundles(source_ref, '')) self.assertEqual(expected_bundle_fqids, bundle_fqids) - for bundle_fqid in bundle_fqids: + for bundle_fqid in canned_bundle_fqids: with self.subTest(bundle_fqid=bundle_fqid): canned_bundle = self._load_canned_bundle(bundle_fqid) assert isinstance(canned_bundle, TDRAnvilBundle) @@ -183,6 +207,7 @@ def test_list_and_fetch_bundles(self): self.assertEqual(canned_bundle.fqid, bundle.fqid) self.assertEqual(canned_bundle.entities, bundle.entities) self.assertEqual(canned_bundle.links, bundle.links) + self.assertEqual(canned_bundle.orphans, bundle.orphans) class TestAnvilIndexerWithIndexesSetUp(AnvilIndexerTestCase): @@ -233,7 +258,26 @@ def test_dataset_description(self): self.assertDictEqual(doc_counts, { DocumentType.aggregate: 1, DocumentType.contribution: 2, - # No replica is emitted for the primary dataset because we dropped - # the files (hubs) from its bundle above - **({DocumentType.replica: 1} if config.enable_replicas else {}) + **({DocumentType.replica: 2} if config.enable_replicas else {}) }) + + def test_orphans(self): + bundle = self._index_canned_bundle(self.replica_bundle()) + assert isinstance(bundle, TDRAnvilBundle) + dataset_entity_id = one( + ref.entity_id + for ref in bundle.orphans + if ref.entity_type == 'anvil_dataset' + ) + expected = bundle.orphans if config.enable_replicas else {} + actual = {} + hits = self._get_all_hits() + for hit in hits: + qualifier, doc_type = self._parse_index_name(hit) + self.assertEqual(DocumentType.replica, doc_type) + source = hit['_source'] + self.assertEqual(source['hub_ids'], [dataset_entity_id]) + ref = EntityReference(entity_type=source['replica_type'], + entity_id=source['entity_id']) + actual[ref] = source['contents'] + self.assertEqual(expected, actual) diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index 2fbf8206ba..3b24b211cf 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -288,7 +288,7 @@ def test_deletion(self): self.assertNotEqual(doc.contents, {}) elif doc_type is DocumentType.contribution: doc = Contribution.from_index(field_types, hit) - self.assertEqual(bundle_fqid.upcast(), doc.coordinates.bundle) + self.assertEqual(bundle_fqid, doc.coordinates.bundle) self.assertFalse(doc.coordinates.deleted) elif doc_type is DocumentType.replica: pass @@ -315,7 +315,7 @@ def test_deletion(self): if doc_type is DocumentType.contribution: doc = Contribution.from_index(field_types, hit) docs_by_entity[doc.entity].append(doc) - self.assertEqual(bundle_fqid.upcast(), doc.coordinates.bundle) + self.assertEqual(bundle_fqid, doc.coordinates.bundle) else: # Since there is only one bundle and it was deleted, # nothing should be aggregated @@ -388,7 +388,7 @@ def test_duplicate_notification(self): coordinates = [ ContributionCoordinates( entity=entity, - bundle=bundle.fqid.upcast(), + bundle=bundle.fqid, deleted=False ).with_catalog(self.catalog) ] @@ -553,7 +553,7 @@ def test_multi_entity_contributing_bundles(self): entity_id=old_file_uuid, entity_type='files') deletion = ContributionCoordinates(entity=entity, - bundle=bundle_fqid.upcast(), + bundle=bundle_fqid, deleted=True) index_name, document_id = deletion.index_name, deletion.document_id hits = [ diff --git a/test/indexer/test_indexer_controller.py b/test/indexer/test_indexer_controller.py index 0aea488842..7cd3d0aca1 100644 --- a/test/indexer/test_indexer_controller.py +++ b/test/indexer/test_indexer_controller.py @@ -232,7 +232,7 @@ def test_contribute_and_aggregate(self): notified_fqids = list(map(self._fqid_from_notification, notifications)) notified_bundles = [bundles[fqid] for fqid in notified_fqids] mock_plugin.fetch_bundle.side_effect = notified_bundles - mock_plugin.resolve_bundle.side_effect = DSSBundleFQID.from_json + mock_plugin.bundle_fqid_from_json.side_effect = DSSBundleFQID.from_json mock_plugin.sources = [source] with patch.object(IndexService, 'repository_plugin', return_value=mock_plugin): with patch.object(BundlePartition, 'max_partition_size', 4): @@ -241,7 +241,7 @@ def test_contribute_and_aggregate(self): # Assert plugin calls by controller expected_calls = [call(fqid.to_json()) for fqid in notified_fqids] - self.assertEqual(expected_calls, mock_plugin.resolve_bundle.mock_calls) + self.assertEqual(expected_calls, mock_plugin.bundle_fqid_from_json.mock_calls) expected_calls = list(map(call, notified_fqids)) self.assertEqual(expected_calls, mock_plugin.fetch_bundle.mock_calls) diff --git a/test/indexer/test_projects.py b/test/indexer/test_projects.py index c96cb4e490..d76dc52980 100644 --- a/test/indexer/test_projects.py +++ b/test/indexer/test_projects.py @@ -89,7 +89,7 @@ def test_no_duplicate_files_in_specimen(self): coordinates = AggregateCoordinates(entity=entity) else: coordinates = ContributionCoordinates(entity=entity, - bundle=bundle_fqid.upcast(), + bundle=bundle_fqid, deleted=False) result = self.es_client.get(index=coordinates.index_name, id=coordinates.document_id) diff --git a/test/indexer/test_tdr.py b/test/indexer/test_tdr.py index 62aab9dfe5..3e59985cbb 100644 --- a/test/indexer/test_tdr.py +++ b/test/indexer/test_tdr.py @@ -197,8 +197,7 @@ def setUpClass(cls): '--dataset=' + cls.source.spec.name ]) - def _make_mock_tdr_tables(self, - source: TDRSourceRef) -> None: + def _make_mock_tdr_tables(self, source: TDRSourceRef) -> JSON: tables = self._load_canned_file_version(uuid=source.id, version=None, extension='tables.tdr')['tables'] @@ -206,6 +205,7 @@ def _make_mock_tdr_tables(self, self._make_mock_entity_table(source.spec, table_name, table_rows['rows']) + return tables def _make_mock_entity_table(self, source: TDRSourceSpec, diff --git a/test/integration_test.py b/test/integration_test.py index 3d3b635abd..c9a79f0a4e 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -3,10 +3,8 @@ ) from collections.abc import ( Iterable, - Iterator, Mapping, Sequence, - Set, ) from concurrent.futures.thread import ( ThreadPoolExecutor, @@ -20,10 +18,8 @@ BytesIO, TextIOWrapper, ) -import itertools from itertools import ( count, - starmap, ) import json import os @@ -43,7 +39,6 @@ Callable, ContextManager, IO, - Optional, Protocol, TypedDict, cast, @@ -80,6 +75,7 @@ first, grouper, one, + only, ) from openapi_spec_validator import ( validate_spec, @@ -107,6 +103,9 @@ from azul.chalice import ( AzulChaliceApp, ) +from azul.collections import ( + alist, +) from azul.drs import ( AccessMethod, ) @@ -117,10 +116,11 @@ http_client, ) from azul.indexer import ( + Prefix, SourceJSON, SourceRef, + SourceSpec, SourcedBundleFQID, - SourcedBundleFQIDJSON, ) from azul.indexer.document import ( EntityReference, @@ -154,7 +154,6 @@ from azul.plugins.repository.tdr_anvil import ( BundleType, TDRAnvilBundleFQID, - TDRAnvilBundleFQIDJSON, ) from azul.portal_service import ( PortalService, @@ -287,73 +286,34 @@ def managed_access_sources_by_catalog(self managed_access_sources[catalog].add(ref) return managed_access_sources - def _list_partitions(self, - catalog: CatalogName, - *, - min_bundles: int, - public_1st: bool - ) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]: - """ - Iterate through the sources in the given catalog and yield partitions of - bundle FQIDs until a desired minimum number of bundles are found. For - each emitted source, every partition is included, even if it's empty. - """ - total_bundles = 0 - sources = sorted(config.sources(catalog)) - self.random.shuffle(sources) - if public_1st: - managed_access_sources = frozenset( + def _choose_source(self, + catalog: CatalogName, + *, + public: bool | None = None + ) -> SourceRef | None: + plugin = self.repository_plugin(catalog) + sources = set(config.sources(catalog)) + if public is not None: + ma_sources = { str(source.spec) + # This would raise a KeyError during the can bundle script test + # due to it using a mock catalog, so we only evaluate it when + # it's actually needed for source in self.managed_access_sources_by_catalog[catalog] - ) - index = first( - i - for i, source in enumerate(sources) - if source not in managed_access_sources - ) - sources[0], sources[index] = sources[index], sources[0] - plugin = self.azul_client.repository_plugin(catalog) - # This iteration prefers sources occurring first, so we shuffle them - # above to neutralize the bias. - for source in sources: - source = plugin.resolve_source(source) - source = plugin.partition_source(catalog, source) - for prefix in source.spec.prefix.partition_prefixes(): - new_fqids = self.azul_client.list_bundles(catalog, source, prefix) - total_bundles += len(new_fqids) - yield source, prefix, new_fqids - # We postpone this check until after we've yielded all partitions in - # the current source to ensure test coverage for handling multiple - # partitions per source - if total_bundles >= min_bundles: - break + } + self.assertIsSubset(ma_sources, sources) + if public is True: + sources -= ma_sources + elif public is False: + sources &= ma_sources + else: + assert False, public + if len(sources) == 0: + assert public is False, 'Every catalog should contain a public source' + return None else: - log.warning('Checked all sources and found only %d bundles instead of the ' - 'expected minimum %d', total_bundles, min_bundles) - - def _list_managed_access_bundles(self, - catalog: CatalogName - ) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]: - sources = self.azul_client.catalog_sources(catalog) - # We need at least one managed_access bundle per IT. To index them with - # remote_reindex and avoid collateral bundles, we use as specific a - # prefix as possible. - for source in self.managed_access_sources_by_catalog[catalog]: - assert str(source.spec) in sources - source = self.repository_plugin(catalog).partition_source(catalog, source) - bundle_fqids = sorted( - bundle_fqid - for bundle_fqid in self.azul_client.list_bundles(catalog, source, prefix='') - if not ( - # DUOS bundles are too sparse to fulfill the managed access tests - config.is_anvil_enabled(catalog) - and cast(TDRAnvilBundleFQID, bundle_fqid).table_name is BundleType.duos - ) - ) - bundle_fqid = self.random.choice(bundle_fqids) - prefix = bundle_fqid.uuid[:8] - new_fqids = self.azul_client.list_bundles(catalog, source, prefix) - yield source, prefix, new_fqids + source = self.random.choice(sorted(sources)) + return plugin.resolve_source(source) class IndexingIntegrationTest(IntegrationTestCase, AlwaysTearDownTestCase): @@ -429,6 +389,8 @@ class Catalog: name: CatalogName bundles: set[SourcedBundleFQID] notifications: list[JSON] + public_source: SourceRef + ma_source: SourceRef | None def _wait_for_indexer(): self.azul_client.wait_for_indexer() @@ -445,12 +407,23 @@ def _wait_for_indexer(): catalogs: list[Catalog] = [] for catalog in config.integration_test_catalogs: if index: - notifications, fqids = self._prepare_notifications(catalog) + public_source = self._choose_source(catalog, public=True) + ma_source = self._choose_source(catalog, public=False) + notifications, fqids = self._prepare_notifications(catalog, + sources=alist(public_source, ma_source)) else: - notifications, fqids = [], set() + with self._service_account_credentials: + fqids = self._get_indexed_bundles(catalog) + indexed_sources = {fqid.source for fqid in fqids} + ma_sources = self.managed_access_sources_by_catalog[catalog] + public_source = one(s for s in indexed_sources if s not in ma_sources) + ma_source = only(s for s in indexed_sources if s in ma_sources) + notifications = [] catalogs.append(Catalog(name=catalog, bundles=fqids, - notifications=notifications)) + notifications=notifications, + public_source=public_source, + ma_source=ma_source)) if index: for catalog in catalogs: @@ -466,12 +439,9 @@ def _wait_for_indexer(): self._test_manifest_tagging_race(catalog.name) self._test_dos_and_drs(catalog.name) self._test_repository_files(catalog.name) - if index: - bundle_fqids = catalog.bundles - else: - with self._service_account_credentials: - bundle_fqids = self._get_indexed_bundles(catalog.name) - self._test_managed_access(catalog=catalog.name, bundle_fqids=bundle_fqids) + self._test_managed_access(catalog=catalog.name, + public_source=catalog.public_source, + ma_source=catalog.ma_source) if index and delete: # FIXME: Test delete notifications @@ -701,7 +671,7 @@ def _get_one_outer_file(self, catalog: CatalogName) -> JSON: self.fail('No files found') return one(hits) - def _source_spec(self, catalog: CatalogName, entity: JSON) -> TDRSourceSpec: + def _source_spec(self, catalog: CatalogName, entity: JSON) -> SourceSpec: if config.is_hca_enabled(catalog): field = 'sourceSpec' elif config.is_anvil_enabled(catalog): @@ -755,9 +725,10 @@ def _uuid_column_name(self, catalog: CatalogName) -> str: def _test_dos_and_drs(self, catalog: CatalogName): if config.is_dss_enabled(catalog) and config.dss_direct_access: - _, file = self._get_one_inner_file(catalog) - self._test_dos(catalog, file) - self._test_drs(catalog, file) + outer_file, inner_file = self._get_one_inner_file(catalog) + source = self._source_spec(catalog, outer_file) + self._test_dos(catalog, inner_file) + self._test_drs(catalog, source, inner_file) @property def _service_account_credentials(self) -> ContextManager: @@ -802,8 +773,8 @@ def _check_endpoint(self, method: str, path: str, *, - args: Optional[Mapping[str, Any]] = None, - endpoint: Optional[furl] = None, + args: Mapping[str, Any] | None = None, + endpoint: furl | None = None, fetch: bool = False ) -> bytes: if endpoint is None: @@ -1140,13 +1111,14 @@ def _validate_file_content(self, content: ReadableFileObject, file: FileInnerEnt def _validate_file_response(self, response: urllib3.HTTPResponse, - source: TDRSourceSpec, + source: SourceSpec, file: FileInnerEntity): """ Note: The response object must have been obtained with stream=True """ try: - if source.name == 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732': + special = 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732' + if isinstance(source, TDRSourceSpec) and source.name == special: # All files in this snapshot were truncated to zero bytes by the # Broad to save costs. The metadata is not a reliable indication # of these files' actual size. @@ -1156,7 +1128,11 @@ def _validate_file_response(self, finally: response.close() - def _test_drs(self, catalog: CatalogName, file: FileInnerEntity): + def _test_drs(self, + catalog: CatalogName, + source: SourceSpec, + file: FileInnerEntity + ) -> None: repository_plugin = self.azul_client.repository_plugin(catalog) drs = repository_plugin.drs_client() for access_method in AccessMethod: @@ -1167,7 +1143,7 @@ def _test_drs(self, catalog: CatalogName, file: FileInnerEntity): self.assertIsNone(access.headers) if access.method is AccessMethod.https: response = self._get_url(GET, furl(access.url), stream=True) - self._validate_file_response(response, file) + self._validate_file_response(response, source, file) elif access.method is AccessMethod.gs: content = self._get_gs_url_content(furl(access.url), size=self.num_fastq_bytes) self._validate_file_content(content, file) @@ -1202,7 +1178,7 @@ def _test_dos(self, catalog: CatalogName, file: FileInnerEntity): def _get_gs_url_content(self, url: furl, - size: Optional[int] = None + size: int | None = None ) -> BytesIO: self.assertEquals('gs', url.scheme) path = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] @@ -1223,33 +1199,21 @@ def _validate_fastq_content(self, content: ReadableFileObject): self.assertTrue(lines[2].startswith(b'+')) def _prepare_notifications(self, - catalog: CatalogName + catalog: CatalogName, + sources: Iterable[SourceRef] ) -> tuple[JSONs, set[SourcedBundleFQID]]: - bundle_fqids: set[SourcedBundleFQID] = set() + plugin = self.repository_plugin(catalog) + bundle_fqids = set() notifications = [] - - def update(source: SourceRef, - prefix: str, - partition_bundle_fqids: Iterable[SourcedBundleFQID]): - bundle_fqids.update(partition_bundle_fqids) - notifications.append(self.azul_client.reindex_message(catalog, - source, - prefix)) - - list(starmap(update, self._list_managed_access_bundles(catalog))) - num_bundles = max(self.min_bundles - len(bundle_fqids), 1) - log.info('Selected %d bundles to satisfy managed access coverage; ' - 'selecting at least %d more', len(bundle_fqids), num_bundles) - # _list_partitions selects both public and managed access sources at random. - # If we don't index at least one public source, every request would need - # service account credentials and we couldn't compare the responses for - # public and managed access data. `public_1st` ensures that at least - # one of the sources will be public because sources are indexed starting - # with the first one yielded by the iteration. - list(starmap(update, self._list_partitions(catalog, - min_bundles=num_bundles, - public_1st=True))) - + for source in sources: + source = plugin.partition_source(catalog, source) + # Some partitions may be empty, but we include them anyway to + # ensure test coverage for handling multiple partitions per source + for partition_prefix in source.spec.prefix.partition_prefixes(): + bundle_fqids.update(self.azul_client.list_bundles(catalog, source, partition_prefix)) + notifications.append(self.azul_client.reindex_message(catalog, + source, + partition_prefix)) # Index some bundles again to test that we handle duplicate additions. # Note: random.choices() may pick the same element multiple times so # some notifications may end up being sent three or more times. @@ -1263,7 +1227,7 @@ def update(source: SourceRef, def _get_indexed_bundles(self, catalog: CatalogName, - filters: Optional[JSON] = None + filters: JSON | None = None ) -> set[SourcedBundleFQID]: indexed_fqids = set() hits = self._get_entities(catalog, 'bundles', filters) @@ -1272,36 +1236,36 @@ def _get_indexed_bundles(self, source, bundle = one(hit['sources']), one(hit['bundles']) source = SourceJSON(id=source[special_fields.source_id], spec=source[special_fields.source_spec]) - bundle_fqid = SourcedBundleFQIDJSON(uuid=bundle[special_fields.bundle_uuid], - version=bundle[special_fields.bundle_version], - source=source) - if config.is_anvil_enabled(catalog): - # Every primary bundle contains 1 or more biosamples, 1 dataset, - # and 0 or more other entities. Biosamples only occur in primary - # bundles. - if len(hit['biosamples']) > 0: - table_name = BundleType.primary - # Supplementary bundles contain only 1 file and 1 dataset. - elif len(hit['files']) > 0: - table_name = BundleType.supplementary - # DUOS bundles contain only 1 dataset. - elif len(hit['datasets']) > 0: - table_name = BundleType.duos - else: - assert False, hit - bundle_fqid = cast(TDRAnvilBundleFQIDJSON, bundle_fqid) - bundle_fqid['table_name'] = table_name.value - bundle_fqid = self.repository_plugin(catalog).resolve_bundle(bundle_fqid) + source = self.repository_plugin(catalog).source_from_json(source) + bundle_fqid = SourcedBundleFQID(uuid=bundle[special_fields.bundle_uuid], + version=bundle[special_fields.bundle_version], + source=source) indexed_fqids.add(bundle_fqid) return indexed_fqids def _assert_catalog_complete(self, catalog: CatalogName, - bundle_fqids: Set[SourcedBundleFQID] + bundle_fqids: set[SourcedBundleFQID] ) -> None: with self.subTest('catalog_complete', catalog=catalog): expected_fqids = bundle_fqids - if not config.is_anvil_enabled(catalog): + if config.is_anvil_enabled(catalog): + # Replica bundles do not add contributions to the index and + # therefore do not appear anywhere in the service response + # FIXME: Integration test does not assert that replica bundles are indexed + # https://github.com/DataBiosphere/azul/issues/6647 + replica_fqids = { + bundle_fqid + for bundle_fqid in expected_fqids + if cast(TDRAnvilBundleFQID, bundle_fqid).table_name not in ( + BundleType.primary.value, + BundleType.supplementary.value, + BundleType.duos.value, + ) + } + expected_fqids -= replica_fqids + log.info('Ignoring replica bundles %r', replica_fqids) + else: expected_fqids = set(self.azul_client.filter_obsolete_bundle_versions(expected_fqids)) obsolete_fqids = bundle_fqids - expected_fqids if obsolete_fqids: @@ -1356,7 +1320,7 @@ def _assert_catalog_empty(self, catalog: CatalogName): def _get_entities(self, catalog: CatalogName, entity_type: EntityType, - filters: Optional[JSON] = None + filters: JSON | None = None ) -> MutableJSONs: entities = [] size = 100 @@ -1388,40 +1352,34 @@ def _assert_indices_exist(self, catalog: CatalogName): def _test_managed_access(self, catalog: CatalogName, - bundle_fqids: Set[SourcedBundleFQID] + public_source: SourceRef, + ma_source: SourceRef | None, ) -> None: with self.subTest('managed_access', catalog=catalog): - indexed_source_ids = {fqid.source.id for fqid in bundle_fqids} - managed_access_sources = self.managed_access_sources_by_catalog[catalog] - managed_access_source_ids = {source.id for source in managed_access_sources} - self.assertIsSubset(managed_access_source_ids, indexed_source_ids) - - if not managed_access_sources: + if ma_source is None: if config.deployment_stage in ('dev', 'sandbox'): # There should always be at least one managed-access source # indexed and tested on the default catalog for these deployments self.assertNotEqual(catalog, config.it_catalog_for(config.default_catalog)) self.skipTest(f'No managed access sources found in catalog {catalog!r}') - with self.subTest('managed_access_indices', catalog=catalog): - self._test_managed_access_indices(catalog, managed_access_source_ids) + self._test_managed_access_indices(catalog, public_source, ma_source) with self.subTest('managed_access_repository_files', catalog=catalog): - files = self._test_managed_access_repository_files(catalog, managed_access_source_ids) + files = self._test_managed_access_repository_files(catalog, ma_source) with self.subTest('managed_access_summary', catalog=catalog): self._test_managed_access_summary(catalog, files) with self.subTest('managed_access_repository_sources', catalog=catalog): - public_source_ids = self._test_managed_access_repository_sources(catalog, - indexed_source_ids, - managed_access_source_ids) - with self.subTest('managed_access_manifest', catalog=catalog): - source_id = self.random.choice(sorted(public_source_ids & indexed_source_ids)) - self._test_managed_access_manifest(catalog, files, source_id) + self._test_managed_access_repository_sources(catalog, + public_source, + ma_source) + with self.subTest('managed_access_manifest', catalog=catalog): + self._test_managed_access_manifest(catalog, files, public_source) def _test_managed_access_repository_sources(self, catalog: CatalogName, - indexed_source_ids: Set[str], - managed_access_source_ids: Set[str] - ) -> set[str]: + public_source: SourceRef, + ma_source: SourceRef + ) -> None: """ Test the managed access controls for the /repository/sources endpoint :return: the set of public sources @@ -1434,11 +1392,14 @@ def list_source_ids() -> set[str]: return {source['sourceId'] for source in cast(JSONs, response['sources'])} with self._service_account_credentials: - self.assertIsSubset(indexed_source_ids, list_source_ids()) + self.assertIsSubset({public_source.id, ma_source.id}, list_source_ids()) with self._public_service_account_credentials: public_source_ids = list_source_ids() + self.assertIn(public_source.id, public_source_ids) + self.assertNotIn(ma_source.id, public_source_ids) with self._unregistered_service_account_credentials: self.assertEqual(public_source_ids, list_source_ids()) + self.assertEqual(public_source_ids, list_source_ids()) invalid_auth = OAuth2('foo') with self.assertRaises(UnauthorizedError): TDRClient.for_registered_user(invalid_auth) @@ -1446,13 +1407,11 @@ def list_source_ids() -> set[str]: invalid_client = OAuth2Client(credentials_provider=invalid_provider) with self._authorization_context(invalid_client): self.assertEqual(401, self._get_url_unchecked(GET, url).status) - self.assertEqual(set(), list_source_ids() & managed_access_source_ids) - self.assertEqual(public_source_ids, list_source_ids()) - return public_source_ids def _test_managed_access_indices(self, catalog: CatalogName, - managed_access_source_ids: Set[str] + public_source: SourceRef, + ma_source: SourceRef ) -> JSONs: """ Test the managed-access controls for the /index/bundles and @@ -1462,11 +1421,6 @@ def _test_managed_access_indices(self, """ special_fields = self.metadata_plugin(catalog).special_fields - - def source_id_from_hit(hit: JSON) -> str: - sources: JSONs = hit['sources'] - return one(sources)[special_fields.source_id] - bundle_type = self._bundle_type(catalog) project_type = self._project_type(catalog) @@ -1479,31 +1433,22 @@ def source_id_from_hit(hit: JSON) -> str: hits = self._get_entities(catalog, project_type, filters=filters) if accessible is None: unfiltered_hits = hits - accessible_sources, inaccessible_sources = set(), set() for hit in hits: - source_id = source_id_from_hit(hit) - source_accessible = source_id not in managed_access_source_ids + source_id = one(hit['sources'])[special_fields.source_id] + source_accessible = {public_source.id: True, ma_source.id: False}[source_id] hit_accessible = one(hit[project_type])[special_fields.accessible] self.assertEqual(source_accessible, hit_accessible, hit['entryId']) if accessible is not None: self.assertEqual(accessible, hit_accessible) - if source_accessible: - accessible_sources.add(source_id) - else: - inaccessible_sources.add(source_id) - self.assertIsDisjoint(accessible_sources, inaccessible_sources) - self.assertIsDisjoint(managed_access_source_ids, accessible_sources) - self.assertEqual(set() if accessible else managed_access_source_ids, - inaccessible_sources) self.assertIsNotNone(unfiltered_hits, 'Cannot recover from subtest failure') bundle_fqids = self._get_indexed_bundles(catalog) hit_source_ids = {fqid.source.id for fqid in bundle_fqids} - self.assertEqual(set(), hit_source_ids & managed_access_source_ids) + self.assertEqual(hit_source_ids, {public_source.id}) source_filter = { special_fields.source_id: { - 'is': list(managed_access_source_ids) + 'is': [ma_source.id] } } params = { @@ -1512,18 +1457,18 @@ def source_id_from_hit(hit: JSON) -> str: } url = config.service_endpoint.set(path=('index', bundle_type), args=params) response = self._get_url_unchecked(GET, url) - self.assertEqual(403 if managed_access_source_ids else 200, response.status) + self.assertEqual(403, response.status) with self._service_account_credentials: bundle_fqids = self._get_indexed_bundles(catalog, filters=source_filter) hit_source_ids = {fqid.source.id for fqid in bundle_fqids} - self.assertEqual(managed_access_source_ids, hit_source_ids) + self.assertEqual({ma_source.id}, hit_source_ids) return unfiltered_hits def _test_managed_access_repository_files(self, catalog: CatalogName, - managed_access_source_ids: set[str] + ma_source: SourceRef ) -> JSONs: """ Test the managed access controls for the /repository/files endpoint @@ -1533,7 +1478,7 @@ def _test_managed_access_repository_files(self, with self._service_account_credentials: files = self._get_entities(catalog, 'files', filters={ special_fields.source_id: { - 'is': list(managed_access_source_ids) + 'is': [ma_source.id] } }) managed_access_file_urls = { @@ -1570,7 +1515,7 @@ def _get_summary_file_count() -> int: def _test_managed_access_manifest(self, catalog: CatalogName, files: JSONs, - source_id: str + public_source: SourceRef ) -> None: """ Test the managed access controls for the /manifest/files endpoint and @@ -1592,7 +1537,7 @@ def bundle_uuids(hit: JSON) -> set[str]: for file in files if len(file['sources']) == 1 )) - filters = {special_fields.source_id: {'is': [source_id]}} + filters = {special_fields.source_id: {'is': [public_source.id]}} params = {'size': 1, 'catalog': catalog, 'filters': json.dumps(filters)} files_url = furl(url=endpoint, path='index/files', args=params) response = self._get_url_json(GET, files_url) @@ -1890,10 +1835,7 @@ def _test_catalog(self, catalog: config.Catalog): fqid = self.bundle_fqid(catalog.name) log.info('Canning bundle %r from catalog %r', fqid, catalog.name) with tempfile.TemporaryDirectory() as d: - self._can_bundle(source=str(fqid.source.spec), - uuid=fqid.uuid, - version=fqid.version, - output_dir=d) + self._can_bundle(fqid, output_dir=d) generated_file = one(os.listdir(d)) with open(os.path.join(d, generated_file)) as f: bundle_json = json.load(f) @@ -1918,7 +1860,7 @@ def _test_catalog(self, catalog: config.Catalog): } self.assertIsSubset(set(stitched), metadata_ids) elif metadata_plugin_name == 'anvil': - self.assertEqual({'entities', 'links'}, bundle_json.keys()) + self.assertEqual({'entities', 'links', 'orphans'}, bundle_json.keys()) entities, links = bundle_json['entities'], bundle_json['links'] self.assertIsInstance(entities, dict) self.assertIsInstance(links, list) @@ -1960,26 +1902,29 @@ def test_can_bundle_canned_repository(self): self._test_catalog(mock_catalog) def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID: - # Skip through empty partitions - bundle_fqids = itertools.chain.from_iterable( - bundle_fqids - for _, _, bundle_fqids in self._list_partitions(catalog, - min_bundles=1, - public_1st=False) - ) + source = self._choose_source(catalog) + # The plugin will raise an exception if the source lacks a prefix + source = source.with_prefix(Prefix.of_everything) + bundle_fqids = self.repository_plugin(catalog).list_bundles(source, '') return self.random.choice(sorted(bundle_fqids)) def _can_bundle(self, - source: str, - uuid: str, - version: str, + fqid: SourcedBundleFQID, output_dir: str ) -> None: args = [ - '--source', source, - '--uuid', uuid, - '--version', version, - '--output-dir', output_dir + '--uuid', fqid.uuid, + '--version', fqid.version, + '--source', str(fqid.source.spec), + *( + [ + '--table-name', fqid.table_name, + '--batch-prefix', 'null' if fqid.batch_prefix is None else fqid.batch_prefix, + ] + if isinstance(fqid, TDRAnvilBundleFQID) else + [] + ), + '--output-dir', output_dir, ] return self._can_bundle_main(args) diff --git a/test/service/test_filters.py b/test/service/test_filters.py index 8eaf55126a..b782a5671a 100644 --- a/test/service/test_filters.py +++ b/test/service/test_filters.py @@ -33,7 +33,8 @@ class TestFilterReification(AzulTestCase): source_id='sourceId', source_spec=MagicMock(), bundle_uuid=MagicMock(), - bundle_version=MagicMock() + bundle_version=MagicMock(), + implicit_hub_id=MagicMock() ) @property diff --git a/test/service/test_manifest.py b/test/service/test_manifest.py index f57b88d012..dac94047db 100644 --- a/test/service/test_manifest.py +++ b/test/service/test_manifest.py @@ -1705,9 +1705,11 @@ def bundles(cls) -> list[SourcedBundleFQID]: return [ cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', version=cls.version), - cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5', + cls.bundle_fqid(uuid='c67e7adb-1a9c-a3b9-bc91-eb10a428374a', version=cls.version), cls.bundle_fqid(uuid='826dea02-e274-affe-aabc-eb3db63ad068', + version=cls.version), + cls.bundle_fqid(uuid='9a135c9a-069b-a90e-b588-eaf8d1aeeac9', version=cls.version) ] @@ -1717,7 +1719,7 @@ def test_compact_manifest(self): expected = [ ( 'bundles.bundle_uuid', - '6b0f6c0f-5d80-a242-accb-840921351cd5', + 'c67e7adb-1a9c-a3b9-bc91-eb10a428374a', '826dea02-e274-affe-aabc-eb3db63ad068', '826dea02-e274-affe-aabc-eb3db63ad068' ), @@ -2085,18 +2087,39 @@ def test_compact_manifest(self): self._assert_tsv(expected, response) def test_verbatim_jsonl_manifest(self): - response = self._get_manifest(ManifestFormat.verbatim_jsonl, filters={}) - self.assertEqual(200, response.status_code) - expected = { - # Consolidate entities with the same replica (i.e. datasets) - json_hash(entity).digest(): { - 'type': entity_ref.entity_type, - 'value': entity, + all_entities, linked_entities = self._canned_entities() + cases = [ + ({}, False), + ({'datasets.title': {'is': ['ANVIL_CMG_UWASH_DS_BDIS']}}, False), + # Orphans should be included only when filtering by dataset ID + ({'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}}, True) + ] + for filters, expect_orphans in cases: + with self.subTest(filters=filters): + response = self._get_manifest(ManifestFormat.verbatim_jsonl, filters=filters) + self.assertEqual(200, response.status_code) + expected_rows = list(all_entities if expect_orphans else linked_entities) + self._assert_jsonl(expected_rows, response) + + def _canned_entities(self): + + def hash_entities(entities: dict[EntityReference, JSON]) -> dict[str, JSON]: + return { + json_hash(contents).digest(): { + 'type': ref.entity_type, + 'value': contents + } + for ref, contents in entities.items() } - for bundle in self.bundles() - for entity_ref, entity in self._load_canned_bundle(bundle).entities.items() - }.values() - self._assert_jsonl(list(expected), response) + + linked_entities_by_hash, all_entities_by_hash = {}, {} + for bundle_fqid in self.bundles(): + bundle = self._load_canned_bundle(bundle_fqid) + linked_entities_by_hash.update(hash_entities(bundle.entities)) + all_entities_by_hash.update(hash_entities(bundle.orphans)) + all_entities_by_hash.update(linked_entities_by_hash) + + return all_entities_by_hash.values(), linked_entities_by_hash.values() def test_verbatim_pfb_manifest(self): response = self._get_manifest(ManifestFormat.verbatim_pfb, filters={}) diff --git a/test/service/test_request_builder.py b/test/service/test_request_builder.py index 218f964313..5e89aab8e6 100644 --- a/test/service/test_request_builder.py +++ b/test/service/test_request_builder.py @@ -65,7 +65,8 @@ def special_fields(self) -> SpecialFields: return SpecialFields(source_id='sourceId', source_spec='sourceSpec', bundle_uuid='bundleUuid', - bundle_version='bundleVersion') + bundle_version='bundleVersion', + implicit_hub_id='projectId') @property def _field_mapping(self) -> MetadataPlugin._FieldMapping: