Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connectors] Google drive parent scripts improvement #9390

Merged
merged 7 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 64 additions & 44 deletions connectors/migrations/20241211_fix_gdrive_parents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,51 @@ import { getGoogleSheetTableId } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

import { getDocumentId } from "@connectors/connectors/google_drive/temporal/utils";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
getDocumentFromDataSource,
getFolderNode,
getTable,
updateDocumentParentsField,
updateTableParentsField,
upsertFolderNode,
} from "@connectors/lib/data_sources";
import {
GoogleDriveFiles,
GoogleDriveSheet,
} from "@connectors/lib/models/google_drive";
import type { Logger } from "@connectors/logger/logger";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

const QUERY_BATCH_SIZE = 1024;

function getParents(
fileId: string | null,
parentsMap: Record<string, string | null>,
logger: Logger
) {
const parents = [];
let current: string | null = fileId;
while (current) {
parents.push(current);
if (typeof parentsMap[current] === "undefined") {
logger.error({ fileId: current }, "Parent not found");
return null;
}
current = parentsMap[current] || null;
}
return parents;
}

async function migrate({
connector,
execute,
}: {
connector: ConnectorModel;
execute: boolean;
}) {
logger.info(`Processing connector ${connector.id}...`);
const childLogger = logger.child({ connectorId: connector.id });
childLogger.info(`Processing connector ${connector.id}...`);

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const parentsMap: Record<string, string | null> = {};
Expand Down Expand Up @@ -60,6 +79,9 @@ async function migrate({
id: {
[Op.gt]: nextId,
},
mimeType: {
[Op.or]: ["application/vnd.google-apps.folder", "text/csv"],
},
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
Expand All @@ -69,58 +91,57 @@ async function migrate({
for (const file of googleDriveFiles) {
const internalId = file.dustFileId;
const driveFileId = file.driveFileId;
const parents = [driveFileId];
let current: string | null = file.parentId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;
const parents = getParents(
file.parentId,
parentsMap,
childLogger.child({ nodeId: driveFileId })
);
if (!parents) {
continue;
}
parents.unshift(driveFileId);

if (file.mimeType === "application/vnd.google-apps.folder") {
const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
if (!folder || folder.parents.join("/") !== parents.join("/")) {
logger.info({ folderId: file.driveFileId, parents }, "Upsert folder");
const newParents = parents.map((id) => getDocumentId(id));
if (!folder || folder.parents.join("/") !== newParents.join("/")) {
childLogger.info(
{ folderId: file.driveFileId, parents: newParents },
"Upsert folder"
);

if (execute) {
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
folderId: file.dustFileId,
parents,
parents: newParents,
parentId: file.parentId,
title: file.name,
});
}
}
} else {
const document = await getDocumentFromDataSource({
dataSourceConfig,
documentId: internalId,
});
if (document) {
logger.info(
{
documentId: file.driveFileId,
},
"Found document"
);
if (document.parents.join("/") !== parents.join("/")) {
logger.info(
} else if (file.mimeType === "text/csv") {
const tableId = internalId;
parents.unshift(...parents.map((id) => getDocumentId(id)));
const table = await getTable({ dataSourceConfig, tableId });
if (table) {
if (table.parents.join("/") !== parents.join("/")) {
childLogger.info(
{
documentId: internalId,
tableId,
parents,
previousParents: document.parents,
previousParents: table.parents,
},
"Update parents for document"
"Update parents for table"
);

if (execute) {
await updateDocumentParentsField({
await updateTableParentsField({
dataSourceConfig,
documentId: file.dustFileId,
tableId,
parents,
});
}
Expand Down Expand Up @@ -151,24 +172,23 @@ async function migrate({
sheet.driveFileId,
sheet.driveSheetId
);
const parents = [tableId];
let current: string | null = sheet.driveFileId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;

const parents = getParents(
sheet.driveFileId,
parentsMap,
childLogger.child({ nodeId: tableId })
);
if (!parents) {
continue;
}

parents.unshift(...parents.map((id) => getDocumentId(id)));
parents.unshift(tableId);

const table = await getTable({ dataSourceConfig, tableId });
if (table) {
logger.info(
{
tableId: tableId,
},
"Found table"
);

if (table.parents.join("/") !== parents.join("/")) {
logger.info(
childLogger.info(
{
tableId,
parents,
Expand Down
64 changes: 64 additions & 0 deletions connectors/migrations/20241212_restore_gdrive_parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import * as fs from "fs";
import * as readline from "readline";
import { makeScript } from "scripts/helpers";

import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

interface LogEntry {
msg: string;
documentId: string;
parents: string[];
previousParents: string[];
}

async function processLogFile(
connector: ConnectorModel,
filePath: string,
execute: boolean
) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

const dataSourceConfig = dataSourceConfigFromConnector(connector);

for await (const line of rl) {
const entry = JSON.parse(line) as LogEntry;
const { msg, documentId, previousParents } = entry;
if (
msg === "Update parents for document" &&
documentId &&
previousParents
) {
logger.info(
{ documentId, previousParents },
"Restoring parent for document"
);
if (execute) {
await updateDocumentParentsField({
dataSourceConfig,
documentId: documentId,
parents: previousParents,
});
}
}
}
}

makeScript(
{
connectorId: { type: "number", required: true },
file: { type: "string", required: true },
},
async ({ connectorId, file, execute }) => {
const connector = await ConnectorModel.findByPk(connectorId);
if (connector) {
await processLogFile(connector, file, execute);
}
}
);
4 changes: 3 additions & 1 deletion connectors/src/lib/data_sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,9 @@ export async function getFolderNode({
return dustRequestResult.data.folder;
}

export async function upsertFolderNode({
export const upsertFolderNode = withRetries(_upsertFolderNode);

export async function _upsertFolderNode({
dataSourceConfig,
folderId,
timestampMs,
Expand Down
Loading