Skip to content

Commit

Permalink
[connectors] Google drive parent scripts improvement (#9390)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdraier authored Dec 16, 2024
1 parent 5840b2d commit 36c675d
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 45 deletions.
108 changes: 64 additions & 44 deletions connectors/migrations/20241211_fix_gdrive_parents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,51 @@ import { getGoogleSheetTableId } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

import { getDocumentId } from "@connectors/connectors/google_drive/temporal/utils";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
getDocumentFromDataSource,
getFolderNode,
getTable,
updateDocumentParentsField,
updateTableParentsField,
upsertFolderNode,
} from "@connectors/lib/data_sources";
import {
GoogleDriveFiles,
GoogleDriveSheet,
} from "@connectors/lib/models/google_drive";
import type { Logger } from "@connectors/logger/logger";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

const QUERY_BATCH_SIZE = 1024;

function getParents(
fileId: string | null,
parentsMap: Record<string, string | null>,
logger: Logger
) {
const parents = [];
let current: string | null = fileId;
while (current) {
parents.push(current);
if (typeof parentsMap[current] === "undefined") {
logger.error({ fileId: current }, "Parent not found");
return null;
}
current = parentsMap[current] || null;
}
return parents;
}

async function migrate({
connector,
execute,
}: {
connector: ConnectorModel;
execute: boolean;
}) {
logger.info(`Processing connector ${connector.id}...`);
const childLogger = logger.child({ connectorId: connector.id });
childLogger.info(`Processing connector ${connector.id}...`);

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const parentsMap: Record<string, string | null> = {};
Expand Down Expand Up @@ -60,6 +79,9 @@ async function migrate({
id: {
[Op.gt]: nextId,
},
mimeType: {
[Op.or]: ["application/vnd.google-apps.folder", "text/csv"],
},
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
Expand All @@ -69,58 +91,57 @@ async function migrate({
for (const file of googleDriveFiles) {
const internalId = file.dustFileId;
const driveFileId = file.driveFileId;
const parents = [driveFileId];
let current: string | null = file.parentId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;
const parents = getParents(
file.parentId,
parentsMap,
childLogger.child({ nodeId: driveFileId })
);
if (!parents) {
continue;
}
parents.unshift(driveFileId);

if (file.mimeType === "application/vnd.google-apps.folder") {
const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
if (!folder || folder.parents.join("/") !== parents.join("/")) {
logger.info({ folderId: file.driveFileId, parents }, "Upsert folder");
const newParents = parents.map((id) => getDocumentId(id));
if (!folder || folder.parents.join("/") !== newParents.join("/")) {
childLogger.info(
{ folderId: file.driveFileId, parents: newParents },
"Upsert folder"
);

if (execute) {
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
folderId: file.dustFileId,
parents,
parents: newParents,
parentId: file.parentId,
title: file.name,
});
}
}
} else {
const document = await getDocumentFromDataSource({
dataSourceConfig,
documentId: internalId,
});
if (document) {
logger.info(
{
documentId: file.driveFileId,
},
"Found document"
);
if (document.parents.join("/") !== parents.join("/")) {
logger.info(
} else if (file.mimeType === "text/csv") {
const tableId = internalId;
parents.unshift(...parents.map((id) => getDocumentId(id)));
const table = await getTable({ dataSourceConfig, tableId });
if (table) {
if (table.parents.join("/") !== parents.join("/")) {
childLogger.info(
{
documentId: internalId,
tableId,
parents,
previousParents: document.parents,
previousParents: table.parents,
},
"Update parents for document"
"Update parents for table"
);

if (execute) {
await updateDocumentParentsField({
await updateTableParentsField({
dataSourceConfig,
documentId: file.dustFileId,
tableId,
parents,
});
}
Expand Down Expand Up @@ -151,24 +172,23 @@ async function migrate({
sheet.driveFileId,
sheet.driveSheetId
);
const parents = [tableId];
let current: string | null = sheet.driveFileId;
while (current) {
parents.push(current);
current = parentsMap[current] || null;

const parents = getParents(
sheet.driveFileId,
parentsMap,
childLogger.child({ nodeId: tableId })
);
if (!parents) {
continue;
}

parents.unshift(...parents.map((id) => getDocumentId(id)));
parents.unshift(tableId);

const table = await getTable({ dataSourceConfig, tableId });
if (table) {
logger.info(
{
tableId: tableId,
},
"Found table"
);

if (table.parents.join("/") !== parents.join("/")) {
logger.info(
childLogger.info(
{
tableId,
parents,
Expand Down
64 changes: 64 additions & 0 deletions connectors/migrations/20241212_restore_gdrive_parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import * as fs from "fs";
import * as readline from "readline";
import { makeScript } from "scripts/helpers";

import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

interface LogEntry {
msg: string;
documentId: string;
parents: string[];
previousParents: string[];
}

async function processLogFile(
connector: ConnectorModel,
filePath: string,
execute: boolean
) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

const dataSourceConfig = dataSourceConfigFromConnector(connector);

for await (const line of rl) {
const entry = JSON.parse(line) as LogEntry;
const { msg, documentId, previousParents } = entry;
if (
msg === "Update parents for document" &&
documentId &&
previousParents
) {
logger.info(
{ documentId, previousParents },
"Restoring parent for document"
);
if (execute) {
await updateDocumentParentsField({
dataSourceConfig,
documentId: documentId,
parents: previousParents,
});
}
}
}
}

makeScript(
{
connectorId: { type: "number", required: true },
file: { type: "string", required: true },
},
async ({ connectorId, file, execute }) => {
const connector = await ConnectorModel.findByPk(connectorId);
if (connector) {
await processLogFile(connector, file, execute);
}
}
);
4 changes: 3 additions & 1 deletion connectors/src/lib/data_sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,9 @@ export async function getFolderNode({
return dustRequestResult.data.folder;
}

export async function upsertFolderNode({
export const upsertFolderNode = withRetries(_upsertFolderNode);

export async function _upsertFolderNode({
dataSourceConfig,
folderId,
timestampMs,
Expand Down

0 comments on commit 36c675d

Please sign in to comment.