diff --git a/keep/api/models/db/alert.py b/keep/api/models/db/alert.py index 2c2e70014..b635b8488 100644 --- a/keep/api/models/db/alert.py +++ b/keep/api/models/db/alert.py @@ -361,6 +361,8 @@ class AlertRaw(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) tenant_id: str = Field(foreign_key="tenant.id") raw_alert: dict = Field(sa_column=Column(JSON)) + timestamp: datetime = Field(default_factory=datetime.utcnow) + provider_type: str | None = Field(default=None) class Config: arbitrary_types_allowed = True diff --git a/keep/api/models/db/migrations/versions/2024-11-08-20-58_895fe80117aa.py b/keep/api/models/db/migrations/versions/2024-11-08-20-58_895fe80117aa.py new file mode 100644 index 000000000..fb11c774d --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-11-08-20-58_895fe80117aa.py @@ -0,0 +1,41 @@ +"""Add timestamp and provider_type to alertraw + +Revision ID: 895fe80117aa +Revises: ef0b5b0df41c +Create Date: 2024-11-08 20:58:40.201477 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "895fe80117aa" +down_revision = "ef0b5b0df41c" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + + with op.batch_alter_table("alertraw", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "timestamp", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ) + ) + batch_op.add_column(sa.Column("provider_type", sa.String(255), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("alertraw", schema=None) as batch_op: + batch_op.drop_column("timestamp") + batch_op.drop_column("provider_type") + # ### end Alembic commands ### diff --git a/keep/api/tasks/process_event_task.py b/keep/api/tasks/process_event_task.py index 2e97a3ad5..cbd8ead68 100644 --- a/keep/api/tasks/process_event_task.py +++ b/keep/api/tasks/process_event_task.py @@ -86,6 +86,7 @@ def __save_to_db( alert = AlertRaw( tenant_id=tenant_id, raw_alert=raw_event, + provider_type=provider_type, ) session.add(alert) # add audit to the deduplicated events @@ -568,6 +569,8 @@ def process_event( ) except Exception: logger.exception("Error processing event", extra=extra_dict) + # In case of exception, add the alerts to the defect table + __save_error_alerts(tenant_id, provider_type, raw_event) # Retrying only if context is present (running the job in arq worker) if bool(ctx): raise Retry(defer=ctx["job_try"] * TIMES_TO_RETRY_JOB) @@ -576,5 +579,40 @@ def process_event( logger.info("Event processed", extra=extra_dict) +def __save_error_alerts( + tenant_id, provider_type, raw_events: dict | list[dict] | list[AlertDto] | None +): + if not raw_events: + logger.info("No raw events to save as errors") + return + + try: + logger.info("Getting database session") + session = get_session_sync() + + # Convert to list if single dict + if isinstance(raw_events, dict): + logger.info("Converting single dict to list") + raw_events = [raw_events] + + logger.info(f"Saving {len(raw_events)} error alerts") + for raw_event in raw_events: + # Convert AlertDto to dict if needed + if isinstance(raw_event, AlertDto): + logger.info("Converting AlertDto to dict") + raw_event = raw_event.dict() + + alert = AlertRaw( + tenant_id=tenant_id, raw_alert=raw_event, provider_type=provider_type + ) + session.add(alert) + session.commit() + logger.info("Successfully saved error alerts") + except Exception: + logger.exception("Failed to save error alerts") + finally: + session.close() + + async def async_process_event(*args, **kwargs): return process_event(*args, **kwargs)