diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index b658a52ee6f1..d4861e143105 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -32,6 +32,7 @@ import { GoogleDriveFolders, GoogleDriveSyncToken, } from "@connectors/lib/models/google_drive"; +import { redisClient } from "@connectors/lib/redis"; import { heartbeat } from "@connectors/lib/temporal"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; @@ -332,6 +333,10 @@ export async function incrementalSync( activity: "incrementalSync", runInstance: uuid4(), }); + const redisCli = await redisClient({ + origin: "google_drive_incremental_sync", + }); + try { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { @@ -408,6 +413,18 @@ export async function incrementalSync( if (!change.file.id) { continue; } + + if ( + await alreadySeenAndIgnored({ + fileId: change.file.id, + connectorId, + startSyncTs, + redisCli, + }) + ) { + continue; + } + const file = await driveObjectToDustType(change.file, authCredentials); if ( !(await objectIsInFolderSelection( @@ -430,6 +447,12 @@ export async function incrementalSync( if (localFile) { await deleteOneFile(connectorId, file); } + await markAsSeenAndIgnored({ + fileId: change.file.id, + connectorId, + startSyncTs, + redisCli, + }); continue; } @@ -493,6 +516,8 @@ export async function incrementalSync( } else { throw e; } + } finally { + await redisCli?.quit(); } } @@ -796,3 +821,37 @@ export async function folderHasChildren( return res.data.files?.length > 0; } + +async function alreadySeenAndIgnored({ + fileId, + connectorId, + startSyncTs, + redisCli, +}: { + fileId: string; + connectorId: ModelId; + startSyncTs: number; + redisCli: Awaited>; +}) { + const key = `google_drive_seen_and_ignored_${connectorId}_${startSyncTs}_${fileId}`; + const val = await redisCli.get(key); + return val !== null; +} + +async function markAsSeenAndIgnored({ + fileId, + connectorId, + startSyncTs, + redisCli, +}: { + fileId: string; + connectorId: ModelId; + startSyncTs: number; + redisCli: Awaited>; +}) { + const key = `google_drive_seen_and_ignored_${connectorId}_${startSyncTs}_${fileId}`; + await redisCli.set(key, "1", { + PX: 1000 * 60 * 60 * 24, // 1 day + }); + return; +} diff --git a/connectors/src/lib/redis.ts b/connectors/src/lib/redis.ts index bb4cb5833a01..a6bf090ad7c4 100644 --- a/connectors/src/lib/redis.ts +++ b/connectors/src/lib/redis.ts @@ -6,7 +6,7 @@ import { statsDClient } from "@connectors/logger/withlogging"; let client: RedisClientType; -type RedisUsageTagsType = "notion_gc"; +type RedisUsageTagsType = "notion_gc" | "google_drive_incremental_sync"; export async function redisClient({ origin,