diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 3f2860f097..5e8a0cd64d 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1076,20 +1076,6 @@ def filter(self, relation: str, values: list[AnyJSON]) -> list[JSON]: FieldTypes = Mapping[str, FieldTypes1] CataloguedFieldTypes = Mapping[CatalogName, FieldTypes] - -class VersionType(Enum): - # No versioning; document is created or overwritten as needed - none = auto() - - # Writing a document fails with 409 conflict if one with the same ID already - # exists in the index - create_only = auto() - - # Use the Elasticsearch "internal" versioning type - # https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-index_.html#_version_types - internal = auto() - - InternalVersion = tuple[int, int] @@ -1112,17 +1098,11 @@ class OpType(Enum): @attr.s(frozen=False, kw_only=True, auto_attribs=True) -class Document(Generic[C]): - needs_seq_no_primary_term: ClassVar[bool] = False +class Document(Generic[C], metaclass=ABCMeta): needs_translation: ClassVar[bool] = True coordinates: C - version_type: VersionType = VersionType.none - # For VersionType.internal, version is a tuple composed of the sequence - # number and primary term. For VersionType.none and .create_only, it is - # None. - # https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-bulk.html#bulk-api-response-body version: Optional[InternalVersion] # In the index, the `contents` property is always present and never null in @@ -1137,6 +1117,24 @@ class Document(Generic[C]): def entity(self) -> EntityReference: return self.coordinates.entity + @property + @abstractmethod + def op_type(self) -> OpType: + """ + Get the ES client method to use when writing this document to the index. + """ + raise NotImplementedError + + @op_type.setter + def op_type(self, value: OpType): + """ + Set the ES client method to use when writing this document to the index. + This setter is optional, concrete classes may not implement it. If they + don't, callers should gracefully handle the resulting AttributeError, as + that is what the Python runtime raises for the NotImplementedError. + """ + raise NotImplementedError + @classmethod def field_types(cls, field_types: FieldTypes) -> FieldTypes: return { @@ -1295,14 +1293,11 @@ def from_index(cls, document = cls.translate_fields(document, field_types[coordinates.entity.catalog], forward=False) - if cls.needs_seq_no_primary_term: - try: - version = (hit['_seq_no'], hit['_primary_term']) - except KeyError: - assert '_seq_no' not in hit - assert '_primary_term' not in hit - version = None - else: + try: + version = hit['_seq_no'], hit['_primary_term'] + except KeyError: + assert '_seq_no' not in hit + assert '_primary_term' not in hit version = None return cls.from_json(coordinates=coordinates, @@ -1311,59 +1306,34 @@ def from_index(cls, def to_index(self, catalog: Optional[CatalogName], - field_types: CataloguedFieldTypes, - bulk: bool = False + field_types: CataloguedFieldTypes ) -> JSON: """ - Build request parameters from the document for indexing + Prepare a request to write this document to the index. The return value + is a dictionary with keyword arguments to the ES client method selected + by the :meth:`op_type` property. :param catalog: An optional catalog name. If None, this document's coordinates must supply it. Otherwise this document's coordinates must supply the same catalog or none at all. + :param field_types: A mapping of field paths to field type - :param bulk: If bulk indexing + :return: Request parameters for indexing """ - op_type = self.op_type coordinates = self.coordinates.with_catalog(catalog) result = { - '_index' if bulk else 'index': coordinates.index_name, - **( - {} - if op_type is OpType.delete else - { - '_source' if bulk else 'body': - self._body(field_types[coordinates.entity.catalog]) - } - ), - '_id' if bulk else 'id': self.coordinates.document_id + 'index': coordinates.index_name, + 'id': self.coordinates.document_id } - # For non-bulk updates, self.op_type determines which client - # method is invoked. - if bulk: - result['_op_type'] = self.op_type.name - if self.version_type is VersionType.none: - assert not self.needs_seq_no_primary_term - elif self.version_type is VersionType.create_only: - assert not self.needs_seq_no_primary_term - if bulk: - if op_type is OpType.delete: - result['if_seq_no'], result['if_primary_term'] = self.version - else: - assert op_type is OpType.create, op_type - elif self.version_type is VersionType.internal: - assert self.needs_seq_no_primary_term - if self.version is not None: - # For internal versioning, self.version is None for new documents - result['if_seq_no'], result['if_primary_term'] = self.version - else: - assert False, self.version_type + if self.op_type is not OpType.delete: + result['body'] = self._body(field_types[coordinates.entity.catalog]) + if self.version is not None: + result['if_seq_no'], result['if_primary_term'] = self.version + if self.op_type is OpType.update: + result['params'] = {'retry_on_conflict': 3} return result - @property - def op_type(self) -> OpType: - raise NotImplementedError - def _body(self, field_types: FieldTypes) -> JSON: body = self.to_json() if self.needs_translation: @@ -1383,9 +1353,17 @@ class Contribution(Document[ContributionCoordinates[E]]): contents: JSON source: DocumentSource - #: The version_type attribute will change to VersionType.none if writing + #: The op_type attribute will change to OpType.index if writing #: to Elasticsearch fails with 409 - version_type: VersionType = VersionType.create_only + _op_type: OpType = OpType.create + + @property + def op_type(self) -> OpType: + return self._op_type + + @op_type.setter + def op_type(self, op_type: OpType): + self._op_type = op_type def __attrs_post_init__(self): assert self.contents is not None @@ -1443,23 +1421,12 @@ def to_json(self): bundle_version=self.coordinates.bundle.version, bundle_deleted=self.coordinates.deleted) - @property - def op_type(self) -> OpType: - if self.version_type is VersionType.create_only: - return OpType.create - elif self.version_type is VersionType.none: - return OpType.index - else: - assert False, self.version_type - @attr.s(frozen=False, kw_only=True, auto_attribs=True) class Aggregate(Document[AggregateCoordinates]): - version_type: VersionType = VersionType.internal sources: set[DocumentSource] bundles: Optional[list[BundleFQIDJSON]] num_contributions: int - needs_seq_no_primary_term: ClassVar[bool] = True def __attrs_post_init__(self): assert isinstance(self.coordinates, AggregateCoordinates) @@ -1564,7 +1531,6 @@ def to_json(self) -> JSON: @property def op_type(self) -> OpType: - assert self.version_type is VersionType.none, self.version_type return OpType.update def _body(self, field_types: FieldTypes) -> JSON: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index ab9571afdc..fbf0493572 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -16,6 +16,7 @@ attrgetter, ) from typing import ( + Any, MutableSet, Optional, TYPE_CHECKING, @@ -79,7 +80,6 @@ OpType, Replica, ReplicaCoordinates, - VersionType, ) from azul.indexer.document_service import ( DocumentService, @@ -437,7 +437,7 @@ def contribute(self, else: entity = CataloguedEntityReference.for_entity(catalog, c.coordinates.entity) # Don't count overwrites, but ensure entry exists - was_overwrite = c.version_type is VersionType.none + was_overwrite = c.op_type is OpType.index tallies[entity] += 0 if was_overwrite else 1 contributions = retry_contributions writer.raise_on_errors() @@ -617,7 +617,7 @@ def pages() -> Iterable[JSONs]: body=body, size=config.contribution_page_size, track_total_hits=False, - seq_no_primary_term=Contribution.needs_seq_no_primary_term) + seq_no_primary_term=True) hits = response['hits']['hits'] log.debug('Read a page with %i contribution(s)', len(hits)) if hits: @@ -879,11 +879,26 @@ def _write_bulk(self, documents: Iterable[Document]): doc.coordinates: doc for doc in documents } - actions = [ - doc.to_index(self.catalog, self.field_types, bulk=True) - for doc in documents.values() - ] + + def expand_action(doc: Any) -> tuple[dict[str, Any], dict[str, Any] | None]: + # Document.to_index returns the keyword arguments to the ES client + # method referenced by Document.op_type. In bulk requests, these + # methods are not invoked individually. This function converts the + # keyword arguments returned by Document.to_index to the form + # internally used by the ES client's `bulk` method: a pair + # consisting of 1) the action and associated metadata and 2) an + # optional document source. + assert isinstance(doc, Document), doc + action = dict(doc.to_index(self.catalog, self.field_types)) + action.update(action.pop('params', {})) + action['_index'] = action.pop('index') + action['_id'] = action.pop('id') + body = action.pop('body', None) + action = {doc.op_type.name: action} + return action, body + log.info('Writing documents using streaming_bulk().') + # We cannot use parallel_bulk() for 1024+ actions because Lambda doesn't # support shared memory. See the issue below for details. # @@ -894,8 +909,13 @@ def _write_bulk(self, documents: Iterable[Document]): # There is no way to split a single action and hence a single document # into multiple requests. # + # Technically, we're not supposed to pass Document instances in the + # `action` parameter but we're exploiting the undocumented fact that the + # method immediately maps the value of the `expand_action_callback` + # parameter over the list passed in the `actions` parameter. response = streaming_bulk(client=self.es_client, - actions=actions, + actions=list(documents.values()), + expand_action_callback=expand_action, refresh=self.refresh, raise_on_error=False, max_chunk_bytes=config.max_chunk_size) @@ -941,14 +961,22 @@ def _on_conflict(self, doc: Document, e: Union[Exception, JSON]): self.retries.add(doc.coordinates) else: action = 'giving up' - if doc.version_type is VersionType.create_only: - log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates) - # Try again but allow overwriting - doc.version_type = VersionType.none - else: + + def warn(): log.warning('There was a conflict with document %r: %r. Total # of errors: %i, %s.', doc.coordinates, e, self.conflicts[doc.coordinates], action) + if doc.op_type is OpType.create: + try: + doc.op_type = OpType.index + except AttributeError: + # We don't expect all Document types will let us modify op_type + warn() + else: + log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates) + else: + warn() + def raise_on_errors(self): if self.errors or self.conflicts: log.warning('Failures: %r', self.errors)