Skip to content

Commit

Permalink
enh(notion connector): avoid long-running activities (#1367)
Browse files Browse the repository at this point in the history
  • Loading branch information
fontanierh authored Sep 11, 2023
1 parent c330da5 commit 9ede5ab
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 123 deletions.
85 changes: 68 additions & 17 deletions connectors/src/connectors/notion/lib/notion_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
GetPageResponse,
PageObjectResponse,
PartialBlockObjectResponse,
QueryDatabaseResponse,
RichTextItemResponse,
SearchResponse,
} from "@notionhq/client/build/src/api-endpoints";
Expand Down Expand Up @@ -209,23 +210,6 @@ export async function getPagesAndDatabasesEditedSince(
// through its pages.
try {
editedDbs[pageOrDb.id] = lastEditedTime;

// Note: we don't want to optimize this step due to Notion not always returning all the
// dbs (so if we miss it at initial sync and it gets touched we will miss all its old
// pages here again. It's a lot of additional work but it helps catching as much as we
// can from Notion). The caller of this function filters the edited page based on our
// knowledge of it in DB so this won't create extraneous upserts.
for await (const child of iteratePaginatedAPIWithRetries(
notionClient.databases.query,
{
database_id: pageOrDb.id,
},
localLogger.child({ databaseId: pageOrDb.id })
)) {
if (isFullPage(child)) {
editedPages[child.id] = lastEditedTime;
}
}
} catch (e) {
if (
APIResponseError.isAPIResponseError(e) &&
Expand All @@ -252,6 +236,73 @@ export async function getPagesAndDatabasesEditedSince(
};
}

export async function getDatabaseChildPages({
notionAccessToken,
databaseId,
loggerArgs,
cursor,
retry = { retries: 5, backoffFactor: 2 },
}: {
notionAccessToken: string;
databaseId: string;
loggerArgs: Record<string, string | number>;
cursor: string | null;
retry?: { retries: number; backoffFactor: number };
}): Promise<{
pages: { id: string; lastEditedTs: number }[];
nextCursor: string | null;
}> {
const localLogger = logger.child(loggerArgs);

const notionClient = new Client({ auth: notionAccessToken });
let resultsPage: QueryDatabaseResponse | null = null;
const pages: Record<string, number> = {};

const tries = 0;
while (tries < retry.retries) {
const tryLogger = localLogger.child({ tries, maxTries: retry.retries });
tryLogger.info("Fetching result page from Notion API.");
try {
resultsPage = await notionClient.databases.query({
database_id: databaseId,
start_cursor: cursor || undefined,
});
for (const r of resultsPage.results) {
if (isFullPage(r)) {
const lastEditedTime = new Date(r.last_edited_time).getTime();
pages[r.id] = lastEditedTime;
}
}

tryLogger.info(
{ count: resultsPage.results.length },
"Received result page from Notion API."
);

return {
pages: Object.entries(pages).map(([id, lastEditedTs]) => ({
id,
lastEditedTs,
})),
nextCursor: resultsPage.has_more ? resultsPage.next_cursor : null,
};
} catch (e) {
tryLogger.error(
{ error: e },
"Error fetching result page from Notion API."
);
if (tries >= retry.retries) {
throw e;
}
const sleepTime = 500 * retry.backoffFactor ** tries;
tryLogger.info({ sleepTime }, "Sleeping before retrying.");
await new Promise((resolve) => setTimeout(resolve, sleepTime));
}
}

throw new Error("Unreachable.");
}

const NOTION_UNAUTHORIZED_ACCESS_ERROR_CODES = [
"object_not_found",
"unauthorized",
Expand Down
130 changes: 129 additions & 1 deletion connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
isDuringGarbageCollectStartWindow,
} from "@connectors/connectors/notion/lib/garbage_collect";
import {
getDatabaseChildPages,
getPagesAndDatabasesEditedSince,
getParsedDatabase,
getParsedPage,
Expand Down Expand Up @@ -45,6 +46,129 @@ const logger = mainLogger.child({ provider: "notion" });

const GARBAGE_COLLECTION_INTERVAL_HOURS = 12;

export async function getDatabaseChildPagesActivity({
databaseId,
dataSourceInfo,
accessToken,
cursor,
loggerArgs,
excludeUpToDatePages,
}: {
databaseId: string;
dataSourceInfo: DataSourceInfo;
accessToken: string;
cursor: string | null;
loggerArgs: Record<string, string | number>;
excludeUpToDatePages: boolean;
}): Promise<{
pageIds: string[];
nextCursor: string | null;
}> {
const localLoggerArgs = {
...loggerArgs,
databaseId,
dataSourceName: dataSourceInfo.dataSourceName,
workspaceId: dataSourceInfo.workspaceId,
};
const localLogger = logger.child(localLoggerArgs);

const connector = await Connector.findOne({
where: {
type: "notion",
workspaceId: dataSourceInfo.workspaceId,
dataSourceName: dataSourceInfo.dataSourceName,
},
});
if (!connector) {
throw new Error("Could not find connector");
}

let res;
try {
res = await getDatabaseChildPages({
notionAccessToken: accessToken,
databaseId,
loggerArgs: localLoggerArgs,
cursor,
});
} catch (e) {
// Sometimes a cursor will consistently fail with 500.
// In this case, there is not much we can do, so we just give up and move on.
// Notion workspaces are resynced daily so nothing is lost forever.
const potentialNotionError = e as {
body: unknown;
code: string;
status: number;
};
if (
potentialNotionError.code === "internal_server_error" &&
potentialNotionError.status === 500
) {
if (Context.current().info.attempt > 20) {
localLogger.error(
{
error: potentialNotionError,
attempt: Context.current().info.attempt,
},
"Failed to get Notion database children result page with cursor. Giving up and moving on"
);
return {
pageIds: [],
nextCursor: null,
};
}
}

throw e;
}

const { pages, nextCursor } = res;

if (!excludeUpToDatePages) {
return {
pageIds: pages.map((p) => p.id),
nextCursor,
};
}

// We exclude pages that we have already seen since their lastEditedTs we recieved from
// getPagesEditedSince.
const existingPages = await NotionPage.findAll({
where: {
notionPageId: pages.map((p) => p.id),
connectorId: connector.id,
},
attributes: ["notionPageId", "lastSeenTs"],
});
if (existingPages.length > 0) {
localLogger.info({ count: existingPages.length }, "Found existing pages");
}

const lastSeenTsByPageId = new Map<string, number>();
for (const page of existingPages) {
lastSeenTsByPageId.set(page.notionPageId, page.lastSeenTs.getTime());
}
const filteredPageIds = pages
.filter(({ id, lastEditedTs }) => {
const ts = lastSeenTsByPageId.get(id);
return !ts || ts < lastEditedTs;
})
.map((p) => p.id);

localLogger.info(
{
initial_count: filteredPageIds.length,
filtered_count: filteredPageIds.length - filteredPageIds.length,
},
"Filtered out databases already up to date."
);

return {
pageIds: filteredPageIds,
nextCursor,
};
}

export async function notionGetToSyncActivity(
dataSourceInfo: DataSourceInfo,
accessToken: string,
Expand Down Expand Up @@ -150,7 +274,11 @@ export async function notionGetToSyncActivity(
},
attributes: ["notionPageId", "lastSeenTs"],
});
localLogger.info({ count: existingPages.length }, "Found existing pages");

if (existingPages.length > 0) {
localLogger.info({ count: existingPages.length }, "Found existing pages");
}

const lastSeenTsByPageId = new Map<string, number>();
for (const page of existingPages) {
lastSeenTsByPageId.set(page.notionPageId, page.lastSeenTs.getTime());
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/connectors/notion/temporal/config.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export const WORKFLOW_VERSION = 18;
export const WORKFLOW_VERSION = 19;
export const QUEUE_NAME = `notion-queue-v${WORKFLOW_VERSION}`;
1 change: 0 additions & 1 deletion connectors/src/connectors/notion/temporal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export async function runNotionWorker() {
workflowsPath: require.resolve("./workflows"),
activities,
taskQueue: QUEUE_NAME,
maxConcurrentActivityTaskExecutions: 3,
connection,
namespace,
interceptors: {
Expand Down
Loading

0 comments on commit 9ede5ab

Please sign in to comment.