diff --git a/connectors/migrations/20241211_fix_gdrive_parents.ts b/connectors/migrations/20241211_fix_gdrive_parents.ts index af1125c2660b..fe18c1bc871d 100644 --- a/connectors/migrations/20241211_fix_gdrive_parents.ts +++ b/connectors/migrations/20241211_fix_gdrive_parents.ts @@ -2,12 +2,11 @@ import { getGoogleSheetTableId } from "@dust-tt/types"; import { makeScript } from "scripts/helpers"; import { Op } from "sequelize"; +import { getDocumentId } from "@connectors/connectors/google_drive/temporal/utils"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { - getDocumentFromDataSource, getFolderNode, getTable, - updateDocumentParentsField, updateTableParentsField, upsertFolderNode, } from "@connectors/lib/data_sources"; @@ -15,11 +14,30 @@ import { GoogleDriveFiles, GoogleDriveSheet, } from "@connectors/lib/models/google_drive"; +import type { Logger } from "@connectors/logger/logger"; import logger from "@connectors/logger/logger"; import { ConnectorModel } from "@connectors/resources/storage/models/connector_model"; const QUERY_BATCH_SIZE = 1024; +function getParents( + fileId: string | null, + parentsMap: Record, + logger: Logger +) { + const parents = []; + let current: string | null = fileId; + while (current) { + parents.push(current); + if (typeof parentsMap[current] === "undefined") { + logger.error({ fileId: current }, "Parent not found"); + return null; + } + current = parentsMap[current] || null; + } + return parents; +} + async function migrate({ connector, execute, @@ -27,7 +45,8 @@ async function migrate({ connector: ConnectorModel; execute: boolean; }) { - logger.info(`Processing connector ${connector.id}...`); + const childLogger = logger.child({ connectorId: connector.id }); + childLogger.info(`Processing connector ${connector.id}...`); const dataSourceConfig = dataSourceConfigFromConnector(connector); const parentsMap: Record = {}; @@ -60,6 +79,9 @@ async function migrate({ id: { [Op.gt]: nextId, }, + mimeType: { + [Op.or]: ["application/vnd.google-apps.folder", "text/csv"], + }, }, order: [["id", "ASC"]], limit: QUERY_BATCH_SIZE, @@ -69,58 +91,57 @@ async function migrate({ for (const file of googleDriveFiles) { const internalId = file.dustFileId; const driveFileId = file.driveFileId; - const parents = [driveFileId]; - let current: string | null = file.parentId; - while (current) { - parents.push(current); - current = parentsMap[current] || null; + const parents = getParents( + file.parentId, + parentsMap, + childLogger.child({ nodeId: driveFileId }) + ); + if (!parents) { + continue; } + parents.unshift(driveFileId); if (file.mimeType === "application/vnd.google-apps.folder") { const folder = await getFolderNode({ dataSourceConfig, folderId: internalId, }); - if (!folder || folder.parents.join("/") !== parents.join("/")) { - logger.info({ folderId: file.driveFileId, parents }, "Upsert folder"); + const newParents = parents.map((id) => getDocumentId(id)); + if (!folder || folder.parents.join("/") !== newParents.join("/")) { + childLogger.info( + { folderId: file.driveFileId, parents: newParents }, + "Upsert folder" + ); if (execute) { // upsert repository as folder await upsertFolderNode({ dataSourceConfig, folderId: file.dustFileId, - parents, + parents: newParents, parentId: file.parentId, title: file.name, }); } } - } else { - const document = await getDocumentFromDataSource({ - dataSourceConfig, - documentId: internalId, - }); - if (document) { - logger.info( - { - documentId: file.driveFileId, - }, - "Found document" - ); - if (document.parents.join("/") !== parents.join("/")) { - logger.info( + } else if (file.mimeType === "text/csv") { + const tableId = internalId; + parents.unshift(...parents.map((id) => getDocumentId(id))); + const table = await getTable({ dataSourceConfig, tableId }); + if (table) { + if (table.parents.join("/") !== parents.join("/")) { + childLogger.info( { - documentId: internalId, + tableId, parents, - previousParents: document.parents, + previousParents: table.parents, }, - "Update parents for document" + "Update parents for table" ); - if (execute) { - await updateDocumentParentsField({ + await updateTableParentsField({ dataSourceConfig, - documentId: file.dustFileId, + tableId, parents, }); } @@ -151,24 +172,23 @@ async function migrate({ sheet.driveFileId, sheet.driveSheetId ); - const parents = [tableId]; - let current: string | null = sheet.driveFileId; - while (current) { - parents.push(current); - current = parentsMap[current] || null; + + const parents = getParents( + sheet.driveFileId, + parentsMap, + childLogger.child({ nodeId: tableId }) + ); + if (!parents) { + continue; } + parents.unshift(...parents.map((id) => getDocumentId(id))); + parents.unshift(tableId); + const table = await getTable({ dataSourceConfig, tableId }); if (table) { - logger.info( - { - tableId: tableId, - }, - "Found table" - ); - if (table.parents.join("/") !== parents.join("/")) { - logger.info( + childLogger.info( { tableId, parents, diff --git a/connectors/migrations/20241212_restore_gdrive_parents.ts b/connectors/migrations/20241212_restore_gdrive_parents.ts new file mode 100644 index 000000000000..ac394bdfc536 --- /dev/null +++ b/connectors/migrations/20241212_restore_gdrive_parents.ts @@ -0,0 +1,64 @@ +import * as fs from "fs"; +import * as readline from "readline"; +import { makeScript } from "scripts/helpers"; + +import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; +import { updateDocumentParentsField } from "@connectors/lib/data_sources"; +import logger from "@connectors/logger/logger"; +import { ConnectorModel } from "@connectors/resources/storage/models/connector_model"; + +interface LogEntry { + msg: string; + documentId: string; + parents: string[]; + previousParents: string[]; +} + +async function processLogFile( + connector: ConnectorModel, + filePath: string, + execute: boolean +) { + const fileStream = fs.createReadStream(filePath); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + const dataSourceConfig = dataSourceConfigFromConnector(connector); + + for await (const line of rl) { + const entry = JSON.parse(line) as LogEntry; + const { msg, documentId, previousParents } = entry; + if ( + msg === "Update parents for document" && + documentId && + previousParents + ) { + logger.info( + { documentId, previousParents }, + "Restoring parent for document" + ); + if (execute) { + await updateDocumentParentsField({ + dataSourceConfig, + documentId: documentId, + parents: previousParents, + }); + } + } + } +} + +makeScript( + { + connectorId: { type: "number", required: true }, + file: { type: "string", required: true }, + }, + async ({ connectorId, file, execute }) => { + const connector = await ConnectorModel.findByPk(connectorId); + if (connector) { + await processLogFile(connector, file, execute); + } + } +); diff --git a/connectors/src/lib/data_sources.ts b/connectors/src/lib/data_sources.ts index 19150100f564..7abd7653b947 100644 --- a/connectors/src/lib/data_sources.ts +++ b/connectors/src/lib/data_sources.ts @@ -1214,7 +1214,9 @@ export async function getFolderNode({ return dustRequestResult.data.folder; } -export async function upsertFolderNode({ +export const upsertFolderNode = withRetries(_upsertFolderNode); + +export async function _upsertFolderNode({ dataSourceConfig, folderId, timestampMs,