diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index 7e83350fe655..aee0404aff48 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -421,7 +421,7 @@ export async function retrieveGoogleDriveConnectorPermissions({ } else if (filterPermission === null) { if (parentInternalId === null) { // Return the list of remote shared drives. - const drives = await getDrivesIds(c.connectionId); + const drives = await getDrivesIds(c.id); const resources: ConnectorResource[] = await Promise.all( drives.map(async (d): Promise => { const driveObject = await getGoogleDriveObject(authCredentials, d.id); diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index ca72c0814cab..65bf084009db 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -129,14 +129,19 @@ export async function getDriveClient( throw new Error("Invalid auth_credentials type"); } -export async function getDrivesIds(nangoConnectionId: string): Promise< +export async function getDrivesIds(connectorId: ModelId): Promise< { id: string; name: string; sharedDrive: boolean; }[] > { - const drive = await getDriveClient(nangoConnectionId); + const connector = await Connector.findByPk(connectorId); + if (!connector) { + throw new Error(`Connector ${connectorId} not found`); + } + const drive = await getDriveClient(connector.connectionId); + let nextPageToken: string | undefined | null = undefined; const ids: { id: string; name: string; sharedDrive: boolean }[] = []; const myDriveRes = await drive.files.get({ fileId: "root" }); @@ -177,7 +182,6 @@ export async function getDrivesIds(nangoConnectionId: string): Promise< export async function syncFiles( connectorId: ModelId, - nangoConnectionId: string, dataSourceConfig: DataSourceConfig, driveFolderId: string, startSyncTs: number, @@ -187,7 +191,11 @@ export async function syncFiles( count: number; subfolders: string[]; }> { - const authCredentials = await getAuthObject(nangoConnectionId); + const connector = await Connector.findByPk(connectorId); + if (!connector) { + throw new Error(`Connector ${connectorId} not found`); + } + const authCredentials = await getAuthObject(connector.connectionId); const driveFolder = await getGoogleDriveObject( authCredentials, driveFolderId @@ -575,7 +583,6 @@ async function objectIsInFolders( export async function incrementalSync( connectorId: ModelId, - nangoConnectionId: string, dataSourceConfig: DataSourceConfig, driveId: string, sharedDrive: boolean, @@ -586,23 +593,21 @@ export async function incrementalSync( provider: "google_drive", connectorId: connectorId, driveId: driveId, - nangoConnectionId: nangoConnectionId, activity: "incrementalSync", runInstance: uuid4(), }); try { + const connector = await Connector.findByPk(connectorId); + if (!connector) { + throw new Error(`Connector ${connectorId} not found`); + } if (!nextPageToken) { - nextPageToken = await getSyncPageToken( - connectorId, - nangoConnectionId, - driveId, - sharedDrive - ); + nextPageToken = await getSyncPageToken(connectorId, driveId, sharedDrive); } const selectedFoldersIds = await getFoldersToSync(connectorId); - const authCredentials = await getAuthObject(nangoConnectionId); + const authCredentials = await getAuthObject(connector.connectionId); const driveClient = await getDriveClient(authCredentials); let opts: drive_v3.Params$Resource$Changes$List = { @@ -739,10 +744,13 @@ export async function incrementalSync( async function getSyncPageToken( connectorId: ModelId, - nangoConnectionId: string, driveId: string, sharedDrive: boolean ) { + const connector = await Connector.findByPk(connectorId); + if (!connector) { + throw new Error(`Connector ${connectorId} not found`); + } const last = await GoogleDriveSyncToken.findOne({ where: { connectorId: connectorId, @@ -752,7 +760,7 @@ async function getSyncPageToken( if (last) { return last.syncToken; } - const driveClient = await getDriveClient(nangoConnectionId); + const driveClient = await getDriveClient(connector.connectionId); let lastSyncToken = undefined; if (!lastSyncToken) { let opts = {}; @@ -947,11 +955,10 @@ export async function populateSyncTokens(connectorId: ModelId) { if (!connector) { throw new Error(`Connector ${connectorId} not found`); } - const drivesIds = await getDrivesIds(connector.connectionId); + const drivesIds = await getDrivesIds(connector.id); for (const drive of drivesIds) { const lastSyncToken = await getSyncPageToken( connectorId, - connector.connectionId, drive.id, drive.sharedDrive ); diff --git a/connectors/src/connectors/google_drive/temporal/client.ts b/connectors/src/connectors/google_drive/temporal/client.ts index fc7935421722..668284d6bc7b 100644 --- a/connectors/src/connectors/google_drive/temporal/client.ts +++ b/connectors/src/connectors/google_drive/temporal/client.ts @@ -36,7 +36,6 @@ export async function launchGoogleDriveFullSyncWorkflow( const connectorIdModelId = parseInt(connectorId, 10) as ModelId; const dataSourceConfig = dataSourceConfigFromConnector(connector); - const nangoConnectionId = connector.connectionId; const workflowId = googleDriveFullSyncWorkflowId(connectorId); try { @@ -50,7 +49,7 @@ export async function launchGoogleDriveFullSyncWorkflow( } } await client.workflow.start(googleDriveFullSync, { - args: [connectorIdModelId, nangoConnectionId, dataSourceConfig], + args: [connectorIdModelId, dataSourceConfig], taskQueue: "google-queue", workflowId: workflowId, @@ -90,12 +89,11 @@ export async function launchGoogleDriveIncrementalSyncWorkflow( const connectorIdModelId = parseInt(connectorId, 10) as ModelId; const dataSourceConfig = dataSourceConfigFromConnector(connector); - const nangoConnectionId = connector.connectionId; const workflowId = googleDriveIncrementalSyncWorkflowId(connectorId); try { await client.workflow.signalWithStart(googleDriveIncrementalSync, { - args: [connectorIdModelId, nangoConnectionId, dataSourceConfig], + args: [connectorIdModelId, dataSourceConfig], taskQueue: "google-queue", workflowId: workflowId, signal: newWebhookSignal, diff --git a/connectors/src/connectors/google_drive/temporal/workflows.ts b/connectors/src/connectors/google_drive/temporal/workflows.ts index e65f70d3971c..f50c0f94c48a 100644 --- a/connectors/src/connectors/google_drive/temporal/workflows.ts +++ b/connectors/src/connectors/google_drive/temporal/workflows.ts @@ -36,7 +36,6 @@ const { reportInitialSyncProgress, syncSucceeded } = proxyActivities< export async function googleDriveFullSync( connectorId: ModelId, - nangoConnectionId: string, dataSourceConfig: DataSourceConfig, garbageCollect = true, foldersToBrowse: string[] | undefined = undefined, @@ -63,7 +62,6 @@ export async function googleDriveFullSync( do { const res = await syncFiles( connectorId, - nangoConnectionId, dataSourceConfig, folder, startSyncTs, @@ -82,7 +80,6 @@ export async function googleDriveFullSync( if (workflowInfo().historyLength > 4000) { await continueAsNew( connectorId, - nangoConnectionId, dataSourceConfig, garbageCollect, foldersToBrowse, @@ -109,7 +106,6 @@ export function googleDriveFullSyncWorkflowId(connectorId: string) { export async function googleDriveIncrementalSync( connectorId: ModelId, - nangoConnectionId: string, dataSourceConfig: DataSourceConfig ) { let signaled = false; @@ -128,14 +124,13 @@ export async function googleDriveIncrementalSync( } } console.log(`Processing after debouncing ${debounceCount} time(s)`); - const drivesIds = await getDrivesIds(nangoConnectionId); + const drivesIds = await getDrivesIds(connectorId); const startSyncTs = new Date().getTime(); for (const googleDrive of drivesIds) { let nextPageToken: undefined | string = undefined; do { nextPageToken = await incrementalSync( connectorId, - nangoConnectionId, dataSourceConfig, googleDrive.id, googleDrive.sharedDrive,