Skip to content

Commit

Permalink
Eliminate VersionType in favor of OpType
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Oct 29, 2024
1 parent 0949201 commit f079dc1
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 79 deletions.
106 changes: 37 additions & 69 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,20 +1076,6 @@ def filter(self, relation: str, values: list[AnyJSON]) -> list[JSON]:
FieldTypes = Mapping[str, FieldTypes1]
CataloguedFieldTypes = Mapping[CatalogName, FieldTypes]


class VersionType(Enum):
# No versioning; document is created or overwritten as needed
none = auto()

# Writing a document fails with 409 conflict if one with the same ID already
# exists in the index
create_only = auto()

# Use the Elasticsearch "internal" versioning type
# https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-index_.html#_version_types
internal = auto()


InternalVersion = tuple[int, int]


Expand All @@ -1112,24 +1098,11 @@ class OpType(Enum):


@attr.s(frozen=False, kw_only=True, auto_attribs=True)
class Document(Generic[C]):
class Document(Generic[C], metaclass=ABCMeta):
needs_translation: ClassVar[bool] = True

coordinates: C

#: By default, instances are fixed to VersionType.none. A subclass may
#: decide to make the version_type attribute mutable but it still needs to
#: declare what VersionType its instances use initially. This is so that
#: factories of these instances can make the necessary preparations.
#:
initial_version_type: ClassVar[VersionType] = VersionType.none
version_type: VersionType = attr.ib(default=initial_version_type,
on_setattr=attr.setters.frozen)

# For VersionType.internal, version is a tuple composed of the sequence
# number and primary term. For VersionType.none and .create_only, it is
# None.
# https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-bulk.html#bulk-api-response-body
version: Optional[InternalVersion]

# In the index, the `contents` property is always present and never null in
Expand All @@ -1144,6 +1117,24 @@ class Document(Generic[C]):
def entity(self) -> EntityReference:
return self.coordinates.entity

@property
@abstractmethod
def op_type(self) -> OpType:
"""
Get the ES client method to use when writing this document to the index.
"""
raise NotImplementedError

