Skip to content

Commit

Permalink
[r p] Fix: Optimistic lock contention on HCA replicas (#6648, PR #6652)
Browse files Browse the repository at this point in the history
  • Loading branch information
achave11-ucsc committed Oct 30, 2024
2 parents ccc785f + da706a6 commit 24aaa42
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 95 deletions.
130 changes: 48 additions & 82 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,20 +1076,6 @@ def filter(self, relation: str, values: list[AnyJSON]) -> list[JSON]:
FieldTypes = Mapping[str, FieldTypes1]
CataloguedFieldTypes = Mapping[CatalogName, FieldTypes]


class VersionType(Enum):
# No versioning; document is created or overwritten as needed
none = auto()

# Writing a document fails with 409 conflict if one with the same ID already
# exists in the index
create_only = auto()

# Use the Elasticsearch "internal" versioning type
# https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-index_.html#_version_types
internal = auto()


InternalVersion = tuple[int, int]


Expand All @@ -1112,17 +1098,11 @@ class OpType(Enum):


@attr.s(frozen=False, kw_only=True, auto_attribs=True)
class Document(Generic[C]):
needs_seq_no_primary_term: ClassVar[bool] = False
class Document(Generic[C], metaclass=ABCMeta):
needs_translation: ClassVar[bool] = True

coordinates: C
version_type: VersionType = VersionType.none

# For VersionType.internal, version is a tuple composed of the sequence
# number and primary term. For VersionType.none and .create_only, it is
# None.
# https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-bulk.html#bulk-api-response-body
version: Optional[InternalVersion]

# In the index, the `contents` property is always present and never null in
Expand All @@ -1137,6 +1117,24 @@ class Document(Generic[C]):
def entity(self) -> EntityReference:
return self.coordinates.entity

@property
@abstractmethod
def op_type(self) -> OpType:
"""
Get the ES client method to use when writing this document to the index.
"""
raise NotImplementedError

@op_type.setter
def op_type(self, value: OpType):
"""
Set the ES client method to use when writing this document to the index.
This setter is optional, concrete classes may not implement it. If they
don't, callers should gracefully handle the resulting AttributeError, as
that is what the Python runtime raises for the NotImplementedError.
"""
raise NotImplementedError

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
return {
Expand Down Expand Up @@ -1295,14 +1293,11 @@ def from_index(cls,
document = cls.translate_fields(document,
field_types[coordinates.entity.catalog],
forward=False)
if cls.needs_seq_no_primary_term:
try:
version = (hit['_seq_no'], hit['_primary_term'])
except KeyError:
assert '_seq_no' not in hit
assert '_primary_term' not in hit
version = None
else:
try:
version = hit['_seq_no'], hit['_primary_term']
except KeyError:
assert '_seq_no' not in hit
assert '_primary_term' not in hit
version = None

return cls.from_json(coordinates=coordinates,
Expand All @@ -1311,59 +1306,34 @@ def from_index(cls,

def to_index(self,
catalog: Optional[CatalogName],
field_types: CataloguedFieldTypes,
bulk: bool = False
field_types: CataloguedFieldTypes
) -> JSON:
"""
Build request parameters from the document for indexing
Prepare a request to write this document to the index. The return value
is a dictionary with keyword arguments to the ES client method selected
by the :meth:`op_type` property.
:param catalog: An optional catalog name. If None, this document's
coordinates must supply it. Otherwise this document's
coordinates must supply the same catalog or none at all.
:param field_types: A mapping of field paths to field type
:param bulk: If bulk indexing
:return: Request parameters for indexing
"""
op_type = self.op_type
coordinates = self.coordinates.with_catalog(catalog)
result = {
'_index' if bulk else 'index': coordinates.index_name,
**(
{}
if op_type is OpType.delete else
{
'_source' if bulk else 'body':
self._body(field_types[coordinates.entity.catalog])
}
),
'_id' if bulk else 'id': self.coordinates.document_id
'index': coordinates.index_name,
'id': self.coordinates.document_id
}
# For non-bulk updates, self.op_type determines which client
# method is invoked.
if bulk:
result['_op_type'] = self.op_type.name
if self.version_type is VersionType.none:
assert not self.needs_seq_no_primary_term
elif self.version_type is VersionType.create_only:
assert not self.needs_seq_no_primary_term
if bulk:
if op_type is OpType.delete:
result['if_seq_no'], result['if_primary_term'] = self.version
else:
assert op_type is OpType.create, op_type
elif self.version_type is VersionType.internal:
assert self.needs_seq_no_primary_term
if self.version is not None:
# For internal versioning, self.version is None for new documents
result['if_seq_no'], result['if_primary_term'] = self.version
else:
assert False, self.version_type
if self.op_type is not OpType.delete:
result['body'] = self._body(field_types[coordinates.entity.catalog])
if self.version is not None:
result['if_seq_no'], result['if_primary_term'] = self.version
if self.op_type is OpType.update:
result['params'] = {'retry_on_conflict': 3}
return result

@property
def op_type(self) -> OpType:
raise NotImplementedError

def _body(self, field_types: FieldTypes) -> JSON:
body = self.to_json()
if self.needs_translation:
Expand All @@ -1383,9 +1353,17 @@ class Contribution(Document[ContributionCoordinates[E]]):
contents: JSON
source: DocumentSource

#: The version_type attribute will change to VersionType.none if writing
#: The op_type attribute will change to OpType.index if writing
#: to Elasticsearch fails with 409
version_type: VersionType = VersionType.create_only
_op_type: OpType = OpType.create

@property
def op_type(self) -> OpType:
return self._op_type

@op_type.setter
def op_type(self, op_type: OpType):
self._op_type = op_type

def __attrs_post_init__(self):
assert self.contents is not None
Expand Down Expand Up @@ -1443,23 +1421,12 @@ def to_json(self):
bundle_version=self.coordinates.bundle.version,
bundle_deleted=self.coordinates.deleted)

@property
def op_type(self) -> OpType:
if self.version_type is VersionType.create_only:
return OpType.create
elif self.version_type is VersionType.none:
return OpType.index
else:
assert False, self.version_type


@attr.s(frozen=False, kw_only=True, auto_attribs=True)
class Aggregate(Document[AggregateCoordinates]):
version_type: VersionType = VersionType.internal
sources: set[DocumentSource]
bundles: Optional[list[BundleFQIDJSON]]
num_contributions: int
needs_seq_no_primary_term: ClassVar[bool] = True

def __attrs_post_init__(self):
assert isinstance(self.coordinates, AggregateCoordinates)
Expand Down Expand Up @@ -1564,7 +1531,6 @@ def to_json(self) -> JSON:

@property
def op_type(self) -> OpType:
assert self.version_type is VersionType.none, self.version_type
return OpType.update

def _body(self, field_types: FieldTypes) -> JSON:
Expand Down
54 changes: 41 additions & 13 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
attrgetter,
)
from typing import (
Any,
MutableSet,
Optional,
TYPE_CHECKING,
Expand Down Expand Up @@ -79,7 +80,6 @@
OpType,
Replica,
ReplicaCoordinates,
VersionType,
)
from azul.indexer.document_service import (
DocumentService,
Expand Down Expand Up @@ -437,7 +437,7 @@ def contribute(self,
else:
entity = CataloguedEntityReference.for_entity(catalog, c.coordinates.entity)
# Don't count overwrites, but ensure entry exists
was_overwrite = c.version_type is VersionType.none
was_overwrite = c.op_type is OpType.index
tallies[entity] += 0 if was_overwrite else 1
contributions = retry_contributions
writer.raise_on_errors()
Expand Down Expand Up @@ -617,7 +617,7 @@ def pages() -> Iterable[JSONs]:
body=body,
size=config.contribution_page_size,
track_total_hits=False,
seq_no_primary_term=Contribution.needs_seq_no_primary_term)
seq_no_primary_term=True)
hits = response['hits']['hits']
log.debug('Read a page with %i contribution(s)', len(hits))
if hits:
Expand Down Expand Up @@ -879,11 +879,26 @@ def _write_bulk(self, documents: Iterable[Document]):
doc.coordinates: doc
for doc in documents
}
actions = [
doc.to_index(self.catalog, self.field_types, bulk=True)
for doc in documents.values()
]

def expand_action(doc: Any) -> tuple[dict[str, Any], dict[str, Any] | None]:
# Document.to_index returns the keyword arguments to the ES client
# method referenced by Document.op_type. In bulk requests, these
# methods are not invoked individually. This function converts the
# keyword arguments returned by Document.to_index to the form
# internally used by the ES client's `bulk` method: a pair
# consisting of 1) the action and associated metadata and 2) an
# optional document source.
assert isinstance(doc, Document), doc
action = dict(doc.to_index(self.catalog, self.field_types))
action.update(action.pop('params', {}))
action['_index'] = action.pop('index')
action['_id'] = action.pop('id')
body = action.pop('body', None)
action = {doc.op_type.name: action}
return action, body

log.info('Writing documents using streaming_bulk().')

# We cannot use parallel_bulk() for 1024+ actions because Lambda doesn't
# support shared memory. See the issue below for details.
#
Expand All @@ -894,8 +909,13 @@ def _write_bulk(self, documents: Iterable[Document]):
# There is no way to split a single action and hence a single document
# into multiple requests.
#
# Technically, we're not supposed to pass Document instances in the
# `action` parameter but we're exploiting the undocumented fact that the
# method immediately maps the value of the `expand_action_callback`
# parameter over the list passed in the `actions` parameter.
response = streaming_bulk(client=self.es_client,
actions=actions,
actions=list(documents.values()),
expand_action_callback=expand_action,
refresh=self.refresh,
raise_on_error=False,
max_chunk_bytes=config.max_chunk_size)
Expand Down Expand Up @@ -941,14 +961,22 @@ def _on_conflict(self, doc: Document, e: Union[Exception, JSON]):
self.retries.add(doc.coordinates)
else:
action = 'giving up'
if doc.version_type is VersionType.create_only:
log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates)
# Try again but allow overwriting
doc.version_type = VersionType.none
else:

def warn():
log.warning('There was a conflict with document %r: %r. Total # of errors: %i, %s.',
doc.coordinates, e, self.conflicts[doc.coordinates], action)

if doc.op_type is OpType.create:
try:
doc.op_type = OpType.index
except AttributeError:
# We don't expect all Document types will let us modify op_type
warn()
else:
log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates)
else:
warn()

def raise_on_errors(self):
if self.errors or self.conflicts:
log.warning('Failures: %r', self.errors)
Expand Down

0 comments on commit 24aaa42

Please sign in to comment.