Skip to content

Commit

Permalink
fix: update SQL query condition in parents migrator to use >= for ti…
Browse files Browse the repository at this point in the history
…mestamp comparison (#9409)

* fix: update SQL query condition in parents migrator to use >= for timestamp comparison

* more batch sizd

* fix notion parentIds

* cli arg to skip work

---------

Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Dec 16, 2024
1 parent 8591626 commit 0ff3838
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
2 changes: 2 additions & 0 deletions connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,7 @@ export async function renderAndUpsertPageFromCache({
}),
// TODO(kw_search) remove legacy
parents: [...parentIds, ...legacyParentIds],
parentId: parentIds.length > 1 ? parentIds[1] : null,
loggerArgs,
upsertContext: {
sync_type: isFullSync ? "batch" : "incremental",
Expand Down Expand Up @@ -2603,6 +2604,7 @@ export async function upsertDatabaseStructuredDataFromCache({
tags: [`title:${databaseName}`, "is_database:true"],
// TODO(kw_search) remove legacy
parents: [databaseDocId, ...parentIds, ...legacyParentIds],
parentId: parentIds.length > 1 ? parentIds[1] : null,
loggerArgs,
upsertContext: {
sync_type: "batch",
Expand Down
41 changes: 38 additions & 3 deletions front/migrations/20241211_parents_migrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ProviderMigrator = {
) => { parents: string[]; parentId: string | null };
};

const QUERY_BATCH_SIZE = 256;
const QUERY_BATCH_SIZE = 4096;
const DOCUMENT_CONCURRENCY = 16;
const TABLE_CONCURRENCY = 16;

Expand Down Expand Up @@ -264,6 +264,7 @@ async function migrateDocument({
dataSource,
coreDocument,
execute,
skipIfParentsAreAlreadyCorrect,
}: {
action: MigratorAction;
migrator: ProviderMigrator;
Expand All @@ -274,6 +275,7 @@ async function migrateDocument({
document_id: string;
};
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
}) {
let newParents = coreDocument.parents;
let newParentId: string | null = null;
Expand All @@ -295,6 +297,21 @@ async function migrateDocument({
throw e;
}

if (
skipIfParentsAreAlreadyCorrect &&
newParents.every((x, i) => x === coreDocument.parents[i])
) {
logger.info(
{
documentId: coreDocument.document_id,
fromParents: coreDocument.parents,
toParents: newParents,
},
`SKIP document (parents are already correct)`
);
return new Ok(undefined);
}

if (execute) {
const updateRes = await withRetries(
async () => {
Expand Down Expand Up @@ -417,11 +434,13 @@ async function migrateDataSource({
migrator,
dataSource,
execute,
skipIfParentsAreAlreadyCorrect,
}: {
action: MigratorAction;
migrator: ProviderMigrator;
dataSource: DataSourceModel;
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
}) {
const corePrimary = getCorePrimaryDbConnection();

Expand Down Expand Up @@ -449,7 +468,7 @@ async function migrateDataSource({
for (;;) {
const [coreDocumentRows] = (await corePrimary.query(
"SELECT id, parents, document_id, timestamp FROM data_sources_documents " +
"WHERE data_source = ? AND STATUS = ? AND timestamp > ? " +
"WHERE data_source = ? AND STATUS = ? AND timestamp >= ? " +
"ORDER BY timestamp ASC LIMIT ?",
{
replacements: [
Expand Down Expand Up @@ -480,6 +499,7 @@ async function migrateDataSource({
migrator,
dataSource,
coreDocument,
skipIfParentsAreAlreadyCorrect,
execute,
}),
{ concurrency: DOCUMENT_CONCURRENCY }
Expand Down Expand Up @@ -555,12 +575,14 @@ async function migrateAll({
migrator,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
}: {
provider: ConnectorProvider;
action: MigratorAction;
migrator: ProviderMigrator;
nextDataSourceId: number;
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
}) {
// retrieve all data sources for the provider
const dataSources = await DataSourceModel.findAll({
Expand All @@ -584,6 +606,7 @@ async function migrateAll({
action,
dataSource,
execute,
skipIfParentsAreAlreadyCorrect,
});
} else {
logger.info({ dataSourceId: dataSource.id }, "SKIP");
Expand All @@ -601,13 +624,24 @@ makeScript(
type: "string",
required: true,
},
skipIfParentsAreAlreadyCorrect: {
type: "boolean",
required: false,
default: false,
},
nextDataSourceId: {
type: "number",
required: false,
default: 0,
},
},
async ({ provider, action, nextDataSourceId, execute }) => {
async ({
provider,
action,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
}) => {
if (!isMigratorAction(action)) {
console.error(
`Invalid action ${action}, supported actions are "transform" and "clean"`
Expand All @@ -630,6 +664,7 @@ makeScript(
migrator,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
});
}
);

0 comments on commit 0ff3838

Please sign in to comment.