From c47b4fdf2576b34c1d8d8d4803c437f1f9725bed Mon Sep 17 00:00:00 2001 From: Philippe Rolet Date: Wed, 27 Nov 2024 18:45:38 +0100 Subject: [PATCH] [Google Drive] Cache ignored files during incremental sync (#8946) Description --- We get incremental changes list, and for each change perform a check as to whether we should sync it, and lookup and delete the file if needed. This is suboptimal and in some cases leads to very long syncs while no file is actually synced, e.g. [here](https://dust4ai.slack.com/archives/C05F84CFP0E/p1732663500878059) It is very likely that changes occur multiple times on the same file over the course of long incremental syncs, and that we redo costful checks (e.g. loading parents from google API). This PR marks already ignored files, so we don't go over it again. This should vastly speed up the syncs when there are many changes to files we don't sync. Risk --- Breaking logic on google drive syncs Deploy --- connectors --- .../google_drive/temporal/activities.ts | 59 +++++++++++++++++++ connectors/src/lib/redis.ts | 2 +- 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index b658a52ee6f1..d4861e143105 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -32,6 +32,7 @@ import { GoogleDriveFolders, GoogleDriveSyncToken, } from "@connectors/lib/models/google_drive"; +import { redisClient } from "@connectors/lib/redis"; import { heartbeat } from "@connectors/lib/temporal"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; @@ -332,6 +333,10 @@ export async function incrementalSync( activity: "incrementalSync", runInstance: uuid4(), }); + const redisCli = await redisClient({ + origin: "google_drive_incremental_sync", + }); + try { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { @@ -408,6 +413,18 @@ export async function incrementalSync( if (!change.file.id) { continue; } + + if ( + await alreadySeenAndIgnored({ + fileId: change.file.id, + connectorId, + startSyncTs, + redisCli, + }) + ) { + continue; + } + const file = await driveObjectToDustType(change.file, authCredentials); if ( !(await objectIsInFolderSelection( @@ -430,6 +447,12 @@ export async function incrementalSync( if (localFile) { await deleteOneFile(connectorId, file); } + await markAsSeenAndIgnored({ + fileId: change.file.id, + connectorId, + startSyncTs, + redisCli, + }); continue; } @@ -493,6 +516,8 @@ export async function incrementalSync( } else { throw e; } + } finally { + await redisCli?.quit(); } } @@ -796,3 +821,37 @@ export async function folderHasChildren( return res.data.files?.length > 0; } + +async function alreadySeenAndIgnored({ + fileId, + connectorId, + startSyncTs, + redisCli, +}: { + fileId: string; + connectorId: ModelId; + startSyncTs: number; + redisCli: Awaited>; +}) { + const key = `google_drive_seen_and_ignored_${connectorId}_${startSyncTs}_${fileId}`; + const val = await redisCli.get(key); + return val !== null; +} + +async function markAsSeenAndIgnored({ + fileId, + connectorId, + startSyncTs, + redisCli, +}: { + fileId: string; + connectorId: ModelId; + startSyncTs: number; + redisCli: Awaited>; +}) { + const key = `google_drive_seen_and_ignored_${connectorId}_${startSyncTs}_${fileId}`; + await redisCli.set(key, "1", { + PX: 1000 * 60 * 60 * 24, // 1 day + }); + return; +} diff --git a/connectors/src/lib/redis.ts b/connectors/src/lib/redis.ts index bb4cb5833a01..a6bf090ad7c4 100644 --- a/connectors/src/lib/redis.ts +++ b/connectors/src/lib/redis.ts @@ -6,7 +6,7 @@ import { statsDClient } from "@connectors/logger/withlogging"; let client: RedisClientType; -type RedisUsageTagsType = "notion_gc"; +type RedisUsageTagsType = "notion_gc" | "google_drive_incremental_sync"; export async function redisClient({ origin,