From 00f052d1a5ae8c4c1ecefb4d6549178cb5a4bf11 Mon Sep 17 00:00:00 2001 From: nael Date: Tue, 8 Oct 2024 05:20:06 +0200 Subject: [PATCH] :ambulance: Scaling fetching datta from gdrive --- .../@core-services/queues/shared.service.ts | 5 + .../src/@core/@core-services/queues/types.ts | 1 + .../unification/ingest-data.service.ts | 14 +- packages/api/src/@core/utils/helpers.ts | 3 + .../api/src/@core/utils/types/interface.ts | 14 ++ .../file/services/googledrive/index.ts | 217 ++++++++++++++++-- .../file/services/googledrive/processor.ts | 23 ++ packages/api/src/main.ts | 2 +- 8 files changed, 252 insertions(+), 27 deletions(-) create mode 100644 packages/api/src/@core/utils/helpers.ts create mode 100644 packages/api/src/filestorage/file/services/googledrive/processor.ts diff --git a/packages/api/src/@core/@core-services/queues/shared.service.ts b/packages/api/src/@core/@core-services/queues/shared.service.ts index b059e2529..0c208b9f4 100644 --- a/packages/api/src/@core/@core-services/queues/shared.service.ts +++ b/packages/api/src/@core/@core-services/queues/shared.service.ts @@ -14,6 +14,8 @@ export class BullQueueService { public readonly syncJobsQueue: Queue, @InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER) public readonly failedPassthroughRequestsQueue: Queue, + @InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION) + public readonly thirdPartyDataIngestionQueue: Queue, @InjectQueue(Queues.RAG_DOCUMENT_PROCESSING) private ragDocumentQueue: Queue, ) {} @@ -35,6 +37,9 @@ export class BullQueueService { getRagDocumentQueue() { return this.ragDocumentQueue; } + getThirdPartyDataIngestionQueue() { + return this.thirdPartyDataIngestionQueue; + } async removeRepeatableJob(jobName: string) { const jobs = await this.syncJobsQueue.getRepeatableJobs(); diff --git a/packages/api/src/@core/@core-services/queues/types.ts b/packages/api/src/@core/@core-services/queues/types.ts index 5cd578e4b..be54c4139 100644 --- a/packages/api/src/@core/@core-services/queues/types.ts +++ b/packages/api/src/@core/@core-services/queues/types.ts @@ -4,4 +4,5 @@ export enum Queues { SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING', + THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', } diff --git a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts index 74f11f6b0..24cf8179d 100644 --- a/packages/api/src/@core/@core-services/unification/ingest-data.service.ts +++ b/packages/api/src/@core/@core-services/unification/ingest-data.service.ts @@ -85,10 +85,16 @@ export class IngestDataService { .filter((p) => p.shouldPassToService) .map((p) => p.param); + const ingestParams = params + .filter((p) => p.shouldPassToIngest) + .reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); + // Construct the syncParam object dynamically const syncParam: SyncParam = { linkedUserId, custom_properties: remoteProperties, + custom_field_mappings: customFieldMappings, + ingestParams: ingestParams, }; serviceParams.forEach((param, index) => { @@ -124,11 +130,7 @@ export class IngestDataService { return; } - const sourceObject: U[] = resp.data; - - const ingestParams = params - .filter((p) => p.shouldPassToIngest) - .reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {}); + /*const sourceObject: U[] = resp.data; await this.ingestData( sourceObject, @@ -138,7 +140,7 @@ export class IngestDataService { commonObject, customFieldMappings, ingestParams, - ); + );*/ } catch (syncError) { this.logger.error( `Error syncing ${integrationId} ${commonObject}: ${syncError.message}`, diff --git a/packages/api/src/@core/utils/helpers.ts b/packages/api/src/@core/utils/helpers.ts new file mode 100644 index 000000000..421bda080 --- /dev/null +++ b/packages/api/src/@core/utils/helpers.ts @@ -0,0 +1,3 @@ +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/api/src/@core/utils/types/interface.ts b/packages/api/src/@core/utils/types/interface.ts index e744718c0..f04be63fc 100644 --- a/packages/api/src/@core/utils/types/interface.ts +++ b/packages/api/src/@core/utils/types/interface.ts @@ -68,8 +68,22 @@ export interface IBaseSync { export type SyncParam = { linkedUserId: string; + custom_field_mappings?: { + slug: string; + remote_id: string; + }[]; + ingestParams: { [key: string]: any }; [key: string]: any; }; export interface IBaseObjectService { sync(data: SyncParam): Promise>; + ingestData( + sourceData: any[], + connectionId: string, + customFieldMappings?: { + slug: string; + remote_id: string; + }[], + extraParams?: { [key: string]: any }, + ): Promise; } diff --git a/packages/api/src/filestorage/file/services/googledrive/index.ts b/packages/api/src/filestorage/file/services/googledrive/index.ts index 6fbf1c0d3..a5af0e204 100644 --- a/packages/api/src/filestorage/file/services/googledrive/index.ts +++ b/packages/api/src/filestorage/file/services/googledrive/index.ts @@ -1,10 +1,12 @@ import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; import { LoggerService } from '@@core/@core-services/logger/logger.service'; import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; -import { ApiResponse } from '@@core/utils/types'; +import { BullQueueService } from '@@core/@core-services/queues/shared.service'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; import { SyncParam } from '@@core/utils/types/interface'; import { FileStorageObject } from '@filestorage/@lib/@types'; import { IFileService } from '@filestorage/file/types'; +import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified'; import { Injectable } from '@nestjs/common'; import axios from 'axios'; import { OAuth2Client } from 'google-auth-library'; @@ -12,6 +14,9 @@ import { google } from 'googleapis'; import { ServiceRegistry } from '../registry.service'; import { GoogleDriveFileOutput } from './types'; +const BATCH_SIZE = 1000; // Number of files to process in each batch +const API_RATE_LIMIT = 10; // Requests per second + @Injectable() export class GoogleDriveService implements IFileService { constructor( @@ -19,6 +24,8 @@ export class GoogleDriveService implements IFileService { private logger: LoggerService, private cryptoService: EncryptionService, private registry: ServiceRegistry, + private ingestService: IngestDataService, + private bullQueueService: BullQueueService, ) { this.logger.setContext( FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name, @@ -26,7 +33,148 @@ export class GoogleDriveService implements IFileService { this.registry.registerService('googledrive', this); } - async sync(data: SyncParam): Promise> { + async ingestData( + sourceData: GoogleDriveFileOutput[], + connectionId: string, + customFieldMappings?: { + slug: string; + remote_id: string; + }[], + extraParams?: { [key: string]: any }, + ): Promise { + return this.ingestService.ingestData< + UnifiedFilestorageFileOutput, + GoogleDriveFileOutput + >( + sourceData, + 'googledrive', + connectionId, + 'filestorage', + 'file', + customFieldMappings, + extraParams, + ); + } + + async sync(data: SyncParam) { + const { linkedUserId, custom_field_mappings, ingestParams } = data; + const connection = await this.prisma.connections.findFirst({ + where: { + id_linked_user: linkedUserId, + provider_slug: 'googledrive', + vertical: 'filestorage', + }, + }); + + if (!connection) return; + + const auth = new OAuth2Client(); + auth.setCredentials({ + access_token: this.cryptoService.decrypt(connection.access_token), + }); + const drive = google.drive({ version: 'v3', auth }); + + const lastSyncTime = await this.getLastSyncTime(connection.id_connection); + const query = lastSyncTime + ? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` + : 'trashed = false'; + + let pageToken: string | undefined; + do { + const response = await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: 'nextPageToken', + pageSize: BATCH_SIZE, + pageToken: pageToken, + }), + ); + + await this.bullQueueService + .getThirdPartyDataIngestionQueue() + .add('fs_file_googledrive', { + ...data, + pageToken: response.data.nextPageToken, + query, + connectionId: connection.id_connection, + custom_field_mappings, + ingestParams, + }); + + pageToken = response.data.nextPageToken; + } while (pageToken); + + return { + data: [], + message: 'Google Drive sync completed', + statusCode: 200, + }; + } + + async processBatch(job: any) { + const { + linkedUserId, + query, + pageToken, + connectionId, + custom_field_mappings, + ingestParams, + } = job.data; + const connection = await this.prisma.connections.findFirst({ + where: { + id_linked_user: linkedUserId, + provider_slug: 'googledrive', + vertical: 'filestorage', + }, + }); + + if (!connection) return; + + const auth = new OAuth2Client(); + auth.setCredentials({ + access_token: this.cryptoService.decrypt(connection.access_token), + }); + const drive = google.drive({ version: 'v3', auth }); + + const response = await this.rateLimitedRequest(() => + drive.files.list({ + q: query, + fields: + 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', + pageSize: BATCH_SIZE, + pageToken: pageToken, + orderBy: 'modifiedTime', + }), + ); + + const files: GoogleDriveFileOutput[] = response.data.files.map((file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + modifiedTime: file.modifiedTime!, + size: file.size!, + parents: file.parents, + webViewLink: file.webViewLink, + })); + + await this.ingestData( + files, + connectionId, + custom_field_mappings, + ingestParams, + ); + } + + private async rateLimitedRequest(request: () => Promise): Promise { + return new Promise((resolve) => { + setTimeout(async () => { + const result = await request(); + resolve(result); + }, 1000 / API_RATE_LIMIT); + }); + } + + /*async sync(data: SyncParam): Promise> { try { const { linkedUserId, id_folder } = data; @@ -46,34 +194,63 @@ export class GoogleDriveService implements IFileService { }); const drive = google.drive({ version: 'v3', auth }); - const response = await drive.files.list({ - q: 'trashed = false', - fields: - 'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', - pageSize: 1000, // Adjust as needed - }); - - const files: GoogleDriveFileOutput[] = response.data.files.map( - (file) => ({ - id: file.id!, - name: file.name!, - mimeType: file.mimeType!, - modifiedTime: file.modifiedTime!, - size: file.size!, - parents: file.parents, - webViewLink: file.webViewLink, - }), + const lastSyncTime = await this.getLastSyncTime(connection.id_connection); + console.log( + 'last updated time for google drive file is ' + + JSON.stringify(lastSyncTime), ); + let pageToken: string | undefined; + let allFiles: GoogleDriveFileOutput[] = []; + + const query = lastSyncTime + ? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'` + : 'trashed = false'; + + do { + const response = await drive.files.list({ + q: query, + fields: + 'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)', + pageSize: 1000, + pageToken: pageToken, + orderBy: 'modifiedTime', + }); + + const files: GoogleDriveFileOutput[] = response.data.files.map( + (file) => ({ + id: file.id!, + name: file.name!, + mimeType: file.mimeType!, + modifiedTime: file.modifiedTime!, + size: file.size!, + parents: file.parents, + webViewLink: file.webViewLink, + }), + ); + allFiles = allFiles.concat(files); + pageToken = response.data.nextPageToken; + if (pageToken) { + await sleep(100); // Wait 100ms between requests to avoid hitting rate limits + } + } while (pageToken); this.logger.log(`Synced googledrive files !`); return { - data: files, + data: allFiles, message: 'Google Drive files retrieved', statusCode: 200, }; } catch (error) { throw error; } + }*/ + + private async getLastSyncTime(connectionId: string): Promise { + const lastSync = await this.prisma.fs_files.findFirst({ + where: { id_connection: connectionId }, + orderBy: { modified_at: 'desc' }, + }); + return lastSync ? lastSync.modified_at : null; } async downloadFile(fileId: string, connection: any): Promise { diff --git a/packages/api/src/filestorage/file/services/googledrive/processor.ts b/packages/api/src/filestorage/file/services/googledrive/processor.ts new file mode 100644 index 000000000..2ae382e97 --- /dev/null +++ b/packages/api/src/filestorage/file/services/googledrive/processor.ts @@ -0,0 +1,23 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { Job } from 'bull'; +import { Queues } from '@@core/@core-services/queues/types'; +import { GoogleDriveService } from '.'; + +@Injectable() +@Processor(Queues.THIRD_PARTY_DATA_INGESTION) +export class GoogleDriveQueueProcessor { + constructor(private readonly googleDriveService: GoogleDriveService) {} + + @Process('fs_file_googledrive') + async handleGoogleDriveSync(job: Job) { + try { + await this.googleDriveService.processBatch(job); + } catch (error) { + console.error( + `Failed to process Google Drive sync job: ${error.message}`, + ); + throw error; + } + } +} diff --git a/packages/api/src/main.ts b/packages/api/src/main.ts index 4ee9678d4..ca08b7e59 100644 --- a/packages/api/src/main.ts +++ b/packages/api/src/main.ts @@ -95,7 +95,7 @@ async function bootstrap() { extendedSpecs['x-speakeasy-name-override']; addSpeakeasyGroup(document); - await generatePanoraParamsSpec(document); + // TODO: await generatePanoraParamsSpec(document); useContainer(app.select(AppModule), { fallbackOnErrors: true });