diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 5c8051921a..4e9f7e60ec 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1076,20 +1076,6 @@ def filter(self, relation: str, values: list[AnyJSON]) -> list[JSON]: FieldTypes = Mapping[str, FieldTypes1] CataloguedFieldTypes = Mapping[CatalogName, FieldTypes] - -class VersionType(Enum): - # No versioning; document is created or overwritten as needed - none = auto() - - # Writing a document fails with 409 conflict if one with the same ID already - # exists in the index - create_only = auto() - - # Use the Elasticsearch "internal" versioning type - # https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-index_.html#_version_types - internal = auto() - - InternalVersion = tuple[int, int] @@ -1112,24 +1098,11 @@ class OpType(Enum): @attr.s(frozen=False, kw_only=True, auto_attribs=True) -class Document(Generic[C]): +class Document(Generic[C], metaclass=ABCMeta): needs_translation: ClassVar[bool] = True coordinates: C - #: By default, instances are fixed to VersionType.none. A subclass may - #: decide to make the version_type attribute mutable but it still needs to - #: declare what VersionType its instances use initially. This is so that - #: factories of these instances can make the necessary preparations. - #: - initial_version_type: ClassVar[VersionType] = VersionType.none - version_type: VersionType = attr.ib(default=initial_version_type, - on_setattr=attr.setters.frozen) - - # For VersionType.internal, version is a tuple composed of the sequence - # number and primary term. For VersionType.none and .create_only, it is - # None. - # https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-bulk.html#bulk-api-response-body version: Optional[InternalVersion] # In the index, the `contents` property is always present and never null in @@ -1144,6 +1117,24 @@ class Document(Generic[C]): def entity(self) -> EntityReference: return self.coordinates.entity + @property + @abstractmethod + def op_type(self) -> OpType: + """ + Get the ES client method to use when writing this document to the index. + """ + raise NotImplementedError + + @op_type.setter + def op_type(self, value: OpType): + """ + Set the ES client method to use when writing this document to the index. + This setter is optional, concrete classes may not implement it. If they + don't, callers should gracefully handle the resulting AttributeError, as + that is what the Python runtime raises for the NotImplementedError. + """ + raise NotImplementedError + @classmethod def field_types(cls, field_types: FieldTypes) -> FieldTypes: return { @@ -1302,14 +1293,11 @@ def from_index(cls, document = cls.translate_fields(document, field_types[coordinates.entity.catalog], forward=False) - if cls.initial_version_type is VersionType.internal: - try: - version = (hit['_seq_no'], hit['_primary_term']) - except KeyError: - assert '_seq_no' not in hit - assert '_primary_term' not in hit - version = None - else: + try: + version = hit['_seq_no'], hit['_primary_term'] + except KeyError: + assert '_seq_no' not in hit + assert '_primary_term' not in hit version = None return cls.from_json(coordinates=coordinates, @@ -1333,31 +1321,17 @@ def to_index(self, :return: Request parameters for indexing """ - op_type = self.op_type coordinates = self.coordinates.with_catalog(catalog) result = { 'index': coordinates.index_name, 'id': self.coordinates.document_id } - if op_type is not OpType.delete: + if self.op_type is not OpType.delete: result['body'] = self._body(field_types[coordinates.entity.catalog]) - - if self.version_type is VersionType.none: - pass - elif self.version_type is VersionType.create_only: - assert op_type is OpType.create, op_type - elif self.version_type is VersionType.internal: - if self.version is not None: - # For internal versioning, self.version is None for new documents - result['if_seq_no'], result['if_primary_term'] = self.version - else: - assert False, self.version_type + if self.version is not None: + result['if_seq_no'], result['if_primary_term'] = self.version return result - @property - def op_type(self) -> OpType: - raise NotImplementedError - def _body(self, field_types: FieldTypes) -> JSON: body = self.to_json() if self.needs_translation: @@ -1377,10 +1351,17 @@ class Contribution(Document[ContributionCoordinates[E]]): contents: JSON source: DocumentSource - #: The version_type attribute will change to VersionType.none if writing + #: The op_type attribute will change to OpType.index if writing #: to Elasticsearch fails with 409 - initial_version_type: ClassVar[VersionType] = VersionType.create_only - version_type: VersionType = initial_version_type + _op_type: OpType = OpType.create + + @property + def op_type(self) -> OpType: + return self._op_type + + @op_type.setter + def op_type(self, op_type: OpType): + self._op_type = op_type def __attrs_post_init__(self): assert self.contents is not None @@ -1438,21 +1419,9 @@ def to_json(self): bundle_version=self.coordinates.bundle.version, bundle_deleted=self.coordinates.deleted) - @property - def op_type(self) -> OpType: - if self.version_type is VersionType.create_only: - return OpType.create - elif self.version_type is VersionType.none: - return OpType.index - else: - assert False, self.version_type - @attr.s(frozen=False, kw_only=True, auto_attribs=True) class Aggregate(Document[AggregateCoordinates]): - initial_version_type: ClassVar[VersionType] = VersionType.internal - version_type: VersionType = attr.ib(default=initial_version_type, - on_setattr=attr.setters.frozen) sources: set[DocumentSource] bundles: Optional[list[BundleFQIDJSON]] num_contributions: int @@ -1560,7 +1529,6 @@ def to_json(self) -> JSON: @property def op_type(self) -> OpType: - assert self.version_type is VersionType.none, self.version_type return OpType.update def _body(self, field_types: FieldTypes) -> JSON: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 70402da1df..b738533ea1 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -80,7 +80,6 @@ OpType, Replica, ReplicaCoordinates, - VersionType, ) from azul.indexer.document_service import ( DocumentService, @@ -438,7 +437,7 @@ def contribute(self, else: entity = CataloguedEntityReference.for_entity(catalog, c.coordinates.entity) # Don't count overwrites, but ensure entry exists - was_overwrite = c.version_type is VersionType.none + was_overwrite = c.op_type is OpType.index tallies[entity] += 0 if was_overwrite else 1 contributions = retry_contributions writer.raise_on_errors() @@ -610,8 +609,6 @@ def _read_contributions(self, num_contributions = sum(tallies.values()) log.info('Reading %i expected contribution(s)', num_contributions) - is_internal_version = Contribution.initial_version_type is VersionType.internal - def pages() -> Iterable[JSONs]: body = dict(query=query) while True: @@ -620,7 +617,7 @@ def pages() -> Iterable[JSONs]: body=body, size=config.contribution_page_size, track_total_hits=False, - seq_no_primary_term=is_internal_version) + seq_no_primary_term=True) hits = response['hits']['hits'] log.debug('Read a page with %i contribution(s)', len(hits)) if hits: @@ -963,14 +960,22 @@ def _on_conflict(self, doc: Document, e: Union[Exception, JSON]): self.retries.add(doc.coordinates) else: action = 'giving up' - if doc.version_type is VersionType.create_only: - log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates) - # Try again but allow overwriting - doc.version_type = VersionType.none - else: + + def warn(): log.warning('There was a conflict with document %r: %r. Total # of errors: %i, %s.', doc.coordinates, e, self.conflicts[doc.coordinates], action) + if doc.op_type is OpType.create: + try: + doc.op_type = OpType.index + except AttributeError: + # We don't expect all Document types will let us modify op_type + warn() + else: + log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates) + else: + warn() + def raise_on_errors(self): if self.errors or self.conflicts: log.warning('Failures: %r', self.errors)