@op_type.setter
def op_type(self, value: OpType):
"""
Set the ES client method to use when writing this document to the index.
This setter is optional, concrete classes may not implement it. If they
don't, callers should gracefully handle the resulting AttributeError, as
that is what the Python runtime raises for the NotImplementedError.
"""
raise NotImplementedError

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
return {
Expand Down Expand Up @@ -1302,14 +1293,11 @@ def from_index(cls,
document = cls.translate_fields(document,
field_types[coordinates.entity.catalog],
forward=False)
if cls.initial_version_type is VersionType.internal:
try:
version = (hit['_seq_no'], hit['_primary_term'])
except KeyError:
assert '_seq_no' not in hit
assert '_primary_term' not in hit
version = None
else:
try:
version = hit['_seq_no'], hit['_primary_term']
except KeyError:
assert '_seq_no' not in hit
assert '_primary_term' not in hit
version = None

return cls.from_json(coordinates=coordinates,
Expand All @@ -1333,31 +1321,17 @@ def to_index(self,
:return: Request parameters for indexing
"""
op_type = self.op_type
coordinates = self.coordinates.with_catalog(catalog)
result = {
'index': coordinates.index_name,
'id': self.coordinates.document_id
}
if op_type is not OpType.delete:
if self.op_type is not OpType.delete:
result['body'] = self._body(field_types[coordinates.entity.catalog])

if self.version_type is VersionType.none:
pass
elif self.version_type is VersionType.create_only:
assert op_type is OpType.create, op_type
elif self.version_type is VersionType.internal:
if self.version is not None:
# For internal versioning, self.version is None for new documents
result['if_seq_no'], result['if_primary_term'] = self.version
else:
assert False, self.version_type
if self.version is not None:
result['if_seq_no'], result['if_primary_term'] = self.version
return result

@property
def op_type(self) -> OpType:
raise NotImplementedError

def _body(self, field_types: FieldTypes) -> JSON:
body = self.to_json()
if self.needs_translation:
Expand All @@ -1377,10 +1351,17 @@ class Contribution(Document[ContributionCoordinates[E]]):
contents: JSON
source: DocumentSource

#: The version_type attribute will change to VersionType.none if writing
#: The op_type attribute will change to OpType.index if writing
#: to Elasticsearch fails with 409
initial_version_type: ClassVar[VersionType] = VersionType.create_only
version_type: VersionType = initial_version_type
_op_type: OpType = OpType.create

@property
def op_type(self) -> OpType:
return self._op_type

@op_type.setter
def op_type(self, op_type: OpType):
self._op_type = op_type

def __attrs_post_init__(self):
assert self.contents is not None
Expand Down Expand Up @@ -1438,21 +1419,9 @@ def to_json(self):
bundle_version=self.coordinates.bundle.version,
bundle_deleted=self.coordinates.deleted)

@property
def op_type(self) -> OpType:
if self.version_type is VersionType.create_only:
return OpType.create
elif self.version_type is VersionType.none:
return OpType.index
else:
assert False, self.version_type


@attr.s(frozen=False, kw_only=True, auto_attribs=True)
class Aggregate(Document[AggregateCoordinates]):
initial_version_type: ClassVar[VersionType] = VersionType.internal
version_type: VersionType = attr.ib(default=initial_version_type,
on_setattr=attr.setters.frozen)
sources: set[DocumentSource]
bundles: Optional[list[BundleFQIDJSON]]
num_contributions: int
Expand Down Expand Up @@ -1560,7 +1529,6 @@ def to_json(self) -> JSON:

@property
def op_type(self) -> OpType:
assert self.version_type is VersionType.none, self.version_type
return OpType.update

def _body(self, field_types: FieldTypes) -> JSON:
Expand Down
25 changes: 15 additions & 10 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
OpType,
Replica,
ReplicaCoordinates,
VersionType,
)
from azul.indexer.document_service import (
DocumentService,
Expand Down Expand Up @@ -438,7 +437,7 @@ def contribute(self,
else:
entity = CataloguedEntityReference.for_entity(catalog, c.coordinates.entity)
# Don't count overwrites, but ensure entry exists
was_overwrite = c.version_type is VersionType.none
was_overwrite = c.op_type is OpType.index
tallies[entity] += 0 if was_overwrite else 1
contributions = retry_contributions
writer.raise_on_errors()
Expand Down Expand Up @@ -610,8 +609,6 @@ def _read_contributions(self,
num_contributions = sum(tallies.values())
log.info('Reading %i expected contribution(s)', num_contributions)

is_internal_version = Contribution.initial_version_type is VersionType.internal

def pages() -> Iterable[JSONs]:
body = dict(query=query)
while True:
Expand All @@ -620,7 +617,7 @@ def pages() -> Iterable[JSONs]:
body=body,
size=config.contribution_page_size,
track_total_hits=False,
seq_no_primary_term=is_internal_version)
seq_no_primary_term=True)
hits = response['hits']['hits']
log.debug('Read a page with %i contribution(s)', len(hits))
if hits:
Expand Down Expand Up @@ -963,14 +960,22 @@ def _on_conflict(self, doc: Document, e: Union[Exception, JSON]):
self.retries.add(doc.coordinates)
else:
action = 'giving up'
if doc.version_type is VersionType.create_only:
log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates)
# Try again but allow overwriting
doc.version_type = VersionType.none
else:

def warn():
log.warning('There was a conflict with document %r: %r. Total # of errors: %i, %s.',
doc.coordinates, e, self.conflicts[doc.coordinates], action)

if doc.op_type is OpType.create:
try:
doc.op_type = OpType.index
except AttributeError:
# We don't expect all Document types will let us modify op_type
warn()
else:
log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates)
else:
warn()

def raise_on_errors(self):
if self.errors or self.conflicts:
log.warning('Failures: %r', self.errors)
Expand Down

0 comments on commit f079dc1

Please sign in to comment.