Skip to content

Commit

Permalink
Fix: Left-overs from orphan-related PRs (#6691, PR #6705)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsotirho-ucsc committed Nov 25, 2024
2 parents 6e8ec5e + da2e515 commit f08b173
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 192 deletions.
9 changes: 7 additions & 2 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None:
for contributions, replicas in transforms:
tallies.update(self.contribute(catalog, contributions))
self.replicate(catalog, replicas)
if tallies:
self.aggregate(tallies)
self.aggregate(tallies)

def delete(self, catalog: CatalogName, bundle: Bundle) -> None:
"""
Expand Down Expand Up @@ -461,6 +460,12 @@ def aggregate(self, tallies: CataloguedTallies):
Also note that the input tallies can refer to entities from different
catalogs.
"""
# Attempting to filter by an empty array of coordinates while reading
# the aggregates will fail with a 400 error from ElasticSearch. This
# happens when indexing replica bundles for AnVIL, since they emit no
# contributions.
if not tallies:
return
# Use catalog specified in each tally
writer = self._create_writer(DocumentType.aggregate, catalog=None)
while True:
Expand Down
20 changes: 9 additions & 11 deletions src/azul/plugins/metadata/anvil/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,27 @@ def reject_joiner(self, catalog: CatalogName):
pass

def to_json(self) -> MutableJSON:
def serialize_entities(entities):
def to_json(entities):
return {
str(entity_ref): entity
for entity_ref, entity in sorted(entities.items())
}

return {
'entities': serialize_entities(self.entities),
'orphans': serialize_entities(self.orphans),
'entities': to_json(self.entities),
'orphans': to_json(self.orphans),
'links': [link.to_json() for link in sorted(self.links)]
}

@classmethod
def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self:
def deserialize_entities(json_entities):
def from_json(entities):
return {
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_entities.items()
for entity_ref, entity in entities.items()
}

return cls(
fqid=fqid,
entities=deserialize_entities(json_['entities']),
links=set(map(EntityLink.from_json, json_['links'])),
orphans=deserialize_entities(json_['orphans'])
)
return cls(fqid=fqid,
entities=from_json(json_['entities']),
links=set(map(EntityLink.from_json, json_['links'])),
orphans=from_json(json_['orphans']))
18 changes: 9 additions & 9 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def aggregator(cls, entity_type) -> EntityAggregator:

def estimate(self, partition: BundlePartition) -> int:
# Orphans are not considered when deciding whether to partition the
# bundle, but if the bundle is partitioned then each orphan will be
# replicated in a single partition
# bundle, but if the bundle is partitioned then each partition will only
# emit replicas for the orphans that it contains
return sum(map(partial(self._contains, partition), self.bundle.entities))

def transform(self,
Expand Down Expand Up @@ -578,13 +578,13 @@ def transform(self,
) -> Iterable[Contribution | Replica]:
yield from super().transform(partition)
if config.enable_replicas:
# Replicas are only emitted by the file transformer for entities
# that are linked to at least one file. This excludes all orphans,
# and a small number of linked entities, usually from primary
# bundles don't include any files. Some of the replicas we emit here
# will be redundant with those emitted by the file transformer, but
# these will be coalesced by the index service before they are
# written to ElasticSearch.
# The file transformer only emits replicas for entities that are
# linked to at least one file. This excludes all orphans, and a
# small number of linked entities, usually from primary bundles
# don't include any files. Some of the replicas we emit here will be
# redundant with those emitted by the file transformer, but these
# will be consolidated by the index service before they are written
# to ElasticSearch.
dataset = self._only_dataset()
for entity in chain(self.bundle.orphans, self.bundle.entities):
if partition.contains(UUID(entity.entity_id)):
Expand Down
13 changes: 5 additions & 8 deletions src/azul/plugins/repository/canned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
from furl import (
furl,
)
from more_itertools import (
ilen,
)

from azul import (
CatalogName,
Expand Down Expand Up @@ -165,11 +162,11 @@ def staging_area(self, url: str) -> StagingArea:

def count_bundles(self, source: SOURCE_SPEC) -> int:
staging_area = self.staging_area(source.spec.name)
return ilen(
links_id
for links_id in staging_area.links
if source.prefix is None or links_id.startswith(source.prefix.common)
)
if source.prefix is None:
return len(staging_area.links)
else:
prefix = source.prefix.common
return sum(1 for links_id in staging_area.links if links_id.startswith(prefix))

def list_bundles(self,
source: CannedSourceRef,
Expand Down
137 changes: 80 additions & 57 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
AbstractSet,
Callable,
Iterable,
Self,
cast,
)
import uuid
Expand Down Expand Up @@ -80,63 +79,86 @@

class BundleType(Enum):
"""
AnVIL snapshots have no inherent notion of a "bundle". During indexing, we
dynamically construct bundles by querying each table in the snapshot. This
class enumerates the tables that require special strategies for listing and
fetching their bundles.
Primary bundles are defined by a biosample entity, termed the bundle entity.
Each primary bundle includes all of the bundle entity descendants and all of
those those entities' ancestors, which are discovered by iteratively
following foreign keys. Biosamples were chosen for this role based on a
desirable balance between the size and number of the resulting bundles as
well as the degree of overlap between them. The implementation of the graph
traversal is tightly coupled to this choice, and switching to a different
entity type would require re-implementing much of the Plugin code. Primary
bundles consist of at least one biosample (the bundle entity), exactly one
dataset, and zero or more other entities of assorted types. Primary bundles
never contain orphans because they are bijective to rows in the biosample
table.
Unlike HCA, AnVIL has no inherent notion of a "bundle". Its data model is
strictly relational: each row in a table represents an entity, each entity
has a primary key, and entities reference each other via foreign keys.
During indexing, we dynamically construct bundles by querying each table in
the snapshot. This class enumerates the tables that require special
strategies for listing and fetching their bundles.
An orphan is defined as an AnVIL entity that does not appear in any of
Azul's `/index/{entity_type}`. Bundles *can* contain orphans, but they will
only ever manifest as replicas in our index. A *local orphan* is an entity
in a bundle that is not referenced anywhere in that bundle's links. Local
orphans may or may not be true/global orphans (because they may still be
references in *other* bundles' links), but all global orphans are always
local orphans. Bundles only contain local orphans from the table that
matches the bundle's `table_name` attribute.
Primary bundles are defined by a biosample entity, termed the *bundle
entity*. Each primary bundle includes all of the bundle entity's descendants
and all of those entities' ancestors. Descendants and ancestors are
discovered by iteratively following foreign keys. Biosamples were chosen to
act as the bundle entities for primary bundles based on a desirable balance
between the size and number of the resulting bundles as well as the degree
of overlap between them. The implementation of the graph traversal is
tightly coupled to this choice, and switching to a different bundle entity
type would require re-implementing much of the Plugin code. Primary bundles
consist of at least one biosample (the bundle entity), exactly one dataset
entity, and zero or more other entities of assorted types. Primary bundles
never contain local orphans because they are bijective to rows in the
biosample table.
Supplementary bundles consist of batches of file entities, which may include
supplementary files, which lack any foreign keys that associate them with
any other entity. Non-supplementary files in the bundle are classified as
orphans. The bundle also includes a dataset entity linked to the
supplementary files.
Duos bundles consist of a single dataset entity. This "entity" includes only
supplementary files. Supplementary files lack any foreign keys that would
associate them with any other entity. Each supplementary bundle also
includes a dataset entity, and we create synthetic links between the
supplementary files and the dataset. Without these links, the relationship
between these files and their parent dataset would not be properly
represented in the service response. Supplementary files therefore are never
local or global orphans.
Normal (non-supplementary) files are not linked to the dataset and thus are
local orphans within these bundles. This is because these files may also
appear in primary bundles. If they do, then those bundles will contribute
them to the index alongside all of their linked entities. If they don't,
then they are global orphans. In either case, it would be pointless for a
supplementary bundle to emit contributions for them, hence we treat them as
orphans.
DUOS bundles consist of a single dataset entity. This "entity" includes only
the dataset description retrieved from DUOS, while a copy of the BigQuery
row for this dataset is also included as an orphan. We chose this design
because there is only one dataset per snapshot, which is referenced in all
primary and supplementary bundles. Therefore, only one request to DUOS per
*snapshot* is necessary, but if `description` is retrieved at the same time
as the other dataset fields, we will make one request per *bundle* instead,
bundles. Therefore, only one request to DUOS per *snapshot* is necessary. If
the DUOS `description` were retrieved at the same time as the other fields
of the dataset entity, we would make one request per *bundle* instead,
potentially overloading the DUOS service. Our solution is to retrieve
`description` only in a dedicated bundle format, once per snapshot, and
merge it with the other dataset fields during aggregation.
`description` only in a bundle of this dedicated DUOS type, once per
snapshot, and merge it with the other dataset fields during aggregation.
All other bundles are replica bundles. Replica bundles consist of a batch of
rows from an arbitrary BigQuery table, which may or may not be described by
the AnVIL schema. Replica bundles only include orphans and have no links.
the AnVIL schema, and the snapshot's dataset entity. Replica bundles contain
no links and thus all of their entities are local orphans.
"""
primary = 'anvil_biosample'
supplementary = 'anvil_file'
duos = 'anvil_dataset'

def is_batched(self: Self | str) -> bool:
@classmethod
def is_batched(cls, table_name: str) -> bool:
"""
>>> BundleType.primary.is_batched()
True if bundles for the table of the given name represent batches of
rows, or False if each bundle represents a single row.
>>> BundleType.is_batched(BundleType.primary.value)
False
>>> BundleType.is_batched('anvil_activity')
True
"""
if isinstance(self, str):
try:
self = BundleType(self)
except ValueError:
return True
return self not in (BundleType.primary, BundleType.duos)
return table_name not in (cls.primary.value, cls.duos.value)


class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON):
Expand Down Expand Up @@ -245,28 +267,28 @@ def list_bundles(self,
self._assert_source(source)
bundles = []
spec = source.spec

if config.duos_service_url is not None:
# We intentionally omit the WHERE clause for datasets in order to
# verify our assumption that each snapshot only contains rows for a
# single dataset. This verification is performed independently and
# concurrently for every partition, but only one partition actually
# emits the bundle.
row = one(self._run_sql(f'''
SELECT datarepo_row_id
FROM {backtick(self._full_table_name(spec, BundleType.duos.value))}
'''))
dataset_row_id = row['datarepo_row_id']
# We intentionally omit the WHERE clause for datasets in order
# to verify our assumption that each snapshot only contains rows
# for a single dataset. This verification is performed
# independently and concurrently for every partition, but only
# one partition actually emits the bundle.
if dataset_row_id.startswith(prefix):
bundle_uuid = change_version(dataset_row_id,
self.datarepo_row_uuid_version,
self.bundle_uuid_version)
bundles.append(TDRAnvilBundleFQID(
uuid=bundle_uuid,
version=self._version,
source=source,
table_name=BundleType.duos.value,
batch_prefix=None,
))
bundle_fqid = TDRAnvilBundleFQID(uuid=bundle_uuid,
version=self._version,
source=source,
table_name=BundleType.duos.value,
batch_prefix=None)
bundles.append(bundle_fqid)
for row in self._run_sql(f'''
SELECT datarepo_row_id
FROM {backtick(self._full_table_name(spec, BundleType.primary.value))}
Expand All @@ -275,13 +297,12 @@ def list_bundles(self,
bundle_uuid = change_version(row['datarepo_row_id'],
self.datarepo_row_uuid_version,
self.bundle_uuid_version)
bundles.append(TDRAnvilBundleFQID(
uuid=bundle_uuid,
version=self._version,
source=source,
table_name=BundleType.primary.value,
batch_prefix=None,
))
bundle_fqid = TDRAnvilBundleFQID(uuid=bundle_uuid,
version=self._version,
source=source,
table_name=BundleType.primary.value,
batch_prefix=None)
bundles.append(bundle_fqid)
prefix_lengths_by_table = self._batch_tables(source.spec, prefix)
for table_name, (batch_prefix_length, _) in prefix_lengths_by_table.items():
batch_prefixes = Prefix(common=prefix,
Expand Down Expand Up @@ -351,6 +372,8 @@ def repeat(fmt):
log.info('Calculating batch prefix lengths for partition %r of %d tables '
'in source %s', prefix, len(table_names), source)
# The extraneous outer 'SELECT *' works around a bug in BigQuery emulator
# FIXME: BigQuery Emulator rejects valid query
# https://github.com/DataBiosphere/azul/issues/6704
query = ' UNION ALL '.join(f'''(
SELECT * FROM (
SELECT
Expand Down
6 changes: 6 additions & 0 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,8 @@ class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta):

@property
def entity_type(self) -> str:
# Orphans only have projects/datasets as hubs, so we need to retrieve
# aggregates of those types in order to join against orphan replicas
return self.implicit_hub_type if self.include_orphans else 'files'

@property
Expand All @@ -2003,6 +2005,9 @@ def implicit_hub_type(self) -> str:

@property
def include_orphans(self) -> bool:
# When filtering only by project/dataset ID, we need to include
# *everything* in the selected projects/datasets, even rows that don't
# appear anywhere in the rest of the service response.
special_fields = self.service.metadata_plugin(self.catalog).special_fields
return self.filters.explicit.keys() == {special_fields.implicit_hub_id}

Expand Down Expand Up @@ -2110,6 +2115,7 @@ def create_file(self) -> tuple[str, Optional[str]]:
plugin = self.service.metadata_plugin(self.catalog)
replicas = list(self._all_replicas())
replica_schemas = plugin.verbatim_pfb_schema(replicas)
# Ensure field order is consistent for unit tests
replica_schemas.sort(key=itemgetter('name'))
replica_types = [s['name'] for s in replica_schemas]
pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas)
Expand Down
6 changes: 3 additions & 3 deletions test/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class CannedFileTestCase(AzulUnitTestCase):
"""

@classmethod
def _data_path(cls, module: Literal['service', 'indexer']) -> Path:
return Path(config.project_root) / 'test' / module / 'data'
def _data_path(cls, module: Literal['service', 'indexer'], *path: str) -> Path:
return Path(config.project_root).joinpath('test', module, 'data', *path)

@classmethod
def _load_canned_file(cls,
Expand All @@ -120,7 +120,7 @@ def _load_canned_file_version(cls,
) -> Union[MutableJSONs, MutableJSON]:
suffix = '' if version is None else '.' + version
file_name = f'{uuid}{suffix}.{extension}.json'
with open(cls._data_path('indexer') / file_name, 'r') as infile:
with open(cls._data_path('indexer', file_name), 'r') as infile:
return json.load(infile)


Expand Down
Loading

0 comments on commit f08b173

Please sign in to comment.