diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts new file mode 100644 index 000000000000..e3e7818bfc5c --- /dev/null +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -0,0 +1,121 @@ +import type { + AdminSuccessResponseType, + ConfluenceCommandType, + ConfluenceUpsertPageResponseType, +} from "@dust-tt/types"; +import assert from "assert"; +import fs from "fs/promises"; + +import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config"; +import { + confluenceUpsertPagesWithFullParentsWorkflow, + confluenceUpsertPageWithFullParentsWorkflow, +} from "@connectors/connectors/confluence/temporal/workflows"; +import { getTemporalClient } from "@connectors/lib/temporal"; +import { default as topLogger } from "@connectors/logger/logger"; + +export const confluence = async ({ + command, + args, +}: ConfluenceCommandType): Promise< + AdminSuccessResponseType | ConfluenceUpsertPageResponseType +> => { + const logger = topLogger.child({ majorCommand: "confluence", command, args }); + switch (command) { + case "upsert-page": { + if (!args.connectorId) { + throw new Error("Missing --connectorId argument"); + } + if (!args.pageId) { + throw new Error("Missing --pageId argument"); + } + const { connectorId, pageId } = args; + + const client = await getTemporalClient(); + const workflow = await client.workflow.start( + confluenceUpsertPageWithFullParentsWorkflow, + { + args: [{ connectorId, pageId }], + taskQueue: QUEUE_NAME, + workflowId: `confluence-upsert-page-${connectorId}-${pageId}`, + searchAttributes: { connectorId: [connectorId] }, + memo: { connectorId }, + } + ); + + const { workflowId } = workflow; + const temporalNamespace = process.env.TEMPORAL_NAMESPACE; + if (!temporalNamespace) { + logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`); + } else { + logger.info( + `[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + ); + } + return { + workflowId, + workflowUrl: temporalNamespace + ? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + : undefined, + }; + } + case "upsert-pages": { + if (!args.connectorId) { + throw new Error("Missing --connectorId argument"); + } + if (!args.file) { + throw new Error("Missing --file argument"); + } + if (!args.keyInFile) { + throw new Error("Missing --keyInFile argument"); + } + const connectorId = args.connectorId; + const file = args.file; + const keyInFile = args.keyInFile; + + // parsing the JSON file + const fileContent = await fs.readFile(file, "utf-8"); + const jsonArray = JSON.parse(fileContent); + assert(Array.isArray(jsonArray), "The file content is not an array."); + + const pageIds = jsonArray.map((entry) => { + assert( + keyInFile in entry, + `Key "${keyInFile}" not found in entry ${JSON.stringify(entry)}` + ); + return entry[keyInFile]; + }); + + const client = await getTemporalClient(); + const workflow = await client.workflow.start( + confluenceUpsertPagesWithFullParentsWorkflow, + { + args: [{ connectorId, pageIds }], + taskQueue: QUEUE_NAME, + workflowId: `confluence-upsert-pages-${connectorId}`, + searchAttributes: { connectorId: [connectorId] }, + memo: { connectorId }, + } + ); + + const { workflowId } = workflow; + const temporalNamespace = process.env.TEMPORAL_NAMESPACE; + if (!temporalNamespace) { + logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`); + } else { + logger.info( + `[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + ); + } + return { + workflowId, + workflowUrl: temporalNamespace + ? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + : undefined, + }; + } + + default: + throw new Error("Unknown Confluence command: " + command); + } +}; diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index eed69a283be9..af5104ea2afd 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -22,6 +22,7 @@ import { makePageInternalId } from "@connectors/connectors/confluence/lib/intern import { makeConfluenceDocumentUrl } from "@connectors/connectors/confluence/temporal/workflow_ids"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { concurrentExecutor } from "@connectors/lib/async_utils"; +import type { UpsertToDataSourceParams } from "@connectors/lib/data_sources"; import { deleteFromDataSource, renderDocumentTitleAndContent, @@ -92,7 +93,7 @@ export async function getConfluenceClient( ) { const { cloudId, connectorId } = config; - // Ensure connector is fetched if not directly provided. + // Ensure the connector is fetched if not directly provided. const effectiveConnector = connector ?? (connectorId ? await fetchConfluenceConnector(connectorId) : undefined); @@ -220,6 +221,90 @@ export async function markPageHasVisited({ ); } +interface ConfluenceUpsertPageInput { + page: NonNullable>>; + spaceName: string; + parents: string[]; + confluenceConfig: ConfluenceConfiguration; + syncType?: UpsertToDataSourceParams["upsertContext"]["sync_type"]; + dataSourceConfig: DataSourceConfig; + loggerArgs: Record; +} + +async function upsertConfluencePageToDataSource({ + page, + spaceName, + parents, + confluenceConfig, + syncType = "batch", + dataSourceConfig, + loggerArgs, +}: ConfluenceUpsertPageInput) { + const localLogger = logger.child(loggerArgs); + + const markdown = turndownService.turndown(page.body.storage.value); + const pageCreatedAt = new Date(page.createdAt); + const lastPageVersionCreatedAt = new Date(page.version.createdAt); + + if (markdown) { + const renderedMarkdown = await renderMarkdownSection( + dataSourceConfig, + markdown + ); + const renderedPage = await renderDocumentTitleAndContent({ + dataSourceConfig, + title: `Page ${page.title} Space ${spaceName}`, + createdAt: pageCreatedAt, + updatedAt: lastPageVersionCreatedAt, + content: renderedMarkdown, + }); + + const documentId = makePageInternalId(page.id); + const documentUrl = makeConfluenceDocumentUrl({ + baseUrl: confluenceConfig.url, + suffix: page._links.tinyui, + }); + + // We log the number of labels to help define the importance of labels in the future. + if (page.labels.results.length > 0) { + localLogger.info( + { labelsCount: page.labels.results.length }, + "Confluence page has labels." + ); + } + + // Limit to 10 custom tags. + const customTags = page.labels.results + .slice(0, 10) + .map((l) => `labels:${l.id}`); + + const tags = [ + `createdAt:${pageCreatedAt.getTime()}`, + `space:${spaceName}`, + `title:${page.title}`, + `updatedAt:${lastPageVersionCreatedAt.getTime()}`, + `version:${page.version.number}`, + ...customTags, + ]; + + await upsertToDatasource({ + dataSourceConfig, + documentContent: renderedPage, + documentId, + documentUrl, + loggerArgs, + parents, + parentId: parents[1], + tags, + timestampMs: lastPageVersionCreatedAt.getTime(), + upsertContext: { sync_type: syncType }, + title: page.title, + mimeType: "application/vnd.dust.confluence.page", + async: true, + }); + } +} + async function upsertConfluencePageInDb( connectorId: ModelId, page: ConfluencePageWithBodyType, @@ -247,6 +332,11 @@ interface ConfluenceCheckAndUpsertPageActivityInput { visitedAtMs: number; } +/** + * Upsert a Confluence page without its full parents. + * Operates greedily by stopping if the page is restricted or if there is a version match + * (unless the page was moved, in this case, we have to upsert because the parents have changed). + */ export async function confluenceCheckAndUpsertPageActivity({ connectorId, isBatchSync, @@ -303,15 +393,15 @@ export async function confluenceCheckAndUpsertPageActivity({ return false; } - // Check version. + // Check the version. const isSameVersion = pageAlreadyInDb && pageAlreadyInDb.version === pageRef.version; - // Check if page was moved. Version is not bumped when a page is moved. + // Check whether the page was moved (the version is not bumped when a page is moved). const pageWasMoved = pageAlreadyInDb && pageAlreadyInDb.parentId !== pageRef.parentId; - // Only index in DB if the page does not exis, has been moved or we want to upsert. + // Only index in DB if the page does not exist, has been moved, or we want to upsert. if (isSameVersion && !forceUpsert && !pageWasMoved) { // Simply record that we visited the page. await markPageHasVisited({ @@ -334,74 +424,119 @@ export async function confluenceCheckAndUpsertPageActivity({ } localLogger.info("Upserting Confluence page."); + await upsertConfluencePageToDataSource({ + page, + spaceName, + // Parent Ids will be computed after all page imports within the space have been completed. + parents: [makePageInternalId(page.id), HiddenContentNodeParentId], + confluenceConfig, + syncType: isBatchSync ? "batch" : "incremental", + dataSourceConfig, + loggerArgs, + }); - const markdown = turndownService.turndown(page.body.storage.value); - const pageCreatedAt = new Date(page.createdAt); - const lastPageVersionCreatedAt = new Date(page.version.createdAt); + localLogger.info("Upserting Confluence page in DB."); + await upsertConfluencePageInDb(connector.id, page, visitedAtMs); - if (markdown) { - const renderedMarkdown = await renderMarkdownSection( - dataSourceConfig, - markdown - ); - const renderedPage = await renderDocumentTitleAndContent({ - dataSourceConfig, - title: `Page ${page.title} Space ${spaceName}`, - createdAt: pageCreatedAt, - updatedAt: lastPageVersionCreatedAt, - content: renderedMarkdown, - }); + return true; +} - const documentId = makePageInternalId(pageId); - const documentUrl = makeConfluenceDocumentUrl({ - baseUrl: confluenceConfig.url, - suffix: page._links.tinyui, - }); +/** + * Upsert a Confluence page with its full parent hierarchy. + * Expensive operation, it should be reserved to admin actions on a limited set of pages. + */ +export async function confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + cachedSpaceNames = {}, + cachedSpaceHierarchies = {}, +}: { + connectorId: ModelId; + pageId: string; + cachedSpaceNames?: Record; + cachedSpaceHierarchies?: Record< + string, + Awaited> + >; +}): Promise { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("Connector not found."); + } + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const confluenceConfig = + await fetchConfluenceConfigurationActivity(connectorId); - // We log the number of labels to help define the importance of labels in the future. - if (page.labels.results.length > 0) { - localLogger.info( - { labelsCount: page.labels.results.length }, - "Confluence page has labels." - ); - } + const loggerArgs = { + connectorId, + dataSourceId: dataSourceConfig.dataSourceId, + pageId, + workspaceId: dataSourceConfig.workspaceId, + }; + const localLogger = logger.child(loggerArgs); + const visitedAtMs = new Date().getTime(); - // Limit to 10 custom tags. - const customTags = page.labels.results - .slice(0, 10) - .map((l) => `labels:${l.id}`); + const pageInDb = await ConfluencePage.findOne({ + attributes: ["parentId", "skipReason"], + where: { connectorId, pageId }, + }); + if (pageInDb && pageInDb.skipReason !== null) { + localLogger.info("Confluence page skipped."); + return false; + } - const tags = [ - `createdAt:${pageCreatedAt.getTime()}`, - `space:${spaceName}`, - `title:${page.title}`, - `updatedAt:${lastPageVersionCreatedAt.getTime()}`, - `version:${page.version.number}`, - ...customTags, - ]; + const client = await getConfluenceClient( + { cloudId: confluenceConfig?.cloudId }, + connector + ); - await upsertToDatasource({ - dataSourceConfig, - documentContent: renderedPage, - documentId, - documentUrl, - loggerArgs, - // Parent Ids will be computed after all page imports within the space have been completed. - parents: [documentId, HiddenContentNodeParentId], - parentId: HiddenContentNodeParentId, - tags, - timestampMs: lastPageVersionCreatedAt.getTime(), - upsertContext: { - sync_type: isBatchSync ? "batch" : "incremental", - }, - title: page.title, - mimeType: "application/vnd.dust.confluence.page", - async: true, - }); + const hasReadRestrictions = await pageHasReadRestrictions(client, pageId); + if (hasReadRestrictions) { + localLogger.info("Skipping restricted Confluence page."); + return false; } - localLogger.info("Upserting Confluence page in DB."); + const page = await client.getPageById(pageId); + if (!page) { + localLogger.info("Confluence page not found."); + return false; + } + let spaceName = cachedSpaceNames[page.spaceId]; + if (!spaceName) { + const space = await client.getSpaceById(page.spaceId); + if (!space) { + localLogger.info("Confluence space not found."); + return false; + } + cachedSpaceNames[page.spaceId] = space.name; + spaceName = space.name; + } + + if (!cachedSpaceHierarchies[page.spaceId]) { + cachedSpaceHierarchies[page.spaceId] = await getSpaceHierarchy( + connectorId, + page.spaceId + ); + } + + const parents = await getConfluencePageParentIds( + connectorId, + { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, + cachedSpaceHierarchies[page.spaceId] + ); + + localLogger.info("Upserting Confluence page."); + await upsertConfluencePageToDataSource({ + page, + spaceName, + parents, + confluenceConfig, + dataSourceConfig, + loggerArgs, + }); + + localLogger.info("Upserting Confluence page in DB."); await upsertConfluencePageInDb(connector.id, page, visitedAtMs); return true; @@ -757,7 +892,7 @@ export async function confluenceGetReportPersonalActionActivity( "Error while reporting Confluence account." ); - // If token has been revoked, return false. + // If the token has been revoked, return false. if (err instanceof ExternalOAuthTokenError) { return false; } diff --git a/connectors/src/connectors/confluence/temporal/workflows.ts b/connectors/src/connectors/confluence/temporal/workflows.ts index 88d58be69f9d..47b260815274 100644 --- a/connectors/src/connectors/confluence/temporal/workflows.ts +++ b/connectors/src/connectors/confluence/temporal/workflows.ts @@ -31,6 +31,7 @@ const { confluenceGetActiveChildPageRefsActivity, confluenceGetRootPageRefsActivity, fetchConfluenceSpaceIdsForConnectorActivity, + confluenceUpsertPageWithFullParentsActivity, confluenceGetReportPersonalActionActivity, fetchConfluenceUserAccountAndConnectorIdsActivity, @@ -412,3 +413,39 @@ export async function confluencePersonalDataReportingWorkflow() { } } } + +export async function confluenceUpsertPageWithFullParentsWorkflow({ + connectorId, + pageId, +}: { + connectorId: ModelId; + pageId: string; +}) { + await confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + }); +} + +export async function confluenceUpsertPagesWithFullParentsWorkflow({ + connectorId, + pageIds, +}: { + connectorId: ModelId; + pageIds: string[]; +}) { + const cachedSpaceNames: Record = {}; + const cachedSpaceHierarchies: Record< + string, + Record + > = {}; + + for (const pageId of pageIds) { + await confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + cachedSpaceNames, + cachedSpaceHierarchies, + }); + } +} diff --git a/connectors/src/lib/cli.ts b/connectors/src/lib/cli.ts index b8d4e72fe750..247fe59d99ae 100644 --- a/connectors/src/lib/cli.ts +++ b/connectors/src/lib/cli.ts @@ -16,6 +16,7 @@ import PQueue from "p-queue"; import readline from "readline"; import { getConnectorManager } from "@connectors/connectors"; +import { confluence } from "@connectors/connectors/confluence/lib/cli"; import { github } from "@connectors/connectors/github/lib/cli"; import { google_drive } from "@connectors/connectors/google_drive/lib/cli"; import { intercom } from "@connectors/connectors/intercom/lib/cli"; @@ -34,6 +35,8 @@ export async function runCommand(adminCommand: AdminCommandType) { switch (adminCommand.majorCommand) { case "connectors": return connectors(adminCommand); + case "confluence": + return confluence(adminCommand); case "batch": return batch(adminCommand); case "notion": diff --git a/connectors/src/lib/data_sources.ts b/connectors/src/lib/data_sources.ts index 387be695dccc..19150100f564 100644 --- a/connectors/src/lib/data_sources.ts +++ b/connectors/src/lib/data_sources.ts @@ -63,7 +63,7 @@ type UpsertContext = { sync_type: "batch" | "incremental"; }; -type UpsertToDataSourceParams = { +export type UpsertToDataSourceParams = { dataSourceConfig: DataSourceConfig; documentId: string; documentContent: CoreAPIDataSourceDocumentSection; diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index bd74919adf9a..2d2ff0b74507 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -23,6 +23,32 @@ export const ConnectorsCommandSchema = t.type({ export type ConnectorsCommandType = t.TypeOf; +/** + * + */ +export const ConfluenceCommandSchema = t.type({ + majorCommand: t.literal("confluence"), + command: t.union([t.literal("upsert-page"), t.literal("upsert-pages")]), + args: t.type({ + connectorId: t.union([t.number, t.undefined]), + pageId: t.union([t.string, t.undefined]), + file: t.union([t.string, t.undefined]), + keyInFile: t.union([t.string, t.undefined]), + }), +}); +export type ConfluenceCommandType = t.TypeOf; + +export const ConfluenceUpsertPageResponseSchema = t.type({ + workflowId: t.string, + workflowUrl: t.union([t.string, t.undefined]), +}); +export type ConfluenceUpsertPageResponseType = t.TypeOf< + typeof ConfluenceUpsertPageResponseSchema +>; +/** + * + */ + export const GithubCommandSchema = t.type({ majorCommand: t.literal("github"), command: t.union([ @@ -280,6 +306,7 @@ export type MicrosoftCommandType = t.TypeOf; export const AdminCommandSchema = t.union([ BatchCommandSchema, ConnectorsCommandSchema, + ConfluenceCommandSchema, GithubCommandSchema, GoogleDriveCommandSchema, IntercomCommandSchema,