diff --git a/.github/PULL_REQUEST_TEMPLATE/upgrade.md b/.github/PULL_REQUEST_TEMPLATE/upgrade.md
index 117b82d285..136e5b8373 100644
--- a/.github/PULL_REQUEST_TEMPLATE/upgrade.md
+++ b/.github/PULL_REQUEST_TEMPLATE/upgrade.md
@@ -107,7 +107,7 @@ Connected issue: #0000
### Operator
-- [ ] At least one hour has passed since `anvildev.shared` was last deployed
+- [ ] At least 24 hours have passed since `anvildev.shared` was last deployed
- [ ] Ran `script/export_inspector_findings.py` against `anvildev`, imported results to [Google Sheet](https://docs.google.com/spreadsheets/d/1RWF7g5wRKWPGovLw4jpJGX_XMi8aWLXLOvvE5rxqgH8) and posted screenshot of relevant1 findings as a comment on the connected issue.
- [ ] Propagated the `deploy:shared`, `deploy:gitlab`, `deploy:runner` and `backup:gitlab` labels to the next promotion PRs or this PR carries none of these labels
- [ ] Propagated any specific instructions related to the `deploy:shared`, `deploy:gitlab`, `deploy:runner` and `backup:gitlab` labels, from the description of this PR to that of the next promotion PRs or this PR carries none of these labels
diff --git a/.github/pull_request_template.md.template.py b/.github/pull_request_template.md.template.py
index eb28c95dc8..1fd2f544bd 100644
--- a/.github/pull_request_template.md.template.py
+++ b/.github/pull_request_template.md.template.py
@@ -958,7 +958,7 @@ def emit(t: T, target_branch: str):
*iif(t is T.upgrade, [
{
'type': 'cli',
- 'content': 'At least one hour has passed since `anvildev.shared` was last deployed'
+ 'content': 'At least 24 hours have passed since `anvildev.shared` was last deployed'
},
{
'type': 'cli',
diff --git a/deployments/anvilprod/environment.py b/deployments/anvilprod/environment.py
index e7e584c0b8..9f96cd53f4 100644
--- a/deployments/anvilprod/environment.py
+++ b/deployments/anvilprod/environment.py
@@ -975,7 +975,6 @@ def env() -> Mapping[str, Optional[str]]:
repository=dict(name='tdr_anvil')),
sources=list(filter(None, sources.values())))
for atlas, catalog, sources in [
- ('anvil', 'anvil7', anvil7_sources),
('anvil', 'anvil8', anvil8_sources),
]
for suffix, internal in [
diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py
index dab3c1bca0..89693fcf64 100644
--- a/deployments/hammerbox/environment.py
+++ b/deployments/hammerbox/environment.py
@@ -989,7 +989,6 @@ def env() -> Mapping[str, Optional[str]]:
repository=dict(name='tdr_anvil')),
sources=list(filter(None, sources.values())))
for atlas, catalog, sources in [
- ('anvil', 'anvil7', anvil7_sources),
('anvil', 'anvil8', anvil8_sources),
]
for suffix, internal in [
diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py
index 079999b289..8ce9214a9a 100644
--- a/deployments/prod/environment.py
+++ b/deployments/prod/environment.py
@@ -1181,6 +1181,7 @@ def mkdict(previous_catalog: dict[str, str],
mksrc('bigquery', 'datarepo-c9158593', 'lungmap_prod_20037472ea1d4ddb9cd356a11a6f0f76__20220307_20241002_lm8'),
mksrc('bigquery', 'datarepo-35a6d7ca', 'lungmap_prod_3a02d15f9c6a4ef7852b4ddec733b70b__20241001_20241002_lm8'),
mksrc('bigquery', 'datarepo-131a1234', 'lungmap_prod_4ae8c5c91520437198276935661f6c84__20231004_20241002_lm8'),
+ mksrc('bigquery', 'datarepo-936db385', 'lungmap_prod_6135382f487d4adb9cf84d6634125b68__20230207_20241106_lm8'),
mksrc('bigquery', 'datarepo-3c4905d2', 'lungmap_prod_834e0d1671b64425a8ab022b5000961c__20241001_20241002_lm8'),
mksrc('bigquery', 'datarepo-d7447983', 'lungmap_prod_f899709cae2c4bb988f0131142e6c7ec__20220310_20241002_lm8'),
mksrc('bigquery', 'datarepo-c11ef363', 'lungmap_prod_fdadee7e209745d5bf81cc280bd8348e__20240206_20241002_lm8'),
diff --git a/scripts/can_bundle.py b/scripts/can_bundle.py
index 9314e10ff5..a2021b27c1 100644
--- a/scripts/can_bundle.py
+++ b/scripts/can_bundle.py
@@ -1,6 +1,8 @@
"""
-Download manifest and metadata for a given bundle from the given repository
-source and store them as separate JSON files in the index test data directory.
+Download the contents of a given bundle from the given repository source and
+store it as a single JSON file. Users are expected to be familiar with the
+structure of the bundle FQIDs for the given source and provide the appropriate
+attributes.
Note: silently overwrites the destination file.
"""
@@ -15,10 +17,6 @@
import sys
import uuid
-from more_itertools import (
- one,
-)
-
from azul import (
cache,
config,
@@ -45,6 +43,7 @@
from azul.types import (
AnyJSON,
AnyMutableJSON,
+ JSON,
)
log = logging.getLogger(__name__)
@@ -52,34 +51,45 @@
def main(argv):
parser = argparse.ArgumentParser(description=__doc__, formatter_class=AzulArgumentHelpFormatter)
- default_catalog = config.default_catalog
- plugin_cls = RepositoryPlugin.load(default_catalog)
- plugin = plugin_cls.create(default_catalog)
- if len(plugin.sources) == 1:
- source_arg = {'default': str(one(plugin.sources))}
- else:
- source_arg = {'required': True}
parser.add_argument('--source', '-s',
- **source_arg,
+ required=True,
help='The repository source containing the bundle')
parser.add_argument('--uuid', '-b',
required=True,
help='The UUID of the bundle to can.')
parser.add_argument('--version', '-v',
- help='The version of the bundle to can (default: the latest version).')
+ help='The version of the bundle to can. Required for HCA, ignored for AnVIL.')
+ parser.add_argument('--table-name',
+ help='The BigQuery table of the bundle to can. Only applicable for AnVIL.')
+ parser.add_argument('--batch-prefix',
+ help='The batch prefix of the bundle to can. Only applicable for AnVIL. '
+ 'Use "null" for non-batched bundle formats.')
parser.add_argument('--output-dir', '-O',
default=os.path.join(config.project_root, 'test', 'indexer', 'data'),
help='The path to the output directory (default: %(default)s).')
parser.add_argument('--redaction-key', '-K',
help='Provide a key to redact confidential or sensitive information from the output files')
args = parser.parse_args(argv)
- bundle = fetch_bundle(args.source, args.uuid, args.version)
+ fqid_fields = parse_fqid_fields(args)
+ bundle = fetch_bundle(args.source, fqid_fields)
if args.redaction_key:
redact_bundle(bundle, args.redaction_key.encode())
save_bundle(bundle, args.output_dir)
-def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle:
+def parse_fqid_fields(args: argparse.Namespace) -> JSON:
+ fields = {'uuid': args.uuid, 'version': args.version}
+ if args.table_name is not None:
+ fields['table_name'] = args.table_name
+ batch_prefix = args.batch_prefix
+ if batch_prefix is not None:
+ if batch_prefix == 'null':
+ batch_prefix = None
+ fields['batch_prefix'] = batch_prefix
+ return fields
+
+
+def fetch_bundle(source: str, fqid_args: JSON) -> Bundle:
for catalog in config.catalogs:
plugin = plugin_for(catalog)
try:
@@ -90,10 +100,8 @@ def fetch_bundle(source: str, bundle_uuid: str, bundle_version: str) -> Bundle:
log.debug('Searching for %r in catalog %r', source, catalog)
for plugin_source_spec in plugin.sources:
if source_ref.spec.eq_ignoring_prefix(plugin_source_spec):
- fqid = SourcedBundleFQIDJSON(source=source_ref.to_json(),
- uuid=bundle_uuid,
- version=bundle_version)
- fqid = plugin.resolve_bundle(fqid)
+ fqid = SourcedBundleFQIDJSON(source=source_ref.to_json(), **fqid_args)
+ fqid = plugin.bundle_fqid_from_json(fqid)
bundle = plugin.fetch_bundle(fqid)
log.info('Fetched bundle %r version %r from catalog %r.',
fqid.uuid, fqid.version, catalog)
diff --git a/scripts/post_deploy_tdr.py b/scripts/post_deploy_tdr.py
index 98c680f312..a6566345ea 100644
--- a/scripts/post_deploy_tdr.py
+++ b/scripts/post_deploy_tdr.py
@@ -96,13 +96,13 @@ def verify_source(self,
plugin = self.repository_plugin(catalog)
ref = plugin.resolve_source(str(source_spec))
log.info('TDR client is authorized for API access to %s.', source_spec)
- ref = plugin.partition_source(catalog, ref)
- prefix = ref.spec.prefix
if config.deployment.is_main:
- require(prefix.common == '', source_spec)
+ if source_spec.prefix is not None:
+ require(source_spec.prefix.common == '', source_spec)
self.tdr.check_bigquery_access(source_spec)
else:
- subgraph_count = len(plugin.list_bundles(ref, prefix.common))
+ ref = plugin.partition_source(catalog, ref)
+ subgraph_count = plugin.count_bundles(ref.spec)
require(subgraph_count > 0, 'Common prefix is too long', ref.spec)
require(subgraph_count <= 512, 'Common prefix is too short', ref.spec)
diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py
index 67a90a1f62..5917dfc533 100644
--- a/src/azul/azulclient.py
+++ b/src/azul/azulclient.py
@@ -225,7 +225,11 @@ def list_bundles(self,
source = plugin.resolve_source(source)
else:
assert isinstance(source, SourceRef), source
- return plugin.list_bundles(source, prefix)
+ log.info('Listing bundles with prefix %r in source %r.', prefix, source)
+ bundle_fqids = plugin.list_bundles(source, prefix)
+ log.info('There are %i bundle(s) with prefix %r in source %r.',
+ len(bundle_fqids), prefix, source)
+ return bundle_fqids
@property
def sqs(self):
diff --git a/src/azul/indexer/__init__.py b/src/azul/indexer/__init__.py
index 5d17349c0b..f8228cc835 100644
--- a/src/azul/indexer/__init__.py
+++ b/src/azul/indexer/__init__.py
@@ -2,6 +2,9 @@
ABCMeta,
abstractmethod,
)
+from functools import (
+ total_ordering,
+)
from itertools import (
product,
)
@@ -17,6 +20,7 @@
Self,
TypeVar,
TypedDict,
+ final,
)
import attrs
@@ -45,21 +49,115 @@
BundleVersion = str
-@attrs.frozen(kw_only=True, order=True)
-class BundleFQID(SupportsLessAndGreaterThan):
+# PyCharm can't handle mixing `attrs` with `total_ordering` and falsely claims
+# that comparison operators besides `__lt__` are not defined.
+# noinspection PyDataclass
+@attrs.frozen(kw_only=True, eq=False)
+@total_ordering
+class BundleFQID:
"""
- >>> list(sorted([
- ... BundleFQID(uuid='d', version='e'),
- ... BundleFQID(uuid='a', version='c'),
- ... BundleFQID(uuid='a', version='b'),
- ... ]))
- ... # doctest: +NORMALIZE_WHITESPACE
- [BundleFQID(uuid='a', version='b'),
- BundleFQID(uuid='a', version='c'),
- BundleFQID(uuid='d', version='e')]
+ A fully qualified bundle identifier. The attributes defined in this class
+ must always be sufficient to decide whether two instances of this class or
+ its subclasses identify the same bundle or not. Subclasses may define
+ additional attributes to help describe the bundle, but they are forbidden
+ from using these attributes in the implementations of their `__eq__` or
+ `__hash__` methods, either explicitly or in code generated by `attrs`.
"""
- uuid: BundleUUID = attrs.field(order=str.lower)
- version: BundleVersion = attrs.field(order=str.lower)
+ uuid: BundleUUID
+ version: BundleVersion
+
+ def _nucleus(self) -> tuple[str, str]:
+ return self.uuid.lower(), self.version.lower()
+
+ # We can't use attrs' generated implementation because it always
+ # considers operands with different types to be unequal, regardless of
+ # their inheritance relationships or how their attributes are annotated
+ # (e.g. specifying `eq=False` has no effect). We want instances of
+ # all subclasses to compare equal as long as `uuid` and `version` are
+ # equal. For the same reason, we can't use `typing.Self` in the signature
+ # because it would constrain the RHS to instances of subclasses of the LHS.
+ @final
+ def __eq__(self, other: 'BundleFQID') -> bool:
+ """
+ >>> b1 = BundleFQID(uuid='a', version='b')
+ >>> b2 = BundleFQID(uuid='a', version='b')
+ >>> b1 == b2
+ True
+
+ >>> s1 = SourceRef(id='x', spec=SimpleSourceSpec.parse('y:/0'))
+ >>> sb1 = SourcedBundleFQID(uuid='a', version='b', source=s1)
+ >>> sb2 = SourcedBundleFQID(uuid='a', version='b', source=s1)
+ >>> sb1 == sb2
+ True
+
+ >>> b1 == sb1
+ True
+
+ >>> s2 = SourceRef(id='w', spec=SimpleSourceSpec.parse('z:/0'))
+ >>> sb3 = SourcedBundleFQID(uuid='a', version='b', source=s2)
+ >>> b1 == sb3
+ True
+
+ >>> sb1 == sb3
+ ... # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ AssertionError: (('a', 'b'),
+ SourceRef(id='x', spec=SimpleSourceSpec(prefix=Prefix(common='', partition=0), name='y')),
+ SourceRef(id='w', spec=SimpleSourceSpec(prefix=Prefix(common='', partition=0), name='z')))
+ """
+ same_bundle = self._nucleus() == other._nucleus()
+ if (
+ same_bundle
+ and isinstance(self, SourcedBundleFQID)
+ and isinstance(other, SourcedBundleFQID)
+ ):
+ assert self.source == other.source, (self._nucleus(), self.source, other.source)
+ return same_bundle
+
+ @final
+ def __hash__(self) -> int:
+ return hash(self._nucleus())
+
+ def __init_subclass__(cls, **kwargs):
+ """
+ >>> @attrs.frozen(kw_only=True)
+ ... class FooBundleFQID(SourcedBundleFQID):
+ ... foo: str
+ Traceback (most recent call last):
+ ...
+ AssertionError:
+
+ >>> @attrs.frozen(kw_only=True, eq=False)
+ ... class FooBundleFQID(SourcedBundleFQID):
+ ... foo: str
+ """
+ super().__init_subclass__(**kwargs)
+ assert cls.__eq__ is BundleFQID.__eq__, cls
+ assert cls.__hash__ is BundleFQID.__hash__, cls
+
+ # attrs doesn't allow `order=True` when `eq=False`
+ def __lt__(self, other: 'BundleFQID') -> bool:
+ """
+ >>> aa = BundleFQID(uuid='a', version='a')
+ >>> ab = BundleFQID(uuid='a', version='b')
+ >>> ba = BundleFQID(uuid='b', version='a')
+ >>> aa < ab < ba
+ True
+
+ >>> ba > ab > aa
+ True
+
+ >>> aa <= ab <= ba
+ True
+
+ >>> ba >= ab >= aa
+ True
+
+ >>> aa != ab != ba
+ True
+ """
+ return self._nucleus() < other._nucleus()
def to_json(self) -> MutableJSON:
return attrs.asdict(self, recurse=False)
@@ -454,6 +552,9 @@ def spec_cls(cls) -> type[SourceSpec]:
spec_cls, ref_cls = get_generic_type_params(cls, SourceSpec, SourceRef)
return spec_cls
+ def with_prefix(self, prefix: Prefix) -> Self:
+ return attrs.evolve(self, spec=attrs.evolve(self.spec, prefix=prefix))
+
class SourcedBundleFQIDJSON(BundleFQIDJSON):
source: SourceJSON
@@ -462,7 +563,7 @@ class SourcedBundleFQIDJSON(BundleFQIDJSON):
BUNDLE_FQID = TypeVar('BUNDLE_FQID', bound='SourcedBundleFQID')
-@attrs.frozen(kw_only=True, order=True)
+@attrs.frozen(kw_only=True, eq=False)
class SourcedBundleFQID(BundleFQID, Generic[SOURCE_REF]):
"""
>>> spec = SimpleSourceSpec(name='', prefix=(Prefix(partition=0)))
@@ -493,10 +594,6 @@ def from_json(cls, json: SourcedBundleFQIDJSON) -> 'SourcedBundleFQID':
source = cls.source_ref_cls().from_json(json.pop('source'))
return cls(source=source, **json)
- def upcast(self) -> BundleFQID:
- return BundleFQID(uuid=self.uuid,
- version=self.version)
-
def to_json(self) -> SourcedBundleFQIDJSON:
return dict(super().to_json(),
source=self.source.to_json())
diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py
index 5e8a0cd64d..fe5430967e 100644
--- a/src/azul/indexer/document.py
+++ b/src/azul/indexer/document.py
@@ -504,13 +504,12 @@ class ContributionCoordinates(DocumentCoordinates[E], Generic[E]):
subgraph ("bundle") and represent either the addition of metadata to an
entity or the removal of metadata from an entity.
- Contributions produced by
- transformers don't specify a catalog, the catalog is supplied when the
- contributions are written to the index and it is guaranteed to be the same
- for all contributions produced in response to one notification. When
- contributions are read back during aggregation, they specify a catalog, the
- catalog they were read from. Because of that duality this class has to be
- generic in E, the type of EntityReference.
+ Contributions produced by transformers don't specify a catalog. The catalog
+ is supplied when the contributions are written to the index and it is
+ guaranteed to be the same for all contributions produced in response to one
+ notification. When contributions are read back during aggregation, they
+ specify a catalog, the catalog they were read from. Because of that duality
+ this class has to be generic in E, the type of EntityReference.
"""
doc_type: ClassVar[DocumentType] = DocumentType.contribution
@@ -519,23 +518,6 @@ class ContributionCoordinates(DocumentCoordinates[E], Generic[E]):
deleted: bool
- def __attrs_post_init__(self):
- # If we were to allow instances of subclasses, we'd risk breaking
- # equality and hashing semantics. It is impossible to correctly
- # implement the transitivity property of equality between instances of
- # type and subtype without ignoring the additional attributes added by
- # the subtype. Consider types T and S where S is a subtype of T.
- # Transitivity requires that s1 == s2 for any two instances s1 and s2
- # of S for which s1 == t and s2 == t, where t is any instance of T. The
- # subtype instances s1 and s2 can only be equal to t if they match in
- # all attributes that T defines. The additional attributes introduced
- # by S must be ignored, even when comparing s1 and s2, otherwise s1 and
- # s2 might turn out to be unequal. In this particular case that is not
- # desirable: we do want to be able to compare instances of subtypes of
- # BundleFQID without ignoring any of their attributes.
- concrete_type = type(self.bundle)
- assert concrete_type is BundleFQID, concrete_type
-
@property
def document_id(self) -> str:
return '_'.join((
diff --git a/src/azul/indexer/document_service.py b/src/azul/indexer/document_service.py
index 3f0e7e0ef5..0e9bb11dec 100644
--- a/src/azul/indexer/document_service.py
+++ b/src/azul/indexer/document_service.py
@@ -49,6 +49,14 @@ def metadata_plugin(self, catalog: CatalogName) -> MetadataPlugin:
def aggregate_class(self, catalog: CatalogName) -> Type[Aggregate]:
return self.metadata_plugin(catalog).aggregate_class()
+ @property
+ def always_limit_access(self) -> bool:
+ """
+ True if access restrictions are enforced unconditionally. False, if the
+ filter stage is allowed to weaken them, e.g., based on the entity type.
+ """
+ return True
+
def transformer_types(self,
catalog: CatalogName
) -> Iterable[Type[Transformer]]:
diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py
index fbf0493572..20c28a7beb 100644
--- a/src/azul/indexer/index_service.py
+++ b/src/azul/indexer/index_service.py
@@ -197,7 +197,7 @@ def fetch_bundle(self,
bundle_fqid: SourcedBundleFQIDJSON
) -> Bundle:
plugin = self.repository_plugin(catalog)
- bundle_fqid = plugin.resolve_bundle(bundle_fqid)
+ bundle_fqid = plugin.bundle_fqid_from_json(bundle_fqid)
return plugin.fetch_bundle(bundle_fqid)
def index(self, catalog: CatalogName, bundle: Bundle) -> None:
@@ -212,7 +212,8 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None:
for contributions, replicas in transforms:
tallies.update(self.contribute(catalog, contributions))
self.replicate(catalog, replicas)
- self.aggregate(tallies)
+ if tallies:
+ self.aggregate(tallies)
def delete(self, catalog: CatalogName, bundle: Bundle) -> None:
"""
diff --git a/src/azul/indexer/transform.py b/src/azul/indexer/transform.py
index 4f6efb7f50..4a2f2c8dc9 100644
--- a/src/azul/indexer/transform.py
+++ b/src/azul/indexer/transform.py
@@ -123,7 +123,7 @@ def _contribution(self,
) -> Contribution:
entity = EntityReference(entity_type=self.entity_type(), entity_id=entity_id)
coordinates = ContributionCoordinates(entity=entity,
- bundle=self.bundle.fqid.upcast(),
+ bundle=self.bundle.fqid,
deleted=self.deleted)
return Contribution(coordinates=coordinates,
version=None,
@@ -133,6 +133,7 @@ def _contribution(self,
def _replica(self,
entity: EntityReference,
*,
+ root_hub: EntityID,
file_hub: EntityID | None,
) -> Replica:
replica_type, contents = self._replicate(entity)
@@ -144,7 +145,7 @@ def _replica(self,
contents=contents,
# The other hubs will be added when the indexer
# consolidates duplicate replicas.
- hub_ids=alist(file_hub))
+ hub_ids=alist(file_hub, root_hub))
@classmethod
@abstractmethod
diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py
index e6f9bef7a3..1f9c19da54 100644
--- a/src/azul/plugins/__init__.py
+++ b/src/azul/plugins/__init__.py
@@ -127,11 +127,25 @@ def order(self) -> str:
@attr.s(auto_attribs=True, frozen=True, kw_only=True)
class SpecialFields:
+ """
+ Azul defines a number of fields in each /index/{entity_type} response that
+ are synthetic (not directly taken from the metadata) and/or are used
+ internally. Their naming is inconsistent between metadata plugin
+ implementations. This class encapsulates the naming of these fields so that
+ we don't need to litter the source with strings literals and conditionals.
+
+ It is an incomplete abstraction in that it does not express the name of the
+ inner entity the field is a property of in the /index/{entity_type}
+ response. In that way, the values of the attributes of instances of this
+ class are more akin to a facet name, rather than a field name. However, not
+ every field represented here is actually a facet.
+ """
accessible: ClassVar[FieldName] = 'accessible'
source_id: FieldName
source_spec: FieldName
bundle_uuid: FieldName
bundle_version: FieldName
+ implicit_hub_id: FieldName
class ManifestFormat(Enum):
@@ -409,6 +423,9 @@ def _field_mapping(self) -> _FieldMapping:
@property
@abstractmethod
def special_fields(self) -> SpecialFields:
+ """
+ See :py:class:`SpecialFields`.
+ """
raise NotImplementedError
@property
@@ -433,8 +450,8 @@ def manifest_config(self) -> ManifestConfig:
raise NotImplementedError
def verbatim_pfb_schema(self,
- replicas: Iterable[JSON]
- ) -> tuple[Iterable[JSON], Sequence[str], JSON]:
+ replicas: list[JSON]
+ ) -> list[JSON]:
"""
Generate a PFB schema for the verbatim manifest. The default,
metadata-agnostic implementation loads all replica documents into memory
@@ -445,17 +462,14 @@ def verbatim_pfb_schema(self,
:param replicas: The replica documents to be described by the PFB schema
- :return: a triple of
- 1. the same set of replicas passed to this method
- 2. the set of entity types defined by the PFB schema
- 3. a PFB schema describing the provided replicas
+ :return: a tuple of
+ 1. the set of entity types defined by the PFB schema
+ 2. a PFB schema describing the provided replicas
"""
from azul.service import (
avro_pfb,
)
- replicas = list(replicas)
- replica_types, pfb_schema = avro_pfb.pfb_schema_from_replicas(replicas)
- return replicas, replica_types, pfb_schema
+ return avro_pfb.pfb_schema_from_replicas(replicas)
@abstractmethod
def document_slice(self, entity_type: str) -> DocumentSlice | None:
@@ -583,6 +597,13 @@ def _bundle_fqid_cls(self) -> type[BUNDLE_FQID]:
bundle_cls, spec_cls, ref_cls, fqid_cls = self._generic_params
return fqid_cls
+ def bundle_fqid_from_json(self, fqid: SourcedBundleFQIDJSON) -> BUNDLE_FQID:
+ """
+ Instantiate a :class:`SourcedBundleFQID` from its JSON representation.
+ The expected input matches the output format of `SourcedBundleFQID.to_json`.
+ """
+ return self._bundle_fqid_cls.from_json(fqid)
+
@property
def _bundle_cls(self) -> type[BUNDLE]:
bundle_cls, spec_cls, ref_cls, fqid_cls = self._generic_params
@@ -607,13 +628,11 @@ def _lookup_source_id(self, spec: SOURCE_SPEC) -> str:
"""
raise NotImplementedError
- def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> BUNDLE_FQID:
- return self._bundle_fqid_cls.from_json(fqid)
-
@abstractmethod
- def _count_subgraphs(self, source: SOURCE_SPEC) -> int:
+ def count_bundles(self, source: SOURCE_SPEC) -> int:
"""
- The total number of subgraphs in the given source, ignoring its prefix.
+ The total number of subgraphs in the given source. The source's prefix
+ may be None.
"""
raise NotImplementedError
@@ -627,7 +646,7 @@ def partition_source(self,
should be appropriate for indexing in the given catalog.
"""
if source.spec.prefix is None:
- count = self._count_subgraphs(source.spec)
+ count = self.count_bundles(source.spec)
is_main = config.deployment.is_main
is_it = catalog in config.integration_test_catalogs
# We use the "lesser" heuristic during IT to avoid indexing an
@@ -636,7 +655,7 @@ def partition_source(self,
prefix = Prefix.for_main_deployment(count)
else:
prefix = Prefix.for_lesser_deployment(count)
- source = attr.evolve(source, spec=attr.evolve(source.spec, prefix=prefix))
+ source = source.with_prefix(prefix)
return source
@abstractmethod
diff --git a/src/azul/plugins/metadata/anvil/__init__.py b/src/azul/plugins/metadata/anvil/__init__.py
index b3fe445326..ab4c55441c 100644
--- a/src/azul/plugins/metadata/anvil/__init__.py
+++ b/src/azul/plugins/metadata/anvil/__init__.py
@@ -53,9 +53,6 @@
AnvilSearchResponseStage,
AnvilSummaryResponseStage,
)
-from azul.service.avro_pfb import (
- avro_pfb_schema,
-)
from azul.service.manifest_service import (
ManifestFormat,
)
@@ -246,7 +243,8 @@ def special_fields(self) -> SpecialFields:
return SpecialFields(source_id='source_id',
source_spec='source_spec',
bundle_uuid='bundle_uuid',
- bundle_version='bundle_version')
+ bundle_version='bundle_version',
+ implicit_hub_id='datasets.dataset_id')
@property
def implicit_hub_type(self) -> str:
@@ -327,16 +325,24 @@ def recurse(mapping: MetadataPlugin._FieldMapping, path: FieldPath):
return result
def verbatim_pfb_schema(self,
- replicas: Iterable[JSON]
- ) -> tuple[Iterable[JSON], Sequence[str], JSON]:
- entity_schemas = []
- entity_types = []
- for table_schema in sorted(anvil_schema['tables'], key=itemgetter('name')):
- table_name = table_schema['name']
+ replicas: list[JSON]
+ ) -> list[JSON]:
+ table_schemas_by_name = {
+ schema['name']: schema
+ for schema in anvil_schema['tables']
+ }
+ non_schema_replicas = [
+ r for r in replicas
+ if r['replica_type'] not in table_schemas_by_name
+ ]
+ # For tables not described by the AnVIL schema, fall back to building
+ # their PFB schema dynamically from the shapes of the replicas
+ entity_schemas = super().verbatim_pfb_schema(non_schema_replicas)
+ # For the rest, use the AnVIL schema as the basis of the PFB schema
+ for table_name, table_schema in table_schemas_by_name.items():
# FIXME: Improve handling of DUOS replicas
# https://github.com/DataBiosphere/azul/issues/6139
is_duos_type = table_name == 'anvil_dataset'
- entity_types.append(table_name)
field_schemas = [
self._pfb_schema_from_anvil_column(table_name=table_name,
column_name='datarepo_row_id',
@@ -369,7 +375,7 @@ def verbatim_pfb_schema(self,
'type': 'record',
'fields': field_schemas
})
- return replicas, entity_types, avro_pfb_schema(entity_schemas)
+ return entity_schemas
def _pfb_schema_from_anvil_column(self,
*,
diff --git a/src/azul/plugins/metadata/anvil/bundle.py b/src/azul/plugins/metadata/anvil/bundle.py
index 8df46cb8c5..7e6e064f8a 100644
--- a/src/azul/plugins/metadata/anvil/bundle.py
+++ b/src/azul/plugins/metadata/anvil/bundle.py
@@ -122,6 +122,7 @@ class AnvilBundle(Bundle[BUNDLE_FQID], ABC):
# the former to the latter during transformation.
entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)
links: set[EntityLink] = attrs.field(factory=set)
+ orphans: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)
def reject_joiner(self, catalog: CatalogName):
# FIXME: Optimize joiner rejection and re-enable it for AnVIL
@@ -129,21 +130,29 @@ def reject_joiner(self, catalog: CatalogName):
pass
def to_json(self) -> MutableJSON:
- return {
- 'entities': {
+ def serialize_entities(entities):
+ return {
str(entity_ref): entity
- for entity_ref, entity in sorted(self.entities.items())
- },
+ for entity_ref, entity in sorted(entities.items())
+ }
+
+ return {
+ 'entities': serialize_entities(self.entities),
+ 'orphans': serialize_entities(self.orphans),
'links': [link.to_json() for link in sorted(self.links)]
}
@classmethod
def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self:
+ def deserialize_entities(json_entities):
+ return {
+ EntityReference.parse(entity_ref): entity
+ for entity_ref, entity in json_entities.items()
+ }
+
return cls(
fqid=fqid,
- entities={
- EntityReference.parse(entity_ref): entity
- for entity_ref, entity in json_['entities'].items()
- },
- links=set(map(EntityLink.from_json, json_['links']))
+ entities=deserialize_entities(json_['entities']),
+ links=set(map(EntityLink.from_json, json_['links'])),
+ orphans=deserialize_entities(json_['orphans'])
)
diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py
index 858899bd59..9e2f4f22ed 100644
--- a/src/azul/plugins/metadata/anvil/indexer/transform.py
+++ b/src/azul/plugins/metadata/anvil/indexer/transform.py
@@ -169,6 +169,9 @@ def aggregator(cls, entity_type) -> EntityAggregator:
assert False, entity_type
def estimate(self, partition: BundlePartition) -> int:
+ # Orphans are not considered when deciding whether to partition the
+ # bundle, but if the bundle is partitioned then each orphan will be
+ # replicated in a single partition
return sum(map(partial(self._contains, partition), self.bundle.entities))
def transform(self,
@@ -188,7 +191,8 @@ def _transform(self,
raise NotImplementedError
def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
- return entity.entity_type, self.bundle.entities[entity]
+ content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity]
+ return entity.entity_type, content
def _convert_entity_type(self, entity_type: str) -> str:
assert entity_type == 'bundle' or entity_type.startswith('anvil_'), entity_type
@@ -406,7 +410,10 @@ def _file(self, file: EntityReference) -> MutableJSON:
uuid=file.entity_id)
def _only_dataset(self) -> EntityReference:
- return one(self._entities_by_type['anvil_dataset'])
+ try:
+ return one(self._entities_by_type['anvil_dataset'])
+ except ValueError:
+ return one(o for o in self.bundle.orphans if o.entity_type == 'anvil_dataset')
@cached_property
def _activity_polymorphic_types(self) -> AbstractSet[str]:
@@ -506,7 +513,9 @@ def _dataset(self, dataset: EntityReference) -> MutableJSON:
return super()._dataset(dataset)
def _list_entities(self) -> Iterable[EntityReference]:
- yield self._singleton()
+ # Suppress contributions for bundles that only contain orphans
+ if self.bundle.entities:
+ yield self._singleton()
@abstractmethod
def _singleton(self) -> EntityReference:
@@ -564,6 +573,23 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)
+ def transform(self,
+ partition: BundlePartition
+ ) -> Iterable[Contribution | Replica]:
+ yield from super().transform(partition)
+ if config.enable_replicas:
+ # Replicas are only emitted by the file transformer for entities
+ # that are linked to at least one file. This excludes all orphans,
+ # and a small number of linked entities, usually from primary
+ # bundles don't include any files. Some of the replicas we emit here
+ # will be redundant with those emitted by the file transformer, but
+ # these will be coalesced by the index service before they are
+ # written to ElasticSearch.
+ dataset = self._only_dataset()
+ for entity in chain(self.bundle.orphans, self.bundle.entities):
+ if partition.contains(UUID(entity.entity_id)):
+ yield self._replica(entity, file_hub=None, root_hub=dataset.entity_id)
+
class DatasetTransformer(SingletonTransformer):
@@ -574,13 +600,6 @@ def entity_type(cls) -> str:
def _singleton(self) -> EntityReference:
return self._only_dataset()
- def _transform(self,
- entity: EntityReference
- ) -> Iterable[Contribution | Replica]:
- yield from super()._transform(entity)
- if self._is_duos(entity):
- yield self._replica(entity, file_hub=None)
-
class DonorTransformer(BaseTransformer):
@@ -614,6 +633,7 @@ def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
linked = self._linked_entities(entity)
+ dataset = self._only_dataset()
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
@@ -627,7 +647,7 @@ def _transform(self,
)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
- yield self._replica(entity, file_hub=entity.entity_id)
+ yield self._replica(entity, file_hub=entity.entity_id, root_hub=dataset.entity_id)
for linked_entity in linked:
yield self._replica(
linked_entity,
@@ -637,4 +657,5 @@ def _transform(self,
# hub IDs field empty for datasets and rely on the tenet
# that every file is an implicit hub of its parent dataset.
file_hub=None if linked_entity.entity_type == 'anvil_dataset' else entity.entity_id,
+ root_hub=dataset.entity_id
)
diff --git a/src/azul/plugins/metadata/anvil/service/filter.py b/src/azul/plugins/metadata/anvil/service/filter.py
index ac49c2c7fb..0b21e2c62b 100644
--- a/src/azul/plugins/metadata/anvil/service/filter.py
+++ b/src/azul/plugins/metadata/anvil/service/filter.py
@@ -5,5 +5,6 @@
class AnvilFilterStage(FilterStage):
+ @property
def _limit_access(self) -> bool:
return self.entity_type != 'datasets'
diff --git a/src/azul/plugins/metadata/hca/__init__.py b/src/azul/plugins/metadata/hca/__init__.py
index 0286a2b489..46a8b04988 100644
--- a/src/azul/plugins/metadata/hca/__init__.py
+++ b/src/azul/plugins/metadata/hca/__init__.py
@@ -282,7 +282,8 @@ def special_fields(self) -> SpecialFields:
return SpecialFields(source_id='sourceId',
source_spec='sourceSpec',
bundle_uuid='bundleUuid',
- bundle_version='bundleVersion')
+ bundle_version='bundleVersion',
+ implicit_hub_id='projectId')
@property
def implicit_hub_type(self) -> str:
diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py
index 00380d8564..174d43878e 100644
--- a/src/azul/plugins/metadata/hca/indexer/transform.py
+++ b/src/azul/plugins/metadata/hca/indexer/transform.py
@@ -1470,17 +1470,19 @@ def _transform(self,
for entity_type, values in additional_contents.items():
contents[entity_type].extend(values)
file_id = file.ref.entity_id
+ project_ref = self._api_project.ref
+ project_id = project_ref.entity_id
yield self._contribution(contents, file_id)
if config.enable_replicas:
- yield self._replica(self.api_bundle.ref, file_hub=file_id)
+ yield self._replica(self.api_bundle.ref, file_hub=file_id, root_hub=project_id)
# Projects are linked to every file in their snapshot,
# making an explicit list of hub IDs for the project both
# redundant and impractically large. Therefore, we leave the
# hub IDs field empty for projects and rely on the tenet
# that every file is an implicit hub of its parent project.
- yield self._replica(self._api_project.ref, file_hub=None)
+ yield self._replica(project_ref, file_hub=None, root_hub=project_id)
for linked_entity in visitor.entities:
- yield self._replica(linked_entity, file_hub=file_id)
+ yield self._replica(linked_entity, file_hub=file_id, root_hub=project_id)
def matrix_stratification_values(self, file: api.File) -> JSON:
"""
diff --git a/src/azul/plugins/metadata/hca/service/filter.py b/src/azul/plugins/metadata/hca/service/filter.py
index 86374979b4..c2ba76ab88 100644
--- a/src/azul/plugins/metadata/hca/service/filter.py
+++ b/src/azul/plugins/metadata/hca/service/filter.py
@@ -5,5 +5,6 @@
class HCAFilterStage(FilterStage):
+ @property
def _limit_access(self) -> bool:
return self.entity_type != 'projects'
diff --git a/src/azul/plugins/repository/canned/__init__.py b/src/azul/plugins/repository/canned/__init__.py
index 3c2705fdf5..be4257c134 100644
--- a/src/azul/plugins/repository/canned/__init__.py
+++ b/src/azul/plugins/repository/canned/__init__.py
@@ -26,6 +26,9 @@
from furl import (
furl,
)
+from more_itertools import (
+ ilen,
+)
from azul import (
CatalogName,
@@ -61,9 +64,6 @@
from azul.types import (
JSON,
)
-from azul.uuids import (
- validate_uuid_prefix,
-)
from humancellatlas.data.metadata.helpers.staging_area import (
CannedStagingAreaFactory,
StagingArea,
@@ -163,27 +163,27 @@ def staging_area(self, url: str) -> StagingArea:
ref)
return factory.load_staging_area(path)
- def _count_subgraphs(self, source: SOURCE_SPEC) -> int:
+ def count_bundles(self, source: SOURCE_SPEC) -> int:
staging_area = self.staging_area(source.spec.name)
- return len(staging_area.links)
+ return ilen(
+ links_id
+ for links_id in staging_area.links
+ if source.prefix is None or links_id.startswith(source.prefix.common)
+ )
def list_bundles(self,
source: CannedSourceRef,
prefix: str
) -> list[CannedBundleFQID]:
self._assert_source(source)
- validate_uuid_prefix(prefix)
- log.info('Listing bundles with prefix %r in source %r.', prefix, source)
- bundle_fqids = []
staging_area = self.staging_area(source.spec.name)
- for link in staging_area.links.values():
- if link.uuid.startswith(prefix):
- bundle_fqids.append(CannedBundleFQID(source=source,
- uuid=link.uuid,
- version=link.version))
- log.info('There are %i bundle(s) with prefix %r in source %r.',
- len(bundle_fqids), prefix, source)
- return bundle_fqids
+ return [
+ CannedBundleFQID(source=source,
+ uuid=link.uuid,
+ version=link.version)
+ for link in staging_area.links.values()
+ if link.uuid.startswith(prefix)
+ ]
def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> CannedBundle:
self._assert_source(bundle_fqid.source)
diff --git a/src/azul/plugins/repository/dss/__init__.py b/src/azul/plugins/repository/dss/__init__.py
index 96a6da973a..4e5809f4a6 100644
--- a/src/azul/plugins/repository/dss/__init__.py
+++ b/src/azul/plugins/repository/dss/__init__.py
@@ -118,7 +118,7 @@ def sources(self) -> AbstractSet[SimpleSourceSpec]:
def _lookup_source_id(self, spec: SimpleSourceSpec) -> str:
return DSSSourceRef.id_from_spec(spec)
- def _count_subgraphs(self, source: SimpleSourceSpec) -> NoReturn:
+ def count_bundles(self, source: SimpleSourceSpec) -> NoReturn:
assert False, 'DSS is EOL'
def list_sources(self,
diff --git a/src/azul/plugins/repository/tdr.py b/src/azul/plugins/repository/tdr.py
index a45cf96483..0ca79cd786 100644
--- a/src/azul/plugins/repository/tdr.py
+++ b/src/azul/plugins/repository/tdr.py
@@ -191,17 +191,6 @@ def _drs_client(cls,
def _lookup_source_id(self, spec: TDRSourceSpec) -> str:
return self.tdr.lookup_source(spec)
- def list_bundles(self,
- source: TDRSourceRef,
- prefix: str
- ) -> list[TDRBundleFQID]:
- self._assert_source(source)
- log.info('Listing bundles with prefix %r in source %r.', prefix, source)
- bundle_fqids = self._list_bundles(source, prefix)
- log.info('There are %i bundle(s) with prefix %r in source %r.',
- len(bundle_fqids), prefix, source)
- return bundle_fqids
-
def fetch_bundle(self, bundle_fqid: TDRBundleFQID) -> TDR_BUNDLE:
self._assert_source(bundle_fqid.source)
now = time.time()
@@ -223,13 +212,6 @@ def _run_sql(self, query):
def _full_table_name(self, source: TDRSourceSpec, table_name: str) -> str:
return source.qualify_table(table_name)
- @abstractmethod
- def _list_bundles(self,
- source: TDRSourceRef,
- prefix: str
- ) -> list[TDRBundleFQID]:
- raise NotImplementedError
-
@abstractmethod
def _emulate_bundle(self, bundle_fqid: TDRBundleFQID) -> TDR_BUNDLE:
raise NotImplementedError
diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py
index 530850366d..e841615931 100644
--- a/src/azul/plugins/repository/tdr_anvil/__init__.py
+++ b/src/azul/plugins/repository/tdr_anvil/__init__.py
@@ -2,6 +2,7 @@
from enum import (
Enum,
)
+import itertools
import logging
from operator import (
itemgetter,
@@ -10,7 +11,10 @@
AbstractSet,
Callable,
Iterable,
+ Self,
+ cast,
)
+import uuid
import attrs
from more_itertools import (
@@ -24,12 +28,14 @@
uuids,
)
from azul.bigquery import (
+ BigQueryRow,
backtick,
)
from azul.drs import (
DRSURI,
)
from azul.indexer import (
+ Prefix,
SourcedBundleFQIDJSON,
)
from azul.indexer.document import (
@@ -74,58 +80,88 @@
class BundleType(Enum):
"""
- AnVIL snapshots have no inherent notion of a "bundle". When indexing these
- snapshots, we dynamically construct bundles by selecting individual entities
- and following their foreign keys to discover associated entities. The
- initial entity from which this graph traversal begins is termed the
- "bundle entity", and its FQID serves as the basis for the bundle FQID. Each
- member of this enumeration represents a strategy for selecting bundle
- entities.
-
- Our primary such strategy is to use every biosample in a given snapshot as a
- bundle entity. Biosamples were chosen for this role based on a desirable
- balance between the size and number of the resulting bundles as well as the
- degree of overlap between them. The implementation of the graph traversal is
- tightly coupled to this choice, and switching to a different entity type
- would require re-implementing much of the Plugin code. Primary bundles
- consist of at least one biosample (the bundle entity), exactly one dataset,
- and zero or more other entities of assorted types.
-
- Some snapshots include file entities that lack any foreign keys that
- associate the file with any other entity. To ensure that these "orphaned"
- files are indexed, they are also used as bundle entities. As with primary
- bundles, the creation of these supplementary bundles depends on a
- specifically tailored traversal implementation. Supplementary bundles always
- consist of exactly two entities: one file (the bundle entity) and one
- dataset.
-
- The `dataset.description` field is unusual in that it is not stored in
- BigQuery and must be retrieved via Terra's DUOS API. There is only one
- dataset per snapshot, which is referenced in all primary and supplementary
- bundles. Therefore, only one request to DUOS per *snapshot* is necessary,
- but if `description` is retrieved at the same time as the other dataset
- fields, we will make one request per *bundle* instead, potentially
- overloading the DUOS service. Our solution is to retrieve `description` only
- in a dedicated bundle format, once per snapshot, and merge it with the other
- dataset fields during aggregation. This bundle contains only a single
- dataset entity with only the `description` field populated.
+ AnVIL snapshots have no inherent notion of a "bundle". During indexing, we
+ dynamically construct bundles by querying each table in the snapshot. This
+ class enumerates the tables that require special strategies for listing and
+ fetching their bundles.
+
+ Primary bundles are defined by a biosample entity, termed the bundle entity.
+ Each primary bundle includes all of the bundle entity descendants and all of
+ those those entities' ancestors, which are discovered by iteratively
+ following foreign keys. Biosamples were chosen for this role based on a
+ desirable balance between the size and number of the resulting bundles as
+ well as the degree of overlap between them. The implementation of the graph
+ traversal is tightly coupled to this choice, and switching to a different
+ entity type would require re-implementing much of the Plugin code. Primary
+ bundles consist of at least one biosample (the bundle entity), exactly one
+ dataset, and zero or more other entities of assorted types. Primary bundles
+ never contain orphans because they are bijective to rows in the biosample
+ table.
+
+ Supplementary bundles consist of batches of file entities, which may include
+ supplementary files, which lack any foreign keys that associate them with
+ any other entity. Non-supplementary files in the bundle are classified as
+ orphans. The bundle also includes a dataset entity linked to the
+ supplementary files.
+
+ Duos bundles consist of a single dataset entity. This "entity" includes only
+ the dataset description retrieved from DUOS, while a copy of the BigQuery
+ row for this dataset is also included as an orphan. We chose this design
+ because there is only one dataset per snapshot, which is referenced in all
+ primary and supplementary bundles. Therefore, only one request to DUOS per
+ *snapshot* is necessary, but if `description` is retrieved at the same time
+ as the other dataset fields, we will make one request per *bundle* instead,
+ potentially overloading the DUOS service. Our solution is to retrieve
+ `description` only in a dedicated bundle format, once per snapshot, and
+ merge it with the other dataset fields during aggregation.
+
+ All other bundles are replica bundles. Replica bundles consist of a batch of
+ rows from an arbitrary BigQuery table, which may or may not be described by
+ the AnVIL schema. Replica bundles only include orphans and have no links.
"""
primary = 'anvil_biosample'
supplementary = 'anvil_file'
duos = 'anvil_dataset'
+ def is_batched(self: Self | str) -> bool:
+ """
+ >>> BundleType.primary.is_batched()
+ False
+
+ >>> BundleType.is_batched('anvil_activity')
+ True
+ """
+ if isinstance(self, str):
+ try:
+ self = BundleType(self)
+ except ValueError:
+ return True
+ return self not in (BundleType.primary, BundleType.duos)
+
class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON):
table_name: str
+ batch_prefix: str | None
-@attrs.frozen(kw_only=True)
+@attrs.frozen(kw_only=True, eq=False)
class TDRAnvilBundleFQID(TDRBundleFQID):
- table_name: BundleType = attrs.field(converter=BundleType)
+ table_name: str
+ batch_prefix: str | None
def to_json(self) -> TDRAnvilBundleFQIDJSON:
- return dict(super().to_json(),
- table_name=self.table_name.value)
+ return cast(TDRAnvilBundleFQIDJSON, super().to_json())
+
+ def __attrs_post_init__(self):
+ should_be_batched = BundleType.is_batched(self.table_name)
+ is_batched = self.is_batched
+ assert is_batched == should_be_batched, self
+ if is_batched:
+ assert len(self.batch_prefix) <= 8, self
+
+ @property
+ def is_batched(self) -> bool:
+ return self.batch_prefix is not None
class TDRAnvilBundle(AnvilBundle[TDRAnvilBundleFQID], TDRBundle):
@@ -137,9 +173,14 @@ def canning_qualifier(cls) -> str:
def add_entity(self,
entity: EntityReference,
version: str,
- row: MutableJSON
+ row: MutableJSON,
+ *,
+ is_orphan: bool = False
) -> None:
- assert entity not in self.entities, entity
+ target = self.orphans if is_orphan else self.entities
+ # In DUOS bundles, the dataset is represented as both as entity and an
+ # orphan
+ assert entity not in target, entity
metadata = dict(row,
version=version)
if entity.entity_type == 'anvil_file':
@@ -149,7 +190,7 @@ def add_entity(self,
metadata.update(drs_uri=drs_uri,
sha256='',
crc32='')
- self.entities[entity] = metadata
+ target[entity] = metadata
def add_links(self, links: Iterable[EntityLink]):
self.links.update(links)
@@ -167,107 +208,186 @@ def _version(self):
tzinfo=datetime.timezone.utc))
datarepo_row_uuid_version = 4
+ batch_uuid_version = 5
bundle_uuid_version = 10
- def _count_subgraphs(self, source: TDRSourceSpec) -> int:
- rows = self._run_sql(f'''
+ def _batch_uuid(self,
+ source: TDRSourceSpec,
+ table_name: str,
+ batch_prefix: str
+ ) -> str:
+ namespace = uuid.UUID('b8b3ac80-e035-4904-8b02-2d04f9e9a369')
+ batch_uuid = uuid.uuid5(namespace, f'{source}:{table_name}:{batch_prefix}')
+ return change_version(str(batch_uuid),
+ self.batch_uuid_version,
+ self.bundle_uuid_version)
+
+ def count_bundles(self, source: TDRSourceSpec) -> int:
+ prefix = '' if source.prefix is None else source.prefix.common
+ primary_count = one(self._run_sql(f'''
SELECT COUNT(*) AS count
FROM {backtick(self._full_table_name(source, BundleType.primary.value))}
- UNION ALL
+ WHERE STARTS_WITH(datarepo_row_id, {prefix!r})
+ '''))['count']
+ duos_count = 0 if config.duos_service_url is None else one(self._run_sql(f'''
SELECT COUNT(*) AS count
- FROM {backtick(self._full_table_name(source, BundleType.supplementary.value))}
- WHERE is_supplementary
- ''')
- return sum(row['count'] for row in rows)
-
- def _list_bundles(self,
- source: TDRSourceRef,
- prefix: str
- ) -> list[TDRAnvilBundleFQID]:
- spec = source.spec
- primary = BundleType.primary.value
- supplementary = BundleType.supplementary.value
- duos = BundleType.duos.value
- rows = list(self._run_sql(f'''
- SELECT datarepo_row_id, {primary!r} AS table_name
- FROM {backtick(self._full_table_name(spec, primary))}
- WHERE STARTS_WITH(datarepo_row_id, '{prefix}')
- UNION ALL
- SELECT datarepo_row_id, {supplementary!r} AS table_name
- FROM {backtick(self._full_table_name(spec, supplementary))} AS supp
- WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{prefix}')
- ''' + (
- ''
- if config.duos_service_url is None else
- f'''
- UNION ALL
- SELECT datarepo_row_id, {duos!r} AS table_name
- FROM {backtick(self._full_table_name(spec, duos))}
- '''
- )))
+ FROM {backtick(self._full_table_name(source, BundleType.duos.value))}
+ WHERE STARTS_WITH(datarepo_row_id, {prefix!r})
+ '''))['count']
+ sizes_by_table = self._batch_tables(source, prefix)
+ batched_count = sum(batch_size for (_, batch_size) in sizes_by_table.values())
+ return primary_count + duos_count + batched_count
+
+ def list_bundles(self,
+ source: TDRSourceRef,
+ prefix: str
+ ) -> list[TDRAnvilBundleFQID]:
+ self._assert_source(source)
bundles = []
- duos_count = 0
- for row in rows:
- # Reversibly tweak the entity UUID to prevent
- # collisions between entity IDs and bundle IDs
- bundle_uuid = uuids.change_version(row['datarepo_row_id'],
- self.datarepo_row_uuid_version,
- self.bundle_uuid_version)
- # We intentionally omit the WHERE clause for datasets so that we can
- # verify our assumption that each snapshot only contains rows for a
- # single dataset. This verification is performed independently and
- # concurrently for every partition, but only one partition actually
- # emits the bundle.
- if row['table_name'] == duos:
- require(0 == duos_count)
- duos_count += 1
- # Ensure that one partition will always contain the DUOS bundle
- # regardless of the choice of common prefix
- if not bundle_uuid.startswith(prefix):
- continue
+ spec = source.spec
+ if config.duos_service_url is not None:
+ row = one(self._run_sql(f'''
+ SELECT datarepo_row_id
+ FROM {backtick(self._full_table_name(spec, BundleType.duos.value))}
+ '''))
+ dataset_row_id = row['datarepo_row_id']
+ # We intentionally omit the WHERE clause for datasets in order
+ # to verify our assumption that each snapshot only contains rows
+ # for a single dataset. This verification is performed
+ # independently and concurrently for every partition, but only
+ # one partition actually emits the bundle.
+ if dataset_row_id.startswith(prefix):
+ bundle_uuid = change_version(dataset_row_id,
+ self.datarepo_row_uuid_version,
+ self.bundle_uuid_version)
+ bundles.append(TDRAnvilBundleFQID(
+ uuid=bundle_uuid,
+ version=self._version,
+ source=source,
+ table_name=BundleType.duos.value,
+ batch_prefix=None,
+ ))
+ for row in self._run_sql(f'''
+ SELECT datarepo_row_id
+ FROM {backtick(self._full_table_name(spec, BundleType.primary.value))}
+ WHERE STARTS_WITH(datarepo_row_id, {prefix!r})
+ '''):
+ bundle_uuid = change_version(row['datarepo_row_id'],
+ self.datarepo_row_uuid_version,
+ self.bundle_uuid_version)
bundles.append(TDRAnvilBundleFQID(
- source=source,
uuid=bundle_uuid,
version=self._version,
- table_name=BundleType(row['table_name'])
+ source=source,
+ table_name=BundleType.primary.value,
+ batch_prefix=None,
))
+ prefix_lengths_by_table = self._batch_tables(source.spec, prefix)
+ for table_name, (batch_prefix_length, _) in prefix_lengths_by_table.items():
+ batch_prefixes = Prefix(common=prefix,
+ partition=batch_prefix_length - len(prefix)).partition_prefixes()
+ for batch_prefix in batch_prefixes:
+ bundle_uuid = self._batch_uuid(spec, table_name, batch_prefix)
+ bundles.append(TDRAnvilBundleFQID(uuid=bundle_uuid,
+ version=self._version,
+ source=source,
+ table_name=table_name,
+ batch_prefix=batch_prefix))
return bundles
- def resolve_bundle(self, fqid: SourcedBundleFQIDJSON) -> TDRAnvilBundleFQID:
- if 'table_name' not in fqid:
- # Resolution of bundles without the table name is expensive, so we
- # only support it during canning.
- assert not config.is_in_lambda, ('Bundle FQID lacks table name', fqid)
- source = self.source_from_json(fqid['source'])
- entity_id = uuids.change_version(fqid['uuid'],
- self.bundle_uuid_version,
- self.datarepo_row_uuid_version)
- rows = self._run_sql(' UNION ALL '.join((
- f'''
- SELECT {bundle_type.value!r} AS table_name
- FROM {backtick(self._full_table_name(source.spec, bundle_type.value))}
- WHERE datarepo_row_id = {entity_id!r}
- '''
- for bundle_type in BundleType
- )))
- fqid = {**fqid, **one(rows)}
- return super().resolve_bundle(fqid)
-
def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
- if bundle_fqid.table_name is BundleType.primary:
+ if bundle_fqid.table_name == BundleType.primary.value:
log.info('Bundle %r is a primary bundle', bundle_fqid.uuid)
return self._primary_bundle(bundle_fqid)
- elif bundle_fqid.table_name is BundleType.supplementary:
+ elif bundle_fqid.table_name == BundleType.supplementary.value:
log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid)
return self._supplementary_bundle(bundle_fqid)
- elif bundle_fqid.table_name is BundleType.duos:
+ elif bundle_fqid.table_name == BundleType.duos.value:
assert config.duos_service_url is not None, bundle_fqid
log.info('Bundle %r is a DUOS bundle', bundle_fqid.uuid)
return self._duos_bundle(bundle_fqid)
else:
- assert False, bundle_fqid.table_name
+ log.info('Bundle %r is a replica bundle', bundle_fqid.uuid)
+ return self._replica_bundle(bundle_fqid)
+
+ def _batch_tables(self,
+ source: TDRSourceSpec,
+ prefix: str,
+ ) -> dict[str, tuple[int, int]]:
+ """
+ Find a batch prefix length that yields as close to 256 rows per batch
+ as possible for each table within the specified partition. The result's
+ keys are table names and its values are tuples where the first element
+ is the prefix length (*including* the partition prefix) and the second
+ element is the resulting number of batches. Tables are only included in
+ the result if they are non-empty and are used to produce batched bundle
+ formats (i.e. replica and supplementary).
+
+ Because the partitions of a table do not contain exactly the same number
+ of bundles, calculating the batch size statistics for the entire table
+ at once produces a different result than performing the same calculation
+ for any individual partition. We expect the inconsistencies to average
+ out across partitions so that `count_bundles` and `list_bundles` give
+ consistent results as long the partition size is substantially larger
+ than the batch size.
+
+ This method relies on BigQuery's `AVG` function, which is
+ nondeterministic for floating-point return values. The probability that
+ this affects this method's return value is very small, but nonzero.
+ https://cloud.google.com/bigquery/docs/reference/standard-sql/aggregate_functions#avg
+ """
+ max_length = 4
+
+ def repeat(fmt):
+ return ', '.join(fmt.format(i=i) for i in range(1, max_length + 1))
+
+ target_size = 256
+ prefix_len = len(prefix)
+ table_names = self.tdr.list_tables(source)
+ # This table is present in all snapshots. It is large and contains no
+ # useful metadata, so we skip indexing replicas from it.
+ table_names.discard('datarepo_row_ids')
+ table_names = sorted(filter(BundleType.is_batched, table_names))
+ log.info('Calculating batch prefix lengths for partition %r of %d tables '
+ 'in source %s', prefix, len(table_names), source)
+ # The extraneous outer 'SELECT *' works around a bug in BigQuery emulator
+ query = ' UNION ALL '.join(f'''(
+ SELECT * FROM (
+ SELECT
+ {table_name!r} AS table_name,
+ {prefix_len} + LENGTH(CONCAT(
+ {repeat('IFNULL(p{i}, "")')}
+ )) AS batch_prefix_length,
+ AVG(num_rows) AS average_batch_size,
+ COUNT(*) AS num_batches
+ FROM (
+ SELECT
+ {repeat(f'SUBSTR(datarepo_row_id, {prefix_len} + {{i}}, 1) AS p{{i}}')},
+ COUNT(*) AS num_rows
+ FROM {backtick(self._full_table_name(source, table_name))}
+ WHERE STARTS_WITH(datarepo_row_id, {prefix!r})
+ GROUP BY ROLLUP ({repeat('p{i}')})
+ )
+ GROUP BY batch_prefix_length
+ ORDER BY ABS({target_size} - average_batch_size)
+ LIMIT 1
+ )
+ )''' for table_name in table_names)
+
+ def result(row):
+ table_name = row['table_name']
+ prefix_length = row['batch_prefix_length']
+ average_size = row['average_batch_size']
+ num_batches = row['num_batches']
+ log.info('Selected batch prefix length %d for table %r (average '
+ 'batch size %.1f, num batches %d)',
+ prefix_length, table_name, average_size, num_batches)
+ return table_name, (prefix_length, num_batches)
+
+ return dict(map(result, self._run_sql(query)))
def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
+ assert not bundle_fqid.is_batched, bundle_fqid
source = bundle_fqid.source
bundle_entity = self._bundle_entity(bundle_fqid)
@@ -317,56 +437,80 @@ def _primary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
return result
def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
- entity_id = uuids.change_version(bundle_fqid.uuid,
- self.bundle_uuid_version,
- self.datarepo_row_uuid_version)
+ assert bundle_fqid.is_batched, bundle_fqid
source = bundle_fqid.source.spec
- table_name = bundle_fqid.table_name.value
result = TDRAnvilBundle(fqid=bundle_fqid)
- columns = self._columns(table_name)
- bundle_entity = dict(one(self._run_sql(f'''
- SELECT {', '.join(sorted(columns))}
- FROM {backtick(self._full_table_name(source, table_name))}
- WHERE datarepo_row_id = '{entity_id}'
- ''')))
- dataset_table = 'anvil_dataset'
- columns = self._columns(dataset_table)
- linked_entity = dict(one(self._run_sql(f'''
- SELECT {', '.join(sorted(columns))}
- FROM {backtick(self._full_table_name(source, dataset_table))}
- ''')))
- link_args = {}
- for entity_type, row, arg in [
- ('anvil_file', bundle_entity, 'outputs'),
- (dataset_table, linked_entity, 'inputs')
- ]:
- entity_ref = EntityReference(entity_type=entity_type, entity_id=row['datarepo_row_id'])
- result.add_entity(entity_ref, self._version, row)
- link_args[arg] = {entity_ref}
- result.add_links({EntityLink(**link_args)})
+ linked_file_refs = set()
+ for file_ref, file_row in self._get_batch(bundle_fqid):
+ is_supplementary = file_row['is_supplementary']
+ result.add_entity(file_ref,
+ self._version,
+ dict(file_row),
+ is_orphan=not is_supplementary)
+ if is_supplementary:
+ linked_file_refs.add(file_ref)
+ dataset_ref, dataset_row = self._get_dataset(source)
+ result.add_entity(dataset_ref, self._version, dict(dataset_row))
+ result.add_links([EntityLink(inputs={dataset_ref}, outputs=linked_file_refs)])
return result
def _duos_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
+ assert not bundle_fqid.is_batched, bundle_fqid
duos_info = self.tdr.get_duos(bundle_fqid.source)
description = None if duos_info is None else duos_info.get('studyDescription')
- entity_id = change_version(bundle_fqid.uuid,
- self.bundle_uuid_version,
- self.datarepo_row_uuid_version)
- entity = EntityReference(entity_type='anvil_dataset',
- entity_id=entity_id)
+ ref, row = self._get_dataset(bundle_fqid.source.spec)
+ expected_entity_id = change_version(bundle_fqid.uuid,
+ self.bundle_uuid_version,
+ self.datarepo_row_uuid_version)
+ assert ref.entity_id == expected_entity_id, (ref, bundle_fqid)
bundle = TDRAnvilBundle(fqid=bundle_fqid)
- bundle.add_entity(entity=entity,
- version=self._version,
- row={'description': description})
+ bundle.add_entity(ref, self._version, {'description': description})
+ # Classify as orphan to suppress the emission of a contribution
+ bundle.add_entity(ref, self._version, dict(row), is_orphan=True)
return bundle
+ def _replica_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
+ assert bundle_fqid.is_batched, bundle_fqid
+ source = bundle_fqid.source.spec
+ result = TDRAnvilBundle(fqid=bundle_fqid)
+ batch = self._get_batch(bundle_fqid)
+ dataset = self._get_dataset(source)
+ for (ref, row) in itertools.chain([dataset], batch):
+ result.add_entity(ref, self._version, dict(row), is_orphan=True)
+ return result
+
+ def _get_dataset(self, source: TDRSourceSpec) -> tuple[EntityReference, BigQueryRow]:
+ table_name = 'anvil_dataset'
+ columns = self._columns(table_name)
+ row = one(self._run_sql(f'''
+ SELECT {', '.join(sorted(columns))}
+ FROM {backtick(self._full_table_name(source, table_name))}
+ '''))
+ ref = EntityReference(entity_type=table_name, entity_id=row['datarepo_row_id'])
+ return ref, row
+
+ def _get_batch(self,
+ bundle_fqid: TDRAnvilBundleFQID
+ ) -> Iterable[tuple[EntityReference, BigQueryRow]]:
+ source = bundle_fqid.source.spec
+ batch_prefix = bundle_fqid.batch_prefix
+ table_name = bundle_fqid.table_name
+ columns = self._columns(table_name)
+ for row in self._run_sql(f'''
+ SELECT {', '.join(sorted(columns))}
+ FROM {backtick(self._full_table_name(source, table_name))}
+ WHERE STARTS_WITH(datarepo_row_id, {batch_prefix!r})
+ '''):
+ ref = EntityReference(entity_type=table_name, entity_id=row['datarepo_row_id'])
+ yield ref, row
+
def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference:
source = bundle_fqid.source
bundle_uuid = bundle_fqid.uuid
entity_id = uuids.change_version(bundle_uuid,
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
- table_name = bundle_fqid.table_name.value
+ table_name = bundle_fqid.table_name
pk_column = table_name.removeprefix('anvil_') + '_id'
bundle_entity = one(self._run_sql(f'''
SELECT {pk_column}
@@ -677,7 +821,8 @@ def convert_column(value):
log.debug('Retrieved %i entities of type %r', len(rows), entity_type)
missing = keys - {row[pk_column] for row in rows}
require(not missing,
- f'Required entities not found in {table_name}: {missing}')
+ f'Found only {len(rows)} out of {len(keys)} expected rows in {table_name}. '
+ f'Missing entities: {missing}')
return rows
else:
return []
@@ -688,6 +833,11 @@ def convert_column(value):
}
def _columns(self, table_name: str) -> set[str]:
- columns = set(self._schema_columns[table_name])
- columns.add('datarepo_row_id')
- return columns
+ try:
+ columns = self._schema_columns[table_name]
+ except KeyError:
+ return {'*'}
+ else:
+ columns = set(columns)
+ columns.add('datarepo_row_id')
+ return columns
diff --git a/src/azul/plugins/repository/tdr_hca/__init__.py b/src/azul/plugins/repository/tdr_hca/__init__.py
index 320b0a4f1b..e022196832 100644
--- a/src/azul/plugins/repository/tdr_hca/__init__.py
+++ b/src/azul/plugins/repository/tdr_hca/__init__.py
@@ -278,17 +278,21 @@ def _parse_drs_uri(self,
class Plugin(TDRPlugin[TDRHCABundle, TDRSourceSpec, TDRSourceRef, TDRBundleFQID]):
- def _count_subgraphs(self, source: TDRSourceSpec) -> int:
- rows = self._run_sql(f'''
- SELECT COUNT(*) AS count
- FROM {backtick(self._full_table_name(source, 'links'))}
- ''')
+ def count_bundles(self, source: TDRSourceSpec) -> int:
+ prefix = '' if source.prefix is None else source.prefix.common
+ query = f'''
+ SELECT COUNT(*) AS count
+ FROM {backtick(self._full_table_name(source, 'links'))}
+ WHERE STARTS_WITH(datarepo_row_id, {prefix!r})
+ '''
+ rows = self._run_sql(query)
return one(rows)['count']
- def _list_bundles(self,
- source: TDRSourceRef,
- prefix: str
- ) -> list[TDRBundleFQID]:
+ def list_bundles(self,
+ source: TDRSourceRef,
+ prefix: str
+ ) -> list[TDRBundleFQID]:
+ self._assert_source(source)
current_bundles = self._query_unique_sorted(f'''
SELECT links_id, version
FROM {backtick(self._full_table_name(source.spec, 'links'))}
@@ -466,7 +470,8 @@ def _retrieve_entities(self,
log.debug('Retrieved %i entities of type %r', len(rows), entity_type)
missing = expected - {row[pk_column] for row in rows}
require(not missing,
- f'Required entities not found in {table_name}: {missing}')
+ f'Found only {len(rows)} out of {len(entity_ids)} expected rows in {table_name}. '
+ f'Missing entities: {missing}')
return rows
def _in(self,
diff --git a/src/azul/service/avro_pfb.py b/src/azul/service/avro_pfb.py
index d88be244de..eb1f3b7fbe 100644
--- a/src/azul/service/avro_pfb.py
+++ b/src/azul/service/avro_pfb.py
@@ -17,7 +17,6 @@
ClassVar,
MutableSet,
Self,
- Sequence,
)
from uuid import (
UUID,
@@ -315,18 +314,15 @@ def pfb_schema_from_field_types(field_types: FieldTypes) -> JSON:
return avro_pfb_schema(entity_schemas)
-def pfb_schema_from_replicas(replicas: Iterable[JSON]
- ) -> tuple[Sequence[str], JSON]:
- schemas_by_replica_type = {}
+def pfb_schema_from_replicas(replicas: Iterable[JSON]) -> list[JSON]:
+ schemas_by_replica_type: dict[str, MutableJSON] = {}
for replica in replicas:
replica_type, replica_contents = replica['replica_type'], replica['contents']
_update_replica_schema(schema=schemas_by_replica_type,
path=(replica_type,),
key=replica_type,
value=replica_contents)
- schemas_by_replica_type = sorted(schemas_by_replica_type.items())
- keys, values = zip(*schemas_by_replica_type)
- return keys, avro_pfb_schema(values)
+ return list(schemas_by_replica_type.values())
def avro_pfb_schema(azul_avro_schema: Iterable[JSON]) -> JSON:
diff --git a/src/azul/service/elasticsearch_service.py b/src/azul/service/elasticsearch_service.py
index 2b6b81631c..177a16c87b 100644
--- a/src/azul/service/elasticsearch_service.py
+++ b/src/azul/service/elasticsearch_service.py
@@ -192,14 +192,18 @@ def process_response(self, response: Response) -> Response:
@cached_property
def prepared_filters(self) -> TranslatedFilters:
- filters_json = self.filters.reify(self.plugin, limit_access=self._limit_access())
+ limit_access = self.service.always_limit_access or self._limit_access
+ filters_json = self.filters.reify(self.plugin, limit_access=limit_access)
return self._translate_filters(filters_json)
+ @property
@abstractmethod
def _limit_access(self) -> bool:
"""
Whether to enforce the managed access controls during filter
- reification.
+ reification, provided that the service allows such conditional
+ enforcement of access. If it doesn't, the return value should be
+ ignored, and access must be enforced unconditionally.
"""
raise NotImplementedError
diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py
index c82222fda2..f695a7659f 100644
--- a/src/azul/service/manifest_service.py
+++ b/src/azul/service/manifest_service.py
@@ -1985,7 +1985,7 @@ class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta):
@property
def entity_type(self) -> str:
- return 'files'
+ return self.implicit_hub_type if self.include_orphans else 'files'
@property
def included_fields(self) -> list[FieldPath]:
@@ -2001,6 +2001,11 @@ def included_fields(self) -> list[FieldPath]:
def implicit_hub_type(self) -> str:
return self.service.metadata_plugin(self.catalog).implicit_hub_type
+ @property
+ def include_orphans(self) -> bool:
+ special_fields = self.service.metadata_plugin(self.catalog).special_fields
+ return self.filters.explicit.keys() == {special_fields.implicit_hub_id}
+
@attrs.frozen(kw_only=True)
class ReplicaKeys:
"""
@@ -2019,8 +2024,11 @@ def _replica_keys(self) -> Iterable[ReplicaKeys]:
hub_type = self.implicit_hub_type
request = self._create_request()
for hit in request.scan():
+ replica_id = one(hit['contents'][hub_type])['document_id']
+ if self.entity_type != hub_type:
+ replica_id = one(replica_id)
yield self.ReplicaKeys(hub_id=hit['entity_id'],
- replica_id=one(one(hit['contents'][hub_type])['document_id']))
+ replica_id=replica_id)
def _all_replicas(self) -> Iterable[JSON]:
emitted_replica_ids = set()
@@ -2037,14 +2045,7 @@ def _all_replicas(self) -> Iterable[JSON]:
if replica_id not in emitted_replica_ids:
num_new_replicas += 1
yield replica.to_dict()
- # Note that this will be zero for replicas that use implicit
- # hubs, in which case there are actually many hubs
- explicit_hub_count = len(replica.hub_ids)
- # We don't have to track the IDs of replicas with only one
- # hub, since we know that there are no other hubs that could
- # cause their re-emission.
- if explicit_hub_count != 1:
- emitted_replica_ids.add(replica_id)
+ emitted_replica_ids.add(replica_id)
log.info('Found %d replicas (%d already emitted) from page of %d hubs',
num_replicas, num_replicas - num_new_replicas, len(page))
@@ -2107,8 +2108,11 @@ def format(cls) -> ManifestFormat:
def create_file(self) -> tuple[str, Optional[str]]:
plugin = self.service.metadata_plugin(self.catalog)
- replicas = self._all_replicas()
- replicas, replica_types, pfb_schema = plugin.verbatim_pfb_schema(replicas)
+ replicas = list(self._all_replicas())
+ replica_schemas = plugin.verbatim_pfb_schema(replicas)
+ replica_schemas.sort(key=itemgetter('name'))
+ replica_types = [s['name'] for s in replica_schemas]
+ pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas)
pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types, links=False)
def pfb_entities():
diff --git a/src/azul/service/repository_service.py b/src/azul/service/repository_service.py
index 0ea5889f3a..e3257278b6 100644
--- a/src/azul/service/repository_service.py
+++ b/src/azul/service/repository_service.py
@@ -374,3 +374,7 @@ def _hit_to_doc(hit: Hit) -> JSON:
if file_version is not None:
assert file_version == file['version']
return file
+
+ @property
+ def always_limit_access(self) -> bool:
+ return False
diff --git a/src/azul/terra.py b/src/azul/terra.py
index a468c382e3..e5046850a7 100644
--- a/src/azul/terra.py
+++ b/src/azul/terra.py
@@ -39,10 +39,14 @@
bigquery,
)
from google.cloud.bigquery import (
+ DatasetReference,
QueryJob,
QueryJobConfig,
QueryPriority,
)
+from google.cloud.bigquery.table import (
+ RowIterator,
+)
from more_itertools import (
one,
)
@@ -504,13 +508,21 @@ def run_sql(self, query: str) -> BigQueryRows:
else:
assert False
if log.isEnabledFor(logging.DEBUG):
- log.debug('Job info: %s', json.dumps(self._job_info(job)))
+ log.debug('Job info: %s', json.dumps(self._job_info(job, result)))
return result
+ def list_tables(self, source: TDRSourceSpec) -> set[str]:
+ bigquery = self._bigquery(self.credentials.project_id)
+ ref = DatasetReference(project=source.subdomain, dataset_id=source.name)
+ return {
+ table.to_api_repr()['tableReference']['tableId']
+ for table in bigquery.list_tables(ref)
+ }
+
def _trunc_query(self, query: str) -> str:
return trunc_ellipses(query, 2048)
- def _job_info(self, job: QueryJob) -> JSON:
+ def _job_info(self, job: QueryJob, result: RowIterator) -> JSON:
# noinspection PyProtectedMember
stats = job._properties['statistics']['query']
if config.debug < 2:
@@ -518,6 +530,7 @@ def _job_info(self, job: QueryJob) -> JSON:
stats = {k: v for k, v in stats.items() if k not in ignore}
return {
'job_id': job.job_id,
+ 'total_rows': result.total_rows,
'stats': stats,
'query': self._trunc_query(job.query)
}
diff --git a/src/azul/uuids.py b/src/azul/uuids.py
index 194217e9e5..ad1017ad1d 100644
--- a/src/azul/uuids.py
+++ b/src/azul/uuids.py
@@ -124,7 +124,7 @@ def change_version(uuid: str, old_version: int, new_version: int) -> str:
prefix, version, suffix = uuid[:14], uuid[14], uuid[15:]
version = int(version, 16)
assert version == old_version, (uuid, version, old_version)
- uuid = prefix + hex(new_version)[2:] + suffix
+ uuid = f'{prefix}{new_version:x}{suffix}'
assert UUID(uuid).version == new_version, (uuid, old_version)
if new_version in (1, 3, 4, 5):
validate_uuid(uuid)
diff --git a/terraform/authentication.tf.json.template.py b/terraform/authentication.tf.json.template.py
index 782c1581b9..cf291378b5 100644
--- a/terraform/authentication.tf.json.template.py
+++ b/terraform/authentication.tf.json.template.py
@@ -60,6 +60,7 @@
"title": f"azul_{config.deployment_stage}",
"permissions": [
"bigquery.jobs.create",
+ "bigquery.tables.list",
*[
f'bigquery.{resource}.{action}'
for resource in ('capacityCommitments', 'reservations')
diff --git a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json
index 7da9d5f3cd..9859b200f1 100644
--- a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json
+++ b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json
@@ -5,5 +5,30 @@
"version": "2022-06-01T00:00:00.000000Z"
}
},
- "links": []
+ "links": [],
+ "orphans": {
+ "anvil_dataset/2370f948-2783-4eb6-afea-e022897f4dcf": {
+ "consent_group": [
+ "DS-BDIS"
+ ],
+ "data_modality": [],
+ "data_use_permission": [
+ "DS-BDIS"
+ ],
+ "datarepo_row_id": "2370f948-2783-4eb6-afea-e022897f4dcf",
+ "dataset_id": "52ee7665-7033-63f2-a8d9-ce8e32666739",
+ "owner": [
+ "Debbie Nickerson"
+ ],
+ "principal_investigator": [],
+ "registered_identifier": [
+ "phs000693"
+ ],
+ "source_datarepo_row_ids": [
+ "workspace_attributes:7a22b629-9d81-4e4d-9297-f9e44ed760bc"
+ ],
+ "title": "ANVIL_CMG_UWASH_DS_BDIS",
+ "version": "2022-06-01T00:00:00.000000Z"
+ }
+ }
}
\ No newline at end of file
diff --git a/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json b/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json
index 422b461de6..8a119af5c6 100644
--- a/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json
+++ b/test/indexer/data/6c87f0e1-509d-46a4-b845-7584df39263b.tables.tdr.json
@@ -3,8 +3,9 @@
"anvil_activity": {
"rows": [
{
- "activity_id": "",
- "activity_type": "",
+ "activity_id": "95aa5a62-0c42-48c7-aa70-e9987cfd9824",
+ "activity_type": "other",
+ "datarepo_row_id": "85342e55-8897-481e-a2c3-b7344071a72c",
"generated_file_id": [],
"source_datarepo_row_ids": [],
"used_biosample_id": [],
@@ -15,9 +16,10 @@
"anvil_alignmentactivity": {
"rows": [
{
- "activity_type": "",
- "alignmentactivity_id": "",
+ "activity_type": "alignment",
+ "alignmentactivity_id": "bb22e6de-7704-4435-8684-a34b1e4ff188",
"data_modality": [],
+ "datarepo_row_id": "a6f3f829-dec6-4be8-9d53-2c963f9206fc",
"generated_file_id": [],
"reference_assembly": [],
"source_datarepo_row_ids": [],
@@ -28,11 +30,12 @@
"anvil_assayactivity": {
"rows": [
{
- "activity_type": "",
+ "activity_type": "assay",
"antibody_id": [],
"assay_type": "",
- "assayactivity_id": "",
+ "assayactivity_id": "d9150ab0-8e4a-46ab-9501-4b91c6f548e1",
"data_modality": [],
+ "datarepo_row_id": "9258bb9f-1e2b-4139-9226-3588154a4632",
"generated_file_id": [],
"source_datarepo_row_ids": [],
"used_biosample_id": []
@@ -241,13 +244,30 @@
"anvil_variantcallingactivity": {
"rows": [
{
- "activity_type": "",
+ "activity_type": "variant calling",
"data_modality": [],
+ "datarepo_row_id": "c826d5b0-5e86-432b-ab53-36eced42ece0",
"generated_file_id": [],
"reference_assembly": [],
"source_datarepo_row_ids": [],
"used_file_id": [],
- "variantcallingactivity_id": ""
+ "variantcallingactivity_id": "8db89e5e-1c8a-487e-b0c1-b26218ac6b7b"
+ }
+ ]
+ },
+ "non_schema_orphan_table": {
+ "rows": [
+ {
+ "datarepo_row_id": "9687b86d-a2ae-a083-b910-a16bcbef1ba4",
+ "non_schema_column": "spam"
+ },
+ {
+ "datarepo_row_id": "28ed0f3a-157b-417b-a05a-48f57f9d3a34",
+ "non_schema_column": "eggs"
+ },
+ {
+ "datarepo_row_id": "9db5952c-c454-49d9-8a62-5abb026701c0",
+ "non_schema_column": "baked beans"
}
]
}
diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json
index 5d3fd82432..2e09718c18 100644
--- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json
+++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json
@@ -437,7 +437,8 @@
"entity_id": "1509ef40-d1ba-440d-b298-16b7c173dcd4",
"replica_type": "anvil_sequencingactivity",
"hub_ids": [
- "15b76f9c-6b46-433f-851d-34e89f1b9ba6"
+ "15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "2370f948-2783-4eb6-afea-e022897f4dcf"
],
"contents": {
"activity_type": "Sequencing",
@@ -871,7 +872,8 @@
"entity_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6",
"replica_type": "anvil_file",
"hub_ids": [
- "15b76f9c-6b46-433f-851d-34e89f1b9ba6"
+ "15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "2370f948-2783-4eb6-afea-e022897f4dcf"
],
"contents": {
"data_modality": [],
@@ -904,6 +906,7 @@
"replica_type": "anvil_diagnosis",
"hub_ids": [
"15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "2370f948-2783-4eb6-afea-e022897f4dcf",
"3b17377b-16b1-431c-9967-e5d01fc5923f"
],
"contents": {
@@ -1477,7 +1480,9 @@
"_source": {
"entity_id": "2370f948-2783-4eb6-afea-e022897f4dcf",
"replica_type": "anvil_dataset",
- "hub_ids": [],
+ "hub_ids": [
+ "2370f948-2783-4eb6-afea-e022897f4dcf"
+ ],
"contents": {
"consent_group": [
"DS-BDIS"
@@ -1748,6 +1753,7 @@
},
"replica_type": "anvil_file",
"hub_ids": [
+ "2370f948-2783-4eb6-afea-e022897f4dcf",
"3b17377b-16b1-431c-9967-e5d01fc5923f"
]
}
@@ -2190,6 +2196,7 @@
"entity_id": "816e364e-1193-4e5b-a91a-14e4b009157c",
"replica_type": "anvil_sequencingactivity",
"hub_ids": [
+ "2370f948-2783-4eb6-afea-e022897f4dcf",
"3b17377b-16b1-431c-9967-e5d01fc5923f"
],
"contents": {
@@ -2939,6 +2946,7 @@
"replica_type": "anvil_biosample",
"hub_ids": [
"15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "2370f948-2783-4eb6-afea-e022897f4dcf",
"3b17377b-16b1-431c-9967-e5d01fc5923f"
],
"contents": {
@@ -3519,6 +3527,7 @@
"replica_type": "anvil_diagnosis",
"hub_ids": [
"15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "2370f948-2783-4eb6-afea-e022897f4dcf",
"3b17377b-16b1-431c-9967-e5d01fc5923f"
],
"contents": {
@@ -4107,6 +4116,7 @@
"replica_type": "anvil_donor",
"hub_ids": [
"15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "2370f948-2783-4eb6-afea-e022897f4dcf",
"3b17377b-16b1-431c-9967-e5d01fc5923f"
]
}
diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json
index 9c461ee994..ec875d5fa8 100644
--- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json
+++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json
@@ -231,5 +231,6 @@
"anvil_biosample/826dea02-e274-4ffe-aabc-eb3db63ad068"
]
}
- ]
+ ],
+ "orphans": {}
}
diff --git a/test/indexer/data/9a135c9a-069b-a90e-b588-eaf8d1aeeac9.tdr.anvil.json b/test/indexer/data/9a135c9a-069b-a90e-b588-eaf8d1aeeac9.tdr.anvil.json
new file mode 100644
index 0000000000..a9ff2d7af3
--- /dev/null
+++ b/test/indexer/data/9a135c9a-069b-a90e-b588-eaf8d1aeeac9.tdr.anvil.json
@@ -0,0 +1,44 @@
+{
+ "entities": {},
+ "links": {},
+ "orphans": {
+ "non_schema_orphan_table/9687b86d-a2ae-a083-b910-a16bcbef1ba4": {
+ "datarepo_row_id": "9687b86d-a2ae-a083-b910-a16bcbef1ba4",
+ "non_schema_column": "spam",
+ "version": "2022-06-01T00:00:00.000000Z"
+ },
+ "non_schema_orphan_table/28ed0f3a-157b-417b-a05a-48f57f9d3a34": {
+ "datarepo_row_id": "28ed0f3a-157b-417b-a05a-48f57f9d3a34",
+ "non_schema_column": "eggs",
+ "version": "2022-06-01T00:00:00.000000Z"
+ },
+ "non_schema_orphan_table/9db5952c-c454-49d9-8a62-5abb026701c0": {
+ "datarepo_row_id": "9db5952c-c454-49d9-8a62-5abb026701c0",
+ "non_schema_column": "baked beans",
+ "version": "2022-06-01T00:00:00.000000Z"
+ },
+ "anvil_dataset/2370f948-2783-4eb6-afea-e022897f4dcf": {
+ "consent_group": [
+ "DS-BDIS"
+ ],
+ "data_modality": [],
+ "data_use_permission": [
+ "DS-BDIS"
+ ],
+ "datarepo_row_id": "2370f948-2783-4eb6-afea-e022897f4dcf",
+ "dataset_id": "52ee7665-7033-63f2-a8d9-ce8e32666739",
+ "owner": [
+ "Debbie Nickerson"
+ ],
+ "principal_investigator": [],
+ "registered_identifier": [
+ "phs000693"
+ ],
+ "source_datarepo_row_ids": [
+ "workspace_attributes:7a22b629-9d81-4e4d-9297-f9e44ed760bc"
+ ],
+ "title": "ANVIL_CMG_UWASH_DS_BDIS",
+ "version": "2022-06-01T00:00:00.000000Z"
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json
index d7b65274b4..24aab5fdc7 100644
--- a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json
+++ b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json
@@ -70,7 +70,8 @@
"entity_id": "aaa96233-bf27-44c7-82df-b4dc15ad4d9d",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
],
"replica_type": "links"
}
@@ -3485,7 +3486,8 @@
"entity_id": "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
"replica_type": "sequence_file",
"hub_ids": [
- "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb"
+ "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
]
},
"_type": "_doc"
@@ -3524,7 +3526,8 @@
"replica_type": "cell_suspension",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
]
},
"_type": "_doc"
@@ -3555,7 +3558,8 @@
"entity_id": "70d1af4a-82c8-478a-8960-e9028b3616ca",
"replica_type": "sequence_file",
"hub_ids": [
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
]
},
"_type": "_doc"
@@ -3608,7 +3612,8 @@
"replica_type": "specimen_from_organism",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
]
},
"_type": "_doc"
@@ -3688,7 +3693,9 @@
},
"entity_id": "e8642221-4c2c-4fd7-b926-a68bce363c88",
"replica_type": "project",
- "hub_ids": []
+ "hub_ids": [
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
+ ]
},
"_type": "_doc"
},
@@ -3750,7 +3757,8 @@
"entity_id": "7b07b9d0-cc0e-4098-9f64-f4a569f7d746",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
],
"replica_type": "donor_organism"
},
@@ -3793,7 +3801,8 @@
"entity_id": "9c32cf70-3ed7-4720-badc-5ee71e8a38af",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
],
"replica_type": "library_preparation_protocol"
},
@@ -3830,7 +3839,8 @@
"entity_id": "61e629ed-0135-4492-ac8a-5c4ab3ccca8a",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
],
"replica_type": "sequencing_protocol"
},
@@ -3856,7 +3866,8 @@
"entity_id": "771ddaf6-3a4f-4314-97fe-6294ff8e25a4",
"hub_ids": [
"0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb",
- "70d1af4a-82c8-478a-8960-e9028b3616ca"
+ "70d1af4a-82c8-478a-8960-e9028b3616ca",
+ "e8642221-4c2c-4fd7-b926-a68bce363c88"
],
"replica_type": "process"
},
diff --git a/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json b/test/indexer/data/c67e7adb-1a9c-a3b9-bc91-eb10a428374a.tdr.anvil.json
similarity index 51%
rename from test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json
rename to test/indexer/data/c67e7adb-1a9c-a3b9-bc91-eb10a428374a.tdr.anvil.json
index e689fbe3f7..dd3946fbcc 100644
--- a/test/indexer/data/6b0f6c0f-5d80-a242-accb-840921351cd5.tdr.anvil.json
+++ b/test/indexer/data/c67e7adb-1a9c-a3b9-bc91-eb10a428374a.tdr.anvil.json
@@ -27,8 +27,8 @@
"data_modality": [],
"datarepo_row_id": "6b0f6c0f-5d80-4242-accb-840921351cd5",
"file_format": ".txt",
- "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715",
"file_md5sum": "S/GBrRjzZAQYqh3rdiPYzA==",
+ "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715",
"file_name": "CCDG_13607_B01_GRM_WGS_2019-02-19_chr15.recalibrated_variants.annotated.coding.txt",
"file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1fab11f5-7eab-4318-9a58-68d8d06e0715",
"file_size": 15079345,
@@ -53,5 +53,45 @@
"anvil_file/6b0f6c0f-5d80-4242-accb-840921351cd5"
]
}
- ]
+ ],
+ "orphans": {
+ "anvil_file/15b76f9c-6b46-433f-851d-34e89f1b9ba6": {
+ "data_modality": [],
+ "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6",
+ "file_format": ".vcf.gz",
+ "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67",
+ "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==",
+ "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz",
+ "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67",
+ "file_size": 213021639,
+ "is_supplementary": false,
+ "reference_assembly": [],
+ "source_datarepo_row_ids": [
+ "file_inventory:81d16471-97ac-48fe-99a0-73d9ec62c2c0"
+ ],
+ "version": "2022-06-01T00:00:00.000000Z",
+ "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67",
+ "sha256": "",
+ "crc32": ""
+ },
+ "anvil_file/3b17377b-16b1-431c-9967-e5d01fc5923f": {
+ "data_modality": [],
+ "datarepo_row_id": "3b17377b-16b1-431c-9967-e5d01fc5923f",
+ "file_format": ".bam",
+ "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37",
+ "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==",
+ "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam",
+ "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37",
+ "file_size": 3306845592,
+ "is_supplementary": false,
+ "reference_assembly": [],
+ "source_datarepo_row_ids": [
+ "file_inventory:9658d94a-511d-4b49-82c3-d0cb07e0cff2"
+ ],
+ "version": "2022-06-01T00:00:00.000000Z",
+ "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37",
+ "sha256": "",
+ "crc32": ""
+ }
+ }
}
\ No newline at end of file
diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py
index 263f833d1f..1cbc52ba0e 100644
--- a/test/indexer/test_anvil.py
+++ b/test/indexer/test_anvil.py
@@ -105,13 +105,14 @@ def bundle_fqid(cls,
*,
uuid,
version=None,
- table_name=BundleType.primary
+ table_name=BundleType.primary.value,
) -> TDRAnvilBundleFQID:
assert version is None, 'All AnVIL bundles should use the same version'
return TDRAnvilBundleFQID(source=cls.source,
uuid=uuid,
version=cls.version,
- table_name=table_name)
+ table_name=table_name,
+ batch_prefix='' if BundleType.is_batched(table_name) else None)
@classmethod
def primary_bundle(cls) -> TDRAnvilBundleFQID:
@@ -119,13 +120,18 @@ def primary_bundle(cls) -> TDRAnvilBundleFQID:
@classmethod
def supplementary_bundle(cls) -> TDRAnvilBundleFQID:
- return cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5',
- table_name=BundleType.supplementary)
+ return cls.bundle_fqid(uuid='c67e7adb-1a9c-a3b9-bc91-eb10a428374a',
+ table_name=BundleType.supplementary.value)
@classmethod
def duos_bundle(cls) -> TDRAnvilBundleFQID:
return cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf',
- table_name=BundleType.duos)
+ table_name=BundleType.duos.value)
+
+ @classmethod
+ def replica_bundle(cls) -> TDRAnvilBundleFQID:
+ return cls.bundle_fqid(uuid='9a135c9a-069b-a90e-b588-eaf8d1aeeac9',
+ table_name='non_schema_orphan_table')
class TestAnvilIndexer(AnvilIndexerTestCase,
@@ -166,15 +172,33 @@ def test_indexing(self):
def test_list_and_fetch_bundles(self):
source_ref = self.source
self._make_mock_tdr_tables(source_ref)
- expected_bundle_fqids = sorted([
+ canned_bundle_fqids = [
self.primary_bundle(),
self.supplementary_bundle(),
- self.duos_bundle()
+ self.duos_bundle(),
+ self.replica_bundle(),
+ ]
+ expected_bundle_fqids = sorted(canned_bundle_fqids + [
+ # Replica bundles for the AnVIL schema tables, which we don't can
+ self.bundle_fqid(uuid='cd512d30-7d5f-af35-88f0-efaadabfc17a',
+ table_name='anvil_activity'),
+ self.bundle_fqid(uuid='2fd53d37-b8a3-a797-8477-4056bf2473bb',
+ table_name='anvil_alignmentactivity'),
+ self.bundle_fqid(uuid='a52d7f0a-25c3-a4cf-af95-0fd8ecec8127',
+ table_name='anvil_assayactivity'),
+ self.bundle_fqid(uuid='45e532bb-5af7-a977-8939-24933c154380',
+ table_name='anvil_diagnosis'),
+ self.bundle_fqid(uuid='8cc4686a-76cd-a411-a11c-ec3a9fa1d417',
+ table_name='anvil_donor'),
+ self.bundle_fqid(uuid='88092968-d1ec-ad7e-9112-4b38bd0df4c2',
+ table_name='anvil_sequencingactivity'),
+ self.bundle_fqid(uuid='976fc781-4a33-a062-a67d-2c068e4d5e72',
+ table_name='anvil_variantcallingactivity')
])
plugin = self.plugin_for_source_spec(source_ref.spec)
bundle_fqids = sorted(plugin.list_bundles(source_ref, ''))
self.assertEqual(expected_bundle_fqids, bundle_fqids)
- for bundle_fqid in bundle_fqids:
+ for bundle_fqid in canned_bundle_fqids:
with self.subTest(bundle_fqid=bundle_fqid):
canned_bundle = self._load_canned_bundle(bundle_fqid)
assert isinstance(canned_bundle, TDRAnvilBundle)
@@ -183,6 +207,7 @@ def test_list_and_fetch_bundles(self):
self.assertEqual(canned_bundle.fqid, bundle.fqid)
self.assertEqual(canned_bundle.entities, bundle.entities)
self.assertEqual(canned_bundle.links, bundle.links)
+ self.assertEqual(canned_bundle.orphans, bundle.orphans)
class TestAnvilIndexerWithIndexesSetUp(AnvilIndexerTestCase):
@@ -233,7 +258,26 @@ def test_dataset_description(self):
self.assertDictEqual(doc_counts, {
DocumentType.aggregate: 1,
DocumentType.contribution: 2,
- # No replica is emitted for the primary dataset because we dropped
- # the files (hubs) from its bundle above
- **({DocumentType.replica: 1} if config.enable_replicas else {})
+ **({DocumentType.replica: 2} if config.enable_replicas else {})
})
+
+ def test_orphans(self):
+ bundle = self._index_canned_bundle(self.replica_bundle())
+ assert isinstance(bundle, TDRAnvilBundle)
+ dataset_entity_id = one(
+ ref.entity_id
+ for ref in bundle.orphans
+ if ref.entity_type == 'anvil_dataset'
+ )
+ expected = bundle.orphans if config.enable_replicas else {}
+ actual = {}
+ hits = self._get_all_hits()
+ for hit in hits:
+ qualifier, doc_type = self._parse_index_name(hit)
+ self.assertEqual(DocumentType.replica, doc_type)
+ source = hit['_source']
+ self.assertEqual(source['hub_ids'], [dataset_entity_id])
+ ref = EntityReference(entity_type=source['replica_type'],
+ entity_id=source['entity_id'])
+ actual[ref] = source['contents']
+ self.assertEqual(expected, actual)
diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py
index 2fbf8206ba..3b24b211cf 100644
--- a/test/indexer/test_indexer.py
+++ b/test/indexer/test_indexer.py
@@ -288,7 +288,7 @@ def test_deletion(self):
self.assertNotEqual(doc.contents, {})
elif doc_type is DocumentType.contribution:
doc = Contribution.from_index(field_types, hit)
- self.assertEqual(bundle_fqid.upcast(), doc.coordinates.bundle)
+ self.assertEqual(bundle_fqid, doc.coordinates.bundle)
self.assertFalse(doc.coordinates.deleted)
elif doc_type is DocumentType.replica:
pass
@@ -315,7 +315,7 @@ def test_deletion(self):
if doc_type is DocumentType.contribution:
doc = Contribution.from_index(field_types, hit)
docs_by_entity[doc.entity].append(doc)
- self.assertEqual(bundle_fqid.upcast(), doc.coordinates.bundle)
+ self.assertEqual(bundle_fqid, doc.coordinates.bundle)
else:
# Since there is only one bundle and it was deleted,
# nothing should be aggregated
@@ -388,7 +388,7 @@ def test_duplicate_notification(self):
coordinates = [
ContributionCoordinates(
entity=entity,
- bundle=bundle.fqid.upcast(),
+ bundle=bundle.fqid,
deleted=False
).with_catalog(self.catalog)
]
@@ -553,7 +553,7 @@ def test_multi_entity_contributing_bundles(self):
entity_id=old_file_uuid,
entity_type='files')
deletion = ContributionCoordinates(entity=entity,
- bundle=bundle_fqid.upcast(),
+ bundle=bundle_fqid,
deleted=True)
index_name, document_id = deletion.index_name, deletion.document_id
hits = [
diff --git a/test/indexer/test_indexer_controller.py b/test/indexer/test_indexer_controller.py
index 0aea488842..7cd3d0aca1 100644
--- a/test/indexer/test_indexer_controller.py
+++ b/test/indexer/test_indexer_controller.py
@@ -232,7 +232,7 @@ def test_contribute_and_aggregate(self):
notified_fqids = list(map(self._fqid_from_notification, notifications))
notified_bundles = [bundles[fqid] for fqid in notified_fqids]
mock_plugin.fetch_bundle.side_effect = notified_bundles
- mock_plugin.resolve_bundle.side_effect = DSSBundleFQID.from_json
+ mock_plugin.bundle_fqid_from_json.side_effect = DSSBundleFQID.from_json
mock_plugin.sources = [source]
with patch.object(IndexService, 'repository_plugin', return_value=mock_plugin):
with patch.object(BundlePartition, 'max_partition_size', 4):
@@ -241,7 +241,7 @@ def test_contribute_and_aggregate(self):
# Assert plugin calls by controller
expected_calls = [call(fqid.to_json()) for fqid in notified_fqids]
- self.assertEqual(expected_calls, mock_plugin.resolve_bundle.mock_calls)
+ self.assertEqual(expected_calls, mock_plugin.bundle_fqid_from_json.mock_calls)
expected_calls = list(map(call, notified_fqids))
self.assertEqual(expected_calls, mock_plugin.fetch_bundle.mock_calls)
diff --git a/test/indexer/test_projects.py b/test/indexer/test_projects.py
index c96cb4e490..d76dc52980 100644
--- a/test/indexer/test_projects.py
+++ b/test/indexer/test_projects.py
@@ -89,7 +89,7 @@ def test_no_duplicate_files_in_specimen(self):
coordinates = AggregateCoordinates(entity=entity)
else:
coordinates = ContributionCoordinates(entity=entity,
- bundle=bundle_fqid.upcast(),
+ bundle=bundle_fqid,
deleted=False)
result = self.es_client.get(index=coordinates.index_name,
id=coordinates.document_id)
diff --git a/test/indexer/test_tdr.py b/test/indexer/test_tdr.py
index 62aab9dfe5..3e59985cbb 100644
--- a/test/indexer/test_tdr.py
+++ b/test/indexer/test_tdr.py
@@ -197,8 +197,7 @@ def setUpClass(cls):
'--dataset=' + cls.source.spec.name
])
- def _make_mock_tdr_tables(self,
- source: TDRSourceRef) -> None:
+ def _make_mock_tdr_tables(self, source: TDRSourceRef) -> JSON:
tables = self._load_canned_file_version(uuid=source.id,
version=None,
extension='tables.tdr')['tables']
@@ -206,6 +205,7 @@ def _make_mock_tdr_tables(self,
self._make_mock_entity_table(source.spec,
table_name,
table_rows['rows'])
+ return tables
def _make_mock_entity_table(self,
source: TDRSourceSpec,
diff --git a/test/integration_test.py b/test/integration_test.py
index 3d3b635abd..c9a79f0a4e 100644
--- a/test/integration_test.py
+++ b/test/integration_test.py
@@ -3,10 +3,8 @@
)
from collections.abc import (
Iterable,
- Iterator,
Mapping,
Sequence,
- Set,
)
from concurrent.futures.thread import (
ThreadPoolExecutor,
@@ -20,10 +18,8 @@
BytesIO,
TextIOWrapper,
)
-import itertools
from itertools import (
count,
- starmap,
)
import json
import os
@@ -43,7 +39,6 @@
Callable,
ContextManager,
IO,
- Optional,
Protocol,
TypedDict,
cast,
@@ -80,6 +75,7 @@
first,
grouper,
one,
+ only,
)
from openapi_spec_validator import (
validate_spec,
@@ -107,6 +103,9 @@
from azul.chalice import (
AzulChaliceApp,
)
+from azul.collections import (
+ alist,
+)
from azul.drs import (
AccessMethod,
)
@@ -117,10 +116,11 @@
http_client,
)
from azul.indexer import (
+ Prefix,
SourceJSON,
SourceRef,
+ SourceSpec,
SourcedBundleFQID,
- SourcedBundleFQIDJSON,
)
from azul.indexer.document import (
EntityReference,
@@ -154,7 +154,6 @@
from azul.plugins.repository.tdr_anvil import (
BundleType,
TDRAnvilBundleFQID,
- TDRAnvilBundleFQIDJSON,
)
from azul.portal_service import (
PortalService,
@@ -287,73 +286,34 @@ def managed_access_sources_by_catalog(self
managed_access_sources[catalog].add(ref)
return managed_access_sources
- def _list_partitions(self,
- catalog: CatalogName,
- *,
- min_bundles: int,
- public_1st: bool
- ) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]:
- """
- Iterate through the sources in the given catalog and yield partitions of
- bundle FQIDs until a desired minimum number of bundles are found. For
- each emitted source, every partition is included, even if it's empty.
- """
- total_bundles = 0
- sources = sorted(config.sources(catalog))
- self.random.shuffle(sources)
- if public_1st:
- managed_access_sources = frozenset(
+ def _choose_source(self,
+ catalog: CatalogName,
+ *,
+ public: bool | None = None
+ ) -> SourceRef | None:
+ plugin = self.repository_plugin(catalog)
+ sources = set(config.sources(catalog))
+ if public is not None:
+ ma_sources = {
str(source.spec)
+ # This would raise a KeyError during the can bundle script test
+ # due to it using a mock catalog, so we only evaluate it when
+ # it's actually needed
for source in self.managed_access_sources_by_catalog[catalog]
- )
- index = first(
- i
- for i, source in enumerate(sources)
- if source not in managed_access_sources
- )
- sources[0], sources[index] = sources[index], sources[0]
- plugin = self.azul_client.repository_plugin(catalog)
- # This iteration prefers sources occurring first, so we shuffle them
- # above to neutralize the bias.
- for source in sources:
- source = plugin.resolve_source(source)
- source = plugin.partition_source(catalog, source)
- for prefix in source.spec.prefix.partition_prefixes():
- new_fqids = self.azul_client.list_bundles(catalog, source, prefix)
- total_bundles += len(new_fqids)
- yield source, prefix, new_fqids
- # We postpone this check until after we've yielded all partitions in
- # the current source to ensure test coverage for handling multiple
- # partitions per source
- if total_bundles >= min_bundles:
- break
+ }
+ self.assertIsSubset(ma_sources, sources)
+ if public is True:
+ sources -= ma_sources
+ elif public is False:
+ sources &= ma_sources
+ else:
+ assert False, public
+ if len(sources) == 0:
+ assert public is False, 'Every catalog should contain a public source'
+ return None
else:
- log.warning('Checked all sources and found only %d bundles instead of the '
- 'expected minimum %d', total_bundles, min_bundles)
-
- def _list_managed_access_bundles(self,
- catalog: CatalogName
- ) -> Iterator[tuple[SourceRef, str, list[SourcedBundleFQID]]]:
- sources = self.azul_client.catalog_sources(catalog)
- # We need at least one managed_access bundle per IT. To index them with
- # remote_reindex and avoid collateral bundles, we use as specific a
- # prefix as possible.
- for source in self.managed_access_sources_by_catalog[catalog]:
- assert str(source.spec) in sources
- source = self.repository_plugin(catalog).partition_source(catalog, source)
- bundle_fqids = sorted(
- bundle_fqid
- for bundle_fqid in self.azul_client.list_bundles(catalog, source, prefix='')
- if not (
- # DUOS bundles are too sparse to fulfill the managed access tests
- config.is_anvil_enabled(catalog)
- and cast(TDRAnvilBundleFQID, bundle_fqid).table_name is BundleType.duos
- )
- )
- bundle_fqid = self.random.choice(bundle_fqids)
- prefix = bundle_fqid.uuid[:8]
- new_fqids = self.azul_client.list_bundles(catalog, source, prefix)
- yield source, prefix, new_fqids
+ source = self.random.choice(sorted(sources))
+ return plugin.resolve_source(source)
class IndexingIntegrationTest(IntegrationTestCase, AlwaysTearDownTestCase):
@@ -429,6 +389,8 @@ class Catalog:
name: CatalogName
bundles: set[SourcedBundleFQID]
notifications: list[JSON]
+ public_source: SourceRef
+ ma_source: SourceRef | None
def _wait_for_indexer():
self.azul_client.wait_for_indexer()
@@ -445,12 +407,23 @@ def _wait_for_indexer():
catalogs: list[Catalog] = []
for catalog in config.integration_test_catalogs:
if index:
- notifications, fqids = self._prepare_notifications(catalog)
+ public_source = self._choose_source(catalog, public=True)
+ ma_source = self._choose_source(catalog, public=False)
+ notifications, fqids = self._prepare_notifications(catalog,
+ sources=alist(public_source, ma_source))
else:
- notifications, fqids = [], set()
+ with self._service_account_credentials:
+ fqids = self._get_indexed_bundles(catalog)
+ indexed_sources = {fqid.source for fqid in fqids}
+ ma_sources = self.managed_access_sources_by_catalog[catalog]
+ public_source = one(s for s in indexed_sources if s not in ma_sources)
+ ma_source = only(s for s in indexed_sources if s in ma_sources)
+ notifications = []
catalogs.append(Catalog(name=catalog,
bundles=fqids,
- notifications=notifications))
+ notifications=notifications,
+ public_source=public_source,
+ ma_source=ma_source))
if index:
for catalog in catalogs:
@@ -466,12 +439,9 @@ def _wait_for_indexer():
self._test_manifest_tagging_race(catalog.name)
self._test_dos_and_drs(catalog.name)
self._test_repository_files(catalog.name)
- if index:
- bundle_fqids = catalog.bundles
- else:
- with self._service_account_credentials:
- bundle_fqids = self._get_indexed_bundles(catalog.name)
- self._test_managed_access(catalog=catalog.name, bundle_fqids=bundle_fqids)
+ self._test_managed_access(catalog=catalog.name,
+ public_source=catalog.public_source,
+ ma_source=catalog.ma_source)
if index and delete:
# FIXME: Test delete notifications
@@ -701,7 +671,7 @@ def _get_one_outer_file(self, catalog: CatalogName) -> JSON:
self.fail('No files found')
return one(hits)
- def _source_spec(self, catalog: CatalogName, entity: JSON) -> TDRSourceSpec:
+ def _source_spec(self, catalog: CatalogName, entity: JSON) -> SourceSpec:
if config.is_hca_enabled(catalog):
field = 'sourceSpec'
elif config.is_anvil_enabled(catalog):
@@ -755,9 +725,10 @@ def _uuid_column_name(self, catalog: CatalogName) -> str:
def _test_dos_and_drs(self, catalog: CatalogName):
if config.is_dss_enabled(catalog) and config.dss_direct_access:
- _, file = self._get_one_inner_file(catalog)
- self._test_dos(catalog, file)
- self._test_drs(catalog, file)
+ outer_file, inner_file = self._get_one_inner_file(catalog)
+ source = self._source_spec(catalog, outer_file)
+ self._test_dos(catalog, inner_file)
+ self._test_drs(catalog, source, inner_file)
@property
def _service_account_credentials(self) -> ContextManager:
@@ -802,8 +773,8 @@ def _check_endpoint(self,
method: str,
path: str,
*,
- args: Optional[Mapping[str, Any]] = None,
- endpoint: Optional[furl] = None,
+ args: Mapping[str, Any] | None = None,
+ endpoint: furl | None = None,
fetch: bool = False
) -> bytes:
if endpoint is None:
@@ -1140,13 +1111,14 @@ def _validate_file_content(self, content: ReadableFileObject, file: FileInnerEnt
def _validate_file_response(self,
response: urllib3.HTTPResponse,
- source: TDRSourceSpec,
+ source: SourceSpec,
file: FileInnerEntity):
"""
Note: The response object must have been obtained with stream=True
"""
try:
- if source.name == 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732':
+ special = 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732'
+ if isinstance(source, TDRSourceSpec) and source.name == special:
# All files in this snapshot were truncated to zero bytes by the
# Broad to save costs. The metadata is not a reliable indication
# of these files' actual size.
@@ -1156,7 +1128,11 @@ def _validate_file_response(self,
finally:
response.close()
- def _test_drs(self, catalog: CatalogName, file: FileInnerEntity):
+ def _test_drs(self,
+ catalog: CatalogName,
+ source: SourceSpec,
+ file: FileInnerEntity
+ ) -> None:
repository_plugin = self.azul_client.repository_plugin(catalog)
drs = repository_plugin.drs_client()
for access_method in AccessMethod:
@@ -1167,7 +1143,7 @@ def _test_drs(self, catalog: CatalogName, file: FileInnerEntity):
self.assertIsNone(access.headers)
if access.method is AccessMethod.https:
response = self._get_url(GET, furl(access.url), stream=True)
- self._validate_file_response(response, file)
+ self._validate_file_response(response, source, file)
elif access.method is AccessMethod.gs:
content = self._get_gs_url_content(furl(access.url), size=self.num_fastq_bytes)
self._validate_file_content(content, file)
@@ -1202,7 +1178,7 @@ def _test_dos(self, catalog: CatalogName, file: FileInnerEntity):
def _get_gs_url_content(self,
url: furl,
- size: Optional[int] = None
+ size: int | None = None
) -> BytesIO:
self.assertEquals('gs', url.scheme)
path = os.environ['GOOGLE_APPLICATION_CREDENTIALS']
@@ -1223,33 +1199,21 @@ def _validate_fastq_content(self, content: ReadableFileObject):
self.assertTrue(lines[2].startswith(b'+'))
def _prepare_notifications(self,
- catalog: CatalogName
+ catalog: CatalogName,
+ sources: Iterable[SourceRef]
) -> tuple[JSONs, set[SourcedBundleFQID]]:
- bundle_fqids: set[SourcedBundleFQID] = set()
+ plugin = self.repository_plugin(catalog)
+ bundle_fqids = set()
notifications = []
-
- def update(source: SourceRef,
- prefix: str,
- partition_bundle_fqids: Iterable[SourcedBundleFQID]):
- bundle_fqids.update(partition_bundle_fqids)
- notifications.append(self.azul_client.reindex_message(catalog,
- source,
- prefix))
-
- list(starmap(update, self._list_managed_access_bundles(catalog)))
- num_bundles = max(self.min_bundles - len(bundle_fqids), 1)
- log.info('Selected %d bundles to satisfy managed access coverage; '
- 'selecting at least %d more', len(bundle_fqids), num_bundles)
- # _list_partitions selects both public and managed access sources at random.
- # If we don't index at least one public source, every request would need
- # service account credentials and we couldn't compare the responses for
- # public and managed access data. `public_1st` ensures that at least
- # one of the sources will be public because sources are indexed starting
- # with the first one yielded by the iteration.
- list(starmap(update, self._list_partitions(catalog,
- min_bundles=num_bundles,
- public_1st=True)))
-
+ for source in sources:
+ source = plugin.partition_source(catalog, source)
+ # Some partitions may be empty, but we include them anyway to
+ # ensure test coverage for handling multiple partitions per source
+ for partition_prefix in source.spec.prefix.partition_prefixes():
+ bundle_fqids.update(self.azul_client.list_bundles(catalog, source, partition_prefix))
+ notifications.append(self.azul_client.reindex_message(catalog,
+ source,
+ partition_prefix))
# Index some bundles again to test that we handle duplicate additions.
# Note: random.choices() may pick the same element multiple times so
# some notifications may end up being sent three or more times.
@@ -1263,7 +1227,7 @@ def update(source: SourceRef,
def _get_indexed_bundles(self,
catalog: CatalogName,
- filters: Optional[JSON] = None
+ filters: JSON | None = None
) -> set[SourcedBundleFQID]:
indexed_fqids = set()
hits = self._get_entities(catalog, 'bundles', filters)
@@ -1272,36 +1236,36 @@ def _get_indexed_bundles(self,
source, bundle = one(hit['sources']), one(hit['bundles'])
source = SourceJSON(id=source[special_fields.source_id],
spec=source[special_fields.source_spec])
- bundle_fqid = SourcedBundleFQIDJSON(uuid=bundle[special_fields.bundle_uuid],
- version=bundle[special_fields.bundle_version],
- source=source)
- if config.is_anvil_enabled(catalog):
- # Every primary bundle contains 1 or more biosamples, 1 dataset,
- # and 0 or more other entities. Biosamples only occur in primary
- # bundles.
- if len(hit['biosamples']) > 0:
- table_name = BundleType.primary
- # Supplementary bundles contain only 1 file and 1 dataset.
- elif len(hit['files']) > 0:
- table_name = BundleType.supplementary
- # DUOS bundles contain only 1 dataset.
- elif len(hit['datasets']) > 0:
- table_name = BundleType.duos
- else:
- assert False, hit
- bundle_fqid = cast(TDRAnvilBundleFQIDJSON, bundle_fqid)
- bundle_fqid['table_name'] = table_name.value
- bundle_fqid = self.repository_plugin(catalog).resolve_bundle(bundle_fqid)
+ source = self.repository_plugin(catalog).source_from_json(source)
+ bundle_fqid = SourcedBundleFQID(uuid=bundle[special_fields.bundle_uuid],
+ version=bundle[special_fields.bundle_version],
+ source=source)
indexed_fqids.add(bundle_fqid)
return indexed_fqids
def _assert_catalog_complete(self,
catalog: CatalogName,
- bundle_fqids: Set[SourcedBundleFQID]
+ bundle_fqids: set[SourcedBundleFQID]
) -> None:
with self.subTest('catalog_complete', catalog=catalog):
expected_fqids = bundle_fqids
- if not config.is_anvil_enabled(catalog):
+ if config.is_anvil_enabled(catalog):
+ # Replica bundles do not add contributions to the index and
+ # therefore do not appear anywhere in the service response
+ # FIXME: Integration test does not assert that replica bundles are indexed
+ # https://github.com/DataBiosphere/azul/issues/6647
+ replica_fqids = {
+ bundle_fqid
+ for bundle_fqid in expected_fqids
+ if cast(TDRAnvilBundleFQID, bundle_fqid).table_name not in (
+ BundleType.primary.value,
+ BundleType.supplementary.value,
+ BundleType.duos.value,
+ )
+ }
+ expected_fqids -= replica_fqids
+ log.info('Ignoring replica bundles %r', replica_fqids)
+ else:
expected_fqids = set(self.azul_client.filter_obsolete_bundle_versions(expected_fqids))
obsolete_fqids = bundle_fqids - expected_fqids
if obsolete_fqids:
@@ -1356,7 +1320,7 @@ def _assert_catalog_empty(self, catalog: CatalogName):
def _get_entities(self,
catalog: CatalogName,
entity_type: EntityType,
- filters: Optional[JSON] = None
+ filters: JSON | None = None
) -> MutableJSONs:
entities = []
size = 100
@@ -1388,40 +1352,34 @@ def _assert_indices_exist(self, catalog: CatalogName):
def _test_managed_access(self,
catalog: CatalogName,
- bundle_fqids: Set[SourcedBundleFQID]
+ public_source: SourceRef,
+ ma_source: SourceRef | None,
) -> None:
with self.subTest('managed_access', catalog=catalog):
- indexed_source_ids = {fqid.source.id for fqid in bundle_fqids}
- managed_access_sources = self.managed_access_sources_by_catalog[catalog]
- managed_access_source_ids = {source.id for source in managed_access_sources}
- self.assertIsSubset(managed_access_source_ids, indexed_source_ids)
-
- if not managed_access_sources:
+ if ma_source is None:
if config.deployment_stage in ('dev', 'sandbox'):
# There should always be at least one managed-access source
# indexed and tested on the default catalog for these deployments
self.assertNotEqual(catalog, config.it_catalog_for(config.default_catalog))
self.skipTest(f'No managed access sources found in catalog {catalog!r}')
-
with self.subTest('managed_access_indices', catalog=catalog):
- self._test_managed_access_indices(catalog, managed_access_source_ids)
+ self._test_managed_access_indices(catalog, public_source, ma_source)
with self.subTest('managed_access_repository_files', catalog=catalog):
- files = self._test_managed_access_repository_files(catalog, managed_access_source_ids)
+ files = self._test_managed_access_repository_files(catalog, ma_source)
with self.subTest('managed_access_summary', catalog=catalog):
self._test_managed_access_summary(catalog, files)
with self.subTest('managed_access_repository_sources', catalog=catalog):
- public_source_ids = self._test_managed_access_repository_sources(catalog,
- indexed_source_ids,
- managed_access_source_ids)
- with self.subTest('managed_access_manifest', catalog=catalog):
- source_id = self.random.choice(sorted(public_source_ids & indexed_source_ids))
- self._test_managed_access_manifest(catalog, files, source_id)
+ self._test_managed_access_repository_sources(catalog,
+ public_source,
+ ma_source)
+ with self.subTest('managed_access_manifest', catalog=catalog):
+ self._test_managed_access_manifest(catalog, files, public_source)
def _test_managed_access_repository_sources(self,
catalog: CatalogName,
- indexed_source_ids: Set[str],
- managed_access_source_ids: Set[str]
- ) -> set[str]:
+ public_source: SourceRef,
+ ma_source: SourceRef
+ ) -> None:
"""
Test the managed access controls for the /repository/sources endpoint
:return: the set of public sources
@@ -1434,11 +1392,14 @@ def list_source_ids() -> set[str]:
return {source['sourceId'] for source in cast(JSONs, response['sources'])}
with self._service_account_credentials:
- self.assertIsSubset(indexed_source_ids, list_source_ids())
+ self.assertIsSubset({public_source.id, ma_source.id}, list_source_ids())
with self._public_service_account_credentials:
public_source_ids = list_source_ids()
+ self.assertIn(public_source.id, public_source_ids)
+ self.assertNotIn(ma_source.id, public_source_ids)
with self._unregistered_service_account_credentials:
self.assertEqual(public_source_ids, list_source_ids())
+ self.assertEqual(public_source_ids, list_source_ids())
invalid_auth = OAuth2('foo')
with self.assertRaises(UnauthorizedError):
TDRClient.for_registered_user(invalid_auth)
@@ -1446,13 +1407,11 @@ def list_source_ids() -> set[str]:
invalid_client = OAuth2Client(credentials_provider=invalid_provider)
with self._authorization_context(invalid_client):
self.assertEqual(401, self._get_url_unchecked(GET, url).status)
- self.assertEqual(set(), list_source_ids() & managed_access_source_ids)
- self.assertEqual(public_source_ids, list_source_ids())
- return public_source_ids
def _test_managed_access_indices(self,
catalog: CatalogName,
- managed_access_source_ids: Set[str]
+ public_source: SourceRef,
+ ma_source: SourceRef
) -> JSONs:
"""
Test the managed-access controls for the /index/bundles and
@@ -1462,11 +1421,6 @@ def _test_managed_access_indices(self,
"""
special_fields = self.metadata_plugin(catalog).special_fields
-
- def source_id_from_hit(hit: JSON) -> str:
- sources: JSONs = hit['sources']
- return one(sources)[special_fields.source_id]
-
bundle_type = self._bundle_type(catalog)
project_type = self._project_type(catalog)
@@ -1479,31 +1433,22 @@ def source_id_from_hit(hit: JSON) -> str:
hits = self._get_entities(catalog, project_type, filters=filters)
if accessible is None:
unfiltered_hits = hits
- accessible_sources, inaccessible_sources = set(), set()
for hit in hits:
- source_id = source_id_from_hit(hit)
- source_accessible = source_id not in managed_access_source_ids
+ source_id = one(hit['sources'])[special_fields.source_id]
+ source_accessible = {public_source.id: True, ma_source.id: False}[source_id]
hit_accessible = one(hit[project_type])[special_fields.accessible]
self.assertEqual(source_accessible, hit_accessible, hit['entryId'])
if accessible is not None:
self.assertEqual(accessible, hit_accessible)
- if source_accessible:
- accessible_sources.add(source_id)
- else:
- inaccessible_sources.add(source_id)
- self.assertIsDisjoint(accessible_sources, inaccessible_sources)
- self.assertIsDisjoint(managed_access_source_ids, accessible_sources)
- self.assertEqual(set() if accessible else managed_access_source_ids,
- inaccessible_sources)
self.assertIsNotNone(unfiltered_hits, 'Cannot recover from subtest failure')
bundle_fqids = self._get_indexed_bundles(catalog)
hit_source_ids = {fqid.source.id for fqid in bundle_fqids}
- self.assertEqual(set(), hit_source_ids & managed_access_source_ids)
+ self.assertEqual(hit_source_ids, {public_source.id})
source_filter = {
special_fields.source_id: {
- 'is': list(managed_access_source_ids)
+ 'is': [ma_source.id]
}
}
params = {
@@ -1512,18 +1457,18 @@ def source_id_from_hit(hit: JSON) -> str:
}
url = config.service_endpoint.set(path=('index', bundle_type), args=params)
response = self._get_url_unchecked(GET, url)
- self.assertEqual(403 if managed_access_source_ids else 200, response.status)
+ self.assertEqual(403, response.status)
with self._service_account_credentials:
bundle_fqids = self._get_indexed_bundles(catalog, filters=source_filter)
hit_source_ids = {fqid.source.id for fqid in bundle_fqids}
- self.assertEqual(managed_access_source_ids, hit_source_ids)
+ self.assertEqual({ma_source.id}, hit_source_ids)
return unfiltered_hits
def _test_managed_access_repository_files(self,
catalog: CatalogName,
- managed_access_source_ids: set[str]
+ ma_source: SourceRef
) -> JSONs:
"""
Test the managed access controls for the /repository/files endpoint
@@ -1533,7 +1478,7 @@ def _test_managed_access_repository_files(self,
with self._service_account_credentials:
files = self._get_entities(catalog, 'files', filters={
special_fields.source_id: {
- 'is': list(managed_access_source_ids)
+ 'is': [ma_source.id]
}
})
managed_access_file_urls = {
@@ -1570,7 +1515,7 @@ def _get_summary_file_count() -> int:
def _test_managed_access_manifest(self,
catalog: CatalogName,
files: JSONs,
- source_id: str
+ public_source: SourceRef
) -> None:
"""
Test the managed access controls for the /manifest/files endpoint and
@@ -1592,7 +1537,7 @@ def bundle_uuids(hit: JSON) -> set[str]:
for file in files
if len(file['sources']) == 1
))
- filters = {special_fields.source_id: {'is': [source_id]}}
+ filters = {special_fields.source_id: {'is': [public_source.id]}}
params = {'size': 1, 'catalog': catalog, 'filters': json.dumps(filters)}
files_url = furl(url=endpoint, path='index/files', args=params)
response = self._get_url_json(GET, files_url)
@@ -1890,10 +1835,7 @@ def _test_catalog(self, catalog: config.Catalog):
fqid = self.bundle_fqid(catalog.name)
log.info('Canning bundle %r from catalog %r', fqid, catalog.name)
with tempfile.TemporaryDirectory() as d:
- self._can_bundle(source=str(fqid.source.spec),
- uuid=fqid.uuid,
- version=fqid.version,
- output_dir=d)
+ self._can_bundle(fqid, output_dir=d)
generated_file = one(os.listdir(d))
with open(os.path.join(d, generated_file)) as f:
bundle_json = json.load(f)
@@ -1918,7 +1860,7 @@ def _test_catalog(self, catalog: config.Catalog):
}
self.assertIsSubset(set(stitched), metadata_ids)
elif metadata_plugin_name == 'anvil':
- self.assertEqual({'entities', 'links'}, bundle_json.keys())
+ self.assertEqual({'entities', 'links', 'orphans'}, bundle_json.keys())
entities, links = bundle_json['entities'], bundle_json['links']
self.assertIsInstance(entities, dict)
self.assertIsInstance(links, list)
@@ -1960,26 +1902,29 @@ def test_can_bundle_canned_repository(self):
self._test_catalog(mock_catalog)
def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID:
- # Skip through empty partitions
- bundle_fqids = itertools.chain.from_iterable(
- bundle_fqids
- for _, _, bundle_fqids in self._list_partitions(catalog,
- min_bundles=1,
- public_1st=False)
- )
+ source = self._choose_source(catalog)
+ # The plugin will raise an exception if the source lacks a prefix
+ source = source.with_prefix(Prefix.of_everything)
+ bundle_fqids = self.repository_plugin(catalog).list_bundles(source, '')
return self.random.choice(sorted(bundle_fqids))
def _can_bundle(self,
- source: str,
- uuid: str,
- version: str,
+ fqid: SourcedBundleFQID,
output_dir: str
) -> None:
args = [
- '--source', source,
- '--uuid', uuid,
- '--version', version,
- '--output-dir', output_dir
+ '--uuid', fqid.uuid,
+ '--version', fqid.version,
+ '--source', str(fqid.source.spec),
+ *(
+ [
+ '--table-name', fqid.table_name,
+ '--batch-prefix', 'null' if fqid.batch_prefix is None else fqid.batch_prefix,
+ ]
+ if isinstance(fqid, TDRAnvilBundleFQID) else
+ []
+ ),
+ '--output-dir', output_dir,
]
return self._can_bundle_main(args)
diff --git a/test/service/test_filters.py b/test/service/test_filters.py
index 8eaf55126a..b782a5671a 100644
--- a/test/service/test_filters.py
+++ b/test/service/test_filters.py
@@ -33,7 +33,8 @@ class TestFilterReification(AzulTestCase):
source_id='sourceId',
source_spec=MagicMock(),
bundle_uuid=MagicMock(),
- bundle_version=MagicMock()
+ bundle_version=MagicMock(),
+ implicit_hub_id=MagicMock()
)
@property
diff --git a/test/service/test_manifest.py b/test/service/test_manifest.py
index f57b88d012..dac94047db 100644
--- a/test/service/test_manifest.py
+++ b/test/service/test_manifest.py
@@ -1705,9 +1705,11 @@ def bundles(cls) -> list[SourcedBundleFQID]:
return [
cls.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf',
version=cls.version),
- cls.bundle_fqid(uuid='6b0f6c0f-5d80-a242-accb-840921351cd5',
+ cls.bundle_fqid(uuid='c67e7adb-1a9c-a3b9-bc91-eb10a428374a',
version=cls.version),
cls.bundle_fqid(uuid='826dea02-e274-affe-aabc-eb3db63ad068',
+ version=cls.version),
+ cls.bundle_fqid(uuid='9a135c9a-069b-a90e-b588-eaf8d1aeeac9',
version=cls.version)
]
@@ -1717,7 +1719,7 @@ def test_compact_manifest(self):
expected = [
(
'bundles.bundle_uuid',
- '6b0f6c0f-5d80-a242-accb-840921351cd5',
+ 'c67e7adb-1a9c-a3b9-bc91-eb10a428374a',
'826dea02-e274-affe-aabc-eb3db63ad068',
'826dea02-e274-affe-aabc-eb3db63ad068'
),
@@ -2085,18 +2087,39 @@ def test_compact_manifest(self):
self._assert_tsv(expected, response)
def test_verbatim_jsonl_manifest(self):
- response = self._get_manifest(ManifestFormat.verbatim_jsonl, filters={})
- self.assertEqual(200, response.status_code)
- expected = {
- # Consolidate entities with the same replica (i.e. datasets)
- json_hash(entity).digest(): {
- 'type': entity_ref.entity_type,
- 'value': entity,
+ all_entities, linked_entities = self._canned_entities()
+ cases = [
+ ({}, False),
+ ({'datasets.title': {'is': ['ANVIL_CMG_UWASH_DS_BDIS']}}, False),
+ # Orphans should be included only when filtering by dataset ID
+ ({'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}}, True)
+ ]
+ for filters, expect_orphans in cases:
+ with self.subTest(filters=filters):
+ response = self._get_manifest(ManifestFormat.verbatim_jsonl, filters=filters)
+ self.assertEqual(200, response.status_code)
+ expected_rows = list(all_entities if expect_orphans else linked_entities)
+ self._assert_jsonl(expected_rows, response)
+
+ def _canned_entities(self):
+
+ def hash_entities(entities: dict[EntityReference, JSON]) -> dict[str, JSON]:
+ return {
+ json_hash(contents).digest(): {
+ 'type': ref.entity_type,
+ 'value': contents
+ }
+ for ref, contents in entities.items()
}
- for bundle in self.bundles()
- for entity_ref, entity in self._load_canned_bundle(bundle).entities.items()
- }.values()
- self._assert_jsonl(list(expected), response)
+
+ linked_entities_by_hash, all_entities_by_hash = {}, {}
+ for bundle_fqid in self.bundles():
+ bundle = self._load_canned_bundle(bundle_fqid)
+ linked_entities_by_hash.update(hash_entities(bundle.entities))
+ all_entities_by_hash.update(hash_entities(bundle.orphans))
+ all_entities_by_hash.update(linked_entities_by_hash)
+
+ return all_entities_by_hash.values(), linked_entities_by_hash.values()
def test_verbatim_pfb_manifest(self):
response = self._get_manifest(ManifestFormat.verbatim_pfb, filters={})
diff --git a/test/service/test_request_builder.py b/test/service/test_request_builder.py
index 218f964313..5e89aab8e6 100644
--- a/test/service/test_request_builder.py
+++ b/test/service/test_request_builder.py
@@ -65,7 +65,8 @@ def special_fields(self) -> SpecialFields:
return SpecialFields(source_id='sourceId',
source_spec='sourceSpec',
bundle_uuid='bundleUuid',
- bundle_version='bundleVersion')
+ bundle_version='bundleVersion',
+ implicit_hub_id='projectId')
@property
def _field_mapping(self) -> MetadataPlugin._FieldMapping: