Skip to content

Commit

Permalink
enh: simplify types in notion connector (#1390)
Browse files Browse the repository at this point in the history
  • Loading branch information
fontanierh authored Sep 11, 2023
1 parent 0177fee commit a8a273b
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 93 deletions.
2 changes: 1 addition & 1 deletion connectors/src/connectors/notion/temporal/config.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export const WORKFLOW_VERSION = 22;
export const WORKFLOW_VERSION = 23;
export const QUEUE_NAME = `notion-queue-v${WORKFLOW_VERSION}`;
221 changes: 129 additions & 92 deletions connectors/src/connectors/notion/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,13 @@ export async function notionSyncWorkflow(
const childWorkflowQueue = new PQueue({
concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS,
});
const pageUpsertPromises: Promise<
(UpsertActivityResult | void)[] | void

const promises: Promise<
{
upsertPageResults: UpsertActivityResult[];
upsertDatabaseResults: UpsertActivityResult[];
}[]
>[] = [];
const databaseUpsertPromises: Promise<UpsertActivityResult | void>[] = [];

// we go through each result page of the notion search API
do {
Expand Down Expand Up @@ -127,11 +130,11 @@ export async function notionSyncWorkflow(
cursor = nextCursor;
pageIndex += 1;

// this function creates a promise for every page to upsert and every database to upsert
// for each database to upsert, it will also fetch all children pages and add those to the list of
// pages to upsert.
const { upsertDatabasePromises, upsertPagePromises } =
await getUpsertPromises({
// this function triggers child workflows to process batches of pages and databases.
// the worflow that processes databases will itself trigger child workflows to process
// batches of child pages.
promises.push(
getUpsertResults({
dataSourceConfig,
notionAccessToken,
pageIds,
Expand All @@ -142,32 +145,23 @@ export async function notionSyncWorkflow(
isBatchSync: isInitialSync,
skipUpToDatePages,
queue: childWorkflowQueue,
});

pageUpsertPromises.push(...upsertPagePromises);
databaseUpsertPromises.push(...upsertDatabasePromises);
})
);
} while (cursor);

// wait for all child workflows to finish
const pageUpsertResults = await Promise.all(pageUpsertPromises);
const databaseUpsertResults = await Promise.all(databaseUpsertPromises);

// remove potential void results indicating execution timeouts
// we could do this with a `[...x, ...y].filter(Boolean)`, but we'd have to
// cast the result as `UpsertActivityResult[]` so we loose type safety
const allActivityUpsertResults: UpsertActivityResult[] = [];
for (const result of [
...pageUpsertResults.flat(),
...databaseUpsertResults,
]) {
if (result) {
allActivityUpsertResults.push(result);
}
}
const results = (await Promise.all(promises)).flat();

const pageUpsertResults = results.flatMap((r) => r.upsertPageResults);
const databaseUpsertResults = results.flatMap(
(r) => r.upsertDatabaseResults
);

const allResults = [...pageUpsertResults, ...databaseUpsertResults];

await updateParentsFieldsActivity(
dataSourceConfig,
allActivityUpsertResults,
allResults,
new Date().getTime()
);

Expand Down Expand Up @@ -248,24 +242,32 @@ export async function notionSyncResultPageDatabaseWorkflow(
isGarbageCollectionRun = false,
isBatchSync = false
): Promise<{
upsertPagePromises: Promise<(UpsertActivityResult | void)[] | void>[];
upsertDatabasePromises: Promise<UpsertActivityResult | void>[];
upsertPageResults: UpsertActivityResult[];
upsertDatabaseResults: UpsertActivityResult[];
}> {
const upsertQueue = new PQueue({
concurrency: MAX_PENDING_UPSERT_ACTIVITIES,
});
const workflowQueue = new PQueue({
concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS,
});

const upsertPagePromises: Promise<(UpsertActivityResult | void)[] | void>[] =
[];
const upsertDatabasePromises: Promise<UpsertActivityResult | void>[] = [];
const databaseUpsertPromises: Promise<UpsertActivityResult | void>[] = [];
const resultsPromises: Promise<
{
upsertPageResults: UpsertActivityResult[];
upsertDatabaseResults: UpsertActivityResult[];
}[]
>[] = [];

for (const [databaseIndex, databaseId] of databaseIds.entries()) {
const loggerArgs = {
dataSourceName: dataSourceConfig.dataSourceName,
workspaceId: dataSourceConfig.workspaceId,
databaseIndex,
};
upsertDatabasePromises.push(

databaseUpsertPromises.push(
upsertQueue.add(() =>
notionUpsertDatabaseActivity(
notionAccessToken,
Expand All @@ -278,6 +280,15 @@ export async function notionSyncResultPageDatabaseWorkflow(
);
}

// wait for all db upserts before moving on to the children pages
// otherwise we don't have control over concurrency
const dbUpsertResults: UpsertActivityResult[] = [];
for (const result of await Promise.all(databaseUpsertPromises)) {
if (result) {
dbUpsertResults.push(result);
}
}

for (const databaseId of databaseIds) {
let cursor: string | null = null;
let pageIndex = 0;
Expand All @@ -304,7 +315,7 @@ export async function notionSyncResultPageDatabaseWorkflow(
});
cursor = nextCursor;
pageIndex += 1;
const newPromises = await getUpsertPromises({
const upsertResultsPromise = getUpsertResults({
dataSourceConfig,
notionAccessToken,
pageIds,
Expand All @@ -314,24 +325,23 @@ export async function notionSyncResultPageDatabaseWorkflow(
pageIndex,
isBatchSync,
skipUpToDatePages: false,
queue: upsertQueue,
queue: workflowQueue,
childWorkflowsNameSuffix: `database-children-${databaseId}`,
});

upsertPagePromises.push(...(await newPromises.upsertPagePromises));
upsertDatabasePromises.push(
...(await newPromises.upsertDatabasePromises)
);
resultsPromises.push(upsertResultsPromise);
} while (cursor);
}

const upsertResults = (await Promise.all(resultsPromises)).flat();

return {
upsertPagePromises,
upsertDatabasePromises,
upsertPageResults: upsertResults.flatMap((r) => r.upsertPageResults),
upsertDatabaseResults: dbUpsertResults,
};
}

async function getUpsertPromises({
async function getUpsertResults({
dataSourceConfig,
notionAccessToken,
pageIds,
Expand All @@ -355,16 +365,19 @@ async function getUpsertPromises({
skipUpToDatePages: boolean;
queue: PQueue;
childWorkflowsNameSuffix?: string;
}): Promise<{
upsertPagePromises: Promise<(UpsertActivityResult | void)[] | void>[];
upsertDatabasePromises: Promise<UpsertActivityResult | void>[];
}> {
}): Promise<
{
upsertPageResults: UpsertActivityResult[];
upsertDatabaseResults: UpsertActivityResult[];
}[]
> {
let pagesToSync: string[] = [];
let databasesToSync: string[] = [];

const upsertPagePromises: Promise<(UpsertActivityResult | void)[] | void>[] =
[];
const upsertDatabasePromises: Promise<UpsertActivityResult | void>[] = [];
const promises: Promise<{
upsertPageResults: UpsertActivityResult[];
upsertDatabaseResults: UpsertActivityResult[];
}>[] = [];

if (isGarbageCollectionRun) {
// Mark pages and databases as visited to avoid deleting them and return pages and databases
Expand All @@ -383,10 +396,7 @@ async function getUpsertPromises({
}

if (!pagesToSync.length && !databasesToSync.length) {
return {
upsertPagePromises: [],
upsertDatabasePromises: [],
};
return [];
}

if (pagesToSync.length) {
Expand All @@ -406,20 +416,42 @@ async function getUpsertPromises({
if (childWorkflowsNameSuffix) {
workflowId += `-${childWorkflowsNameSuffix}`;
}
upsertPagePromises.push(
queue.add(() =>
executeChild(notionSyncResultPageWorkflow, {
workflowId,
args: [
dataSourceConfig,
notionAccessToken,
batch,
runTimestamp,
isBatchSync,
],
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE,

promises.push(
queue
.add(() =>
executeChild(notionSyncResultPageWorkflow, {
workflowId,
args: [
dataSourceConfig,
notionAccessToken,
batch,
runTimestamp,
isBatchSync,
],
parentClosePolicy:
ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE,
})
)
.then((r) => {
if (!r) {
return {
upsertPageResults: [],
upsertDatabaseResults: [],
};
}
const pageResults: UpsertActivityResult[] = [];
for (const result of r) {
if (result) {
pageResults.push(result);
}
}

return {
upsertPageResults: pageResults,
upsertDatabaseResults: [],
};
})
)
);
}
}
Expand All @@ -444,33 +476,38 @@ async function getUpsertPromises({
if (childWorkflowsNameSuffix) {
workflowId += `-${childWorkflowsNameSuffix}`;
}
const queueAddRes = await queue.add(() =>
executeChild(notionSyncResultPageDatabaseWorkflow, {
workflowId,
args: [
dataSourceConfig,
notionAccessToken,
batch,
runTimestamp,
isGarbageCollectionRun,
isBatchSync,
],
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE,
})

promises.push(
queue
.add(() =>
executeChild(notionSyncResultPageDatabaseWorkflow, {
workflowId,
args: [
dataSourceConfig,
notionAccessToken,
batch,
runTimestamp,
isGarbageCollectionRun,
isBatchSync,
],
parentClosePolicy:
ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE,
})
)
.then((r) => {
if (!r) {
return {
upsertPageResults: [],
upsertDatabaseResults: [],
};
}
return r;
})
);
if (queueAddRes) {
const {
upsertPagePromises: newUpsertPagePromises,
upsertDatabasePromises: newUpsertDatabasePromises,
} = queueAddRes;
upsertPagePromises.push(...newUpsertPagePromises);
upsertDatabasePromises.push(...newUpsertDatabasePromises);
}
}
}

return {
upsertPagePromises,
upsertDatabasePromises,
};
const results = await Promise.all(promises);

return results;
}

0 comments on commit a8a273b

Please sign in to comment.