Skip to content

Commit

Permalink
Avoid duplicate slack message, cleanup script and batch delete (#9331)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraggle authored Dec 12, 2024
1 parent c59b403 commit 245174a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 11 deletions.
66 changes: 66 additions & 0 deletions connectors/migrations/20241212_clean_slack_messages_duplicates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { makeScript } from "scripts/helpers";
import { QueryTypes, Sequelize } from "sequelize";

const { CONNECTORS_DATABASE_URI } = process.env;

makeScript({}, async ({ execute }) => {
const sequelize = new Sequelize(CONNECTORS_DATABASE_URI as string, {
logging: false,
});

// Select distinct connectorId on slack_messages
const connectorIds = (
await sequelize.query<{ connectorId: number }>(
'SELECT DISTINCT "connectorId" FROM slack_messages',
{
type: QueryTypes.SELECT,
}
)
).map((c) => c.connectorId);

for (const connectorId of connectorIds) {
const duplicates = await sequelize.query<{
min_id: string;
documentId: string;
total: number;
}>(
'SELECT min(id) as min_id, "documentId", count(*) as total FROM slack_messages WHERE "connectorId" = $1 GROUP BY "documentId" HAVING count(*) > 1',
{
type: QueryTypes.SELECT,
bind: [connectorId],
}
);

if (duplicates.length > 0) {
console.log(
`${duplicates.length} duplicates slack messages for connector ${connectorId}`
);

for (const { min_id, documentId, total } of duplicates) {
const deleteQuery = `DELETE FROM slack_messages WHERE id > $1 AND "documentId" = $2`;
if (execute) {
await sequelize.query(deleteQuery, {
bind: [Number(min_id), documentId],
type: QueryTypes.DELETE,
});
} else {
const countQuery = `SELECT count(*) as count FROM slack_messages WHERE id > $1 AND "documentId" = $2`;
const counts = await sequelize.query<{ count: number }>(countQuery, {
bind: [Number(min_id), documentId],
type: QueryTypes.SELECT,
});
if (!counts[0]) {
throw new Error(`No results for ${countQuery}`);
}
if (counts[0].count != total - 1) {
throw new Error(
`Expected to delete ${total - 1} but would deleted ${counts[0].count}`
);
} else {
console.log(`OK: Would delete ${counts[0].count} slack messages.`);
}
}
}
}
}
});
64 changes: 53 additions & 11 deletions connectors/src/connectors/slack/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
reportInitialSyncProgress,
syncSucceeded,
} from "@connectors/lib/sync_status";
import { heartbeat } from "@connectors/lib/temporal";
import mainLogger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import { SlackConfigurationResource } from "@connectors/resources/slack_configuration_resource";
Expand Down Expand Up @@ -574,13 +575,26 @@ export async function syncNonThreaded(

const tags = getTagsForPage(documentId, channelId, channelName);

await SlackMessages.upsert({
connectorId: connectorId,
channelId: channelId,
messageTs: undefined,
documentId: documentId,
// Only create the document if it doesn't already exist based on the documentId
const existingMessages = await SlackMessages.findAll({
where: {
connectorId: connectorId,
channelId: channelId,
documentId: documentId,
},
order: [["id", "ASC"]],
limit: 1,
});

if (existingMessages.length === 0) {
await SlackMessages.create({
connectorId: connectorId,
channelId: channelId,
messageTs: undefined,
documentId: documentId,
});
}

await upsertToDatasource({
dataSourceConfig,
documentId,
Expand Down Expand Up @@ -768,12 +782,28 @@ export async function syncThread(

const tags = getTagsForPage(documentId, channelId, channelName, threadTs);

await SlackMessages.upsert({
connectorId: connectorId,
channelId: channelId,
messageTs: threadTs,
documentId: documentId,
// Only create the document if it doesn't already exist based on the documentId
const existingMessages = await SlackMessages.findAll({
where: {
connectorId: connectorId,
channelId: channelId,
documentId: documentId,
},
order: [["id", "ASC"]],
limit: 1,
});
if (existingMessages[0]) {
await existingMessages[0].update({
messageTs: threadTs,
});
} else {
await SlackMessages.create({
connectorId: connectorId,
channelId: channelId,
messageTs: threadTs,
documentId: documentId,
});
}

await upsertToDatasource({
dataSourceConfig,
Expand Down Expand Up @@ -1121,9 +1151,21 @@ export async function deleteChannel(channelId: string, connectorId: ModelId) {
// We delete from the remote datasource first because we would rather double delete remotely
// than miss one.
await deleteFromDataSource(dataSourceConfig, slackMessage.documentId);
await slackMessage.destroy();
nbDeleted++;

if (nbDeleted % 50 === 0) {
await heartbeat();
}
}

// Batch delete after we deleted from the remote datasource
await SlackMessages.destroy({
where: {
channelId: channelId,
connectorId: connectorId,
id: slackMessages.map((s) => s.id),
},
});
} while (slackMessages.length === maxMessages);
logger.info(
{ nbDeleted, channelId, connectorId },
Expand Down

0 comments on commit 245174a

Please sign in to comment.