From 245174aef4a65ae16cf20bdfb2e1e1460d0c2186 Mon Sep 17 00:00:00 2001 From: Sebastien Flory Date: Thu, 12 Dec 2024 16:58:48 +0100 Subject: [PATCH] Avoid duplicate slack message, cleanup script and batch delete (#9331) --- ...0241212_clean_slack_messages_duplicates.ts | 66 +++++++++++++++++++ .../connectors/slack/temporal/activities.ts | 64 ++++++++++++++---- 2 files changed, 119 insertions(+), 11 deletions(-) create mode 100644 connectors/migrations/20241212_clean_slack_messages_duplicates.ts diff --git a/connectors/migrations/20241212_clean_slack_messages_duplicates.ts b/connectors/migrations/20241212_clean_slack_messages_duplicates.ts new file mode 100644 index 000000000000..16b0c387f3dd --- /dev/null +++ b/connectors/migrations/20241212_clean_slack_messages_duplicates.ts @@ -0,0 +1,66 @@ +import { makeScript } from "scripts/helpers"; +import { QueryTypes, Sequelize } from "sequelize"; + +const { CONNECTORS_DATABASE_URI } = process.env; + +makeScript({}, async ({ execute }) => { + const sequelize = new Sequelize(CONNECTORS_DATABASE_URI as string, { + logging: false, + }); + + // Select distinct connectorId on slack_messages + const connectorIds = ( + await sequelize.query<{ connectorId: number }>( + 'SELECT DISTINCT "connectorId" FROM slack_messages', + { + type: QueryTypes.SELECT, + } + ) + ).map((c) => c.connectorId); + + for (const connectorId of connectorIds) { + const duplicates = await sequelize.query<{ + min_id: string; + documentId: string; + total: number; + }>( + 'SELECT min(id) as min_id, "documentId", count(*) as total FROM slack_messages WHERE "connectorId" = $1 GROUP BY "documentId" HAVING count(*) > 1', + { + type: QueryTypes.SELECT, + bind: [connectorId], + } + ); + + if (duplicates.length > 0) { + console.log( + `${duplicates.length} duplicates slack messages for connector ${connectorId}` + ); + + for (const { min_id, documentId, total } of duplicates) { + const deleteQuery = `DELETE FROM slack_messages WHERE id > $1 AND "documentId" = $2`; + if (execute) { + await sequelize.query(deleteQuery, { + bind: [Number(min_id), documentId], + type: QueryTypes.DELETE, + }); + } else { + const countQuery = `SELECT count(*) as count FROM slack_messages WHERE id > $1 AND "documentId" = $2`; + const counts = await sequelize.query<{ count: number }>(countQuery, { + bind: [Number(min_id), documentId], + type: QueryTypes.SELECT, + }); + if (!counts[0]) { + throw new Error(`No results for ${countQuery}`); + } + if (counts[0].count != total - 1) { + throw new Error( + `Expected to delete ${total - 1} but would deleted ${counts[0].count}` + ); + } else { + console.log(`OK: Would delete ${counts[0].count} slack messages.`); + } + } + } + } + } +}); diff --git a/connectors/src/connectors/slack/temporal/activities.ts b/connectors/src/connectors/slack/temporal/activities.ts index 02e35a0bcbe6..d5f17e42c604 100644 --- a/connectors/src/connectors/slack/temporal/activities.ts +++ b/connectors/src/connectors/slack/temporal/activities.ts @@ -37,6 +37,7 @@ import { reportInitialSyncProgress, syncSucceeded, } from "@connectors/lib/sync_status"; +import { heartbeat } from "@connectors/lib/temporal"; import mainLogger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import { SlackConfigurationResource } from "@connectors/resources/slack_configuration_resource"; @@ -574,13 +575,26 @@ export async function syncNonThreaded( const tags = getTagsForPage(documentId, channelId, channelName); - await SlackMessages.upsert({ - connectorId: connectorId, - channelId: channelId, - messageTs: undefined, - documentId: documentId, + // Only create the document if it doesn't already exist based on the documentId + const existingMessages = await SlackMessages.findAll({ + where: { + connectorId: connectorId, + channelId: channelId, + documentId: documentId, + }, + order: [["id", "ASC"]], + limit: 1, }); + if (existingMessages.length === 0) { + await SlackMessages.create({ + connectorId: connectorId, + channelId: channelId, + messageTs: undefined, + documentId: documentId, + }); + } + await upsertToDatasource({ dataSourceConfig, documentId, @@ -768,12 +782,28 @@ export async function syncThread( const tags = getTagsForPage(documentId, channelId, channelName, threadTs); - await SlackMessages.upsert({ - connectorId: connectorId, - channelId: channelId, - messageTs: threadTs, - documentId: documentId, + // Only create the document if it doesn't already exist based on the documentId + const existingMessages = await SlackMessages.findAll({ + where: { + connectorId: connectorId, + channelId: channelId, + documentId: documentId, + }, + order: [["id", "ASC"]], + limit: 1, }); + if (existingMessages[0]) { + await existingMessages[0].update({ + messageTs: threadTs, + }); + } else { + await SlackMessages.create({ + connectorId: connectorId, + channelId: channelId, + messageTs: threadTs, + documentId: documentId, + }); + } await upsertToDatasource({ dataSourceConfig, @@ -1121,9 +1151,21 @@ export async function deleteChannel(channelId: string, connectorId: ModelId) { // We delete from the remote datasource first because we would rather double delete remotely // than miss one. await deleteFromDataSource(dataSourceConfig, slackMessage.documentId); - await slackMessage.destroy(); nbDeleted++; + + if (nbDeleted % 50 === 0) { + await heartbeat(); + } } + + // Batch delete after we deleted from the remote datasource + await SlackMessages.destroy({ + where: { + channelId: channelId, + connectorId: connectorId, + id: slackMessages.map((s) => s.id), + }, + }); } while (slackMessages.length === maxMessages); logger.info( { nbDeleted, channelId, connectorId },