From 6d4ab322cf8cf345b0fd92802ea94c54b4f73b9a Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Wed, 23 Oct 2024 23:06:57 -0700 Subject: [PATCH 1/7] [r] Refactor how document version type determines the retrieval of seq_no and primary_term --- src/azul/indexer/document.py | 25 ++++++++++++++++--------- src/azul/indexer/index_service.py | 4 +++- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 3f2860f097..b04cdc299e 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1113,11 +1113,18 @@ class OpType(Enum): @attr.s(frozen=False, kw_only=True, auto_attribs=True) class Document(Generic[C]): - needs_seq_no_primary_term: ClassVar[bool] = False needs_translation: ClassVar[bool] = True coordinates: C - version_type: VersionType = VersionType.none + + #: By default, instances are fixed to VersionType.none. A subclass may + #: decide to make the version_type attribute mutable but it still needs to + #: declare what VersionType its instances use initially. This is so that + #: factories of these instances can make the necessary preparations. + #: + initial_version_type: ClassVar[VersionType] = VersionType.none + version_type: VersionType = attr.ib(default=initial_version_type, + on_setattr=attr.setters.frozen) # For VersionType.internal, version is a tuple composed of the sequence # number and primary term. For VersionType.none and .create_only, it is @@ -1295,7 +1302,7 @@ def from_index(cls, document = cls.translate_fields(document, field_types[coordinates.entity.catalog], forward=False) - if cls.needs_seq_no_primary_term: + if cls.initial_version_type is VersionType.internal: try: version = (hit['_seq_no'], hit['_primary_term']) except KeyError: @@ -1343,16 +1350,14 @@ def to_index(self, if bulk: result['_op_type'] = self.op_type.name if self.version_type is VersionType.none: - assert not self.needs_seq_no_primary_term + pass elif self.version_type is VersionType.create_only: - assert not self.needs_seq_no_primary_term if bulk: if op_type is OpType.delete: result['if_seq_no'], result['if_primary_term'] = self.version else: assert op_type is OpType.create, op_type elif self.version_type is VersionType.internal: - assert self.needs_seq_no_primary_term if self.version is not None: # For internal versioning, self.version is None for new documents result['if_seq_no'], result['if_primary_term'] = self.version @@ -1385,7 +1390,8 @@ class Contribution(Document[ContributionCoordinates[E]]): #: The version_type attribute will change to VersionType.none if writing #: to Elasticsearch fails with 409 - version_type: VersionType = VersionType.create_only + initial_version_type: ClassVar[VersionType] = VersionType.create_only + version_type: VersionType = initial_version_type def __attrs_post_init__(self): assert self.contents is not None @@ -1455,11 +1461,12 @@ def op_type(self) -> OpType: @attr.s(frozen=False, kw_only=True, auto_attribs=True) class Aggregate(Document[AggregateCoordinates]): - version_type: VersionType = VersionType.internal + initial_version_type: ClassVar[VersionType] = VersionType.internal + version_type: VersionType = attr.ib(default=initial_version_type, + on_setattr=attr.setters.frozen) sources: set[DocumentSource] bundles: Optional[list[BundleFQIDJSON]] num_contributions: int - needs_seq_no_primary_term: ClassVar[bool] = True def __attrs_post_init__(self): assert isinstance(self.coordinates, AggregateCoordinates) diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index ab9571afdc..1ec3ba0706 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -609,6 +609,8 @@ def _read_contributions(self, num_contributions = sum(tallies.values()) log.info('Reading %i expected contribution(s)', num_contributions) + is_internal_version = Contribution.initial_version_type is VersionType.internal + def pages() -> Iterable[JSONs]: body = dict(query=query) while True: @@ -617,7 +619,7 @@ def pages() -> Iterable[JSONs]: body=body, size=config.contribution_page_size, track_total_hits=False, - seq_no_primary_term=Contribution.needs_seq_no_primary_term) + seq_no_primary_term=is_internal_version) hits = response['hits']['hits'] log.debug('Read a page with %i contribution(s)', len(hits)) if hits: From e689fdef601b582d698d11eca18e50df6ae365a5 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 11:20:10 -0700 Subject: [PATCH 2/7] Fix incomplete extraction of local variable --- src/azul/indexer/document.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index b04cdc299e..a0135802ea 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1348,7 +1348,7 @@ def to_index(self, # For non-bulk updates, self.op_type determines which client # method is invoked. if bulk: - result['_op_type'] = self.op_type.name + result['_op_type'] = op_type.name if self.version_type is VersionType.none: pass elif self.version_type is VersionType.create_only: From 26198525acad661ae083e26f1c9c5f68c2ff2469 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 12:32:40 -0700 Subject: [PATCH 3/7] [r] Remove code handling an impossible combination of state VersionType.create_only is only used by contributions and contributions are never deleted individually. Contributions are deleted either when an entire index is removed or a with a delete-by-query when deindexing a source. In other words, the combination of `VersionType.create_only` and OpType.delete can not occur when the deleted code is executed. --- src/azul/indexer/document.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index a0135802ea..7e30dad362 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1352,11 +1352,7 @@ def to_index(self, if self.version_type is VersionType.none: pass elif self.version_type is VersionType.create_only: - if bulk: - if op_type is OpType.delete: - result['if_seq_no'], result['if_primary_term'] = self.version - else: - assert op_type is OpType.create, op_type + assert op_type is OpType.create, op_type elif self.version_type is VersionType.internal: if self.version is not None: # For internal versioning, self.version is None for new documents From 2880eef752625f356ebce0ee527f88be888541c4 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 13:10:08 -0700 Subject: [PATCH 4/7] [r] Remove bulk request concerns from Document.to_index --- src/azul/indexer/document.py | 23 ++++++----------------- src/azul/indexer/index_service.py | 30 +++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 7e30dad362..6d66099a28 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1318,8 +1318,7 @@ def from_index(cls, def to_index(self, catalog: Optional[CatalogName], - field_types: CataloguedFieldTypes, - bulk: bool = False + field_types: CataloguedFieldTypes ) -> JSON: """ Build request parameters from the document for indexing @@ -1328,27 +1327,17 @@ def to_index(self, coordinates must supply it. Otherwise this document's coordinates must supply the same catalog or none at all. :param field_types: A mapping of field paths to field type - :param bulk: If bulk indexing :return: Request parameters for indexing """ op_type = self.op_type coordinates = self.coordinates.with_catalog(catalog) result = { - '_index' if bulk else 'index': coordinates.index_name, - **( - {} - if op_type is OpType.delete else - { - '_source' if bulk else 'body': - self._body(field_types[coordinates.entity.catalog]) - } - ), - '_id' if bulk else 'id': self.coordinates.document_id + 'index': coordinates.index_name, + 'id': self.coordinates.document_id } - # For non-bulk updates, self.op_type determines which client - # method is invoked. - if bulk: - result['_op_type'] = op_type.name + if op_type is not OpType.delete: + result['body'] = self._body(field_types[coordinates.entity.catalog]) + if self.version_type is VersionType.none: pass elif self.version_type is VersionType.create_only: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 1ec3ba0706..70402da1df 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -16,6 +16,7 @@ attrgetter, ) from typing import ( + Any, MutableSet, Optional, TYPE_CHECKING, @@ -881,11 +882,25 @@ def _write_bulk(self, documents: Iterable[Document]): doc.coordinates: doc for doc in documents } - actions = [ - doc.to_index(self.catalog, self.field_types, bulk=True) - for doc in documents.values() - ] + + def expand_action(doc: Any) -> tuple[dict[str, Any], dict[str, Any] | None]: + # Document.to_index returns the keyword arguments to the ES client + # method referenced by Document.op_type. In bulk requests, these + # methods are not invoked individually. This function converts the + # keyword arguments returned by Document.to_index to the form + # internally used by the ES client's `bulk` method: a pair + # consisting of 1) the action and associated metadata and 2) an + # optional document source. + assert isinstance(doc, Document), doc + action = dict(doc.to_index(self.catalog, self.field_types)) + action['_index'] = action.pop('index') + action['_id'] = action.pop('id') + body = action.pop('body', None) + action = {doc.op_type.name: action} + return action, body + log.info('Writing documents using streaming_bulk().') + # We cannot use parallel_bulk() for 1024+ actions because Lambda doesn't # support shared memory. See the issue below for details. # @@ -896,8 +911,13 @@ def _write_bulk(self, documents: Iterable[Document]): # There is no way to split a single action and hence a single document # into multiple requests. # + # Technically, we're not supposed to pass Document instances in the + # `action` parameter but we're exploiting the undocumented fact that the + # method immediately maps the value of the `expand_action_callback` + # parameter over the list passed in the `actions` parameter. response = streaming_bulk(client=self.es_client, - actions=actions, + actions=list(documents.values()), + expand_action_callback=expand_action, refresh=self.refresh, raise_on_error=False, max_chunk_bytes=config.max_chunk_size) From 0f15761d0c08d4a96c4a9f24d4a0339279370619 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 17:41:50 -0700 Subject: [PATCH 5/7] Improve docstring --- src/azul/indexer/document.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 6d66099a28..5c8051921a 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1321,12 +1321,16 @@ def to_index(self, field_types: CataloguedFieldTypes ) -> JSON: """ - Build request parameters from the document for indexing + Prepare a request to write this document to the index. The return value + is a dictionary with keyword arguments to the ES client method selected + by the :meth:`op_type` property. :param catalog: An optional catalog name. If None, this document's coordinates must supply it. Otherwise this document's coordinates must supply the same catalog or none at all. + :param field_types: A mapping of field paths to field type + :return: Request parameters for indexing """ op_type = self.op_type From 7878aac50a50045805fdea0cdb9528f32eff619e Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 17:44:01 -0700 Subject: [PATCH 6/7] [r] Eliminate VersionType in favor of OpType --- src/azul/indexer/document.py | 106 +++++++++++------------------- src/azul/indexer/index_service.py | 25 ++++--- 2 files changed, 52 insertions(+), 79 deletions(-) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 5c8051921a..4e9f7e60ec 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1076,20 +1076,6 @@ def filter(self, relation: str, values: list[AnyJSON]) -> list[JSON]: FieldTypes = Mapping[str, FieldTypes1] CataloguedFieldTypes = Mapping[CatalogName, FieldTypes] - -class VersionType(Enum): - # No versioning; document is created or overwritten as needed - none = auto() - - # Writing a document fails with 409 conflict if one with the same ID already - # exists in the index - create_only = auto() - - # Use the Elasticsearch "internal" versioning type - # https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-index_.html#_version_types - internal = auto() - - InternalVersion = tuple[int, int] @@ -1112,24 +1098,11 @@ class OpType(Enum): @attr.s(frozen=False, kw_only=True, auto_attribs=True) -class Document(Generic[C]): +class Document(Generic[C], metaclass=ABCMeta): needs_translation: ClassVar[bool] = True coordinates: C - #: By default, instances are fixed to VersionType.none. A subclass may - #: decide to make the version_type attribute mutable but it still needs to - #: declare what VersionType its instances use initially. This is so that - #: factories of these instances can make the necessary preparations. - #: - initial_version_type: ClassVar[VersionType] = VersionType.none - version_type: VersionType = attr.ib(default=initial_version_type, - on_setattr=attr.setters.frozen) - - # For VersionType.internal, version is a tuple composed of the sequence - # number and primary term. For VersionType.none and .create_only, it is - # None. - # https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-bulk.html#bulk-api-response-body version: Optional[InternalVersion] # In the index, the `contents` property is always present and never null in @@ -1144,6 +1117,24 @@ class Document(Generic[C]): def entity(self) -> EntityReference: return self.coordinates.entity + @property + @abstractmethod + def op_type(self) -> OpType: + """ + Get the ES client method to use when writing this document to the index. + """ + raise NotImplementedError + + @op_type.setter + def op_type(self, value: OpType): + """ + Set the ES client method to use when writing this document to the index. + This setter is optional, concrete classes may not implement it. If they + don't, callers should gracefully handle the resulting AttributeError, as + that is what the Python runtime raises for the NotImplementedError. + """ + raise NotImplementedError + @classmethod def field_types(cls, field_types: FieldTypes) -> FieldTypes: return { @@ -1302,14 +1293,11 @@ def from_index(cls, document = cls.translate_fields(document, field_types[coordinates.entity.catalog], forward=False) - if cls.initial_version_type is VersionType.internal: - try: - version = (hit['_seq_no'], hit['_primary_term']) - except KeyError: - assert '_seq_no' not in hit - assert '_primary_term' not in hit - version = None - else: + try: + version = hit['_seq_no'], hit['_primary_term'] + except KeyError: + assert '_seq_no' not in hit + assert '_primary_term' not in hit version = None return cls.from_json(coordinates=coordinates, @@ -1333,31 +1321,17 @@ def to_index(self, :return: Request parameters for indexing """ - op_type = self.op_type coordinates = self.coordinates.with_catalog(catalog) result = { 'index': coordinates.index_name, 'id': self.coordinates.document_id } - if op_type is not OpType.delete: + if self.op_type is not OpType.delete: result['body'] = self._body(field_types[coordinates.entity.catalog]) - - if self.version_type is VersionType.none: - pass - elif self.version_type is VersionType.create_only: - assert op_type is OpType.create, op_type - elif self.version_type is VersionType.internal: - if self.version is not None: - # For internal versioning, self.version is None for new documents - result['if_seq_no'], result['if_primary_term'] = self.version - else: - assert False, self.version_type + if self.version is not None: + result['if_seq_no'], result['if_primary_term'] = self.version return result - @property - def op_type(self) -> OpType: - raise NotImplementedError - def _body(self, field_types: FieldTypes) -> JSON: body = self.to_json() if self.needs_translation: @@ -1377,10 +1351,17 @@ class Contribution(Document[ContributionCoordinates[E]]): contents: JSON source: DocumentSource - #: The version_type attribute will change to VersionType.none if writing + #: The op_type attribute will change to OpType.index if writing #: to Elasticsearch fails with 409 - initial_version_type: ClassVar[VersionType] = VersionType.create_only - version_type: VersionType = initial_version_type + _op_type: OpType = OpType.create + + @property + def op_type(self) -> OpType: + return self._op_type + + @op_type.setter + def op_type(self, op_type: OpType): + self._op_type = op_type def __attrs_post_init__(self): assert self.contents is not None @@ -1438,21 +1419,9 @@ def to_json(self): bundle_version=self.coordinates.bundle.version, bundle_deleted=self.coordinates.deleted) - @property - def op_type(self) -> OpType: - if self.version_type is VersionType.create_only: - return OpType.create - elif self.version_type is VersionType.none: - return OpType.index - else: - assert False, self.version_type - @attr.s(frozen=False, kw_only=True, auto_attribs=True) class Aggregate(Document[AggregateCoordinates]): - initial_version_type: ClassVar[VersionType] = VersionType.internal - version_type: VersionType = attr.ib(default=initial_version_type, - on_setattr=attr.setters.frozen) sources: set[DocumentSource] bundles: Optional[list[BundleFQIDJSON]] num_contributions: int @@ -1560,7 +1529,6 @@ def to_json(self) -> JSON: @property def op_type(self) -> OpType: - assert self.version_type is VersionType.none, self.version_type return OpType.update def _body(self, field_types: FieldTypes) -> JSON: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 70402da1df..b738533ea1 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -80,7 +80,6 @@ OpType, Replica, ReplicaCoordinates, - VersionType, ) from azul.indexer.document_service import ( DocumentService, @@ -438,7 +437,7 @@ def contribute(self, else: entity = CataloguedEntityReference.for_entity(catalog, c.coordinates.entity) # Don't count overwrites, but ensure entry exists - was_overwrite = c.version_type is VersionType.none + was_overwrite = c.op_type is OpType.index tallies[entity] += 0 if was_overwrite else 1 contributions = retry_contributions writer.raise_on_errors() @@ -610,8 +609,6 @@ def _read_contributions(self, num_contributions = sum(tallies.values()) log.info('Reading %i expected contribution(s)', num_contributions) - is_internal_version = Contribution.initial_version_type is VersionType.internal - def pages() -> Iterable[JSONs]: body = dict(query=query) while True: @@ -620,7 +617,7 @@ def pages() -> Iterable[JSONs]: body=body, size=config.contribution_page_size, track_total_hits=False, - seq_no_primary_term=is_internal_version) + seq_no_primary_term=True) hits = response['hits']['hits'] log.debug('Read a page with %i contribution(s)', len(hits)) if hits: @@ -963,14 +960,22 @@ def _on_conflict(self, doc: Document, e: Union[Exception, JSON]): self.retries.add(doc.coordinates) else: action = 'giving up' - if doc.version_type is VersionType.create_only: - log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates) - # Try again but allow overwriting - doc.version_type = VersionType.none - else: + + def warn(): log.warning('There was a conflict with document %r: %r. Total # of errors: %i, %s.', doc.coordinates, e, self.conflicts[doc.coordinates], action) + if doc.op_type is OpType.create: + try: + doc.op_type = OpType.index + except AttributeError: + # We don't expect all Document types will let us modify op_type + warn() + else: + log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates) + else: + warn() + def raise_on_errors(self): if self.errors or self.conflicts: log.warning('Failures: %r', self.errors) From da706a68c27f51020824e24dbfcdc62eee4a38a8 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 20:31:59 -0700 Subject: [PATCH 7/7] [r p] Add server-side retries for scripted ES document updates (#6648) This change alleviates the contention (#6648) a little bit but doesn't resolve it. Once #6648 is fully addressed, this should significantly reduce the remaining bits of contention, of which there were just a few dozen incidents per reindex before we fixed the absence of donor and protocol replicas, which are the main drivers of the contention. --- src/azul/indexer/document.py | 2 ++ src/azul/indexer/index_service.py | 1 + 2 files changed, 3 insertions(+) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 4e9f7e60ec..5e8a0cd64d 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1330,6 +1330,8 @@ def to_index(self, result['body'] = self._body(field_types[coordinates.entity.catalog]) if self.version is not None: result['if_seq_no'], result['if_primary_term'] = self.version + if self.op_type is OpType.update: + result['params'] = {'retry_on_conflict': 3} return result def _body(self, field_types: FieldTypes) -> JSON: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index b738533ea1..fbf0493572 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -890,6 +890,7 @@ def expand_action(doc: Any) -> tuple[dict[str, Any], dict[str, Any] | None]: # optional document source. assert isinstance(doc, Document), doc action = dict(doc.to_index(self.catalog, self.field_types)) + action.update(action.pop('params', {})) action['_index'] = action.pop('index') action['_id'] = action.pop('id') body = action.pop('body', None)