From 7eeb93b8449da3dd535c8880d2ae5d6251e4d47b Mon Sep 17 00:00:00 2001 From: Aric Lasry Date: Fri, 21 Jun 2024 12:06:08 +0200 Subject: [PATCH] Split Gdrive in two workers (full sync and incremental sync). (#5777) * Split Gdrive in two workers (full sync and incremental sync). * s/runGoogleWorker/runGoogleWorkers --- .../google_drive/temporal/client.ts | 11 ++++--- .../google_drive/temporal/config.ts | 4 +-- .../google_drive/temporal/worker.ts | 32 ++++++++++++++++--- connectors/src/start.ts | 4 +-- connectors/src/start_worker.ts | 4 +-- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/connectors/src/connectors/google_drive/temporal/client.ts b/connectors/src/connectors/google_drive/temporal/client.ts index aa56e44e144e..832d4fd804f5 100644 --- a/connectors/src/connectors/google_drive/temporal/client.ts +++ b/connectors/src/connectors/google_drive/temporal/client.ts @@ -3,12 +3,15 @@ import { Err, googleDriveIncrementalSyncWorkflowId, Ok } from "@dust-tt/types"; import type { WorkflowHandle } from "@temporalio/client"; import { WorkflowNotFoundError } from "@temporalio/client"; +import { + GDRIVE_FULL_SYNC_QUEUE_NAME, + GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME, +} from "@connectors/connectors/google_drive/temporal/config"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { getTemporalClient, terminateWorkflow } from "@connectors/lib/temporal"; import mainLogger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; -import { QUEUE_NAME } from "./config"; import { googleDriveFullSync, googleDriveFullSyncWorkflowId, @@ -42,7 +45,7 @@ export async function launchGoogleDriveFullSyncWorkflow( await terminateWorkflow(workflowId); await client.workflow.start(googleDriveFullSync, { args: [connectorId, dataSourceConfig], - taskQueue: QUEUE_NAME, + taskQueue: GDRIVE_FULL_SYNC_QUEUE_NAME, workflowId: workflowId, searchAttributes: { connectorId: [connectorId], @@ -89,7 +92,7 @@ export async function launchGoogleDriveIncrementalSyncWorkflow( await terminateWorkflow(workflowId); await client.workflow.start(googleDriveIncrementalSync, { args: [connectorId, dataSourceConfig], - taskQueue: QUEUE_NAME, + taskQueue: GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME, workflowId: workflowId, searchAttributes: { connectorId: [connectorId], @@ -143,7 +146,7 @@ export async function launchGoogleGarbageCollector( } await client.workflow.start(googleDriveGarbageCollectorWorkflow, { args: [connector.id, new Date().getTime()], - taskQueue: QUEUE_NAME, + taskQueue: GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME, workflowId: workflowId, searchAttributes: { connectorId: [connectorId], diff --git a/connectors/src/connectors/google_drive/temporal/config.ts b/connectors/src/connectors/google_drive/temporal/config.ts index ebae11b5f571..29d8863b35e7 100644 --- a/connectors/src/connectors/google_drive/temporal/config.ts +++ b/connectors/src/connectors/google_drive/temporal/config.ts @@ -1,3 +1,3 @@ export const WORKFLOW_VERSION = 5; -export const QUEUE_NAME = `google-queue-v${WORKFLOW_VERSION}`; -export const GDRIVE_INCREMENTAL_SYNC_DEBOUNCE_SEC = 10; +export const GDRIVE_FULL_SYNC_QUEUE_NAME = `google-queue-v${WORKFLOW_VERSION}`; +export const GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME = `google-queue-incremental-v${WORKFLOW_VERSION}`; diff --git a/connectors/src/connectors/google_drive/temporal/worker.ts b/connectors/src/connectors/google_drive/temporal/worker.ts index d67c31742222..4e87ee7f0815 100644 --- a/connectors/src/connectors/google_drive/temporal/worker.ts +++ b/connectors/src/connectors/google_drive/temporal/worker.ts @@ -11,14 +11,36 @@ import { import { ActivityInboundLogInterceptor } from "@connectors/lib/temporal_monitoring"; import logger from "@connectors/logger/logger"; -import { QUEUE_NAME } from "./config"; +import { + GDRIVE_FULL_SYNC_QUEUE_NAME, + GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME, +} from "./config"; -export async function runGoogleWorker() { +export async function runGoogleWorkers() { const { connection, namespace } = await getTemporalWorkerConnection(); - const worker = await Worker.create({ + const workerFullSync = await Worker.create({ + workflowsPath: require.resolve("./workflows"), + activities: { ...activities, ...sync_status }, + taskQueue: GDRIVE_FULL_SYNC_QUEUE_NAME, + maxConcurrentActivityTaskExecutions: 15, + connection, + maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS, + reuseV8Context: true, + namespace, + interceptors: { + activityInbound: [ + (ctx: Context) => { + return new ActivityInboundLogInterceptor(ctx, logger); + }, + () => new GoogleDriveCastKnownErrorsInterceptor(), + ], + }, + }); + + const workerIncrementalSync = await Worker.create({ workflowsPath: require.resolve("./workflows"), activities: { ...activities, ...sync_status }, - taskQueue: QUEUE_NAME, + taskQueue: GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME, maxConcurrentActivityTaskExecutions: 30, connection, maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS, @@ -34,5 +56,5 @@ export async function runGoogleWorker() { }, }); - await worker.run(); + await Promise.all([workerFullSync.run(), workerIncrementalSync.run()]); } diff --git a/connectors/src/start.ts b/connectors/src/start.ts index 20d04932e433..3dbdfe919341 100644 --- a/connectors/src/start.ts +++ b/connectors/src/start.ts @@ -4,7 +4,7 @@ import { startServer } from "@connectors/api_server"; import { runConfluenceWorker } from "@connectors/connectors/confluence/temporal/worker"; import { runGithubWorker } from "./connectors/github/temporal/worker"; -import { runGoogleWorker } from "./connectors/google_drive/temporal/worker"; +import { runGoogleWorkers } from "./connectors/google_drive/temporal/worker"; import { runIntercomWorker } from "./connectors/intercom/temporal/worker"; import { runNotionWorker } from "./connectors/notion/temporal/worker"; import { runSlackWorker } from "./connectors/slack/temporal/worker"; @@ -32,7 +32,7 @@ runNotionWorker().catch((err) => runGithubWorker().catch((err) => logger.error(errorFromAny(err), "Error running github worker") ); -runGoogleWorker().catch((err) => +runGoogleWorkers().catch((err) => logger.error(errorFromAny(err), "Error running google worker") ); runIntercomWorker().catch((err) => diff --git a/connectors/src/start_worker.ts b/connectors/src/start_worker.ts index 31f7cc7ac87c..edee922a909b 100644 --- a/connectors/src/start_worker.ts +++ b/connectors/src/start_worker.ts @@ -7,7 +7,7 @@ import { runConfluenceWorker } from "@connectors/connectors/confluence/temporal/ import { runWebCrawlerWorker } from "@connectors/connectors/webcrawler/temporal/worker"; import { runGithubWorker } from "./connectors/github/temporal/worker"; -import { runGoogleWorker } from "./connectors/google_drive/temporal/worker"; +import { runGoogleWorkers } from "./connectors/google_drive/temporal/worker"; import { runIntercomWorker } from "./connectors/intercom/temporal/worker"; import { runNotionWorker } from "./connectors/notion/temporal/worker"; import { runSlackWorker } from "./connectors/slack/temporal/worker"; @@ -19,7 +19,7 @@ setupGlobalErrorHandler(logger); const workerFunctions: Record Promise> = { confluence: runConfluenceWorker, github: runGithubWorker, - google_drive: runGoogleWorker, + google_drive: runGoogleWorkers, intercom: runIntercomWorker, notion: runNotionWorker, slack: runSlackWorker,