Skip to content

Commit

Permalink
🚑 Scaling fetching datta from gdrive
Browse files Browse the repository at this point in the history
  • Loading branch information
naelob committed Oct 8, 2024
1 parent a12f389 commit 00f052d
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class BullQueueService {
public readonly syncJobsQueue: Queue,
@InjectQueue(Queues.FAILED_PASSTHROUGH_REQUESTS_HANDLER)
public readonly failedPassthroughRequestsQueue: Queue,
@InjectQueue(Queues.THIRD_PARTY_DATA_INGESTION)
public readonly thirdPartyDataIngestionQueue: Queue,
@InjectQueue(Queues.RAG_DOCUMENT_PROCESSING)
private ragDocumentQueue: Queue,
) {}
Expand All @@ -35,6 +37,9 @@ export class BullQueueService {
getRagDocumentQueue() {
return this.ragDocumentQueue;
}
getThirdPartyDataIngestionQueue() {
return this.thirdPartyDataIngestionQueue;
}

async removeRepeatableJob(jobName: string) {
const jobs = await this.syncJobsQueue.getRepeatableJobs();
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/@core/@core-services/queues/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ export class IngestDataService {
.filter((p) => p.shouldPassToService)
.map((p) => p.param);

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});

// Construct the syncParam object dynamically
const syncParam: SyncParam = {
linkedUserId,
custom_properties: remoteProperties,
custom_field_mappings: customFieldMappings,
ingestParams: ingestParams,
};

serviceParams.forEach((param, index) => {
Expand Down Expand Up @@ -124,11 +130,7 @@ export class IngestDataService {
return;
}

const sourceObject: U[] = resp.data;

const ingestParams = params
.filter((p) => p.shouldPassToIngest)
.reduce((acc, p) => ({ ...acc, [p.paramName]: p.param }), {});
/*const sourceObject: U[] = resp.data;
await this.ingestData<T, U>(
sourceObject,
Expand All @@ -138,7 +140,7 @@ export class IngestDataService {
commonObject,
customFieldMappings,
ingestParams,
);
);*/
} catch (syncError) {
this.logger.error(
`Error syncing ${integrationId} ${commonObject}: ${syncError.message}`,
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/@core/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
14 changes: 14 additions & 0 deletions packages/api/src/@core/utils/types/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,22 @@ export interface IBaseSync {

export type SyncParam = {
linkedUserId: string;
custom_field_mappings?: {
slug: string;
remote_id: string;
}[];
ingestParams: { [key: string]: any };
[key: string]: any;
};
export interface IBaseObjectService {
sync(data: SyncParam): Promise<ApiResponse<any>>;
ingestData(
sourceData: any[],
connectionId: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<any[]>;
}
217 changes: 197 additions & 20 deletions packages/api/src/filestorage/file/services/googledrive/index.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,180 @@
import { EncryptionService } from '@@core/@core-services/encryption/encryption.service';
import { LoggerService } from '@@core/@core-services/logger/logger.service';
import { PrismaService } from '@@core/@core-services/prisma/prisma.service';
import { ApiResponse } from '@@core/utils/types';
import { BullQueueService } from '@@core/@core-services/queues/shared.service';
import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service';
import { SyncParam } from '@@core/utils/types/interface';
import { FileStorageObject } from '@filestorage/@lib/@types';
import { IFileService } from '@filestorage/file/types';
import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified';
import { Injectable } from '@nestjs/common';
import axios from 'axios';
import { OAuth2Client } from 'google-auth-library';
import { google } from 'googleapis';
import { ServiceRegistry } from '../registry.service';
import { GoogleDriveFileOutput } from './types';

const BATCH_SIZE = 1000; // Number of files to process in each batch
const API_RATE_LIMIT = 10; // Requests per second

@Injectable()
export class GoogleDriveService implements IFileService {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
private cryptoService: EncryptionService,
private registry: ServiceRegistry,
private ingestService: IngestDataService,
private bullQueueService: BullQueueService,
) {
this.logger.setContext(
FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
);
this.registry.registerService('googledrive', this);
}

async sync(data: SyncParam): Promise<ApiResponse<GoogleDriveFileOutput[]>> {
async ingestData(
sourceData: GoogleDriveFileOutput[],
connectionId: string,
customFieldMappings?: {
slug: string;
remote_id: string;
}[],
extraParams?: { [key: string]: any },
): Promise<UnifiedFilestorageFileOutput[]> {
return this.ingestService.ingestData<
UnifiedFilestorageFileOutput,
GoogleDriveFileOutput
>(
sourceData,
'googledrive',
connectionId,
'filestorage',
'file',
customFieldMappings,
extraParams,
);
}

async sync(data: SyncParam) {
const { linkedUserId, custom_field_mappings, ingestParams } = data;
const connection = await this.prisma.connections.findFirst({
where: {
id_linked_user: linkedUserId,
provider_slug: 'googledrive',
vertical: 'filestorage',
},
});

if (!connection) return;

const auth = new OAuth2Client();
auth.setCredentials({
access_token: this.cryptoService.decrypt(connection.access_token),
});
const drive = google.drive({ version: 'v3', auth });

const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
const query = lastSyncTime
? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'`
: 'trashed = false';

let pageToken: string | undefined;
do {
const response = await this.rateLimitedRequest(() =>
drive.files.list({
q: query,
fields: 'nextPageToken',
pageSize: BATCH_SIZE,
pageToken: pageToken,
}),
);

await this.bullQueueService
.getThirdPartyDataIngestionQueue()
.add('fs_file_googledrive', {
...data,
pageToken: response.data.nextPageToken,
query,
connectionId: connection.id_connection,
custom_field_mappings,
ingestParams,
});

pageToken = response.data.nextPageToken;
} while (pageToken);

return {
data: [],
message: 'Google Drive sync completed',
statusCode: 200,
};
}

async processBatch(job: any) {
const {
linkedUserId,
query,
pageToken,
connectionId,
custom_field_mappings,
ingestParams,
} = job.data;
const connection = await this.prisma.connections.findFirst({
where: {
id_linked_user: linkedUserId,
provider_slug: 'googledrive',
vertical: 'filestorage',
},
});

if (!connection) return;

const auth = new OAuth2Client();
auth.setCredentials({
access_token: this.cryptoService.decrypt(connection.access_token),
});
const drive = google.drive({ version: 'v3', auth });

const response = await this.rateLimitedRequest(() =>
drive.files.list({
q: query,
fields:
'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)',
pageSize: BATCH_SIZE,
pageToken: pageToken,
orderBy: 'modifiedTime',
}),
);

const files: GoogleDriveFileOutput[] = response.data.files.map((file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
parents: file.parents,
webViewLink: file.webViewLink,
}));

await this.ingestData(
files,
connectionId,
custom_field_mappings,
ingestParams,
);
}

private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve) => {
setTimeout(async () => {
const result = await request();
resolve(result);
}, 1000 / API_RATE_LIMIT);
});
}

/*async sync(data: SyncParam): Promise<ApiResponse<GoogleDriveFileOutput[]>> {
try {
const { linkedUserId, id_folder } = data;
Expand All @@ -46,34 +194,63 @@ export class GoogleDriveService implements IFileService {
});
const drive = google.drive({ version: 'v3', auth });
const response = await drive.files.list({
q: 'trashed = false',
fields:
'files(id, name, mimeType, modifiedTime, size, parents, webViewLink)',
pageSize: 1000, // Adjust as needed
});

const files: GoogleDriveFileOutput[] = response.data.files.map(
(file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
parents: file.parents,
webViewLink: file.webViewLink,
}),
const lastSyncTime = await this.getLastSyncTime(connection.id_connection);
console.log(
'last updated time for google drive file is ' +
JSON.stringify(lastSyncTime),
);
let pageToken: string | undefined;
let allFiles: GoogleDriveFileOutput[] = [];
const query = lastSyncTime
? `trashed = false and modifiedTime > '${lastSyncTime.toISOString()}'`
: 'trashed = false';
do {
const response = await drive.files.list({
q: query,
fields:
'nextPageToken, files(id, name, mimeType, modifiedTime, size, parents, webViewLink)',
pageSize: 1000,
pageToken: pageToken,
orderBy: 'modifiedTime',
});
const files: GoogleDriveFileOutput[] = response.data.files.map(
(file) => ({
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
parents: file.parents,
webViewLink: file.webViewLink,
}),
);
allFiles = allFiles.concat(files);
pageToken = response.data.nextPageToken;
if (pageToken) {
await sleep(100); // Wait 100ms between requests to avoid hitting rate limits
}
} while (pageToken);
this.logger.log(`Synced googledrive files !`);
return {
data: files,
data: allFiles,
message: 'Google Drive files retrieved',
statusCode: 200,
};
} catch (error) {
throw error;
}
}*/

private async getLastSyncTime(connectionId: string): Promise<Date | null> {
const lastSync = await this.prisma.fs_files.findFirst({
where: { id_connection: connectionId },
orderBy: { modified_at: 'desc' },
});
return lastSync ? lastSync.modified_at : null;
}

async downloadFile(fileId: string, connection: any): Promise<Buffer> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Process, Processor } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Job } from 'bull';
import { Queues } from '@@core/@core-services/queues/types';
import { GoogleDriveService } from '.';

@Injectable()
@Processor(Queues.THIRD_PARTY_DATA_INGESTION)
export class GoogleDriveQueueProcessor {
constructor(private readonly googleDriveService: GoogleDriveService) {}

@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
);
throw error;
}
}
}
2 changes: 1 addition & 1 deletion packages/api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async function bootstrap() {
extendedSpecs['x-speakeasy-name-override'];
addSpeakeasyGroup(document);

await generatePanoraParamsSpec(document);
// TODO: await generatePanoraParamsSpec(document);

useContainer(app.select(AppModule), { fallbackOnErrors: true });

Expand Down

0 comments on commit 00f052d

Please sign in to comment.