diff --git a/connectors/migrations/20241211_fix_gdrive_parents.ts b/connectors/migrations/20241211_fix_gdrive_parents.ts new file mode 100644 index 000000000000..af1125c2660b --- /dev/null +++ b/connectors/migrations/20241211_fix_gdrive_parents.ts @@ -0,0 +1,235 @@ +import { getGoogleSheetTableId } from "@dust-tt/types"; +import { makeScript } from "scripts/helpers"; +import { Op } from "sequelize"; + +import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; +import { + getDocumentFromDataSource, + getFolderNode, + getTable, + updateDocumentParentsField, + updateTableParentsField, + upsertFolderNode, +} from "@connectors/lib/data_sources"; +import { + GoogleDriveFiles, + GoogleDriveSheet, +} from "@connectors/lib/models/google_drive"; +import logger from "@connectors/logger/logger"; +import { ConnectorModel } from "@connectors/resources/storage/models/connector_model"; + +const QUERY_BATCH_SIZE = 1024; + +async function migrate({ + connector, + execute, +}: { + connector: ConnectorModel; + execute: boolean; +}) { + logger.info(`Processing connector ${connector.id}...`); + + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const parentsMap: Record = {}; + let nextId: number | undefined = 0; + do { + const googleDriveFiles: GoogleDriveFiles[] = await GoogleDriveFiles.findAll( + { + where: { + connectorId: connector.id, + id: { + [Op.gt]: nextId, + }, + }, + } + ); + + googleDriveFiles.forEach((file) => { + parentsMap[file.driveFileId] = file.parentId; + }); + + nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id; + } while (nextId); + + nextId = 0; + do { + const googleDriveFiles: GoogleDriveFiles[] = await GoogleDriveFiles.findAll( + { + where: { + connectorId: connector.id, + id: { + [Op.gt]: nextId, + }, + }, + order: [["id", "ASC"]], + limit: QUERY_BATCH_SIZE, + } + ); + + for (const file of googleDriveFiles) { + const internalId = file.dustFileId; + const driveFileId = file.driveFileId; + const parents = [driveFileId]; + let current: string | null = file.parentId; + while (current) { + parents.push(current); + current = parentsMap[current] || null; + } + + if (file.mimeType === "application/vnd.google-apps.folder") { + const folder = await getFolderNode({ + dataSourceConfig, + folderId: internalId, + }); + if (!folder || folder.parents.join("/") !== parents.join("/")) { + logger.info({ folderId: file.driveFileId, parents }, "Upsert folder"); + + if (execute) { + // upsert repository as folder + await upsertFolderNode({ + dataSourceConfig, + folderId: file.dustFileId, + parents, + parentId: file.parentId, + title: file.name, + }); + } + } + } else { + const document = await getDocumentFromDataSource({ + dataSourceConfig, + documentId: internalId, + }); + if (document) { + logger.info( + { + documentId: file.driveFileId, + }, + "Found document" + ); + if (document.parents.join("/") !== parents.join("/")) { + logger.info( + { + documentId: internalId, + parents, + previousParents: document.parents, + }, + "Update parents for document" + ); + + if (execute) { + await updateDocumentParentsField({ + dataSourceConfig, + documentId: file.dustFileId, + parents, + }); + } + } + } + } + } + + nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id; + } while (nextId); + + nextId = 0; + do { + const googleDriveSheets: GoogleDriveSheet[] = + await GoogleDriveSheet.findAll({ + where: { + connectorId: connector.id, + id: { + [Op.gt]: nextId, + }, + }, + order: [["id", "ASC"]], + limit: QUERY_BATCH_SIZE, + }); + + for (const sheet of googleDriveSheets) { + const tableId = getGoogleSheetTableId( + sheet.driveFileId, + sheet.driveSheetId + ); + const parents = [tableId]; + let current: string | null = sheet.driveFileId; + while (current) { + parents.push(current); + current = parentsMap[current] || null; + } + + const table = await getTable({ dataSourceConfig, tableId }); + if (table) { + logger.info( + { + tableId: tableId, + }, + "Found table" + ); + + if (table.parents.join("/") !== parents.join("/")) { + logger.info( + { + tableId, + parents, + previousParents: table.parents, + }, + "Update parents for table" + ); + if (execute) { + await updateTableParentsField({ + dataSourceConfig, + tableId, + parents, + }); + } + } + } + } + + nextId = googleDriveSheets[googleDriveSheets.length - 1]?.id; + } while (nextId); +} + +makeScript( + { + nextConnectorId: { + type: "number", + required: false, + default: 0, + }, + connectorId: { + type: "number", + required: false, + default: 0, + }, + }, + async ({ nextConnectorId, connectorId, execute }) => { + if (connectorId) { + const connector = await ConnectorModel.findByPk(connectorId); + if (connector) { + await migrate({ + connector, + execute, + }); + } + } else { + const connectors = await ConnectorModel.findAll({ + where: { + type: "google_drive", + id: { + [Op.gt]: nextConnectorId, + }, + }, + order: [["id", "ASC"]], + }); + + for (const connector of connectors) { + await migrate({ + connector, + execute, + }); + } + } + } +); diff --git a/connectors/src/lib/data_sources.ts b/connectors/src/lib/data_sources.ts index 07c6b028045f..6acd7481fb2a 100644 --- a/connectors/src/lib/data_sources.ts +++ b/connectors/src/lib/data_sources.ts @@ -5,18 +5,23 @@ import type { import { DustAPI } from "@dust-tt/client"; import type { CoreAPIDataSourceDocumentSection, + CoreAPIDocument, + CoreAPIFolder, CoreAPITable, PostDataSourceDocumentRequestBody, } from "@dust-tt/types"; -import { isValidDate, safeSubstring, sectionFullText } from "@dust-tt/types"; -import { MAX_CHUNK_SIZE } from "@dust-tt/types"; +import { + isValidDate, + MAX_CHUNK_SIZE, + safeSubstring, + sectionFullText, +} from "@dust-tt/types"; import type { AxiosError, AxiosRequestConfig, AxiosResponse } from "axios"; import axios from "axios"; import tracer from "dd-trace"; import http from "http"; import https from "https"; -import type { Branded } from "io-ts"; -import type { IntBrand } from "io-ts"; +import type { Branded, IntBrand } from "io-ts"; import { fromMarkdown } from "mdast-util-from-markdown"; import { gfmFromMarkdown, gfmToMarkdown } from "mdast-util-gfm"; import { toMarkdown } from "mdast-util-to-markdown"; @@ -238,6 +243,41 @@ async function _upsertToDatasource({ ); } +export async function getDocumentFromDataSource({ + dataSourceConfig, + documentId, +}: { + dataSourceConfig: DataSourceConfig; + documentId: string; +}): Promise { + const localLogger = logger.child({ + documentId, + }); + + const endpoint = + `${DUST_FRONT_API}/api/v1/w/${dataSourceConfig.workspaceId}` + + `/data_sources/${dataSourceConfig.dataSourceId}/documents?document_ids=${documentId}`; + const dustRequestConfig: AxiosRequestConfig = { + headers: { + Authorization: `Bearer ${dataSourceConfig.workspaceAPIKey}`, + }, + }; + + let dustRequestResult: AxiosResponse; + try { + dustRequestResult = await axiosWithTimeout.get(endpoint, dustRequestConfig); + } catch (e) { + localLogger.error({ error: e }, "Error getting document from Dust."); + throw e; + } + if (dustRequestResult.data.documents.length === 0) { + localLogger.info("Document doesn't exist on Dust. Ignoring."); + return; + } + + return dustRequestResult.data.documents[0]; +} + export async function deleteFromDataSource( dataSourceConfig: DataSourceConfig, documentId: string, @@ -1134,6 +1174,42 @@ export async function deleteTable({ } } +export async function getFolderNode({ + dataSourceConfig, + folderId, +}: { + dataSourceConfig: DataSourceConfig; + folderId: string; +}): Promise { + const localLogger = logger.child({ + folderId, + }); + + const endpoint = + `${DUST_FRONT_API}/api/v1/w/${dataSourceConfig.workspaceId}` + + `/data_sources/${dataSourceConfig.dataSourceId}/folders/${folderId}`; + const dustRequestConfig: AxiosRequestConfig = { + headers: { + Authorization: `Bearer ${dataSourceConfig.workspaceAPIKey}`, + }, + }; + + let dustRequestResult: AxiosResponse; + try { + dustRequestResult = await axiosWithTimeout.get(endpoint, dustRequestConfig); + } catch (e) { + const axiosError = e as AxiosError; + if (axiosError?.response?.status === 404) { + localLogger.info("Folder doesn't exist on Dust. Ignoring."); + return; + } + localLogger.error({ error: e }, "Error getting folder from Dust."); + throw e; + } + + return dustRequestResult.data.folder; +} + export async function upsertFolderNode({ dataSourceConfig, folderId, diff --git a/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/index.ts b/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/index.ts index 7108dbf5583b..5eb59447d241 100644 --- a/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/index.ts +++ b/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/index.ts @@ -41,6 +41,13 @@ import { apiError } from "@app/logger/withlogging"; * schema: * type: string * - in: query + * name: document_ids + * description: The IDs of the documents to fetch (optional) + * schema: + * type: array + * items: + * type: string + * - in: query * name: limit * description: Limit the number of documents returned * schema: @@ -139,10 +146,16 @@ async function handler( ? parseInt(req.query.offset as string) : 0; + let documentIds = req.query.document_ids; + if (typeof documentIds === "string") { + documentIds = [documentIds]; + } + const documents = await coreAPI.getDataSourceDocuments( { projectId: dataSource.dustAPIProjectId, dataSourceId: dataSource.dustAPIDataSourceId, + documentIds, }, { limit, offset } ); diff --git a/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/folders/[fId].ts b/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/folders/[fId].ts index 53559fec417b..9afa14435552 100644 --- a/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/folders/[fId].ts +++ b/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/folders/[fId].ts @@ -67,7 +67,7 @@ async function handler( if (docRes.isErr()) { return apiError(req, res, { - status_code: 400, + status_code: 404, api_error: { type: "data_source_error", message: "There was an error retrieving the data source folder.", diff --git a/front/public/swagger.json b/front/public/swagger.json index 7a8a858239bc..7477c469ded6 100644 --- a/front/public/swagger.json +++ b/front/public/swagger.json @@ -2301,6 +2301,17 @@ "type": "string" } }, + { + "in": "query", + "name": "document_ids", + "description": "The IDs of the documents to fetch (optional)", + "schema": { + "type": "array", + "items": { + "type": "string" + } + } + }, { "in": "query", "name": "limit", diff --git a/types/src/core/data_source.ts b/types/src/core/data_source.ts index 23dcedc4f484..411e9178722d 100644 --- a/types/src/core/data_source.ts +++ b/types/src/core/data_source.ts @@ -48,6 +48,7 @@ export type CoreAPIDocument = { data_source_id: string; created: number; document_id: string; + parents: string[]; parent_id: string | null; timestamp: number; tags: string[];