Skip to content

Commit

Permalink
[connectors] check and fix gdrive parents, backfill folders (#9344)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdraier authored Dec 13, 2024
1 parent 0bff6b4 commit a9173eb
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 5 deletions.
235 changes: 235 additions & 0 deletions connectors/migrations/20241211_fix_gdrive_parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
import { getGoogleSheetTableId } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
getDocumentFromDataSource,
getFolderNode,
getTable,
updateDocumentParentsField,
updateTableParentsField,
upsertFolderNode,
} from "@connectors/lib/data_sources";
import {
GoogleDriveFiles,
GoogleDriveSheet,
} from "@connectors/lib/models/google_drive";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

const QUERY_BATCH_SIZE = 1024;

async function migrate({
connector,
execute,
}: {
connector: ConnectorModel;
execute: boolean;
}) {
logger.info(`Processing connector ${connector.id}...`);

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const parentsMap: Record<string, string | null> = {};
let nextId: number | undefined = 0;
do {
const googleDriveFiles: GoogleDriveFiles[] = await GoogleDriveFiles.findAll(
{
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
},
}
);

googleDriveFiles.forEach((file) => {
parentsMap[file.driveFileId] = file.parentId;
});

nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id;
} while (nextId);

nextId = 0;
do {
const googleDriveFiles: GoogleDriveFiles[] = await GoogleDriveFiles.findAll(
{
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
}
);

for (const file of googleDriveFiles) {
const internalId = file.dustFileId;
const driveFileId = file.driveFileId;
const parents = [driveFileId];
let current: string | null = file.parentId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;
}

if (file.mimeType === "application/vnd.google-apps.folder") {
const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
if (!folder || folder.parents.join("/") !== parents.join("/")) {
logger.info({ folderId: file.driveFileId, parents }, "Upsert folder");

if (execute) {
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
folderId: file.dustFileId,
parents,
parentId: file.parentId,
title: file.name,
});
}
}
} else {
const document = await getDocumentFromDataSource({
dataSourceConfig,
documentId: internalId,
});
if (document) {
logger.info(
{
documentId: file.driveFileId,
},
"Found document"
);
if (document.parents.join("/") !== parents.join("/")) {
logger.info(
{
documentId: internalId,
parents,
previousParents: document.parents,
},
"Update parents for document"
);

if (execute) {
await updateDocumentParentsField({
dataSourceConfig,
documentId: file.dustFileId,
parents,
});
}
}
}
}
}

nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id;
} while (nextId);

nextId = 0;
do {
const googleDriveSheets: GoogleDriveSheet[] =
await GoogleDriveSheet.findAll({
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
});

for (const sheet of googleDriveSheets) {
const tableId = getGoogleSheetTableId(
sheet.driveFileId,
sheet.driveSheetId
);
const parents = [tableId];
let current: string | null = sheet.driveFileId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;
}

const table = await getTable({ dataSourceConfig, tableId });
if (table) {
logger.info(
{
tableId: tableId,
},
"Found table"
);

if (table.parents.join("/") !== parents.join("/")) {
logger.info(
{
tableId,
parents,
previousParents: table.parents,
},
"Update parents for table"
);
if (execute) {
await updateTableParentsField({
dataSourceConfig,
tableId,
parents,
});
}
}
}
}

nextId = googleDriveSheets[googleDriveSheets.length - 1]?.id;
} while (nextId);
}

