From f87352a62c6b9ca9369b695d387dfc3fa2213741 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 24 Oct 2024 20:31:59 -0700 Subject: [PATCH] Add server-side retries for scripted ES document updates --- src/azul/indexer/document.py | 2 ++ src/azul/indexer/index_service.py | 1 + 2 files changed, 3 insertions(+) diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index 4e9f7e60ec..5e8a0cd64d 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1330,6 +1330,8 @@ def to_index(self, result['body'] = self._body(field_types[coordinates.entity.catalog]) if self.version is not None: result['if_seq_no'], result['if_primary_term'] = self.version + if self.op_type is OpType.update: + result['params'] = {'retry_on_conflict': 3} return result def _body(self, field_types: FieldTypes) -> JSON: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index b738533ea1..fbf0493572 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -890,6 +890,7 @@ def expand_action(doc: Any) -> tuple[dict[str, Any], dict[str, Any] | None]: # optional document source. assert isinstance(doc, Document), doc action = dict(doc.to_index(self.catalog, self.field_types)) + action.update(action.pop('params', {})) action['_index'] = action.pop('index') action['_id'] = action.pop('id') body = action.pop('body', None)