Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update SQL query condition in parents migrator to use >= for timestamp comparison #9409

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,7 @@ export async function renderAndUpsertPageFromCache({
}),
// TODO(kw_search) remove legacy
parents: [...parentIds, ...legacyParentIds],
parentId: parentIds.length > 1 ? parentIds[1] : null,
loggerArgs,
upsertContext: {
sync_type: isFullSync ? "batch" : "incremental",
Expand Down Expand Up @@ -2603,6 +2604,7 @@ export async function upsertDatabaseStructuredDataFromCache({
tags: [`title:${databaseName}`, "is_database:true"],
// TODO(kw_search) remove legacy
parents: [databaseDocId, ...parentIds, ...legacyParentIds],
parentId: parentIds.length > 1 ? parentIds[1] : null,
loggerArgs,
upsertContext: {
sync_type: "batch",
Expand Down
41 changes: 38 additions & 3 deletions front/migrations/20241211_parents_migrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ProviderMigrator = {
) => { parents: string[]; parentId: string | null };
};

const QUERY_BATCH_SIZE = 256;
const QUERY_BATCH_SIZE = 4096;
const DOCUMENT_CONCURRENCY = 16;
const TABLE_CONCURRENCY = 16;

Expand Down Expand Up @@ -264,6 +264,7 @@ async function migrateDocument({
dataSource,
coreDocument,
execute,
skipIfParentsAreAlreadyCorrect,
}: {
action: MigratorAction;
migrator: ProviderMigrator;
Expand All @@ -274,6 +275,7 @@ async function migrateDocument({
document_id: string;
};
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
}) {
let newParents = coreDocument.parents;
let newParentId: string | null = null;
Expand All @@ -295,6 +297,21 @@ async function migrateDocument({
throw e;
}

if (
skipIfParentsAreAlreadyCorrect &&
newParents.every((x, i) => x === coreDocument.parents[i])
) {
logger.info(
{
documentId: coreDocument.document_id,
fromParents: coreDocument.parents,
toParents: newParents,
},
`SKIP document (parents are already correct)`
);
return new Ok(undefined);
}

if (execute) {
const updateRes = await withRetries(
async () => {
Expand Down Expand Up @@ -417,11 +434,13 @@ async function migrateDataSource({
migrator,
dataSource,
execute,
skipIfParentsAreAlreadyCorrect,
}: {
action: MigratorAction;
migrator: ProviderMigrator;
dataSource: DataSourceModel;
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
}) {
const corePrimary = getCorePrimaryDbConnection();

Expand Down Expand Up @@ -449,7 +468,7 @@ async function migrateDataSource({
for (;;) {
const [coreDocumentRows] = (await corePrimary.query(
"SELECT id, parents, document_id, timestamp FROM data_sources_documents " +
"WHERE data_source = ? AND STATUS = ? AND timestamp > ? " +
"WHERE data_source = ? AND STATUS = ? AND timestamp >= ? " +
"ORDER BY timestamp ASC LIMIT ?",
{
replacements: [
Expand Down Expand Up @@ -480,6 +499,7 @@ async function migrateDataSource({
migrator,
dataSource,
coreDocument,
skipIfParentsAreAlreadyCorrect,
execute,
}),
{ concurrency: DOCUMENT_CONCURRENCY }
Expand Down Expand Up @@ -555,12 +575,14 @@ async function migrateAll({
migrator,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
}: {
provider: ConnectorProvider;
action: MigratorAction;
migrator: ProviderMigrator;
nextDataSourceId: number;
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
}) {
// retrieve all data sources for the provider
const dataSources = await DataSourceModel.findAll({
Expand All @@ -584,6 +606,7 @@ async function migrateAll({
action,
dataSource,
execute,
skipIfParentsAreAlreadyCorrect,
});
} else {
logger.info({ dataSourceId: dataSource.id }, "SKIP");
Expand All @@ -601,13 +624,24 @@ makeScript(
type: "string",
required: true,
},
skipIfParentsAreAlreadyCorrect: {
type: "boolean",
required: false,
default: false,
},
nextDataSourceId: {
type: "number",
required: false,
default: 0,
},
},
async ({ provider, action, nextDataSourceId, execute }) => {
async ({
provider,
action,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
}) => {
if (!isMigratorAction(action)) {
console.error(
`Invalid action ${action}, supported actions are "transform" and "clean"`
Expand All @@ -630,6 +664,7 @@ makeScript(
migrator,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
});
}
);
Loading