From 2d9b83eaacf2264822701e3e8d4833e576664b9f Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 3 Dec 2024 20:28:50 +0400 Subject: [PATCH] fix: Use SELECT FOR UPDATE to prevent race conditions in set_last_alert (#2742) --- keep/api/core/db.py | 67 +++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 0d360659f..7823eb49e 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4564,10 +4564,11 @@ def get_activity_report( def get_last_alert_by_fingerprint( - tenant_id: str, fingerprint: str, session: Optional[Session] = None + tenant_id: str, fingerprint: str, session: Optional[Session] = None, + for_update: bool = False ) -> Optional[LastAlert]: with existed_or_new_session(session) as session: - return session.exec( + query = ( select(LastAlert) .where( and_( @@ -4575,7 +4576,11 @@ def get_last_alert_by_fingerprint( LastAlert.fingerprint == fingerprint, ) ) - ).first() + ) + if for_update: + query = query.with_for_update() + return session.exec(query).first() + def set_last_alert( tenant_id: str, alert: Alert, session: Optional[Session] = None @@ -4584,41 +4589,31 @@ def set_last_alert( f"Set last alert for `{alert.fingerprint}`" ) with existed_or_new_session(session) as session: - last_alert = get_last_alert_by_fingerprint(tenant_id, alert.fingerprint, session) - - # To prevent rare, but possible race condition - # For example if older alert failed to process - # and retried after new one + with session.begin_nested() as transaction: + last_alert = get_last_alert_by_fingerprint(tenant_id, alert.fingerprint, session, for_update=True) - if last_alert and last_alert.timestamp.replace(tzinfo=tz.UTC) < alert.timestamp.replace(tzinfo=tz.UTC): + # To prevent rare, but possible race condition + # For example if older alert failed to process + # and retried after new one + if last_alert and last_alert.timestamp.replace(tzinfo=tz.UTC) < alert.timestamp.replace(tzinfo=tz.UTC): - logger.info( - f"Update last alert for `{alert.fingerprint}`: {last_alert.alert_id} -> {alert.id}" - ) - last_alert.timestamp = alert.timestamp - last_alert.alert_id = alert.id - session.add(last_alert) - session.commit() + logger.info( + f"Update last alert for `{alert.fingerprint}`: {last_alert.alert_id} -> {alert.id}" + ) + last_alert.timestamp = alert.timestamp + last_alert.alert_id = alert.id + session.add(last_alert) - elif not last_alert: - logger.info( - f"No last alert for `{alert.fingerprint}`, creating new" - ) - last_alert = LastAlert( - tenant_id=tenant_id, - fingerprint=alert.fingerprint, - timestamp=alert.timestamp, - alert_id=alert.id, - ) + elif not last_alert: + logger.info( + f"No last alert for `{alert.fingerprint}`, creating new" + ) + last_alert = LastAlert( + tenant_id=tenant_id, + fingerprint=alert.fingerprint, + timestamp=alert.timestamp, + alert_id=alert.id, + ) - try: session.add(last_alert) - session.commit() - except IntegrityError as ex: - reason = ex.args[0] - if "Duplicate entry" in reason: - logger.info( - f"Duplicate primary key for `{alert.fingerprint}`. Retrying." - ) - session.rollback() - return set_last_alert(tenant_id, alert, session) + transaction.commit() \ No newline at end of file