From 0ff3838bf8b3f028dbb5717a6ac2354df9fd790e Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 16 Dec 2024 15:45:23 +0100 Subject: [PATCH] fix: update SQL query condition in parents migrator to use >= for timestamp comparison (#9409) * fix: update SQL query condition in parents migrator to use >= for timestamp comparison * more batch sizd * fix notion parentIds * cli arg to skip work --------- Co-authored-by: Henry Fontanier --- .../connectors/notion/temporal/activities.ts | 2 + front/migrations/20241211_parents_migrator.ts | 41 +++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/connectors/src/connectors/notion/temporal/activities.ts b/connectors/src/connectors/notion/temporal/activities.ts index ddbeca0a270c..1e32cdc5e3af 100644 --- a/connectors/src/connectors/notion/temporal/activities.ts +++ b/connectors/src/connectors/notion/temporal/activities.ts @@ -2047,6 +2047,7 @@ export async function renderAndUpsertPageFromCache({ }), // TODO(kw_search) remove legacy parents: [...parentIds, ...legacyParentIds], + parentId: parentIds.length > 1 ? parentIds[1] : null, loggerArgs, upsertContext: { sync_type: isFullSync ? "batch" : "incremental", @@ -2603,6 +2604,7 @@ export async function upsertDatabaseStructuredDataFromCache({ tags: [`title:${databaseName}`, "is_database:true"], // TODO(kw_search) remove legacy parents: [databaseDocId, ...parentIds, ...legacyParentIds], + parentId: parentIds.length > 1 ? parentIds[1] : null, loggerArgs, upsertContext: { sync_type: "batch", diff --git a/front/migrations/20241211_parents_migrator.ts b/front/migrations/20241211_parents_migrator.ts index 5f185bd4701e..57500ac220bc 100644 --- a/front/migrations/20241211_parents_migrator.ts +++ b/front/migrations/20241211_parents_migrator.ts @@ -34,7 +34,7 @@ type ProviderMigrator = { ) => { parents: string[]; parentId: string | null }; }; -const QUERY_BATCH_SIZE = 256; +const QUERY_BATCH_SIZE = 4096; const DOCUMENT_CONCURRENCY = 16; const TABLE_CONCURRENCY = 16; @@ -264,6 +264,7 @@ async function migrateDocument({ dataSource, coreDocument, execute, + skipIfParentsAreAlreadyCorrect, }: { action: MigratorAction; migrator: ProviderMigrator; @@ -274,6 +275,7 @@ async function migrateDocument({ document_id: string; }; execute: boolean; + skipIfParentsAreAlreadyCorrect: boolean; }) { let newParents = coreDocument.parents; let newParentId: string | null = null; @@ -295,6 +297,21 @@ async function migrateDocument({ throw e; } + if ( + skipIfParentsAreAlreadyCorrect && + newParents.every((x, i) => x === coreDocument.parents[i]) + ) { + logger.info( + { + documentId: coreDocument.document_id, + fromParents: coreDocument.parents, + toParents: newParents, + }, + `SKIP document (parents are already correct)` + ); + return new Ok(undefined); + } + if (execute) { const updateRes = await withRetries( async () => { @@ -417,11 +434,13 @@ async function migrateDataSource({ migrator, dataSource, execute, + skipIfParentsAreAlreadyCorrect, }: { action: MigratorAction; migrator: ProviderMigrator; dataSource: DataSourceModel; execute: boolean; + skipIfParentsAreAlreadyCorrect: boolean; }) { const corePrimary = getCorePrimaryDbConnection(); @@ -449,7 +468,7 @@ async function migrateDataSource({ for (;;) { const [coreDocumentRows] = (await corePrimary.query( "SELECT id, parents, document_id, timestamp FROM data_sources_documents " + - "WHERE data_source = ? AND STATUS = ? AND timestamp > ? " + + "WHERE data_source = ? AND STATUS = ? AND timestamp >= ? " + "ORDER BY timestamp ASC LIMIT ?", { replacements: [ @@ -480,6 +499,7 @@ async function migrateDataSource({ migrator, dataSource, coreDocument, + skipIfParentsAreAlreadyCorrect, execute, }), { concurrency: DOCUMENT_CONCURRENCY } @@ -555,12 +575,14 @@ async function migrateAll({ migrator, nextDataSourceId, execute, + skipIfParentsAreAlreadyCorrect, }: { provider: ConnectorProvider; action: MigratorAction; migrator: ProviderMigrator; nextDataSourceId: number; execute: boolean; + skipIfParentsAreAlreadyCorrect: boolean; }) { // retrieve all data sources for the provider const dataSources = await DataSourceModel.findAll({ @@ -584,6 +606,7 @@ async function migrateAll({ action, dataSource, execute, + skipIfParentsAreAlreadyCorrect, }); } else { logger.info({ dataSourceId: dataSource.id }, "SKIP"); @@ -601,13 +624,24 @@ makeScript( type: "string", required: true, }, + skipIfParentsAreAlreadyCorrect: { + type: "boolean", + required: false, + default: false, + }, nextDataSourceId: { type: "number", required: false, default: 0, }, }, - async ({ provider, action, nextDataSourceId, execute }) => { + async ({ + provider, + action, + nextDataSourceId, + execute, + skipIfParentsAreAlreadyCorrect, + }) => { if (!isMigratorAction(action)) { console.error( `Invalid action ${action}, supported actions are "transform" and "clean"` @@ -630,6 +664,7 @@ makeScript( migrator, nextDataSourceId, execute, + skipIfParentsAreAlreadyCorrect, }); } );