diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index cf24f26b34a7..b1153b97549e 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -412,9 +412,12 @@ export async function confluenceGetTopLevelPageIdsActivity({ pageCursor ); - localLogger.info("Found Confluence top-level pages in space.", { - topLevelPagesCount: childPageIds.length, - }); + localLogger.info( + { + topLevelPagesCount: childPageIds.length, + }, + "Found Confluence top-level pages in space." + ); return { topLevelPageIds: childPageIds, nextPageCursor }; } @@ -554,9 +557,12 @@ export async function confluenceRemoveSpaceActivity( }, }); - localLogger.info("Delete Confluence space", { - numberOfPages: allPages.length, - }); + localLogger.info( + { + numberOfPages: allPages.length, + }, + "Delete Confluence space" + ); for (const page of allPages) { await deletePage(connectorId, page.pageId, dataSourceConfig); diff --git a/connectors/src/connectors/confluence/temporal/workflows.ts b/connectors/src/connectors/confluence/temporal/workflows.ts index 7c5c5eebf326..67b8c1344dd6 100644 --- a/connectors/src/connectors/confluence/temporal/workflows.ts +++ b/connectors/src/connectors/confluence/temporal/workflows.ts @@ -1,6 +1,7 @@ import type { ModelId } from "@dust-tt/types"; import type { WorkflowInfo } from "@temporalio/workflow"; import { + continueAsNew, executeChild, proxyActivities, setHandler, @@ -39,6 +40,8 @@ const { startToCloseTimeout: "20 minutes", }); +const MAX_HISTORY_LENGTH = 30_000; + export async function confluenceSyncWorkflow({ connectorId, spaceIdsToBrowse, @@ -205,7 +208,7 @@ export async function confluenceSpaceSyncWorkflow( spaceName, confluenceCloudId, visitedAtMs, - topLevelPageId: pageId, + topLevelPageIds: [pageId], }, ], memo, @@ -232,7 +235,7 @@ interface confluenceSyncTopLevelChildPagesWorkflowInput { isBatchSync: boolean; spaceId: string; spaceName: string; - topLevelPageId: string; + topLevelPageIds: string[]; visitedAtMs: number; } @@ -245,8 +248,8 @@ interface confluenceSyncTopLevelChildPagesWorkflowInput { export async function confluenceSyncTopLevelChildPagesWorkflow( params: confluenceSyncTopLevelChildPagesWorkflowInput ) { - const { spaceName, topLevelPageId, visitedAtMs } = params; - const stack = [topLevelPageId]; + const { spaceName, topLevelPageIds, visitedAtMs } = params; + const stack = [...topLevelPageIds]; while (stack.length > 0) { const currentPageId = stack.pop(); @@ -278,6 +281,14 @@ export async function confluenceSyncTopLevelChildPagesWorkflow( stack.push(...childPageIds); } while (nextPageCursor !== null); + + // If additional pages are pending and workflow limits are reached, continue in a new workflow. + if (stack.length > 0 && workflowInfo().historyLength > MAX_HISTORY_LENGTH) { + await continueAsNew({ + ...params, + topLevelPageIds: stack, + }); + } } }