Skip to content

Commit

Permalink
Handle large Confluence workflows (#4214)
Browse files Browse the repository at this point in the history
* Handle large Confluence workflows

* Address comment from review.
  • Loading branch information
flvndvd authored Mar 8, 2024
1 parent e8a4940 commit 348e0de
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
18 changes: 12 additions & 6 deletions connectors/src/connectors/confluence/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,12 @@ export async function confluenceGetTopLevelPageIdsActivity({
pageCursor
);

localLogger.info("Found Confluence top-level pages in space.", {
topLevelPagesCount: childPageIds.length,
});
localLogger.info(
{
topLevelPagesCount: childPageIds.length,
},
"Found Confluence top-level pages in space."
);

return { topLevelPageIds: childPageIds, nextPageCursor };
}
Expand Down Expand Up @@ -554,9 +557,12 @@ export async function confluenceRemoveSpaceActivity(
},
});

localLogger.info("Delete Confluence space", {
numberOfPages: allPages.length,
});
localLogger.info(
{
numberOfPages: allPages.length,
},
"Delete Confluence space"
);

for (const page of allPages) {
await deletePage(connectorId, page.pageId, dataSourceConfig);
Expand Down
25 changes: 21 additions & 4 deletions connectors/src/connectors/confluence/temporal/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ModelId } from "@dust-tt/types";
import type { WorkflowInfo } from "@temporalio/workflow";
import {
continueAsNew,
executeChild,
proxyActivities,
setHandler,
Expand Down Expand Up @@ -39,6 +40,11 @@ const {
startToCloseTimeout: "20 minutes",
});

// Set a conservative threshold to start a new workflow and
// avoid exceeding Temporal's max workflow size limit,
// since a Confluence page can have an unbounded number of pages.
const TEMPORAL_WORKFLOW_MAX_HISTORY_LENGTH = 30_000;

export async function confluenceSyncWorkflow({
connectorId,
spaceIdsToBrowse,
Expand Down Expand Up @@ -205,7 +211,7 @@ export async function confluenceSpaceSyncWorkflow(
spaceName,
confluenceCloudId,
visitedAtMs,
topLevelPageId: pageId,
topLevelPageIds: [pageId],
},
],
memo,
Expand All @@ -232,7 +238,7 @@ interface confluenceSyncTopLevelChildPagesWorkflowInput {
isBatchSync: boolean;
spaceId: string;
spaceName: string;
topLevelPageId: string;
topLevelPageIds: string[];
visitedAtMs: number;
}

Expand All @@ -245,8 +251,8 @@ interface confluenceSyncTopLevelChildPagesWorkflowInput {
export async function confluenceSyncTopLevelChildPagesWorkflow(
params: confluenceSyncTopLevelChildPagesWorkflowInput
) {
const { spaceName, topLevelPageId, visitedAtMs } = params;
const stack = [topLevelPageId];
const { spaceName, topLevelPageIds, visitedAtMs } = params;
const stack = [...topLevelPageIds];

while (stack.length > 0) {
const currentPageId = stack.pop();
Expand Down Expand Up @@ -278,6 +284,17 @@ export async function confluenceSyncTopLevelChildPagesWorkflow(

stack.push(...childPageIds);
} while (nextPageCursor !== null);

// If additional pages are pending and workflow limits are reached, continue in a new workflow.
if (
stack.length > 0 &&
workflowInfo().historyLength > TEMPORAL_WORKFLOW_MAX_HISTORY_LENGTH
) {
await continueAsNew<typeof confluenceSyncTopLevelChildPagesWorkflow>({
...params,
topLevelPageIds: stack,
});
}
}
}

Expand Down

0 comments on commit 348e0de

Please sign in to comment.