Skip to content

Commit

Permalink
fix: dont return anything from notion workflows (#1426)
Browse files Browse the repository at this point in the history
* enh: dont return anything from notion workflows

* fix migration script

* use date col

* unused import

* fix logic

* outdated comment

* remove upsert activity result type

* fix case when notion is circular
  • Loading branch information
fontanierh authored Sep 13, 2023
1 parent 938191c commit 8e87493
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 183 deletions.
6 changes: 5 additions & 1 deletion connectors/migrations/20230906_notion_fill_parents_field.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ async function updateParentsFieldForConnector(connector: Connector) {
});

// update all parents fields for all pages and databases
await updateAllParentsFields(connector, [...pages, ...databases]);
await updateAllParentsFields(
connector,
pages.map((p) => p.notionPageId),
databases.map((d) => d.notionDatabaseId)
);
}

main()
Expand Down
12 changes: 12 additions & 0 deletions connectors/src/connectors/notion/lib/connectors_db_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export async function upsertNotionPageInConnectorsDb({
notionUrl,
lastUpsertedTs,
skipReason,
lastCreatedOrMovedRunTs,
}: {
dataSourceInfo: DataSourceInfo;
notionPageId: string;
Expand All @@ -22,6 +23,7 @@ export async function upsertNotionPageInConnectorsDb({
notionUrl?: string | null;
lastUpsertedTs?: number;
skipReason?: string;
lastCreatedOrMovedRunTs?: number;
}): Promise<NotionPage> {
const connector = await Connector.findOne({
where: {
Expand All @@ -48,6 +50,7 @@ export async function upsertNotionPageInConnectorsDb({
notionUrl?: string;
lastUpsertedTs?: Date;
skipReason?: string;
lastCreatedOrMovedRunTs?: Date;
} = {
lastSeenTs: new Date(lastSeenTs),
};
Expand All @@ -69,6 +72,9 @@ export async function upsertNotionPageInConnectorsDb({
if (notionUrl) {
updateParams.notionUrl = notionUrl;
}
if (lastCreatedOrMovedRunTs) {
updateParams.lastCreatedOrMovedRunTs = new Date(lastCreatedOrMovedRunTs);
}

if (page) {
return page.update(updateParams);
Expand Down Expand Up @@ -123,6 +129,7 @@ export async function upsertNotionDatabaseInConnectorsDb({
title,
notionUrl,
skipReason,
lastCreatedOrMovedRunTs,
}: {
dataSourceInfo: DataSourceInfo;
notionDatabaseId: string;
Expand All @@ -132,6 +139,7 @@ export async function upsertNotionDatabaseInConnectorsDb({
title?: string | null;
notionUrl?: string | null;
skipReason?: string;
lastCreatedOrMovedRunTs?: number;
}): Promise<NotionDatabase> {
const connector = await Connector.findOne({
where: {
Expand All @@ -157,6 +165,7 @@ export async function upsertNotionDatabaseInConnectorsDb({
title?: string;
notionUrl?: string;
skipReason?: string;
lastCreatedOrMovedRunTs?: Date;
} = {
lastSeenTs: new Date(lastSeenTs),
};
Expand All @@ -175,6 +184,9 @@ export async function upsertNotionDatabaseInConnectorsDb({
if (notionUrl) {
updateParams.notionUrl = notionUrl;
}
if (lastCreatedOrMovedRunTs) {
updateParams.lastCreatedOrMovedRunTs = new Date(lastCreatedOrMovedRunTs);
}

if (database) {
return database.update(updateParams);
Expand Down
110 changes: 70 additions & 40 deletions connectors/src/connectors/notion/lib/parents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import memoize from "lodash.memoize";
import PQueue from "p-queue";

import {
getDatabaseChildrenOf,
Expand Down Expand Up @@ -78,34 +79,42 @@ export const getParents = memoize(

export async function updateAllParentsFields(
dataSourceConfig: DataSourceConfig,
pageOrDbs: (NotionPage | NotionDatabase)[],
createdOrMovedNotionPageIds: string[],
createdOrMovedNotionDatabaseIds: string[],
memoizationKey?: string
): Promise<number> {
/* Computing all descendants, then updating, ensures the field is updated only
once per page, limiting the load on the Datasource */
const pagesToUpdate = await getPagesToUpdate(pageOrDbs, dataSourceConfig);
const pageIdsToUpdate = await getPagesToUpdate(
createdOrMovedNotionPageIds,
createdOrMovedNotionDatabaseIds,
dataSourceConfig
);

// Update everybody's parents field. Use of a memoization key to control
// sharing memoization across updateAllParentsFields calls, which
// can be desired or not depending on the use case
for (let i = 0; i < pagesToUpdate.length; i += 16) {
const chunk = pagesToUpdate.slice(i, i + 16);
// updates are done in batches of 16
const promises = chunk.map(async (page) => {
const parents = await getParents(
dataSourceConfig,
page.notionPageId,
memoizationKey
);
await updateDocumentParentsField(
dataSourceConfig,
`notion-${page.notionPageId}`,
parents
);
});
await Promise.all(promises);
const q = new PQueue({ concurrency: 16 });
const promises: Promise<void>[] = [];
for (const pageId of pageIdsToUpdate) {
promises.push(
q.add(async () => {
const parents = await getParents(
dataSourceConfig,
pageId,
memoizationKey
);
await updateDocumentParentsField(
dataSourceConfig,
`notion-${pageId}`,
parents
);
})
);
}
return pagesToUpdate.length;

await Promise.all(promises);
return pageIdsToUpdate.size;
}

/** Get ids of all pages whose parents field should be updated: initial pages in
Expand All @@ -116,39 +125,60 @@ export async function updateAllParentsFields(
* updated
*/
async function getPagesToUpdate(
pageOrDbs: (NotionPage | NotionDatabase)[],
createdOrMovedNotionPageIds: string[],
createdOrMovedNotionDatabaseIds: string[],
dataSourceConfig: DataSourceConfig
): Promise<NotionPage[]> {
const pagesToUpdate: NotionPage[] = [];

let i = 0;
while (i < pageOrDbs.length) {
// Visit next document and if it's a page add it to update list
const pageOrDb = pageOrDbs[i++] as NotionPage | NotionDatabase;
const pageOrDbId = notionPageOrDbId(pageOrDb);
if ((pageOrDb as NotionPage).notionPageId) {
pagesToUpdate.push(pageOrDb as NotionPage);
): Promise<Set<string>> {
const pageIdsToUpdate: Set<string> = new Set([
...createdOrMovedNotionPageIds,
]);

// we need to look at all descendants of these objects, and add
// those that are pages to pageIdsToUpdate
const toProcess = new Set([
...createdOrMovedNotionPageIds,
...createdOrMovedNotionDatabaseIds,
]);

const shift = () => {
for (const pageOrDbId of toProcess) {
toProcess.delete(pageOrDbId);
return pageOrDbId;
}
};
const visited = new Set<string>();

while (toProcess.size > 0) {
const pageOrDbIdToProcess = shift() as string; // guaranteed to be defined as toUpdate.size > 0
visited.add(pageOrDbIdToProcess);

const pageChildren = await getPageChildrenOf(
dataSourceConfig,
pageOrDbIdToProcess
);

// add page children to pageIdsToUpdate
for (const child of pageChildren) {
const childId = notionPageOrDbId(child);
pageIdsToUpdate.add(childId);
}

// Get children of the document
const pageChildren = await getPageChildrenOf(dataSourceConfig, pageOrDbId);
const databaseChildren = await getDatabaseChildrenOf(
dataSourceConfig,
pageOrDbId
pageOrDbIdToProcess
);

// If they haven't yet been visited, add them to documents visited
// and to the list of documents whose children should be fetched
// add all page and DB children to toProcess
for (const child of [...pageChildren, ...databaseChildren]) {
if (
!pageOrDbs.some((d) => notionPageOrDbId(d) === notionPageOrDbId(child))
) {
pageOrDbs.push(child);
if (visited.has(notionPageOrDbId(child))) {
continue;
}
const childId = notionPageOrDbId(child);
toProcess.add(childId);
}
}

return pagesToUpdate;
return pageIdsToUpdate;
}

function notionPageOrDbId(pageOrDb: NotionPage | NotionDatabase): string {
Expand Down
Loading

0 comments on commit 8e87493

Please sign in to comment.