Skip to content

Commit

Permalink
Split Gdrive in two workers (full sync and incremental sync). (#5777)
Browse files Browse the repository at this point in the history
* Split Gdrive in two workers (full sync and incremental sync).

* s/runGoogleWorker/runGoogleWorkers
  • Loading branch information
lasryaric authored Jun 21, 2024
1 parent 789eaef commit 7eeb93b
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
11 changes: 7 additions & 4 deletions connectors/src/connectors/google_drive/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import { Err, googleDriveIncrementalSyncWorkflowId, Ok } from "@dust-tt/types";
import type { WorkflowHandle } from "@temporalio/client";
import { WorkflowNotFoundError } from "@temporalio/client";

import {
GDRIVE_FULL_SYNC_QUEUE_NAME,
GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME,
} from "@connectors/connectors/google_drive/temporal/config";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { getTemporalClient, terminateWorkflow } from "@connectors/lib/temporal";
import mainLogger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";

import { QUEUE_NAME } from "./config";
import {
googleDriveFullSync,
googleDriveFullSyncWorkflowId,
Expand Down Expand Up @@ -42,7 +45,7 @@ export async function launchGoogleDriveFullSyncWorkflow(
await terminateWorkflow(workflowId);
await client.workflow.start(googleDriveFullSync, {
args: [connectorId, dataSourceConfig],
taskQueue: QUEUE_NAME,
taskQueue: GDRIVE_FULL_SYNC_QUEUE_NAME,
workflowId: workflowId,
searchAttributes: {
connectorId: [connectorId],
Expand Down Expand Up @@ -89,7 +92,7 @@ export async function launchGoogleDriveIncrementalSyncWorkflow(
await terminateWorkflow(workflowId);
await client.workflow.start(googleDriveIncrementalSync, {
args: [connectorId, dataSourceConfig],
taskQueue: QUEUE_NAME,
taskQueue: GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME,
workflowId: workflowId,
searchAttributes: {
connectorId: [connectorId],
Expand Down Expand Up @@ -143,7 +146,7 @@ export async function launchGoogleGarbageCollector(
}
await client.workflow.start(googleDriveGarbageCollectorWorkflow, {
args: [connector.id, new Date().getTime()],
taskQueue: QUEUE_NAME,
taskQueue: GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME,
workflowId: workflowId,
searchAttributes: {
connectorId: [connectorId],
Expand Down
4 changes: 2 additions & 2 deletions connectors/src/connectors/google_drive/temporal/config.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export const WORKFLOW_VERSION = 5;
export const QUEUE_NAME = `google-queue-v${WORKFLOW_VERSION}`;
export const GDRIVE_INCREMENTAL_SYNC_DEBOUNCE_SEC = 10;
export const GDRIVE_FULL_SYNC_QUEUE_NAME = `google-queue-v${WORKFLOW_VERSION}`;
export const GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME = `google-queue-incremental-v${WORKFLOW_VERSION}`;
32 changes: 27 additions & 5 deletions connectors/src/connectors/google_drive/temporal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,36 @@ import {
import { ActivityInboundLogInterceptor } from "@connectors/lib/temporal_monitoring";
import logger from "@connectors/logger/logger";

import { QUEUE_NAME } from "./config";
import {
GDRIVE_FULL_SYNC_QUEUE_NAME,
GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME,
} from "./config";

export async function runGoogleWorker() {
export async function runGoogleWorkers() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
const workerFullSync = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities: { ...activities, ...sync_status },
taskQueue: GDRIVE_FULL_SYNC_QUEUE_NAME,
maxConcurrentActivityTaskExecutions: 15,
connection,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
reuseV8Context: true,
namespace,
interceptors: {
activityInbound: [
(ctx: Context) => {
return new ActivityInboundLogInterceptor(ctx, logger);
},
() => new GoogleDriveCastKnownErrorsInterceptor(),
],
},
});

const workerIncrementalSync = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities: { ...activities, ...sync_status },
taskQueue: QUEUE_NAME,
taskQueue: GDRIVE_INCREMENTAL_SYNC_QUEUE_NAME,
maxConcurrentActivityTaskExecutions: 30,
connection,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
Expand All @@ -34,5 +56,5 @@ export async function runGoogleWorker() {
},
});

await worker.run();
await Promise.all([workerFullSync.run(), workerIncrementalSync.run()]);
}
4 changes: 2 additions & 2 deletions connectors/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { startServer } from "@connectors/api_server";
import { runConfluenceWorker } from "@connectors/connectors/confluence/temporal/worker";

import { runGithubWorker } from "./connectors/github/temporal/worker";
import { runGoogleWorker } from "./connectors/google_drive/temporal/worker";
import { runGoogleWorkers } from "./connectors/google_drive/temporal/worker";
import { runIntercomWorker } from "./connectors/intercom/temporal/worker";
import { runNotionWorker } from "./connectors/notion/temporal/worker";
import { runSlackWorker } from "./connectors/slack/temporal/worker";
Expand Down Expand Up @@ -32,7 +32,7 @@ runNotionWorker().catch((err) =>
runGithubWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running github worker")
);
runGoogleWorker().catch((err) =>
runGoogleWorkers().catch((err) =>
logger.error(errorFromAny(err), "Error running google worker")
);
runIntercomWorker().catch((err) =>
Expand Down
4 changes: 2 additions & 2 deletions connectors/src/start_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { runConfluenceWorker } from "@connectors/connectors/confluence/temporal/
import { runWebCrawlerWorker } from "@connectors/connectors/webcrawler/temporal/worker";

import { runGithubWorker } from "./connectors/github/temporal/worker";
import { runGoogleWorker } from "./connectors/google_drive/temporal/worker";
import { runGoogleWorkers } from "./connectors/google_drive/temporal/worker";
import { runIntercomWorker } from "./connectors/intercom/temporal/worker";
import { runNotionWorker } from "./connectors/notion/temporal/worker";
import { runSlackWorker } from "./connectors/slack/temporal/worker";
Expand All @@ -19,7 +19,7 @@ setupGlobalErrorHandler(logger);
const workerFunctions: Record<ConnectorProvider, () => Promise<void>> = {
confluence: runConfluenceWorker,
github: runGithubWorker,
google_drive: runGoogleWorker,
google_drive: runGoogleWorkers,
intercom: runIntercomWorker,
notion: runNotionWorker,
slack: runSlackWorker,
Expand Down

0 comments on commit 7eeb93b

Please sign in to comment.