Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connectors] check and fix gdrive parents, backfill folders #9344

Merged
merged 16 commits into from
Dec 13, 2024
235 changes: 235 additions & 0 deletions connectors/migrations/20241211_fix_gdrive_parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
import { getGoogleSheetTableId } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
getDocumentFromDataSource,
getFolderNode,
getTable,
updateDocumentParentsField,
updateTableParentsField,
upsertFolderNode,
} from "@connectors/lib/data_sources";
import {
GoogleDriveFiles,
GoogleDriveSheet,
} from "@connectors/lib/models/google_drive";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

const QUERY_BATCH_SIZE = 1024;

async function migrate({
connector,
execute,
}: {
connector: ConnectorModel;
execute: boolean;
}) {
logger.info(`Processing connector ${connector.id}...`);

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const parentsMap: Record<string, string | null> = {};
let nextId: number | undefined = 0;
do {
const googleDriveFiles: GoogleDriveFiles[] = await GoogleDriveFiles.findAll(
{
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
},
}
);

googleDriveFiles.forEach((file) => {
parentsMap[file.driveFileId] = file.parentId;
});

nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id;
} while (nextId);

nextId = 0;
do {
const googleDriveFiles: GoogleDriveFiles[] = await GoogleDriveFiles.findAll(
{
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
}
);

for (const file of googleDriveFiles) {
const internalId = file.dustFileId;
const driveFileId = file.driveFileId;
const parents = [driveFileId];
let current: string | null = file.parentId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;
}

if (file.mimeType === "application/vnd.google-apps.folder") {
const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
if (!folder || folder.parents.join("/") !== parents.join("/")) {
logger.info({ folderId: file.driveFileId, parents }, "Upsert folder");

if (execute) {
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
folderId: file.dustFileId,
parents,
parentId: file.parentId,
title: file.name,
});
}
}
} else {
const document = await getDocumentFromDataSource({
dataSourceConfig,
documentId: internalId,
});
if (document) {
logger.info(
{
documentId: file.driveFileId,
},
"Found document"
);
if (document.parents.join("/") !== parents.join("/")) {
logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we'll likely have ~10M lines of log for the run; maybe output in file? (that will I guess weigh in the ballpark of gigabyte(s))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, log is already pretty long, will be better on one line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some logs are generated by data_source and this does not allow to use a custom logger . it may be easier to redirect the script output to a file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's what I meant sorry for being unclear

{
documentId: internalId,
parents,
previousParents: document.parents,
},
"Update parents for document"
);

if (execute) {
await updateDocumentParentsField({
dataSourceConfig,
documentId: file.dustFileId,
parents,
});
}
}
}
}
}

nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id;
} while (nextId);

nextId = 0;
do {
const googleDriveSheets: GoogleDriveSheet[] =
await GoogleDriveSheet.findAll({
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
});

for (const sheet of googleDriveSheets) {
const tableId = getGoogleSheetTableId(
sheet.driveFileId,
sheet.driveSheetId
);
const parents = [tableId];
let current: string | null = sheet.driveFileId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;
}

const table = await getTable({ dataSourceConfig, tableId });
if (table) {
logger.info(
{
tableId: tableId,
},
"Found table"
);

if (table.parents.join("/") !== parents.join("/")) {
logger.info(
{
tableId,
parents,
previousParents: table.parents,
},
"Update parents for table"
);
if (execute) {
await updateTableParentsField({
dataSourceConfig,
tableId,
parents,
});
}
}
}
}

nextId = googleDriveSheets[googleDriveSheets.length - 1]?.id;
} while (nextId);
}

