Skip to content

Commit

Permalink
[r] Fix: Optimistic lock contention on HCA replicas (#6648)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Nov 15, 2024
1 parent 81754fb commit cb3c054
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
50 changes: 49 additions & 1 deletion src/azul/indexer/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
Entities = JSONs


class TooManyEntities(Exception):

def __init__(self, max_size: int):
super().__init__(max_size)


class Accumulator(metaclass=ABCMeta):
"""
Accumulates multiple values into a single value, not necessarily of the same
Expand Down Expand Up @@ -92,7 +98,12 @@ class SetAccumulator(Accumulator):
smallest values, it returns a sorted list of the first N distinct values.
"""

def __init__(self, max_size=None, key=None) -> None:
def __init__(self,
max_size=None,
key=None,
*,
raise_on_overflow: bool = False
) -> None:
"""
:param max_size: the maximum number of elements to retain
Expand All @@ -101,15 +112,50 @@ def __init__(self, max_size=None, key=None) -> None:
be used. With that default key, if any None values were
placed in the accumulator, the first element, and only the
first element of the returned list will be None.
:param raise_on_overflow: If true, raise TooManyEntities if the size of
the accumulated set would exceed max_size.
"""
super().__init__()
self.value = set()
self.max_size = max_size
self.key = none_safe_key(none_last=True) if key is None else key
self.raise_on_overflow = raise_on_overflow

def accumulate(self, value) -> bool:
"""
:return: True, if the given value was incorporated into the set
>>> s = SetAccumulator(max_size=3)
>>> s.accumulate(1)
True
>>> s.accumulate(1)
False
>>> s.accumulate(2)
True
>>> s.accumulate([1, 2, 3])
True
>>> s.accumulate([2, 3])
False
>>> s.accumulate(4)
False
>>> s.get()
[1, 2, 3]
>>> s = SetAccumulator(max_size=3, raise_on_overflow=True)
>>> s.accumulate([1, 2, 3])
True
>>> s.accumulate(4)
Traceback (most recent call last):
...
azul.indexer.aggregate.TooManyEntities: 3
"""
if self.max_size is None or len(self.value) < self.max_size:
before = len(self.value)
Expand All @@ -126,6 +172,8 @@ def accumulate(self, value) -> bool:
return False
else:
assert False
elif self.raise_on_overflow and value not in self.value:
raise TooManyEntities(self.max_size)
else:
return False

Expand Down
4 changes: 4 additions & 0 deletions src/azul/plugins/metadata/hca/indexer/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ def _accumulator(self, field) -> Accumulator | None:
none_safe_itemgetter('value', 'unit')))
elif field == 'donor_count':
return UniqueValueCountAccumulator()
elif field == 'document_id':
return SetAccumulator(max_size=100, raise_on_overflow=True)
else:
return super()._accumulator(field)

Expand Down Expand Up @@ -197,6 +199,8 @@ class ProtocolAggregator(SimpleAggregator):
def _accumulator(self, field) -> Accumulator | None:
if field == 'assay_type':
return FrequencySetAccumulator(max_size=100)
elif field == 'document_id':
return SetAccumulator(max_size=100, raise_on_overflow=True)
else:
return super()._accumulator(field)

Expand Down
7 changes: 6 additions & 1 deletion src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,12 @@ def entity_type(cls) -> str:
@classmethod
def hot_entity_types(cls) -> dict[EntityType, EntityType]:
return {
'project': 'projects'
'project': 'projects',
'donor_organism': 'donors',
'analysis_protocol': 'analysis_protocols',
'imaging_protocol': 'imaging_protocols',
'library_preparation_protocol': 'library_preparation_protocols',
'sequencing_protocol': 'sequencing_protocols'
}

def _entities(self) -> Iterable[api.File]:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cb3c054

Please sign in to comment.