diff --git a/.env.example b/.env.example index 8760d042c..739a4a294 100644 --- a/.env.example +++ b/.env.example @@ -112,22 +112,16 @@ BOX_FILESTORAGE_CLOUD_CLIENT_SECRET= # Onedrive ONEDRIVE_FILESTORAGE_CLOUD_CLIENT_ID= ONEDRIVE_FILESTORAGE_CLOUD_CLIENT_SECRET= -# dropbox +# Dropbox DROPBOX_FILESTORAGE_CLOUD_CLIENT_ID= DROPBOX_FILESTORAGE_CLOUD_CLIENT_SECRET= - # Google Drive GOOGLEDRIVE_FILESTORAGE_CLOUD_CLIENT_ID= GOOGLEDRIVE_FILESTORAGE_CLOUD_CLIENT_SECRET= - -# Google Drive +# Sharepoint SHAREPOINT_FILESTORAGE_CLOUD_CLIENT_ID= SHAREPOINT_FILESTORAGE_CLOUD_CLIENT_SECRET= -# Google Drive -DROPBOX_FILESTORAGE_CLOUD_CLIENT_ID= -DROPBOX_FILESTORAGE_CLOUD_CLIENT_SECRET= - # ================================================ # ECOMMERCE diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 556022e07..4371eaf1a 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -218,7 +218,7 @@ services: NEXT_PUBLIC_POSTHOG_KEY: ${POSTHOG_KEY} NEXT_PUBLIC_POSTHOG_HOST: ${POSTHOG_HOST} NEXT_PUBLIC_DISTRIBUTION: ${DISTRIBUTION} - NEXT_PUBLIC_BACKEND_DOMAIN: https://localhost:3000 + NEXT_PUBLIC_BACKEND_DOMAIN: http://localhost:3000 NEXT_PUBLIC_MAGIC_LINK_DOMAIN: ${NEXT_PUBLIC_MAGIC_LINK_DOMAIN} NEXT_PUBLIC_WEBAPP_DOMAIN: ${NEXT_PUBLIC_WEBAPP_DOMAIN} NEXT_PUBLIC_REDIRECT_WEBHOOK_INGRESS: ${REDIRECT_TUNNEL_INGRESS} diff --git a/packages/api/src/@core/connections/filestorage/services/sharepoint/sharepoint.service.ts b/packages/api/src/@core/connections/filestorage/services/sharepoint/sharepoint.service.ts index 826ed1206..77f009463 100644 --- a/packages/api/src/@core/connections/filestorage/services/sharepoint/sharepoint.service.ts +++ b/packages/api/src/@core/connections/filestorage/services/sharepoint/sharepoint.service.ts @@ -137,7 +137,7 @@ export class SharepointConnectionService extends AbstractBaseConnectionService { // get site_id from tenant and sitename const site_details = await axios.get( - `https://graph.microsoft.com/v1.0/sites/${site}.sharepoint.com:/sites/${tenant}`, + `https://graph.microsoft.com/v1.0/sites/${tenant}.sharepoint.com:/sites/${site}`, { headers: { Authorization: `Bearer ${data.access_token}`, diff --git a/packages/api/src/filestorage/drive/services/sharepoint/index.ts b/packages/api/src/filestorage/drive/services/sharepoint/index.ts index 9fece13b1..9f26564ac 100644 --- a/packages/api/src/filestorage/drive/services/sharepoint/index.ts +++ b/packages/api/src/filestorage/drive/services/sharepoint/index.ts @@ -6,7 +6,7 @@ import { SyncParam } from '@@core/utils/types/interface'; import { FileStorageObject } from '@filestorage/@lib/@types'; import { IDriveService } from '@filestorage/drive/types'; import { Injectable } from '@nestjs/common'; -import axios from 'axios'; +import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { ServiceRegistry } from '../registry.service'; import { SharepointDriveOutput } from './types'; import { DesunifyReturnType } from '@@core/utils/types/desunify.input'; @@ -14,6 +14,9 @@ import { OriginalDriveOutput } from '@@core/utils/types/original/original.file-s @Injectable() export class SharepointService implements IDriveService { + private readonly MAX_RETRIES: number = 6; + private readonly INITIAL_BACKOFF_MS: number = 1000; + constructor( private prisma: PrismaService, private logger: LoggerService, @@ -30,8 +33,11 @@ export class SharepointService implements IDriveService { driveData: DesunifyReturnType, linkedUserId: string, ): Promise> { - // No API to add drive in Sharepoint - return; + return { + data: null, + message: 'Add drive not supported for SharePoint.', + statusCode: 501, + }; } async sync(data: SyncParam): Promise> { @@ -46,26 +52,95 @@ export class SharepointService implements IDriveService { }, }); - const resp = await axios.get(`${connection.account_url}/drives`, { + const config: AxiosRequestConfig = { + method: 'get', + url: `${connection.account_url}/drives`, headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${this.cryptoService.decrypt( connection.access_token, )}`, }, - }); + }; + + const resp: AxiosResponse = await this.makeRequestWithRetry(config); const drives: SharepointDriveOutput[] = resp.data.value; - this.logger.log(`Synced sharepoint drives !`); + this.logger.log(`Synced SharePoint drives successfully.`); return { data: drives, - message: 'Sharepoint drives retrived', + message: 'SharePoint drives retrieved successfully.', statusCode: 200, }; - } catch (error) { - console.log(error.response); + } catch (error: any) { + this.logger.error( + `Error syncing SharePoint drives: ${error.message}`, + error, + ); throw error; } } + + private async makeRequestWithRetry( + config: AxiosRequestConfig, + ): Promise { + let attempts = 0; + let backoff: number = this.INITIAL_BACKOFF_MS; + + while (attempts < this.MAX_RETRIES) { + try { + const response: AxiosResponse = await axios(config); + return response; + } catch (error: any) { + attempts++; + + if ( + (error.response && error.response.status === 429) || + (error.response && error.response.status >= 500) || + error.code === 'ECONNABORTED' || + error.code === 'ETIMEDOUT' || + error.response?.code === 'ETIMEDOUT' + ) { + const retryAfter = this.getRetryAfter( + error.response?.headers['retry-after'], + ); + const delayTime: number = Math.max(retryAfter * 1000, backoff); + + this.logger.warn( + `Request failed with ${ + error.code || error.response?.status + }. Retrying in ${delayTime}ms (Attempt ${attempts}/${ + this.MAX_RETRIES + })`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + this.logger.error(`Request failed: ${error.message}`, error); + throw error; + } + } + + this.logger.error( + 'Max retry attempts reached. Request failed.', + SharepointService.name, + ); + throw new Error('Max retry attempts reached.'); + } + + private getRetryAfter(retryAfterHeader: string | undefined): number { + if (!retryAfterHeader) { + return 1; + } + const retryAfterSeconds: number = parseInt(retryAfterHeader, 10); + return isNaN(retryAfterSeconds) ? 1 : retryAfterSeconds; + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/packages/api/src/filestorage/drive/services/sharepoint/types.ts b/packages/api/src/filestorage/drive/services/sharepoint/types.ts index 8ce603089..f24e144d0 100644 --- a/packages/api/src/filestorage/drive/services/sharepoint/types.ts +++ b/packages/api/src/filestorage/drive/services/sharepoint/types.ts @@ -56,6 +56,8 @@ export interface IdentitySet { readonly phone?: Identity; /** Identity representing a user. */ readonly user?: Identity; + /** Identity representing a group. */ + readonly group?: Identity; } /** diff --git a/packages/api/src/filestorage/file/file.module.ts b/packages/api/src/filestorage/file/file.module.ts index e4d8dc121..573278bd0 100644 --- a/packages/api/src/filestorage/file/file.module.ts +++ b/packages/api/src/filestorage/file/file.module.ts @@ -19,6 +19,7 @@ import { SyncService } from './sync/sync.service'; import { GoogleDriveQueueProcessor } from './services/googledrive/processor'; import { FolderModule } from '../folder/folder.module'; import { OnedriveQueueProcessor } from './services/onedrive/processor'; +import { SharepointQueueProcessor } from './services/sharepoint/processor'; @Module({ imports: [forwardRef(() => FolderModule)], @@ -45,6 +46,7 @@ import { OnedriveQueueProcessor } from './services/onedrive/processor'; GoogleDriveService, GoogleDriveQueueProcessor, OnedriveQueueProcessor, + SharepointQueueProcessor, ], exports: [SyncService, ServiceRegistry, GoogleDriveService], }) diff --git a/packages/api/src/filestorage/file/services/sharepoint/index.ts b/packages/api/src/filestorage/file/services/sharepoint/index.ts index cf96e4f82..7e7643920 100644 --- a/packages/api/src/filestorage/file/services/sharepoint/index.ts +++ b/packages/api/src/filestorage/file/services/sharepoint/index.ts @@ -6,17 +6,31 @@ import { SyncParam } from '@@core/utils/types/interface'; import { FileStorageObject } from '@filestorage/@lib/@types'; import { IFileService } from '@filestorage/file/types'; import { Injectable } from '@nestjs/common'; -import axios from 'axios'; +import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { ServiceRegistry } from '../registry.service'; import { SharepointFileOutput } from './types'; +import { SharepointService as SharepointFolderService } from '@filestorage/folder/services/sharepoint'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; +import { Connection } from '@@core/connections/@utils/types'; +import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified'; +import { BullQueueService } from '@@core/@core-services/queues/shared.service'; +import { SharepointPermissionOutput } from '@filestorage/permission/services/sharepoint/types'; +import { UnifiedFilestoragePermissionOutput } from '@filestorage/permission/types/model.unified'; @Injectable() export class SharepointService implements IFileService { + private readonly MAX_RETRIES: number = 6; + private readonly INITIAL_BACKOFF_MS: number = 1000; + private readonly BATCH_SIZE: number = 20; + constructor( private prisma: PrismaService, private logger: LoggerService, private cryptoService: EncryptionService, private registry: ServiceRegistry, + private ingestService: IngestDataService, + private sharepointFolderService: SharepointFolderService, + private bullQueueService: BullQueueService, ) { this.logger.setContext( `${FileStorageObject.file.toUpperCase()}:${SharepointService.name}`, @@ -24,9 +38,13 @@ export class SharepointService implements IFileService { this.registry.registerService('sharepoint', this); } - async sync(data: SyncParam): Promise> { + async sync( + data: SyncParam, + deltaLink?: string, + ): Promise> { try { - const { linkedUserId, id_folder } = data; + const { linkedUserId, custom_field_mappings, ingestParams, id_folder } = + data; const connection = await this.prisma.connections.findFirst({ where: { @@ -36,34 +54,103 @@ export class SharepointService implements IFileService { }, }); - const foldersToSync = ['root']; + // if id_folder is provided, sync only the files in the specified folder if (id_folder) { const folder = await this.prisma.fs_folders.findUnique({ where: { id_fs_folder: id_folder as string, }, + select: { + remote_id: true, + }, }); - if (folder && folder.remote_id !== 'root') { - foldersToSync.push(folder.remote_id); + if (folder) { + const files = await this.syncFolder(connection, folder.remote_id); + return { + data: files, + message: 'SharePoint files retrieved from specified folder', + statusCode: 200, + }; } } - const allFiles: SharepointFileOutput[] = []; + // if deltaLink is provided + if (deltaLink) { + this.logger.log( + `Syncing SharePoint files from deltaLink: ${deltaLink}`, + ); + let files: SharepointFileOutput[] = []; + let nextDeltaLink: string | null = null; + try { + const { files: batchFiles, nextDeltaLink: batchNextDeltaLink } = + await this.getFilesToSync(connection, deltaLink, 10); + + files = batchFiles; + nextDeltaLink = batchNextDeltaLink; + } catch (error: any) { + if (error.response?.status === 410) { + // Delta token expired, start fresh sync + const newDeltaLink = `${connection.account_url}/drive/root/delta?$top=1000`; + return this.sync(data, newDeltaLink); + } + await this.bullQueueService + .getSyncJobsQueue() + .add('fs_file_sharepoint', { + ...data, + deltaLink: deltaLink, + connectionId: connection.id_connection, + }); + + this.logger.error( + `Got 410 error while syncing SharePoint files. Added sync from /delta endpoint to queue to retry.`, + error, + ); + } + + if (files.length > 0) { + const ingestedFiles = await this.ingestFiles( + files, + connection, + custom_field_mappings, + ingestParams, + ); + + this.logger.log( + `Ingested ${ingestedFiles.length} files from SharePoint.`, + ); + } - for (const folderId of foldersToSync) { - const files = await this.syncFolder(connection, folderId); - allFiles.push(...files); + // more files to sync + if (nextDeltaLink) { + await this.bullQueueService + .getThirdPartyDataIngestionQueue() + .add('fs_file_sharepoint', { + ...data, + deltaLink: nextDeltaLink, + connectionId: connection.id_connection, + }); + } else { + this.logger.log(`No more files to sync from SharePoint.`); + } + } else { + const lastSyncTime = await this.getLastSyncTime( + connection.id_connection, + ); + const deltaLink = lastSyncTime + ? `${ + connection.account_url + }/drive/root/delta?$top=1000&token=${lastSyncTime.toISOString()}` + : `${connection.account_url}/drive/root/delta?$top=1000`; + + await this.sync(data, deltaLink); } - this.logger.log( - `Synced SharePoint files from root and specified folder!`, - ); return { - data: allFiles, - message: 'SharePoint files retrieved from root and specified folder', + data: [], + message: 'SharePoint files retrieved', statusCode: 200, }; - } catch (error) { + } catch (error: any) { this.logger.error( `Error syncing SharePoint files: ${error.message}`, error, @@ -72,59 +159,400 @@ export class SharepointService implements IFileService { } } - private async syncFolder( - connection: any, - folderId: string, - ): Promise { - const resp = await axios.get( - `${connection.account_url}/drive/items/${folderId}/children`, - { + private async getFilesToSync( + connection: Connection, + deltaLink: string, + maxApiCalls: number, + ) { + const files: SharepointFileOutput[] = []; + let nextDeltaLink: string | null = deltaLink; + + for (let i = 0; i < maxApiCalls; i++) { + const resp = await this.makeRequestWithRetry({ + timeout: 30000, + method: 'get', + url: deltaLink, headers: { - 'Content-Type': 'application/json', Authorization: `Bearer ${this.cryptoService.decrypt( connection.access_token, )}`, }, + }); + + const batchFiles = resp.data.value?.filter((elem: any) => !elem.folder); + files.push(...batchFiles); + nextDeltaLink = resp.data['@odata.nextLink']; + + if (!resp.data.value?.length) { + nextDeltaLink = null; + break; + } + } + + return { files, nextDeltaLink }; + } + + async processBatch(job: any) { + const { + linkedUserId, + deltaLink, + connectionId, + custom_field_mappings, + ingestParams, + } = job.data; + + await this.sync( + { + linkedUserId, + custom_field_mappings, + ingestParams, }, + deltaLink, + ); + } + + private async ingestFiles( + files: SharepointFileOutput[], + connection: Connection, + customFieldMappings?: { + slug: string; + remote_id: string; + }[], + extraParams?: { [key: string]: any }, + ) { + // Sort files by lastModifiedDateTime in descending order (newest first) + const sortedFiles = [...files].sort((a, b) => { + const dateA = new Date(a.lastModifiedDateTime).getTime(); + const dateB = new Date(b.lastModifiedDateTime).getTime(); + return dateB - dateA; + }); + + // Deduplicate files by remote_id, keeping only the first occurrence (which will be the latest version) + const uniqueFiles = sortedFiles.reduce((acc, file) => { + if (!acc.has(file.id)) { + acc.set(file.id, file); + } + return acc; + }, new Map()); + + this.logger.log( + `Deduplicating ${files.length} delta files to ${uniqueFiles.size} unique files`, + 'sharepoint files ingestion', ); - const files: SharepointFileOutput[] = resp.data.value.filter( - (elem) => !elem.folder, // files don't have a folder property + await this.ingestPermissionsForFiles( + Array.from(uniqueFiles.values()), + connection, ); - // Add permissions (shared link is also included in permissions in SharePoint) - await Promise.all( - files.map(async (driveItem) => { - const resp = await axios.get( - `${connection.account_url}/drive/items/${driveItem.id}/permissions`, - { + return this.ingestService.ingestData< + UnifiedFilestorageFileOutput, + SharepointFileOutput + >( + Array.from(uniqueFiles.values()), + 'sharepoint', + connection.id_connection, + 'filestorage', + 'file', + customFieldMappings, + extraParams, + ); + } + + private async ingestPermissionsForFiles( + allFiles: SharepointFileOutput[], + connection: Connection, + ): Promise { + const allPermissions: SharepointPermissionOutput[] = []; + const fileIdToRemotePermissionIdMap: Map = new Map(); + const batchSize = 4; + + const files = allFiles.filter((f) => !f.deleted); + + for (let i = 0; i < files.length; i += batchSize) { + const batch = files.slice(i, i + batchSize); + const permissions = await Promise.all( + batch.map(async (file) => { + const permissionConfig: AxiosRequestConfig = { + timeout: 30000, + method: 'get', + url: `${connection.account_url}/drive/items/${file.id}/permissions`, headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${this.cryptoService.decrypt( connection.access_token, )}`, }, - }, - ); - driveItem.permissions = resp.data.value; - }), + }; + + const resp = await this.makeRequestWithRetry(permissionConfig); + const permissions = resp.data.value; + fileIdToRemotePermissionIdMap.set( + file.id, + permissions.map((p) => p.id), + ); + return permissions; + }), + ); + + this.delay(1000); + + allPermissions.push(...permissions.flat()); + } + + const uniquePermissions = Array.from( + new Map( + allPermissions.map((permission) => [permission.id, permission]), + ).values(), ); - return files; + await this.assignUserAndGroupIdsToPermissions(uniquePermissions); + + const syncedPermissions = await this.ingestService.ingestData< + UnifiedFilestoragePermissionOutput, + SharepointPermissionOutput + >( + uniquePermissions, + 'sharepoint', + connection.id_connection, + 'filestorage', + 'permission', + ); + + this.logger.log( + `Ingested ${syncedPermissions.length} permissions for files.`, + ); + + const permissionIdMap: Map = new Map( + syncedPermissions.map((permission) => [ + permission.remote_id, + permission.id_fs_permission, + ]), + ); + + files.forEach((file) => { + if (fileIdToRemotePermissionIdMap.has(file.id)) { + file.internal_permissions = fileIdToRemotePermissionIdMap + .get(file.id) + ?.map((permission) => permissionIdMap.get(permission)) + .filter((id) => id !== undefined); + } + }); + + return allFiles; } - async downloadFile(fileId: string, connection: any): Promise { - const response = await axios.get( - `${connection.account_url}/drive/items/${fileId}/content`, - { + private async assignUserAndGroupIdsToPermissions( + permissions: SharepointPermissionOutput[], + ): Promise { + const userLookupCache: Map = new Map(); + const groupLookupCache: Map = new Map(); + + for (const permission of permissions) { + if (permission.grantedToV2?.user?.id) { + const remote_user_id = permission.grantedToV2.user.id; + if (userLookupCache.has(remote_user_id)) { + permission.internal_user_id = userLookupCache.get(remote_user_id); + continue; + } + const user = await this.prisma.fs_users.findFirst({ + where: { + remote_id: remote_user_id, + }, + select: { + id_fs_user: true, + }, + }); + if (user) { + permission.internal_user_id = user.id_fs_user; + userLookupCache.set(remote_user_id, user.id_fs_user); + } + } + + if (permission.grantedToV2?.group?.id) { + const remote_group_id = permission.grantedToV2.group.id; + if (groupLookupCache.has(remote_group_id)) { + permission.internal_group_id = groupLookupCache.get(remote_group_id); + continue; + } + const group = await this.prisma.fs_groups.findFirst({ + where: { + remote_id: remote_group_id, + }, + select: { + id_fs_group: true, + }, + }); + if (group) { + permission.internal_group_id = group.id_fs_group; + groupLookupCache.set(remote_group_id, group.id_fs_group); + } + } + } + } + + private async syncFolder( + connection: any, + folderId: string, + ): Promise { + try { + const config: AxiosRequestConfig = { + timeout: 30000, + method: 'get', + url: `${connection.account_url}/drive/items/${folderId}/children`, headers: { + 'Content-Type': 'application/json', Authorization: `Bearer ${this.cryptoService.decrypt( connection.access_token, )}`, }, - responseType: 'arraybuffer', + }; + + const resp: AxiosResponse = await this.makeRequestWithRetry(config); + + const files: SharepointFileOutput[] = resp.data.value.filter( + (elem: any) => !elem.folder, + ); + + await this.ingestPermissionsForFiles(files, connection); + + return files; + } catch (error: any) { + if (error.response?.status === 404) { + const internalFolder = await this.prisma.fs_folders.findFirst({ + where: { + remote_id: folderId, + id_connection: connection.id_connection, + }, + select: { + id_fs_folder: true, + remote_was_deleted: true, + }, + }); + if (internalFolder && !internalFolder.remote_was_deleted) { + this.logger.debug( + `Folder ${internalFolder.id_fs_folder} not found in SharePoint, marking as deleted in internal database.`, + ); + await this.sharepointFolderService.handleDeletedFolder( + internalFolder.id_fs_folder, + connection, + ); + } + return []; + } + throw error; + } + } + + private async getLastSyncTime(connectionId: string): Promise { + const lastSync = await this.prisma.fs_files.findFirst({ + where: { id_connection: connectionId }, + orderBy: { remote_modified_at: { sort: 'desc', nulls: 'last' } }, + }); + this.logger.log(`Last sync time: ${lastSync?.remote_modified_at}`); + return lastSync ? lastSync.remote_modified_at : null; + } + + async downloadFile(fileId: string, connection: any): Promise { + const config: AxiosRequestConfig = { + method: 'get', + url: `${connection.account_url}/drive/items/${fileId}/content`, + headers: { + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, }, - ); + responseType: 'arraybuffer', + }; + + const response: AxiosResponse = await this.makeRequestWithRetry(config); return Buffer.from(response.data); } + + private async makeRequestWithRetry( + config: AxiosRequestConfig, + ): Promise { + let attempts = 0; + let backoff: number = this.INITIAL_BACKOFF_MS; + + while (attempts < this.MAX_RETRIES) { + try { + const response: AxiosResponse = await axios(config); + return response; + } catch (error: any) { + attempts++; + + // Handle rate limiting + if (error.response && error.response.status === 429) { + const retryAfter: number = this.getRetryAfter( + error.response.headers['retry-after'], + ); + const delayTime: number = Math.max(retryAfter * 1000, backoff); + + this.logger.warn( + `Rate limit hit. Retrying request in ${delayTime}ms (Attempt ${attempts}/${this.MAX_RETRIES})`, + ); + + await this.delay(delayTime); + backoff *= 2; // Exponential backoff + continue; + } + + // Handle timeout errors + if ( + error.code === 'ECONNABORTED' || + error.code === 'ETIMEDOUT' || + error.response?.code === 'ETIMEDOUT' + ) { + const delayTime: number = backoff; + + this.logger.warn( + `Request timeout. Retrying in ${delayTime}ms (Attempt ${attempts}/${this.MAX_RETRIES})`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + // Handle server errors (500+) + if (error.response && error.response.status >= 500) { + const delayTime: number = backoff; + + this.logger.warn( + `Server error ${error.response.status}. Retrying in ${delayTime}ms (Attempt ${attempts}/${this.MAX_RETRIES})`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + // handle 410 gone errors + if (error.response?.status === 410 && config.url.includes('delta')) { + // todo: handle 410 gone errors + } + + throw error; + } + } + + this.logger.error( + 'Max retry attempts reached. Request failed.', + SharepointService.name, + ); + throw new Error('Max retry attempts reached.'); + } + + private getRetryAfter(retryAfterHeader: string | undefined): number { + if (!retryAfterHeader) { + return 1; + } + const retryAfterSeconds: number = parseInt(retryAfterHeader, 10); + return isNaN(retryAfterSeconds) ? 1 : retryAfterSeconds; + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/packages/api/src/filestorage/file/services/sharepoint/mappers.ts b/packages/api/src/filestorage/file/services/sharepoint/mappers.ts index 899638c53..eb2023d0c 100644 --- a/packages/api/src/filestorage/file/services/sharepoint/mappers.ts +++ b/packages/api/src/filestorage/file/services/sharepoint/mappers.ts @@ -88,48 +88,25 @@ export class SharepointFileMapper implements IFileMapper { } } - const opts: any = {}; - if (file.permissions?.length) { - const permissions = await this.coreUnificationService.unify< - OriginalPermissionOutput[] - >({ - sourceObject: file.permissions, - targetType: FileStorageObject.permission, - providerName: 'sharepoint', - vertical: 'filestorage', - connectionId, - customFieldMappings: [], - }); - opts.permissions = permissions; - - // shared link - if (file.permissions.some((p) => p.link)) { - const sharedLinks = - await this.coreUnificationService.unify({ - sourceObject: file.permissions.find((p) => p.link), - targetType: FileStorageObject.sharedlink, - providerName: 'sharepoint', - vertical: 'filestorage', - connectionId, - customFieldMappings: [], - }); - opts.shared_links = sharedLinks; - } - } - - // todo: handle folder - return { remote_id: file.id, remote_data: file, + remote_folder_id: file.parentReference?.id, + remote_drive_id: file.driveId || file?.parentReference?.driveId || null, + remote_created_at: file.createdDateTime + ? new Date(file.createdDateTime) + : null, + remote_modified_at: file.lastModifiedDateTime + ? new Date(file.lastModifiedDateTime) + : null, + remote_was_deleted: file.deleted ? true : false, name: file.name, file_url: file.webUrl, mime_type: file.file.mimeType, size: file.size.toString(), folder_id: null, - // permission: opts.permissions?.[0] || null, - permissions: null, - shared_link: opts.shared_links?.[0] || null, + permissions: file.internal_permissions, + shared_link: null, field_mappings, }; } diff --git a/packages/api/src/filestorage/file/services/sharepoint/processor.ts b/packages/api/src/filestorage/file/services/sharepoint/processor.ts new file mode 100644 index 000000000..25f8ac02a --- /dev/null +++ b/packages/api/src/filestorage/file/services/sharepoint/processor.ts @@ -0,0 +1,21 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { Job } from 'bull'; +import { Queues } from '@@core/@core-services/queues/types'; +import { SharepointService } from '.'; + +@Injectable() +@Processor(Queues.THIRD_PARTY_DATA_INGESTION) +export class SharepointQueueProcessor { + constructor(private readonly sharepointService: SharepointService) {} + + @Process({ name: 'fs_file_sharepoint', concurrency: 1 }) + async handleSharePointSync(job: Job): Promise { + try { + await this.sharepointService.processBatch(job); + } catch (error) { + console.error(`Failed to process SharePoint sync job: ${error.message}`); + throw error; + } + } +} diff --git a/packages/api/src/filestorage/file/services/sharepoint/types.ts b/packages/api/src/filestorage/file/services/sharepoint/types.ts index 32c34810a..46ba3fdb0 100644 --- a/packages/api/src/filestorage/file/services/sharepoint/types.ts +++ b/packages/api/src/filestorage/file/services/sharepoint/types.ts @@ -57,6 +57,9 @@ export interface SharepointFileOutput { readonly video?: Video; /** WebDAV compatible URL for the item. */ readonly webDavUrl?: string; + + // INTERNAL FIELDS + internal_permissions?: string[]; } /** diff --git a/packages/api/src/filestorage/folder/folder.module.ts b/packages/api/src/filestorage/folder/folder.module.ts index ce09e9200..080b6c0cf 100644 --- a/packages/api/src/filestorage/folder/folder.module.ts +++ b/packages/api/src/filestorage/folder/folder.module.ts @@ -43,6 +43,6 @@ import { FileModule } from '../file/file.module'; DropboxFolderMapper, GoogleDriveFolderService, ], - exports: [SyncService, OnedriveService], + exports: [SyncService, OnedriveService, SharepointService], }) export class FolderModule {} diff --git a/packages/api/src/filestorage/folder/services/sharepoint/index.ts b/packages/api/src/filestorage/folder/services/sharepoint/index.ts index b83c4bf40..57d02f3ac 100644 --- a/packages/api/src/filestorage/folder/services/sharepoint/index.ts +++ b/packages/api/src/filestorage/folder/services/sharepoint/index.ts @@ -1,17 +1,27 @@ -import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; -import { LoggerService } from '@@core/@core-services/logger/logger.service'; +import { Injectable } from '@nestjs/common'; +import { IFolderService } from '@filestorage/folder/types'; +import { FileStorageObject } from '@filestorage/@lib/@types'; +import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; -import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; +import { LoggerService } from '@@core/@core-services/logger/logger.service'; +import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; import { ApiResponse } from '@@core/utils/types'; -import { SyncParam } from '@@core/utils/types/interface'; -import { FileStorageObject } from '@filestorage/@lib/@types'; -import { IFolderService } from '@filestorage/folder/types'; -import { Injectable } from '@nestjs/common'; -import axios from 'axios'; import { ServiceRegistry } from '../registry.service'; import { SharepointFolderInput, SharepointFolderOutput } from './types'; +import { SyncParam } from '@@core/utils/types/interface'; +import { IngestDataService } from '@@core/@core-services/unification/ingest-data.service'; +import { UnifiedFilestorageFileOutput } from '@filestorage/file/types/model.unified'; +import { v4 as uuidv4 } from 'uuid'; +import { Connection } from '@@core/connections/@utils/types'; +import { fs_folders } from '@prisma/client'; +import { SharepointPermissionOutput } from '@filestorage/permission/services/sharepoint/types'; +import { UnifiedFilestoragePermissionOutput } from '@filestorage/permission/types/model.unified'; + @Injectable() export class SharepointService implements IFolderService { + private readonly MAX_RETRIES: number = 6; + private readonly INITIAL_BACKOFF_MS: number = 1000; + constructor( private prisma: PrismaService, private logger: LoggerService, @@ -38,40 +48,48 @@ export class SharepointService implements IFolderService { }, }); - // Currently adding in root folder, might need to change - const resp = await axios.post( - `${connection.account_url}/drive/root/children`, - JSON.stringify({ + if (!connection) { + return { + data: null, + message: 'Connection not found', + statusCode: 404, + }; + } + + const config: AxiosRequestConfig = { + method: 'post', + url: `${connection.account_url}/drive/items/${folderData.parentReference?.id}/children`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + data: { name: folderData.name, folder: {}, - '@microsoft.graph.conflictBehavior': 'rename', // 'rename' | 'fail' | 'replace' - }), - { - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, + '@microsoft.graph.conflictBehavior': 'rename', }, - ); + }; + + const response = await this.makeRequestWithRetry(config); return { - data: resp.data, - message: 'Sharepoint folder created', + data: response.data, + message: 'SharePoint folder created successfully', statusCode: 201, }; } catch (error) { - console.log(error.response?.data); + this.logger.error('Error creating SharePoint folder', error); throw error; } } - async iterativeGetSharepointFolders( - remote_folder_id: string, - linkedUserId: string, - ): Promise { + async sync(data: SyncParam): Promise> { try { + this.logger.log('Syncing SharePoint folders'); + const { linkedUserId } = data; + const connection = await this.prisma.connections.findFirst({ where: { id_linked_user: linkedUserId, @@ -80,118 +98,625 @@ export class SharepointService implements IFolderService { }, }); - // get root folder - const rootFolderData = await axios.get( - `${connection.account_url}/drive/root`, - { - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, - }, + if (!connection) { + throw new Error('SharePoint connection not found.'); + } + + const lastSyncTime = await this.getLastSyncTime(connection.id_connection); + + let folders; + + if (!lastSyncTime) { + folders = await this.iterativeGetSharepointFolders(connection); + } else { + this.logger.log('Syncing SharePoint folders incrementally'); + folders = await this.incrementalGetSharepointFolders( + connection, + lastSyncTime, + ); + } + + this.logger.log(`Ingesting permissions for ${folders.length} folders`); + + await this.ingestPermissionsForFolders(folders, connection); + + this.logger.log( + `${folders.length} SharePoint folders synced successfully.`, ); - let result = [rootFolderData.data], - depth = 0, - batch = [remote_folder_id]; + return { + data: folders, + message: 'SharePoint folders synced', + statusCode: 200, + }; + } catch (error) { + this.logger.error('Error in SharePoint sync:', error); + throw error; + } + } - while (batch.length > 0) { - if (depth > 5) { - // todo: handle this better - break; - } + async iterativeGetSharepointFolders( + connection: Connection, + ): Promise { + const result: SharepointFolderOutput[] = []; - const nestedFolders = await Promise.all( - batch.map(async (folder_id) => { - const resp = await axios.get( - `${connection.account_url}/drive/items/${folder_id}/children`, - { - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, + // Get root folder first + const rootConfig: AxiosRequestConfig = { + timeout: 30000, + method: 'get', + url: `${connection.account_url}/drive/root/children`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }; + + const rootResp = await this.makeRequestWithRetry(rootConfig); + const rootFolders: SharepointFolderOutput[] = rootResp.data.value.filter( + (driveItem: any) => driveItem.folder, + ); + + result.push( + ...rootFolders.map((f) => ({ + ...f, + internal_id: uuidv4(), + internal_parent_folder_id: null, + })), + ); + + let batch = rootFolders.map((folder) => ({ + remote_folder_id: folder.id, + internal_id: folder.internal_id, + internal_parent_folder_id: null, + })); + + while (batch.length > 0) { + const nestedFoldersPromises = batch.map( + async (folder: { + remote_folder_id: string; + internal_id: string; + internal_parent_folder_id: string | null; + }) => { + try { + const childrenConfig: AxiosRequestConfig = { + timeout: 30000, + method: 'get', + url: `${connection.account_url}/drive/items/${folder.remote_folder_id}/children`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, }, - ); + }; - // Add permissions (shared link is also included in permissions in SharePoint) - // await Promise.all( - // resp.data.value.map(async (driveItem) => { - // const resp = await axios.get( - // `${connection.account_url}/drive/items/${driveItem.id}/permissions`, - // { - // headers: { - // 'Content-Type': 'application/json', - // Authorization: `Bearer ${this.cryptoService.decrypt( - // connection.access_token, - // )}`, - // }, - // }, - // ); - // driveItem.permissions = resp.data.value; - // }), - // ); - - const folders = resp.data.value.filter( - (driveItem) => driveItem.folder, + const resp = await this.makeRequestWithRetry(childrenConfig); + const folders: SharepointFolderOutput[] = resp.data.value.filter( + (driveItem: any) => driveItem.folder, ); - // const files = resp.data.value.filter( - // (driveItem) => !driveItem.folder, - // ); - - // await this.ingestService.ingestData< - // UnifiedFilestorageFileOutput, - // SharepointFileOutput - // >( - // files, - // 'sharepoint', - // connection.id_connection, - // 'filestorage', - // FileStorageObject.file, - // ); - - return folders; - }), + return folders.map((f) => ({ + ...f, + internal_id: uuidv4(), + internal_parent_folder_id: folder.internal_id, + })); + } catch (error: any) { + if (error.response?.status === 404) { + const f = await this.prisma.fs_folders.findFirst({ + where: { + remote_id: folder.remote_folder_id, + id_connection: connection.id_connection, + }, + select: { + id_fs_folder: true, + }, + }); + await this.handleDeletedFolder(f.id_fs_folder, connection); + return []; + } + throw error; + } + }, + ); + + const nestedFolders = await Promise.all(nestedFoldersPromises); + result.push(...nestedFolders.flat()); + + batch = nestedFolders.flat().map((folder) => ({ + remote_folder_id: folder.id, + internal_id: folder.internal_id, + internal_parent_folder_id: folder.internal_parent_folder_id, + })); + + if (batch.length > 0) { + await this.delay(1000); // Prevent rate limiting + } + } + + return result; + } + + async incrementalGetSharepointFolders( + connection: Connection, + lastSyncTime: Date, + ): Promise { + let deltaLink = `${ + connection.account_url + }/drive/root/delta?token=${lastSyncTime.toISOString()}`; + + const deltaConfig: AxiosRequestConfig = { + timeout: 30000, + method: 'get', + url: deltaLink, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }; + + const sharepointFolders: SharepointFolderOutput[] = []; + + do { + const deltaResponse = await this.makeRequestWithRetry(deltaConfig); + if (!deltaResponse.data.value) { + break; + } + deltaLink = deltaResponse.data.deltaLink; + sharepointFolders.push( + ...deltaResponse.data.value.filter((f: any) => f.folder), + ); + } while (deltaLink); + + // Sort folders by lastModifiedDateTime in descending order (newest first) + const sortedFolders = [...sharepointFolders].sort((a, b) => { + const dateA = new Date(a.lastModifiedDateTime).getTime(); + const dateB = new Date(b.lastModifiedDateTime).getTime(); + return dateB - dateA; + }); + + const uniqueFolders = sortedFolders.reduce((acc, folder) => { + if (!acc.has(folder.id)) { + acc.set(folder.id, folder); + } + return acc; + }, new Map()); + + const deletedFolders = Array.from(uniqueFolders.values()).filter( + (f) => f.deleted, + ); + const updatedFolders = Array.from(uniqueFolders.values()).filter( + (f) => !f.deleted, + ); + + const foldersToSync = await this.assignParentIds( + updatedFolders, + connection, + ); + + return [...foldersToSync, ...deletedFolders]; + } + + private async assignParentIds( + folders: SharepointFolderOutput[], + connection: Connection, + ): Promise { + const folderIdToInternalIdMap: Map = new Map(); + const foldersToSync: SharepointFolderOutput[] = []; + let remainingFolders: SharepointFolderOutput[] = [...folders]; + const parentLookupCache: Map = new Map(); + + while (remainingFolders.length > 0) { + const foldersStillPending: SharepointFolderOutput[] = []; + + for (const folder of remainingFolders) { + const parentId = folder.parentReference?.id || 'root'; + const internalParentId = await this.resolveParentId( + parentId, + folderIdToInternalIdMap, + connection.id_connection, + parentLookupCache, ); - // nestedFolders = [[subfolder1, subfolder2], [subfolder3, subfolder4]] - result = result.concat(nestedFolders.flat()); - batch = nestedFolders.flat().map((folder) => folder.id); - this.logger.log(`Batch size: ${batch.length} at depth ${depth}`); - depth++; + if (internalParentId) { + const folder_internal_id = await this.getIntenalIdForFolder( + folder.id, + connection, + ); + foldersToSync.push({ + ...folder, + internal_parent_folder_id: + internalParentId === 'root' ? null : internalParentId, + internal_id: folder_internal_id, + }); + folderIdToInternalIdMap.set(folder.id, folder_internal_id); + } else { + foldersStillPending.push(folder); + } } - return result; - } catch (error) { - throw error; + if (foldersStillPending.length === remainingFolders.length) { + const remote_folders = new Map( + foldersToSync.map((folder) => [folder.id, folder]), + ); + + await this.handleUnresolvedFolders( + foldersStillPending, + foldersToSync, + remote_folders, + parentLookupCache, + folderIdToInternalIdMap, + connection, + ); + break; + } + + remainingFolders = foldersStillPending; } + + return foldersToSync; } - async sync(data: SyncParam): Promise> { - try { - this.logger.log('Syncing sharepoint folders'); - const { linkedUserId } = data; + private async getIntenalIdForFolder( + folderId: string, + connection: Connection, + ): Promise { + const folder = await this.prisma.fs_folders.findFirst({ + where: { remote_id: folderId, id_connection: connection.id_connection }, + select: { id_fs_folder: true }, + }); + return folder?.id_fs_folder || uuidv4(); + } + + private async resolveParentId( + parentId: string, + idMap: Map, + connectionId: string, + cache: Map, + ): Promise { + if (parentId === 'root') return 'root'; + if (cache.has(parentId)) return cache.get(parentId)!; + if (idMap.has(parentId)) return idMap.get(parentId)!; + + const folder = await this.prisma.fs_folders.findFirst({ + where: { + remote_id: parentId, + id_connection: connectionId, + }, + select: { + id_fs_folder: true, + }, + }); + + const result = folder?.id_fs_folder || null; + cache.set(parentId, result); + return result; + } + + private async handleUnresolvedFolders( + pending: SharepointFolderOutput[], + output: SharepointFolderOutput[], + remote_folders: Map, + parentLookupCache: Map, + idCache: Map, + connection: Connection, + ): Promise { + this.logger.warn( + `${pending.length} folders could not be resolved in the initial pass. Attempting to resolve remaining folders.`, + ); + + const getInternalParentRecursive = async ( + folder: SharepointFolderOutput, + visitedIds: Set = new Set(), + ): Promise => { + const remote_parent_id = folder.parentReference?.id || 'root'; - const folders = await this.iterativeGetSharepointFolders( - 'root', - linkedUserId, + if (visitedIds.has(remote_parent_id)) { + this.logger.warn(`Circular reference detected for folder ${folder.id}`); + return null; + } + + visitedIds.add(remote_parent_id); + + const internal_parent_id = await this.resolveParentId( + remote_parent_id, + idCache, + connection.id_connection, + parentLookupCache, ); - this.logger.log(`${folders.length} sharepoint folders found`); - this.logger.log(`Synced sharepoint folders !`); + if (internal_parent_id) { + return internal_parent_id; + } - return { - data: folders, - message: 'Sharepoint folders synced', - statusCode: 200, - }; - } catch (error) { - this.logger.log('Error in sharepoint sync '); - throw error; + try { + const parentFolder = + remote_folders.get(remote_parent_id) || + (await this.makeRequestWithRetry({ + timeout: 30000, + method: 'get', + url: `${connection.account_url}/drive/items/${remote_parent_id}`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + })); + + if (!parentFolder) { + return null; + } + + return getInternalParentRecursive( + parentFolder as SharepointFolderOutput, + visitedIds, + ); + } catch (error) { + this.logger.error( + `Failed to resolve parent for folder ${folder.id}`, + error, + ); + return null; + } + }; + + await Promise.all( + pending.map(async (folder) => { + const internal_parent_id = await getInternalParentRecursive(folder); + const folder_internal_id = uuidv4(); + idCache.set(folder.id, folder_internal_id); + output.push({ + ...folder, + internal_parent_folder_id: internal_parent_id, + internal_id: folder_internal_id, + }); + }), + ); + } + + private async ingestPermissionsForFolders( + allFolders: SharepointFolderOutput[], + connection: Connection, + ): Promise { + const allPermissions: SharepointPermissionOutput[] = []; + const folderIdToRemotePermissionIdMap: Map = new Map(); + const batchSize = 4; + + const folders = allFolders.filter((f) => !f.deleted); + + for (let i = 0; i < folders.length; i += batchSize) { + const batch = folders.slice(i, i + batchSize); + const permissions = await Promise.all( + batch.map(async (folder) => { + const permissionConfig: AxiosRequestConfig = { + timeout: 30000, + method: 'get', + url: `${connection.account_url}/drive/items/${folder.id}/permissions`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }; + + const resp = await this.makeRequestWithRetry(permissionConfig); + const permissions = resp.data.value; + folderIdToRemotePermissionIdMap.set( + folder.id, + permissions.map((p) => p.id), + ); + return permissions; + }), + ); + + this.delay(1000); + + allPermissions.push(...permissions.flat()); } + + const uniquePermissions = Array.from( + new Map( + allPermissions.map((permission) => [permission.id, permission]), + ).values(), + ); + + await this.assignUserAndGroupIdsToPermissions(uniquePermissions); + + const syncedPermissions = await this.ingestService.ingestData< + UnifiedFilestoragePermissionOutput, + SharepointPermissionOutput + >( + uniquePermissions, + 'sharepoint', + connection.id_connection, + 'filestorage', + 'permission', + ); + + this.logger.log( + `Ingested ${syncedPermissions.length} permissions for folders.`, + ); + + const permissionIdMap: Map = new Map( + syncedPermissions.map((permission) => [ + permission.remote_id, + permission.id_fs_permission, + ]), + ); + + folders.forEach((folder) => { + if (folderIdToRemotePermissionIdMap.has(folder.id)) { + folder.internal_permissions = folderIdToRemotePermissionIdMap + .get(folder.id) + ?.map((permission) => permissionIdMap.get(permission)) + .filter((id) => id !== undefined); + } + }); + + return allFolders; + } + + private async assignUserAndGroupIdsToPermissions( + permissions: SharepointPermissionOutput[], + ): Promise { + const userLookupCache: Map = new Map(); + const groupLookupCache: Map = new Map(); + + for (const permission of permissions) { + if (permission.grantedToV2?.user?.id) { + const remote_user_id = permission.grantedToV2.user.id; + if (userLookupCache.has(remote_user_id)) { + permission.internal_user_id = userLookupCache.get(remote_user_id); + continue; + } + const user = await this.prisma.fs_users.findFirst({ + where: { + remote_id: remote_user_id, + }, + select: { + id_fs_user: true, + }, + }); + if (user) { + permission.internal_user_id = user.id_fs_user; + userLookupCache.set(remote_user_id, user.id_fs_user); + } + } + + if (permission.grantedToV2?.group?.id) { + const remote_group_id = permission.grantedToV2.group.id; + if (groupLookupCache.has(remote_group_id)) { + permission.internal_group_id = groupLookupCache.get(remote_group_id); + continue; + } + const group = await this.prisma.fs_groups.findFirst({ + where: { + remote_id: remote_group_id, + }, + select: { + id_fs_group: true, + }, + }); + if (group) { + permission.internal_group_id = group.id_fs_group; + groupLookupCache.set(remote_group_id, group.id_fs_group); + } + } + } + } + + async handleDeletedFolder( + folderId: string, + connection: Connection, + ): Promise { + await this.prisma.fs_folders.update({ + where: { + id_fs_folder: folderId, + }, + data: { + remote_was_deleted: true, + }, + }); + } + + private async getLastSyncTime(connectionId: string): Promise { + const lastSync = await this.prisma.fs_folders.findFirst({ + where: { id_connection: connectionId }, + orderBy: { remote_modified_at: { sort: 'desc', nulls: 'last' } }, + }); + this.logger.log(`Last sync time: ${lastSync?.remote_modified_at}`); + return lastSync ? lastSync.remote_modified_at : null; + } + + private async makeRequestWithRetry( + config: AxiosRequestConfig, + ): Promise { + let attempts = 0; + let backoff: number = this.INITIAL_BACKOFF_MS; + + while (attempts < this.MAX_RETRIES) { + try { + const response: AxiosResponse = await axios(config); + return response; + } catch (error: any) { + attempts++; + + // Handle rate limiting + if (error.response && error.response.status === 429) { + const retryAfter: number = this.getRetryAfter( + error.response.headers['retry-after'], + ); + const delayTime: number = Math.max(retryAfter * 1000, backoff); + + this.logger.warn( + `Rate limit hit. Retrying request in ${delayTime}ms (Attempt ${attempts}/${this.MAX_RETRIES})`, + ); + + await this.delay(delayTime); + backoff *= 2; // Exponential backoff + continue; + } + + // Handle timeout errors + if ( + error.code === 'ECONNABORTED' || + error.code === 'ETIMEDOUT' || + error.response?.code === 'ETIMEDOUT' + ) { + const delayTime: number = backoff; + + this.logger.warn( + `Request timeout. Retrying in ${delayTime}ms (Attempt ${attempts}/${this.MAX_RETRIES})`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + // Handle server errors (500+) + if (error.response && error.response.status >= 500) { + const delayTime: number = backoff; + + this.logger.warn( + `Server error ${error.response.status}. Retrying in ${delayTime}ms (Attempt ${attempts}/${this.MAX_RETRIES})`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + // handle 410 gone errors + if (error.response?.status === 410 && config.url.includes('delta')) { + // todo: handle 410 gone errors + } + + throw error; + } + } + + this.logger.error( + 'Max retry attempts reached. Request failed.', + SharepointService.name, + ); + throw new Error('Max retry attempts reached.'); + } + + private getRetryAfter(retryAfterHeader: string | undefined): number { + if (!retryAfterHeader) { + return 1; + } + const retryAfterSeconds: number = parseInt(retryAfterHeader, 10); + return isNaN(retryAfterSeconds) ? 1 : retryAfterSeconds; + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } } diff --git a/packages/api/src/filestorage/folder/services/sharepoint/mappers.ts b/packages/api/src/filestorage/folder/services/sharepoint/mappers.ts index 1a51f2d0d..b5ce7b5c4 100644 --- a/packages/api/src/filestorage/folder/services/sharepoint/mappers.ts +++ b/packages/api/src/filestorage/folder/services/sharepoint/mappers.ts @@ -124,19 +124,25 @@ export class SharepointFolderMapper implements IFolderMapper { } const result = { + id: folder.internal_id ?? null, + parent_folder_id: folder.internal_parent_folder_id ?? null, remote_id: folder.id, remote_data: folder, + remote_drive_id: + folder.driveId || folder?.parentReference?.driveId || null, + remote_created_at: folder.createdDateTime + ? new Date(folder.createdDateTime) + : null, + remote_modified_at: folder.lastModifiedDateTime + ? new Date(folder.lastModifiedDateTime) + : null, + remote_was_deleted: folder.deleted ? true : false, name: folder.name, folder_url: folder.webUrl, description: folder.description, drive_id: null, - parent_folder_id: await this.utils.getFolderIdFromRemote( - folder.parentReference?.id, - connectionId, - ), - // permission: opts.permissions?.[0] || null, - permissions: null, - size: folder.size.toString(), + permissions: folder.internal_permissions, + size: folder.size?.toString() || '0', shared_link: opts.shared_links?.[0] || null, field_mappings, }; diff --git a/packages/api/src/filestorage/folder/services/sharepoint/types.ts b/packages/api/src/filestorage/folder/services/sharepoint/types.ts index dcd4a7afb..19198c2f4 100644 --- a/packages/api/src/filestorage/folder/services/sharepoint/types.ts +++ b/packages/api/src/filestorage/folder/services/sharepoint/types.ts @@ -57,6 +57,11 @@ export interface SharepointFolderInput { readonly lastModifiedBy?: IdentitySet; /** If this property is non-null, it indicates that the driveItem is the top-most driveItem in the drive. */ readonly root?: any; + + // internal fields + internal_id?: string; + internal_parent_folder_id?: string; + internal_permissions?: string[]; } /** diff --git a/packages/api/src/filestorage/group/services/sharepoint/index.ts b/packages/api/src/filestorage/group/services/sharepoint/index.ts index f839eccdf..2f45529ef 100644 --- a/packages/api/src/filestorage/group/services/sharepoint/index.ts +++ b/packages/api/src/filestorage/group/services/sharepoint/index.ts @@ -6,12 +6,16 @@ import { SyncParam } from '@@core/utils/types/interface'; import { FileStorageObject } from '@filestorage/@lib/@types'; import { IGroupService } from '@filestorage/group/types'; import { Injectable } from '@nestjs/common'; -import axios from 'axios'; +import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { ServiceRegistry } from '../registry.service'; import { SharepointGroupOutput } from './types'; +import { connections } from '@prisma/client'; @Injectable() export class SharepointService implements IGroupService { + private readonly MAX_RETRIES: number = 6; + private readonly INITIAL_BACKOFF_MS: number = 1000; + constructor( private prisma: PrismaService, private logger: LoggerService, @@ -27,6 +31,7 @@ export class SharepointService implements IGroupService { async sync(data: SyncParam): Promise> { try { const { linkedUserId } = data; + const connection = await this.prisma.connections.findFirst({ where: { id_linked_user: linkedUserId, @@ -34,28 +39,154 @@ export class SharepointService implements IGroupService { vertical: 'filestorage', }, }); - // remove /sites/site_id from account_url + + // remove /sites/site_id from account_url for accessing groups endpoint const url = connection.account_url.replace(/\/sites\/.+$/, ''); - // ref: https://learn.microsoft.com/en-us/graph/api/user-list?view=graph-rest-1.0&tabs=http - const resp = await axios.get(`${url}/groups`, { + const config: AxiosRequestConfig = { + method: 'get', + url: `${url}/groups`, headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${this.cryptoService.decrypt( connection.access_token, )}`, }, - }); + }; + + const resp: AxiosResponse = await this.makeRequestWithRetry(config); + const groups: SharepointGroupOutput[] = resp.data.value; + await this.assignUsersForGroups(groups, connection); - this.logger.log(`Synced sharepoint groups !`); + this.logger.log( + `Synced ${groups.length} SharePoint groups successfully.`, + ); return { - data: resp.data.value, - message: 'Sharepoint groups retrieved', + data: groups, + message: 'SharePoint groups retrieved successfully', statusCode: 200, }; - } catch (error) { + } catch (error: any) { + this.logger.error( + `Error syncing SharePoint groups: ${error.message}`, + error, + ); throw error; } } + + private async assignUsersForGroups( + groups: SharepointGroupOutput[], + connection: connections, + ) { + const userLookupCache: Map = new Map(); + + for (const group of groups) { + const users = await this.fetchUsersForGroup(group, connection); + const internalUsers = await Promise.all( + users.map(async (user) => { + if (userLookupCache.has(user.id)) { + return userLookupCache.get(user.id); + } + const internalUser = await this.prisma.fs_users.findFirst({ + where: { + remote_id: user.id, + }, + select: { + id_fs_user: true, + }, + }); + userLookupCache.set(user.id, internalUser?.id_fs_user || null); + return internalUser?.id_fs_user || null; + }), + ); + group.internal_users = internalUsers.filter( + (user) => user !== null, + ) as string[]; + } + } + + private async fetchUsersForGroup( + group: SharepointGroupOutput, + connection: connections, + ) { + const url = connection.account_url.replace(/\/sites\/.+$/, ''); + const config: AxiosRequestConfig = { + timeout: 10000, + method: 'get', + url: `${url}/groups/${group.id}/members`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, + }, + }; + + const resp: AxiosResponse = await this.makeRequestWithRetry(config); + return resp.data.value; + } + + private async makeRequestWithRetry( + config: AxiosRequestConfig, + ): Promise { + let attempts = 0; + let backoff: number = this.INITIAL_BACKOFF_MS; + + while (attempts < this.MAX_RETRIES) { + try { + const response: AxiosResponse = await axios(config); + return response; + } catch (error: any) { + attempts++; + + if ( + (error.response && error.response.status === 429) || + (error.response && error.response.status >= 500) || + error.code === 'ECONNABORTED' || + error.code === 'ETIMEDOUT' || + error.response?.code === 'ETIMEDOUT' + ) { + const retryAfter = this.getRetryAfter( + error.response?.headers['retry-after'], + ); + const delayTime: number = Math.max(retryAfter * 1000, backoff); + + this.logger.warn( + `Request failed with ${ + error.code || error.response?.status + }. Retrying in ${delayTime}ms (Attempt ${attempts}/${ + this.MAX_RETRIES + })`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + this.logger.error(`Request failed: ${error.message}`, error); + throw error; + } + } + + this.logger.error( + 'Max retry attempts reached. Request failed.', + SharepointService.name, + ); + throw new Error('Max retry attempts reached.'); + } + + private getRetryAfter(retryAfterHeader: string | undefined): number { + if (!retryAfterHeader) { + return 1; + } + const retryAfterSeconds: number = parseInt(retryAfterHeader, 10); + return isNaN(retryAfterSeconds) ? 1 : retryAfterSeconds; + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/packages/api/src/filestorage/group/services/sharepoint/mappers.ts b/packages/api/src/filestorage/group/services/sharepoint/mappers.ts index 3cd197384..5cbab4f2b 100644 --- a/packages/api/src/filestorage/group/services/sharepoint/mappers.ts +++ b/packages/api/src/filestorage/group/services/sharepoint/mappers.ts @@ -44,7 +44,6 @@ export class SharepointGroupMapper implements IGroupMapper { customFieldMappings, ); } - // Handling array of SharepointGroupOutput return Promise.all( source.map((group) => this.mapSingleGroupToUnified(group, connectionId, customFieldMappings), @@ -67,16 +66,13 @@ export class SharepointGroupMapper implements IGroupMapper { } } - // todo: do something about users - // https://graph.microsoft.com/groups/group-id/members - return { remote_id: group.id, remote_data: group, - name: group.mailNickname, + name: group.displayName || group.mailNickname, remote_was_deleted: group.deletedDateTime !== null, field_mappings, - users: [], + users: group.internal_users || [], }; } } diff --git a/packages/api/src/filestorage/group/services/sharepoint/types.ts b/packages/api/src/filestorage/group/services/sharepoint/types.ts index a54c0b5e6..ca065406d 100644 --- a/packages/api/src/filestorage/group/services/sharepoint/types.ts +++ b/packages/api/src/filestorage/group/services/sharepoint/types.ts @@ -125,6 +125,12 @@ export interface SharepointGroupInput { * /groups?$filter=startsWith(displayName,'Role')&$select=id,displayName&$expand=owners($select=id,userPrincipalName,displayName). */ owners?: DirectoryObject[]; + + // INTERNAL + /** + * Internal UUIDs of users that are members of this group. + */ + internal_users?: string[]; } /** diff --git a/packages/api/src/filestorage/permission/services/sharepoint/index.ts b/packages/api/src/filestorage/permission/services/sharepoint/index.ts index 2168542de..ca1f13319 100644 --- a/packages/api/src/filestorage/permission/services/sharepoint/index.ts +++ b/packages/api/src/filestorage/permission/services/sharepoint/index.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { IPermissionService } from '@filestorage/permission/types'; import { FileStorageObject } from '@panora/shared'; -import axios from 'axios'; +import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { PrismaService } from '@@core/@core-services/prisma/prisma.service'; import { LoggerService } from '@@core/@core-services/logger/logger.service'; import { EncryptionService } from '@@core/@core-services/encryption/encryption.service'; @@ -12,6 +12,9 @@ import { SyncParam } from '@@core/utils/types/interface'; @Injectable() export class SharepointService implements IPermissionService { + private readonly MAX_RETRIES: number = 6; + private readonly INITIAL_BACKOFF_MS: number = 1000; + constructor( private prisma: PrismaService, private logger: LoggerService, @@ -29,7 +32,6 @@ export class SharepointService implements IPermissionService { ): Promise> { try { const { linkedUserId, extra } = data; - // TODO: where it comes from ?? extra?: { object_name: 'folder' | 'file'; value: string }, const connection = await this.prisma.connections.findFirst({ where: { @@ -38,43 +40,115 @@ export class SharepointService implements IPermissionService { vertical: 'filestorage', }, }); + let remote_id; - if (extra.object_name == 'folder') { - const a = await this.prisma.fs_folders.findUnique({ + if (extra.object_name === 'folder') { + const folder = await this.prisma.fs_folders.findUnique({ where: { id_fs_folder: extra.value, }, }); - remote_id = a.remote_id; - } - if (extra.object_name == 'file') { - const a = await this.prisma.fs_files.findUnique({ + remote_id = folder.remote_id; + } else if (extra.object_name === 'file') { + const file = await this.prisma.fs_files.findUnique({ where: { id_fs_file: extra.value, }, }); - - remote_id = a.remote_id; + remote_id = file.remote_id; } - const resp = await axios.get( - `${connection.account_url}/drive/items/${remote_id}/permissions`, - { - headers: { - Authorization: `Bearer ${this.cryptoService.decrypt( - connection.access_token, - )}`, - }, + const config: AxiosRequestConfig = { + method: 'get', + url: `${connection.account_url}/drive/items/${remote_id}/permissions`, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.cryptoService.decrypt( + connection.access_token, + )}`, }, + }; + + const response = await this.makeRequestWithRetry(config); + const permissions = response.data.value; + + this.logger.log( + `Synced ${permissions.length} SharePoint permissions successfully.`, ); return { - data: resp.data.value as SharepointPermissionOutput[], - message: 'Synced sharepoint permissions !', + data: permissions, + message: 'SharePoint permissions retrieved successfully', statusCode: 200, }; - } catch (error) { + } catch (error: any) { + this.logger.error( + `Error syncing SharePoint permissions: ${error.message}`, + error, + ); throw error; } } + + private async makeRequestWithRetry( + config: AxiosRequestConfig, + ): Promise { + let attempts = 0; + let backoff: number = this.INITIAL_BACKOFF_MS; + + while (attempts < this.MAX_RETRIES) { + try { + const response: AxiosResponse = await axios(config); + return response; + } catch (error: any) { + attempts++; + + if ( + (error.response && error.response.status === 429) || + (error.response && error.response.status >= 500) || + error.code === 'ECONNABORTED' || + error.code === 'ETIMEDOUT' || + error.response?.code === 'ETIMEDOUT' + ) { + const retryAfter = this.getRetryAfter( + error.response?.headers['retry-after'], + ); + const delayTime: number = Math.max(retryAfter * 1000, backoff); + + this.logger.warn( + `Request failed with ${ + error.code || error.response?.status + }. Retrying in ${delayTime}ms (Attempt ${attempts}/${ + this.MAX_RETRIES + })`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + this.logger.error(`Request failed: ${error.message}`, error); + throw error; + } + } + + this.logger.error( + 'Max retry attempts reached. Request failed.', + SharepointService.name, + ); + throw new Error('Max retry attempts reached.'); + } + + private getRetryAfter(retryAfterHeader: string | undefined): number { + if (!retryAfterHeader) { + return 1; + } + const retryAfterSeconds: number = parseInt(retryAfterHeader, 10); + return isNaN(retryAfterSeconds) ? 1 : retryAfterSeconds; + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/packages/api/src/filestorage/permission/services/sharepoint/mappers.ts b/packages/api/src/filestorage/permission/services/sharepoint/mappers.ts index 5eb501301..094998c50 100644 --- a/packages/api/src/filestorage/permission/services/sharepoint/mappers.ts +++ b/packages/api/src/filestorage/permission/services/sharepoint/mappers.ts @@ -80,15 +80,15 @@ export class SharepointPermissionMapper implements IPermissionMapper { return { remote_id: permission.id, remote_data: permission, - roles: permission.roles.map((role) => role.toUpperCase()), + roles: permission.roles?.map((role) => role.toUpperCase()), type: permission.link?.type === 'edit' ? 'WRITE' : permission.link?.type === 'view' ? 'READ' : permission.link?.type, - user_id: null, - group_id: null, + user_id: permission.internal_user_id, + group_id: permission.internal_group_id, field_mappings, }; } diff --git a/packages/api/src/filestorage/permission/services/sharepoint/types.ts b/packages/api/src/filestorage/permission/services/sharepoint/types.ts index 1f2639914..6c48dd759 100644 --- a/packages/api/src/filestorage/permission/services/sharepoint/types.ts +++ b/packages/api/src/filestorage/permission/services/sharepoint/types.ts @@ -27,6 +27,10 @@ export interface SharepointPermissionOutput { shareId?: string; /** A format of yyyy-MM-ddTHH:mm:ssZ of DateTimeOffset indicates the expiration time of the permission. DateTime.MinValue indicates there's no expiration set for this permission. Optional. */ expirationDateTime?: string; + + // INTERNAL + internal_user_id?: string; + internal_group_id?: string; } /** diff --git a/packages/api/src/filestorage/user/services/sharepoint/index.ts b/packages/api/src/filestorage/user/services/sharepoint/index.ts index d9bf83e8f..0410290d0 100644 --- a/packages/api/src/filestorage/user/services/sharepoint/index.ts +++ b/packages/api/src/filestorage/user/services/sharepoint/index.ts @@ -6,12 +6,15 @@ import { SyncParam } from '@@core/utils/types/interface'; import { FileStorageObject } from '@filestorage/@lib/@types'; import { IUserService } from '@filestorage/user/types'; import { Injectable } from '@nestjs/common'; -import axios from 'axios'; +import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { ServiceRegistry } from '../registry.service'; import { SharepointUserOutput } from './types'; @Injectable() export class SharepointService implements IUserService { + private readonly MAX_RETRIES: number = 6; + private readonly INITIAL_BACKOFF_MS: number = 1000; + constructor( private prisma: PrismaService, private logger: LoggerService, @@ -39,29 +42,98 @@ export class SharepointService implements IUserService { // remove /sites/site_id from account_url const url = connection.account_url.replace(/\/sites\/.+$/, ''); - // ref: https://learn.microsoft.com/en-us/graph/api/user-list?view=graph-rest-1.0&tabs=http - const resp = await axios.get(`${url}/users`, { + const config: AxiosRequestConfig = { + method: 'get', + url: `${url}/users`, headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${this.cryptoService.decrypt( connection.access_token, )}`, }, - }); + }; - this.logger.log(`Synced sharepoint users !`); + const resp: AxiosResponse = await this.makeRequestWithRetry(config); + const users: SharepointUserOutput[] = resp.data.value; + + this.logger.log( + `Synchronized ${users.length} SharePoint users successfully.`, + ); return { - data: resp.data.value, - message: 'Sharepoint users retrieved', + data: users, + message: 'SharePoint users retrieved successfully.', statusCode: 200, }; - } catch (error) { + } catch (error: any) { this.logger.error( - `Error syncing sharepoint users !`, - error.message || 'Unknown', + `Error syncing SharePoint users: ${error.message}`, + error, + SharepointService.name, ); throw error; } } + + private async makeRequestWithRetry( + config: AxiosRequestConfig, + ): Promise { + let attempts = 0; + let backoff: number = this.INITIAL_BACKOFF_MS; + + while (attempts < this.MAX_RETRIES) { + try { + const response: AxiosResponse = await axios(config); + return response; + } catch (error: any) { + attempts++; + + if ( + (error.response && error.response.status === 429) || + (error.response && error.response.status >= 500) || + error.code === 'ECONNABORTED' || + error.code === 'ETIMEDOUT' || + error.response?.code === 'ETIMEDOUT' + ) { + const retryAfter = this.getRetryAfter( + error.response?.headers['retry-after'], + ); + const delayTime: number = Math.max(retryAfter * 1000, backoff); + + this.logger.warn( + `Request failed with ${ + error.code || error.response?.status + }. Retrying in ${delayTime}ms (Attempt ${attempts}/${ + this.MAX_RETRIES + })`, + ); + + await this.delay(delayTime); + backoff *= 2; + continue; + } + + this.logger.error(`Request failed: ${error.message}`, error); + throw error; + } + } + + this.logger.error( + 'Max retry attempts reached. Request failed.', + SharepointService.name, + ); + throw new Error('Max retry attempts reached.'); + } + + private getRetryAfter(retryAfterHeader: string | undefined): number { + if (!retryAfterHeader) { + return 1; + } + const retryAfterSeconds: number = parseInt(retryAfterHeader, 10); + return isNaN(retryAfterSeconds) ? 1 : retryAfterSeconds; + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } }