Skip to content

Commit

Permalink
Remove NangoConnectionId from Gdrive workflow state
Browse files Browse the repository at this point in the history
  • Loading branch information
lasryaric committed Nov 6, 2023
1 parent 0226fa0 commit b6dd6bc
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ export async function retrieveGoogleDriveConnectorPermissions({
} else if (filterPermission === null) {
if (parentInternalId === null) {
// Return the list of remote shared drives.
const drives = await getDrivesIds(c.connectionId);
const drives = await getDrivesIds(c.id);
const resources: ConnectorResource[] = await Promise.all(
drives.map(async (d): Promise<ConnectorResource> => {
const driveObject = await getGoogleDriveObject(authCredentials, d.id);
Expand Down
41 changes: 24 additions & 17 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,19 @@ export async function getDriveClient(
throw new Error("Invalid auth_credentials type");
}

export async function getDrivesIds(nangoConnectionId: string): Promise<
export async function getDrivesIds(connectorId: ModelId): Promise<
{
id: string;
name: string;
sharedDrive: boolean;
}[]
> {
const drive = await getDriveClient(nangoConnectionId);
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
const drive = await getDriveClient(connector.connectionId);

let nextPageToken: string | undefined | null = undefined;
const ids: { id: string; name: string; sharedDrive: boolean }[] = [];
const myDriveRes = await drive.files.get({ fileId: "root" });
Expand Down Expand Up @@ -177,7 +182,6 @@ export async function getDrivesIds(nangoConnectionId: string): Promise<

export async function syncFiles(
connectorId: ModelId,
nangoConnectionId: string,
dataSourceConfig: DataSourceConfig,
driveFolderId: string,
startSyncTs: number,
Expand All @@ -187,7 +191,11 @@ export async function syncFiles(
count: number;
subfolders: string[];
}> {
const authCredentials = await getAuthObject(nangoConnectionId);
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
const authCredentials = await getAuthObject(connector.connectionId);
const driveFolder = await getGoogleDriveObject(
authCredentials,
driveFolderId
Expand Down Expand Up @@ -575,7 +583,6 @@ async function objectIsInFolders(

export async function incrementalSync(
connectorId: ModelId,
nangoConnectionId: string,
dataSourceConfig: DataSourceConfig,
driveId: string,
sharedDrive: boolean,
Expand All @@ -586,23 +593,21 @@ export async function incrementalSync(
provider: "google_drive",
connectorId: connectorId,
driveId: driveId,
nangoConnectionId: nangoConnectionId,
activity: "incrementalSync",
runInstance: uuid4(),
});
try {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
if (!nextPageToken) {
nextPageToken = await getSyncPageToken(
connectorId,
nangoConnectionId,
driveId,
sharedDrive
);
nextPageToken = await getSyncPageToken(connectorId, driveId, sharedDrive);
}

const selectedFoldersIds = await getFoldersToSync(connectorId);

const authCredentials = await getAuthObject(nangoConnectionId);
const authCredentials = await getAuthObject(connector.connectionId);
const driveClient = await getDriveClient(authCredentials);

let opts: drive_v3.Params$Resource$Changes$List = {
Expand Down Expand Up @@ -739,10 +744,13 @@ export async function incrementalSync(

async function getSyncPageToken(
connectorId: ModelId,
nangoConnectionId: string,
driveId: string,
sharedDrive: boolean
) {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
const last = await GoogleDriveSyncToken.findOne({
where: {
connectorId: connectorId,
Expand All @@ -752,7 +760,7 @@ async function getSyncPageToken(
if (last) {
return last.syncToken;
}
const driveClient = await getDriveClient(nangoConnectionId);
const driveClient = await getDriveClient(connector.connectionId);
let lastSyncToken = undefined;
if (!lastSyncToken) {
let opts = {};
Expand Down Expand Up @@ -947,11 +955,10 @@ export async function populateSyncTokens(connectorId: ModelId) {
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
const drivesIds = await getDrivesIds(connector.connectionId);
const drivesIds = await getDrivesIds(connector.id);
for (const drive of drivesIds) {
const lastSyncToken = await getSyncPageToken(
connectorId,
connector.connectionId,
drive.id,
drive.sharedDrive
);
Expand Down
6 changes: 2 additions & 4 deletions connectors/src/connectors/google_drive/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export async function launchGoogleDriveFullSyncWorkflow(
const connectorIdModelId = parseInt(connectorId, 10) as ModelId;

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const nangoConnectionId = connector.connectionId;

const workflowId = googleDriveFullSyncWorkflowId(connectorId);
try {
Expand All @@ -50,7 +49,7 @@ export async function launchGoogleDriveFullSyncWorkflow(
}
}
await client.workflow.start(googleDriveFullSync, {
args: [connectorIdModelId, nangoConnectionId, dataSourceConfig],
args: [connectorIdModelId, dataSourceConfig],
taskQueue: "google-queue",
workflowId: workflowId,

Expand Down Expand Up @@ -90,12 +89,11 @@ export async function launchGoogleDriveIncrementalSyncWorkflow(
const connectorIdModelId = parseInt(connectorId, 10) as ModelId;

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const nangoConnectionId = connector.connectionId;

const workflowId = googleDriveIncrementalSyncWorkflowId(connectorId);
try {
await client.workflow.signalWithStart(googleDriveIncrementalSync, {
args: [connectorIdModelId, nangoConnectionId, dataSourceConfig],
args: [connectorIdModelId, dataSourceConfig],
taskQueue: "google-queue",
workflowId: workflowId,
signal: newWebhookSignal,
Expand Down
7 changes: 1 addition & 6 deletions connectors/src/connectors/google_drive/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ const { reportInitialSyncProgress, syncSucceeded } = proxyActivities<

export async function googleDriveFullSync(
connectorId: ModelId,
nangoConnectionId: string,
dataSourceConfig: DataSourceConfig,
garbageCollect = true,
foldersToBrowse: string[] | undefined = undefined,
Expand All @@ -63,7 +62,6 @@ export async function googleDriveFullSync(
do {
const res = await syncFiles(
connectorId,
nangoConnectionId,
dataSourceConfig,
folder,
startSyncTs,
Expand All @@ -82,7 +80,6 @@ export async function googleDriveFullSync(
if (workflowInfo().historyLength > 4000) {
await continueAsNew<typeof googleDriveFullSync>(
connectorId,
nangoConnectionId,
dataSourceConfig,
garbageCollect,
foldersToBrowse,
Expand All @@ -109,7 +106,6 @@ export function googleDriveFullSyncWorkflowId(connectorId: string) {

export async function googleDriveIncrementalSync(
connectorId: ModelId,
nangoConnectionId: string,
dataSourceConfig: DataSourceConfig
) {
let signaled = false;
Expand All @@ -128,14 +124,13 @@ export async function googleDriveIncrementalSync(
}
}
console.log(`Processing after debouncing ${debounceCount} time(s)`);
const drivesIds = await getDrivesIds(nangoConnectionId);
const drivesIds = await getDrivesIds(connectorId);
const startSyncTs = new Date().getTime();
for (const googleDrive of drivesIds) {
let nextPageToken: undefined | string = undefined;
do {
nextPageToken = await incrementalSync(
connectorId,
nangoConnectionId,
dataSourceConfig,
googleDrive.id,
googleDrive.sharedDrive,
Expand Down

0 comments on commit b6dd6bc

Please sign in to comment.