diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index 5c78659cf92b3..add46a24f4223 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -2,60 +2,18 @@ import type { AdminSuccessResponseType, ConfluenceCommandType, ConfluenceUpsertPageResponseType, - ModelId, } from "@dust-tt/types"; import assert from "assert"; import fs from "fs/promises"; -import { - getConfluencePageParentIds, - getSpaceHierarchy, -} from "@connectors/connectors/confluence/lib/hierarchy"; -import { - confluenceGetSpaceNameActivity, - fetchConfluenceConfigurationActivity, - getConfluenceClient, - upsertConfluencePageInDb, - upsertConfluencePageToDataSource, -} from "@connectors/connectors/confluence/temporal/activities"; -import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; -import { ConfluencePage } from "@connectors/lib/models/confluence"; +import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; import { default as topLogger } from "@connectors/logger/logger"; -import { ConnectorResource } from "@connectors/resources/connector_resource"; interface cachedSpace { spaceName: string | null; spaceHierarchy: Record; } -async function cacheSpace( - cachedSpaces: Record, - { - confluenceCloudId, - connectorId, - spaceId, - }: { - confluenceCloudId: string; - connectorId: ModelId; - spaceId: string; - } -): Promise { - const cachedSpace = cachedSpaces[spaceId]; - if (cachedSpace) { - return cachedSpace; - } - const spaceName = await confluenceGetSpaceNameActivity({ - spaceId, - confluenceCloudId, - connectorId, - }); - const spaceHierarchy = spaceName - ? await getSpaceHierarchy(connectorId, spaceId) - : {}; // not fetching if we couldn't get the space from Confluence API anyway - cachedSpaces[spaceId] = { spaceName, spaceHierarchy }; - return { spaceName, spaceHierarchy }; -} - export const confluence = async ({ command, args, @@ -71,73 +29,12 @@ export const confluence = async ({ if (!args.pageId) { throw new Error("Missing --pageId argument"); } - const connectorId = args.connectorId; - const pageId = args.pageId; - - const connector = await ConnectorResource.fetchById(connectorId); - if (!connector) { - throw new Error("Connector not found."); - } - const dataSourceConfig = dataSourceConfigFromConnector(connector); - const confluenceConfig = - await fetchConfluenceConfigurationActivity(connectorId); - - const loggerArgs = { + const { connectorId, pageId } = args; + const success = await confluenceUpsertPageWithFullParentsActivity({ connectorId, - dataSourceId: dataSourceConfig.dataSourceId, pageId, - workspaceId: dataSourceConfig.workspaceId, - }; - const localLogger = logger.child(loggerArgs); - const visitedAtMs = new Date().getTime(); - - const pageInDb = await ConfluencePage.findOne({ - attributes: ["parentId", "skipReason"], - where: { connectorId, pageId }, }); - if (pageInDb && pageInDb.skipReason !== null) { - localLogger.info("Confluence page skipped."); - return { success: false }; - } - - const client = await getConfluenceClient( - { cloudId: confluenceConfig?.cloudId }, - connector - ); - - const page = await client.getPageById(pageId); - if (!page) { - localLogger.info("Confluence page not found."); - return { success: false }; - } - const space = await client.getSpaceById(page.spaceId); - if (!space) { - localLogger.info("Confluence space not found."); - return { success: false }; - } - - const cachedHierarchy = await getSpaceHierarchy( - connectorId, - page.spaceId - ); - const parentIds = await getConfluencePageParentIds( - connectorId, - { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, - cachedHierarchy - ); - - localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource( - page, - space.name, - parentIds, - confluenceConfig, - "batch", - dataSourceConfig, - loggerArgs - ); - await upsertConfluencePageInDb(connector.id, page, visitedAtMs); - return { success: true }; + return { success }; } case "upsert-pages": { if (!args.connectorId) { @@ -166,79 +63,26 @@ export const confluence = async ({ return entry[keyInFile]; }); - // fetching the pages in DB - const pagesInDb = Object.fromEntries( - ( - await ConfluencePage.findAll({ - attributes: ["pageId", "skipReason"], - where: { connectorId, pageId: pageIds }, - }) - ).map((page) => [page.pageId, page]) - ); - - const connector = await ConnectorResource.fetchById(connectorId); - assert(connector !== null, "Connector not found."); - const dataSourceConfig = dataSourceConfigFromConnector(connector); - const confluenceConfig = - await fetchConfluenceConfigurationActivity(connectorId); - const client = await getConfluenceClient( - { cloudId: confluenceConfig?.cloudId }, - connector - ); - - const cachedSpaces: Record = {}; - const visitedAtMs = new Date().getTime(); + let allSuccesses = true; + const cachedSpaceNames: Record = {}; + const cachedSpaceHierarchies: Record< + string, + Record + > = {}; for (const pageId of pageIds) { - const loggerArgs = { + const success = await confluenceUpsertPageWithFullParentsActivity({ connectorId, - dataSourceId: dataSourceConfig.dataSourceId, pageId, - workspaceId: dataSourceConfig.workspaceId, - }; - const localLogger = logger.child(loggerArgs); - - const pageInDb = pagesInDb[pageId]; - if (pageInDb && pageInDb.skipReason !== null) { - localLogger.info("Confluence page skipped."); - continue; - } - - const page = await client.getPageById(pageId); - if (!page) { - localLogger.info("Confluence page not found."); - continue; - } - // fetching the space if not already cached - const { spaceName, spaceHierarchy } = await cacheSpace(cachedSpaces, { - connectorId, - confluenceCloudId: confluenceConfig?.cloudId, - spaceId: page.spaceId, + cachedSpaceNames, + cachedSpaceHierarchies, }); - if (!spaceName) { - localLogger.info("Confluence space not found."); - continue; + if (!success) { + logger.error({ pageId }, "Failed to upsert page"); + allSuccesses = false; } - - const parentIds = await getConfluencePageParentIds( - connectorId, - { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, - spaceHierarchy - ); - - localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource( - page, - spaceName, - parentIds, - confluenceConfig, - "batch", - dataSourceConfig, - loggerArgs - ); - await upsertConfluencePageInDb(connector.id, page, visitedAtMs); } - return { success: true }; + return { success: allSuccesses }; } default: diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index de68546773840..d4ee8541878ce 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -321,6 +321,10 @@ interface ConfluenceCheckAndUpsertPageActivityInput { visitedAtMs: number; } +/** + * Upsert a Confluence page without its full parents. + * Operates greedily by stopping if the page is restricted or if there is a version match (unless the page was moved, in this case we have to upsert because the parents have changed). + */ export async function confluenceCheckAndUpsertPageActivity({ connectorId, isBatchSync, @@ -428,6 +432,104 @@ export async function confluenceCheckAndUpsertPageActivity({ return true; } +/** + * Upsert a Confluence page with its full parent hierarchy. + * Expensive operation, it should be reserved to admin actions on a limited set of pages. + */ +export async function confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + cachedSpaceNames = {}, + cachedSpaceHierarchies = {}, +}: { + connectorId: ModelId; + pageId: string; + cachedSpaceNames?: Record; + cachedSpaceHierarchies?: Record>; +}): Promise { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("Connector not found."); + } + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const confluenceConfig = + await fetchConfluenceConfigurationActivity(connectorId); + + const loggerArgs = { + connectorId, + dataSourceId: dataSourceConfig.dataSourceId, + pageId, + workspaceId: dataSourceConfig.workspaceId, + }; + const localLogger = logger.child(loggerArgs); + const visitedAtMs = new Date().getTime(); + + const pageInDb = await ConfluencePage.findOne({ + attributes: ["parentId", "skipReason"], + where: { connectorId, pageId }, + }); + if (pageInDb && pageInDb.skipReason !== null) { + localLogger.info("Confluence page skipped."); + return false; + } + + const client = await getConfluenceClient( + { cloudId: confluenceConfig?.cloudId }, + connector + ); + + const hasReadRestrictions = await pageHasReadRestrictions(client, pageId); + if (hasReadRestrictions) { + localLogger.info("Skipping restricted Confluence page."); + return false; + } + + const page = await client.getPageById(pageId); + if (!page) { + localLogger.info("Confluence page not found."); + return false; + } + + let spaceName = cachedSpaceNames[page.spaceId]; + if (!spaceName) { + const space = await client.getSpaceById(page.spaceId); + if (!space) { + localLogger.info("Confluence space not found."); + return false; + } + cachedSpaceNames[page.spaceId] = space.name; + spaceName = space.name; + } + + if (!cachedSpaceHierarchies[page.spaceId]) { + cachedSpaceHierarchies[page.spaceId] = await getSpaceHierarchy( + connectorId, + page.spaceId + ); + } + + const parentIds = await getConfluencePageParentIds( + connectorId, + { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, + cachedSpaceHierarchies[page.spaceId] + ); + + localLogger.info("Upserting Confluence page."); + await upsertConfluencePageToDataSource( + page, + spaceName, + parentIds, + confluenceConfig, + "batch", + dataSourceConfig, + loggerArgs + ); + + await upsertConfluencePageInDb(connector.id, page, visitedAtMs); + + return true; +} + export async function confluenceGetActiveChildPageRefsActivity({ connectorId, parentPageId,