Skip to content

Commit

Permalink
fix: Use SELECT FOR UPDATE to prevent race conditions in set_last_ale…
Browse files Browse the repository at this point in the history
…rt (#2742)
  • Loading branch information
VladimirFilonov authored Dec 3, 2024
1 parent c0ba444 commit 2d9b83e
Showing 1 changed file with 31 additions and 36 deletions.
67 changes: 31 additions & 36 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4564,18 +4564,23 @@ def get_activity_report(


def get_last_alert_by_fingerprint(
tenant_id: str, fingerprint: str, session: Optional[Session] = None
tenant_id: str, fingerprint: str, session: Optional[Session] = None,
for_update: bool = False
) -> Optional[LastAlert]:
with existed_or_new_session(session) as session:
return session.exec(
query = (
select(LastAlert)
.where(
and_(
LastAlert.tenant_id == tenant_id,
LastAlert.fingerprint == fingerprint,
)
)
).first()
)
if for_update:
query = query.with_for_update()
return session.exec(query).first()


def set_last_alert(
tenant_id: str, alert: Alert, session: Optional[Session] = None
Expand All @@ -4584,41 +4589,31 @@ def set_last_alert(
f"Set last alert for `{alert.fingerprint}`"
)
with existed_or_new_session(session) as session:
last_alert = get_last_alert_by_fingerprint(tenant_id, alert.fingerprint, session)

# To prevent rare, but possible race condition
# For example if older alert failed to process
# and retried after new one
with session.begin_nested() as transaction:
last_alert = get_last_alert_by_fingerprint(tenant_id, alert.fingerprint, session, for_update=True)

if last_alert and last_alert.timestamp.replace(tzinfo=tz.UTC) < alert.timestamp.replace(tzinfo=tz.UTC):
# To prevent rare, but possible race condition
# For example if older alert failed to process
# and retried after new one
if last_alert and last_alert.timestamp.replace(tzinfo=tz.UTC) < alert.timestamp.replace(tzinfo=tz.UTC):

logger.info(
f"Update last alert for `{alert.fingerprint}`: {last_alert.alert_id} -> {alert.id}"
)
last_alert.timestamp = alert.timestamp
last_alert.alert_id = alert.id
session.add(last_alert)
session.commit()
logger.info(
f"Update last alert for `{alert.fingerprint}`: {last_alert.alert_id} -> {alert.id}"
)
last_alert.timestamp = alert.timestamp
last_alert.alert_id = alert.id
session.add(last_alert)

elif not last_alert:
logger.info(
f"No last alert for `{alert.fingerprint}`, creating new"
)
last_alert = LastAlert(
tenant_id=tenant_id,
fingerprint=alert.fingerprint,
timestamp=alert.timestamp,
alert_id=alert.id,
)
elif not last_alert:
logger.info(
f"No last alert for `{alert.fingerprint}`, creating new"
)
last_alert = LastAlert(
tenant_id=tenant_id,
fingerprint=alert.fingerprint,
timestamp=alert.timestamp,
alert_id=alert.id,
)

try:
session.add(last_alert)
session.commit()
except IntegrityError as ex:
reason = ex.args[0]
if "Duplicate entry" in reason:
logger.info(
f"Duplicate primary key for `{alert.fingerprint}`. Retrying."
)
session.rollback()
return set_last_alert(tenant_id, alert, session)
transaction.commit()

0 comments on commit 2d9b83e

Please sign in to comment.