makeScript(
{
nextConnectorId: {
type: "number",
required: false,
default: 0,
},
connectorId: {
type: "number",
required: false,
default: 0,
},
},
async ({ nextConnectorId, connectorId, execute }) => {
if (connectorId) {
const connector = await ConnectorModel.findByPk(connectorId);
if (connector) {
await migrate({
connector,
execute,
});
}
} else {
const connectors = await ConnectorModel.findAll({
where: {
type: "google_drive",
id: {
[Op.gt]: nextConnectorId,
},
},
order: [["id", "ASC"]],
});

for (const connector of connectors) {
await migrate({
connector,
execute,
});
}
}
}
);
84 changes: 80 additions & 4 deletions connectors/src/lib/data_sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ import type {
import { DustAPI } from "@dust-tt/client";
import type {
CoreAPIDataSourceDocumentSection,
CoreAPIDocument,
CoreAPIFolder,
CoreAPITable,
PostDataSourceDocumentRequestBody,
} from "@dust-tt/types";
import { isValidDate, safeSubstring, sectionFullText } from "@dust-tt/types";
import { MAX_CHUNK_SIZE } from "@dust-tt/types";
import {
isValidDate,
MAX_CHUNK_SIZE,
safeSubstring,
sectionFullText,
} from "@dust-tt/types";
import type { AxiosError, AxiosRequestConfig, AxiosResponse } from "axios";
import axios from "axios";
import tracer from "dd-trace";
import http from "http";
import https from "https";
import type { Branded } from "io-ts";
import type { IntBrand } from "io-ts";
import type { Branded, IntBrand } from "io-ts";
import { fromMarkdown } from "mdast-util-from-markdown";
import { gfmFromMarkdown, gfmToMarkdown } from "mdast-util-gfm";
import { toMarkdown } from "mdast-util-to-markdown";
Expand Down Expand Up @@ -238,6 +243,41 @@ async function _upsertToDatasource({
);
}

export async function getDocumentFromDataSource({
dataSourceConfig,
documentId,
}: {
dataSourceConfig: DataSourceConfig;
documentId: string;
}): Promise<CoreAPIDocument | undefined> {
const localLogger = logger.child({
documentId,
});

const endpoint =
`${DUST_FRONT_API}/api/v1/w/${dataSourceConfig.workspaceId}` +
`/data_sources/${dataSourceConfig.dataSourceId}/documents?document_ids=${documentId}`;
const dustRequestConfig: AxiosRequestConfig = {
headers: {
Authorization: `Bearer ${dataSourceConfig.workspaceAPIKey}`,
},
};

let dustRequestResult: AxiosResponse;
try {
dustRequestResult = await axiosWithTimeout.get(endpoint, dustRequestConfig);
} catch (e) {
localLogger.error({ error: e }, "Error getting document from Dust.");
throw e;
}
if (dustRequestResult.data.documents.length === 0) {
localLogger.info("Document doesn't exist on Dust. Ignoring.");
return;
}

return dustRequestResult.data.documents[0];
}

export async function deleteFromDataSource(
dataSourceConfig: DataSourceConfig,
documentId: string,
Expand Down Expand Up @@ -1134,6 +1174,42 @@ export async function deleteTable({
}
}

export async function getFolderNode({
dataSourceConfig,
folderId,
}: {
dataSourceConfig: DataSourceConfig;
folderId: string;
}): Promise<CoreAPIFolder | undefined> {
const localLogger = logger.child({
folderId,
});

const endpoint =
`${DUST_FRONT_API}/api/v1/w/${dataSourceConfig.workspaceId}` +
`/data_sources/${dataSourceConfig.dataSourceId}/folders/${folderId}`;
const dustRequestConfig: AxiosRequestConfig = {
headers: {
Authorization: `Bearer ${dataSourceConfig.workspaceAPIKey}`,
},
};

let dustRequestResult: AxiosResponse;
try {
dustRequestResult = await axiosWithTimeout.get(endpoint, dustRequestConfig);
} catch (e) {
const axiosError = e as AxiosError;
if (axiosError?.response?.status === 404) {
localLogger.info("Folder doesn't exist on Dust. Ignoring.");
return;
}
localLogger.error({ error: e }, "Error getting folder from Dust.");
throw e;
}

return dustRequestResult.data.folder;
}

export async function upsertFolderNode({
dataSourceConfig,
folderId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ import { apiError } from "@app/logger/withlogging";
* schema:
* type: string
* - in: query
* name: document_ids
* description: The IDs of the documents to fetch (optional)
* schema:
* type: array
* items:
* type: string
* - in: query
* name: limit
* description: Limit the number of documents returned
* schema:
Expand Down Expand Up @@ -139,10 +146,16 @@ async function handler(
? parseInt(req.query.offset as string)
: 0;

let documentIds = req.query.document_ids;
if (typeof documentIds === "string") {
documentIds = [documentIds];
}

const documents = await coreAPI.getDataSourceDocuments(
{
projectId: dataSource.dustAPIProjectId,
dataSourceId: dataSource.dustAPIDataSourceId,
documentIds,
},
{ limit, offset }
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async function handler(

if (docRes.isErr()) {
return apiError(req, res, {
status_code: 400,
status_code: 404,
api_error: {
type: "data_source_error",
message: "There was an error retrieving the data source folder.",
Expand Down
Loading

0 comments on commit a9173eb

Please sign in to comment.