makeScript(
{
nextConnectorId: {
type: "number",
required: false,
default: 0,
},
connectorId: {
type: "number",
required: false,
default: 0,
},
},
async ({ nextConnectorId, connectorId, execute }) => {
if (connectorId) {
const connector = await ConnectorModel.findByPk(connectorId);
if (connector) {
await migrate({
connector,
execute,
});
}
} else {
const connectors = await ConnectorModel.findAll({
where: {
type: "google_drive",
id: {
[Op.gt]: nextConnectorId,
},
},
order: [["id", "ASC"]],
});

for (const connector of connectors) {
await migrate({
connector,
execute,
});
}
}
}
);
84 changes: 80 additions & 4 deletions connectors/src/lib/data_sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ import type {
import { DustAPI } from "@dust-tt/client";
import type {
CoreAPIDataSourceDocumentSection,
CoreAPIDocument,
CoreAPIFolder,
CoreAPITable,
PostDataSourceDocumentRequestBody,
} from "@dust-tt/types";
import { isValidDate, safeSubstring, sectionFullText } from "@dust-tt/types";
import { MAX_CHUNK_SIZE } from "@dust-tt/types";
import {
isValidDate,
MAX_CHUNK_SIZE,
safeSubstring,
sectionFullText,
} from "@dust-tt/types";
import type { AxiosError, AxiosRequestConfig, AxiosResponse } from "axios";
import axios from "axios";
import tracer from "dd-trace";
import http from "http";
import https from "https";
import type { Branded } from "io-ts";
import type { IntBrand } from "io-ts";
import type { Branded, IntBrand } from "io-ts";
import { fromMarkdown } from "mdast-util-from-markdown";
import { gfmFromMarkdown, gfmToMarkdown } from "mdast-util-gfm";
import { toMarkdown } from "mdast-util-to-markdown";
Expand Down Expand Up @@ -238,6 +243,41 @@ async function _upsertToDatasource({
);
}

export async function getDocumentFromDataSource({
dataSourceConfig,
documentId,
}: {
dataSourceConfig: DataSourceConfig;
documentId: string;
}): Promise<CoreAPIDocument | undefined> {
const localLogger = logger.child({
documentId,
});

const endpoint =
`${DUST_FRONT_API}/api/v1/w/${dataSourceConfig.workspaceId}` +
`/data_sources/${dataSourceConfig.dataSourceId}/documents?document_ids=${documentId}`;
const dustRequestConfig: AxiosRequestConfig = {
headers: {
Authorization: `Bearer ${dataSourceConfig.workspaceAPIKey}`,
},
};

let dustRequestResult: AxiosResponse;
try {
dustRequestResult = await axiosWithTimeout.get(endpoint, dustRequestConfig);
} catch (e) {
localLogger.error({ error: e }, "Error getting document from Dust.");
throw e;
}
if (dustRequestResult.data.documents.length === 0) {
localLogger.info("Document doesn't exist on Dust. Ignoring.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the function should throw there or Result
the caller can decide what to do if doc is not found
wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the logic of the existing getTable. For me both are fine

return;
}

return dustRequestResult.data.documents[0];
}

export async function deleteFromDataSource(
dataSourceConfig: DataSourceConfig,
documentId: string,
Expand Down Expand Up @@ -1134,6 +1174,42 @@ export async function deleteTable({
}
}

export async function getFolderNode({
dataSourceConfig,
folderId,
}: {
dataSourceConfig: DataSourceConfig;
folderId: string;
}): Promise<CoreAPIFolder | undefined> {
const localLogger = logger.child({
folderId,
});

const endpoint =
`${DUST_FRONT_API}/api/v1/w/${dataSourceConfig.workspaceId}` +
`/data_sources/${dataSourceConfig.dataSourceId}/folders/${folderId}`;
const dustRequestConfig: AxiosRequestConfig = {
headers: {
Authorization: `Bearer ${dataSourceConfig.workspaceAPIKey}`,
},
};

let dustRequestResult: AxiosResponse;
try {
dustRequestResult = await axiosWithTimeout.get(endpoint, dustRequestConfig);
} catch (e) {
const axiosError = e as AxiosError;
if (axiosError?.response?.status === 404) {
localLogger.info("Folder doesn't exist on Dust. Ignoring.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A 400 would be bad request? did you mean 404?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the function should keep the throw Though; it's the caller that should decide what to do on error, wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it's a 400 .. but should be a 404, updating.
The getTable was logging and returning null, I kept the same logic here

return;
}
localLogger.error({ error: e }, "Error getting folder from Dust.");
throw e;
}

return dustRequestResult.data.folder;
}

export async function upsertFolderNode({
dataSourceConfig,
folderId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ import { apiError } from "@app/logger/withlogging";
* schema:
* type: string
* - in: query
* name: document_ids
* description: The IDs of the documents to fetch (optional)
* schema:
* type: array
* items:
* type: string
* - in: query
* name: limit
* description: Limit the number of documents returned
* schema:
Expand Down Expand Up @@ -139,10 +146,16 @@ async function handler(
? parseInt(req.query.offset as string)
: 0;

let documentIds = req.query.document_ids;
if (typeof documentIds === "string") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we fail if it's a string and pass only if it's an array? (of 1 if we want a single doc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't pass an array of 1 - if we pass a single value, it will be a string, otherwise an array (that how req.query is parsed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! I never remember that kind of things :)

documentIds = [documentIds];
}

const documents = await coreAPI.getDataSourceDocuments(
{
projectId: dataSource.dustAPIProjectId,
dataSourceId: dataSource.dustAPIDataSourceId,
documentIds,
},
{ limit, offset }
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async function handler(

if (docRes.isErr()) {
return apiError(req, res, {
status_code: 400,
status_code: 404,
api_error: {
type: "data_source_error",
message: "There was an error retrieving the data source folder.",
Expand Down
Loading
Loading