From bf29b2816e38522813665dc3281c254d60da3134 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 12 Dec 2024 14:35:18 +0100 Subject: [PATCH 01/18] wip --- .../src/connectors/confluence/lib/cli.ts | 30 +++++++++++++++++++ types/src/connectors/admin/cli.ts | 22 ++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 connectors/src/connectors/confluence/lib/cli.ts diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts new file mode 100644 index 000000000000..48b4ab1f8e64 --- /dev/null +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -0,0 +1,30 @@ +import type { + AdminSuccessResponseType, + ConfluenceCommandType, + ConfluenceUpsertPageResponseType, +} from "@dust-tt/types"; + +import { default as topLogger } from "@connectors/logger/logger"; + +export const confluence = async ({ + command, + args, +}: ConfluenceCommandType): Promise< + AdminSuccessResponseType | ConfluenceUpsertPageResponseType +> => { + const logger = topLogger.child({ majorCommand: "confluence", command, args }); + switch (command) { + case "upsert-page": { + if (!args.connectorId) { + throw new Error("Missing --connectorId argument"); + } + if (!args.pageId) { + throw new Error("Missing --pageId argument"); + } + break; + } + + default: + throw new Error("Unknown Confluence command: " + command); + } +}; diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index bd74919adf9a..3af2ce6674fe 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -23,6 +23,27 @@ export const ConnectorsCommandSchema = t.type({ export type ConnectorsCommandType = t.TypeOf; +/** + * + */ +export const ConfluenceCommandSchema = t.type({ + majorCommand: t.literal("confluence"), + command: t.literal("upsert-page"), + args: t.type({ + connectorId: t.union([t.number, t.undefined]), + pageId: t.union([t.number, t.undefined]), + }), +}); +export type ConfluenceCommandType = t.TypeOf; + +export const ConfluenceUpsertPageResponseSchema = t.type({}); +export type ConfluenceUpsertPageResponseType = t.TypeOf< + typeof ConfluenceUpsertPageResponseSchema +>; +/** + * + */ + export const GithubCommandSchema = t.type({ majorCommand: t.literal("github"), command: t.union([ @@ -280,6 +301,7 @@ export type MicrosoftCommandType = t.TypeOf; export const AdminCommandSchema = t.union([ BatchCommandSchema, ConnectorsCommandSchema, + ConfluenceCommandSchema, GithubCommandSchema, GoogleDriveCommandSchema, IntercomCommandSchema, From 80ae7d25841f6606542f098f6100d6ae67942943 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 08:49:46 +0100 Subject: [PATCH 02/18] refactor: move page upsertion into a dedicated function --- .../confluence/temporal/activities.ts | 151 ++++++++++-------- connectors/src/lib/data_sources.ts | 2 +- 2 files changed, 87 insertions(+), 66 deletions(-) diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index eed69a283be9..cc3db143b23e 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -22,6 +22,7 @@ import { makePageInternalId } from "@connectors/connectors/confluence/lib/intern import { makeConfluenceDocumentUrl } from "@connectors/connectors/confluence/temporal/workflow_ids"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { concurrentExecutor } from "@connectors/lib/async_utils"; +import type { UpsertToDataSourceParams } from "@connectors/lib/data_sources"; import { deleteFromDataSource, renderDocumentTitleAndContent, @@ -220,7 +221,83 @@ export async function markPageHasVisited({ ); } -async function upsertConfluencePageInDb( +export async function upsertConfluencePageToDataSource( + page: NonNullable>>, + spaceName: string, + confluenceConfig: ConfluenceConfiguration, + syncType: UpsertToDataSourceParams["upsertContext"]["sync_type"], + dataSourceConfig: DataSourceConfig, + loggerArgs: Record +) { + const localLogger = logger.child(loggerArgs); + + const markdown = turndownService.turndown(page.body.storage.value); + const pageCreatedAt = new Date(page.createdAt); + const lastPageVersionCreatedAt = new Date(page.version.createdAt); + + if (markdown) { + const renderedMarkdown = await renderMarkdownSection( + dataSourceConfig, + markdown + ); + const renderedPage = await renderDocumentTitleAndContent({ + dataSourceConfig, + title: `Page ${page.title} Space ${spaceName}`, + createdAt: pageCreatedAt, + updatedAt: lastPageVersionCreatedAt, + content: renderedMarkdown, + }); + + const documentId = makePageInternalId(page.id); + const documentUrl = makeConfluenceDocumentUrl({ + baseUrl: confluenceConfig.url, + suffix: page._links.tinyui, + }); + + // We log the number of labels to help define the importance of labels in the future. + if (page.labels.results.length > 0) { + localLogger.info( + { labelsCount: page.labels.results.length }, + "Confluence page has labels." + ); + } + + // Limit to 10 custom tags. + const customTags = page.labels.results + .slice(0, 10) + .map((l) => `labels:${l.id}`); + + const tags = [ + `createdAt:${pageCreatedAt.getTime()}`, + `space:${spaceName}`, + `title:${page.title}`, + `updatedAt:${lastPageVersionCreatedAt.getTime()}`, + `version:${page.version.number}`, + ...customTags, + ]; + + await upsertToDatasource({ + dataSourceConfig, + documentContent: renderedPage, + documentId, + documentUrl, + loggerArgs, + // Parent Ids will be computed after all page imports within the space have been completed. + parents: [documentId, HiddenContentNodeParentId], + parentId: HiddenContentNodeParentId, + tags, + timestampMs: lastPageVersionCreatedAt.getTime(), + upsertContext: { + sync_type: syncType, + }, + title: page.title, + mimeType: "application/vnd.dust.confluence.page", + async: true, + }); + } +} + +export async function upsertConfluencePageInDb( connectorId: ModelId, page: ConfluencePageWithBodyType, visitedAtMs: number @@ -335,70 +412,14 @@ export async function confluenceCheckAndUpsertPageActivity({ localLogger.info("Upserting Confluence page."); - const markdown = turndownService.turndown(page.body.storage.value); - const pageCreatedAt = new Date(page.createdAt); - const lastPageVersionCreatedAt = new Date(page.version.createdAt); - - if (markdown) { - const renderedMarkdown = await renderMarkdownSection( - dataSourceConfig, - markdown - ); - const renderedPage = await renderDocumentTitleAndContent({ - dataSourceConfig, - title: `Page ${page.title} Space ${spaceName}`, - createdAt: pageCreatedAt, - updatedAt: lastPageVersionCreatedAt, - content: renderedMarkdown, - }); - - const documentId = makePageInternalId(pageId); - const documentUrl = makeConfluenceDocumentUrl({ - baseUrl: confluenceConfig.url, - suffix: page._links.tinyui, - }); - - // We log the number of labels to help define the importance of labels in the future. - if (page.labels.results.length > 0) { - localLogger.info( - { labelsCount: page.labels.results.length }, - "Confluence page has labels." - ); - } - - // Limit to 10 custom tags. - const customTags = page.labels.results - .slice(0, 10) - .map((l) => `labels:${l.id}`); - - const tags = [ - `createdAt:${pageCreatedAt.getTime()}`, - `space:${spaceName}`, - `title:${page.title}`, - `updatedAt:${lastPageVersionCreatedAt.getTime()}`, - `version:${page.version.number}`, - ...customTags, - ]; - - await upsertToDatasource({ - dataSourceConfig, - documentContent: renderedPage, - documentId, - documentUrl, - loggerArgs, - // Parent Ids will be computed after all page imports within the space have been completed. - parents: [documentId, HiddenContentNodeParentId], - parentId: HiddenContentNodeParentId, - tags, - timestampMs: lastPageVersionCreatedAt.getTime(), - upsertContext: { - sync_type: isBatchSync ? "batch" : "incremental", - }, - title: page.title, - mimeType: "application/vnd.dust.confluence.page", - async: true, - }); - } + await upsertConfluencePageToDataSource( + page, + spaceName, + confluenceConfig, + isBatchSync ? "batch" : "incremental", + dataSourceConfig, + loggerArgs + ); localLogger.info("Upserting Confluence page in DB."); diff --git a/connectors/src/lib/data_sources.ts b/connectors/src/lib/data_sources.ts index 387be695dccc..19150100f564 100644 --- a/connectors/src/lib/data_sources.ts +++ b/connectors/src/lib/data_sources.ts @@ -63,7 +63,7 @@ type UpsertContext = { sync_type: "batch" | "incremental"; }; -type UpsertToDataSourceParams = { +export type UpsertToDataSourceParams = { dataSourceConfig: DataSourceConfig; documentId: string; documentContent: CoreAPIDataSourceDocumentSection; From 24b70f53b20e97c82462768c6ae460f938a0f013 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 08:50:06 +0100 Subject: [PATCH 03/18] feat: add page upsertion to the CLI command --- .../src/connectors/confluence/lib/cli.ts | 52 +++++++++++++++++++ types/src/connectors/admin/cli.ts | 2 +- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index 48b4ab1f8e64..fb93e9a85858 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -4,7 +4,14 @@ import type { ConfluenceUpsertPageResponseType, } from "@dust-tt/types"; +import { + fetchConfluenceConfigurationActivity, + getConfluenceClient, + upsertConfluencePageToDataSource, +} from "@connectors/connectors/confluence/temporal/activities"; +import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { default as topLogger } from "@connectors/logger/logger"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; export const confluence = async ({ command, @@ -21,6 +28,51 @@ export const confluence = async ({ if (!args.pageId) { throw new Error("Missing --pageId argument"); } + const connectorId = args.connectorId; + const pageId = args.pageId; + + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("Connector not found."); + } + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const confluenceConfig = + await fetchConfluenceConfigurationActivity(connectorId); + + const loggerArgs = { + connectorId, + dataSourceId: dataSourceConfig.dataSourceId, + pageId, + workspaceId: dataSourceConfig.workspaceId, + }; + const localLogger = logger.child(loggerArgs); + + const client = await getConfluenceClient( + { cloudId: confluenceConfig?.cloudId }, + connector + ); + + const page = await client.getPageById(pageId); + if (!page) { + localLogger.info("Confluence page not found."); + return false; + } + const space = await client.getSpaceById(page.spaceId); + if (!space) { + localLogger.info("Confluence space not found."); + return false; + } + + localLogger.info("Upserting Confluence page."); + await upsertConfluencePageToDataSource( + page, + space.name, + confluenceConfig, + "batch", + dataSourceConfig, + loggerArgs + ); + break; } diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index 3af2ce6674fe..5bdf2e3a3c39 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -31,7 +31,7 @@ export const ConfluenceCommandSchema = t.type({ command: t.literal("upsert-page"), args: t.type({ connectorId: t.union([t.number, t.undefined]), - pageId: t.union([t.number, t.undefined]), + pageId: t.union([t.string, t.undefined]), }), }); export type ConfluenceCommandType = t.TypeOf; From ae07706e35ac1d88b8500c2e97f3dfc33401d4b6 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 10:15:19 +0100 Subject: [PATCH 04/18] add return values for the CLI command --- connectors/src/connectors/confluence/lib/cli.ts | 7 +++---- types/src/connectors/admin/cli.ts | 4 +++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index fb93e9a85858..52d47c89b455 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -55,12 +55,12 @@ export const confluence = async ({ const page = await client.getPageById(pageId); if (!page) { localLogger.info("Confluence page not found."); - return false; + return { success: false }; } const space = await client.getSpaceById(page.spaceId); if (!space) { localLogger.info("Confluence space not found."); - return false; + return { success: false }; } localLogger.info("Upserting Confluence page."); @@ -72,8 +72,7 @@ export const confluence = async ({ dataSourceConfig, loggerArgs ); - - break; + return { success: true }; } default: diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index 5bdf2e3a3c39..7ee84a8c5c8b 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -36,7 +36,9 @@ export const ConfluenceCommandSchema = t.type({ }); export type ConfluenceCommandType = t.TypeOf; -export const ConfluenceUpsertPageResponseSchema = t.type({}); +export const ConfluenceUpsertPageResponseSchema = t.type({ + success: t.union([t.literal(true), t.literal(false)]), +}); export type ConfluenceUpsertPageResponseType = t.TypeOf< typeof ConfluenceUpsertPageResponseSchema >; From 2f76d2639a3029d606975bf253e6a1b03795a71d Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 10:15:34 +0100 Subject: [PATCH 05/18] skip pages that have a skip reason in the db --- connectors/src/connectors/confluence/lib/cli.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index 52d47c89b455..c38cb353304e 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -10,6 +10,7 @@ import { upsertConfluencePageToDataSource, } from "@connectors/connectors/confluence/temporal/activities"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; +import { ConfluencePage } from "@connectors/lib/models/confluence"; import { default as topLogger } from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; @@ -47,6 +48,15 @@ export const confluence = async ({ }; const localLogger = logger.child(loggerArgs); + const pageInDb = await ConfluencePage.findOne({ + attributes: ["parentId", "skipReason", "version"], + where: { connectorId, pageId }, + }); + if (pageInDb && pageInDb.skipReason !== null) { + localLogger.info("Confluence page skipped."); + return { success: false }; + } + const client = await getConfluenceClient( { cloudId: confluenceConfig?.cloudId }, connector From 9055b09b14e068d9fc5a4ae96fc95a186e19d752 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 10:30:41 +0100 Subject: [PATCH 06/18] add the correct parents --- connectors/src/connectors/confluence/lib/cli.ts | 17 ++++++++++++++++- .../confluence/temporal/activities.ts | 13 +++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index c38cb353304e..c2210bece368 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -4,6 +4,10 @@ import type { ConfluenceUpsertPageResponseType, } from "@dust-tt/types"; +import { + getConfluencePageParentIds, + getSpaceHierarchy, +} from "@connectors/connectors/confluence/lib/hierarchy"; import { fetchConfluenceConfigurationActivity, getConfluenceClient, @@ -49,7 +53,7 @@ export const confluence = async ({ const localLogger = logger.child(loggerArgs); const pageInDb = await ConfluencePage.findOne({ - attributes: ["parentId", "skipReason", "version"], + attributes: ["parentId", "skipReason"], where: { connectorId, pageId }, }); if (pageInDb && pageInDb.skipReason !== null) { @@ -73,10 +77,21 @@ export const confluence = async ({ return { success: false }; } + const cachedHierarchy = await getSpaceHierarchy( + connectorId, + page.spaceId + ); + const parentIds = await getConfluencePageParentIds( + connectorId, + { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, + cachedHierarchy + ); + localLogger.info("Upserting Confluence page."); await upsertConfluencePageToDataSource( page, space.name, + parentIds, confluenceConfig, "batch", dataSourceConfig, diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index cc3db143b23e..9ab9a42eddec 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -44,6 +44,7 @@ import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status"; import mainLogger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import type { DataSourceConfig } from "@connectors/types/data_source_config"; +import assert from "assert"; /** * This type represents the ID that should be passed as parentId to a content node to hide it from the UI. @@ -224,6 +225,7 @@ export async function markPageHasVisited({ export async function upsertConfluencePageToDataSource( page: NonNullable>>, spaceName: string, + parents: string[], confluenceConfig: ConfluenceConfiguration, syncType: UpsertToDataSourceParams["upsertContext"]["sync_type"], dataSourceConfig: DataSourceConfig, @@ -282,14 +284,11 @@ export async function upsertConfluencePageToDataSource( documentId, documentUrl, loggerArgs, - // Parent Ids will be computed after all page imports within the space have been completed. - parents: [documentId, HiddenContentNodeParentId], - parentId: HiddenContentNodeParentId, + parents, + parentId: parents[1], tags, timestampMs: lastPageVersionCreatedAt.getTime(), - upsertContext: { - sync_type: syncType, - }, + upsertContext: { sync_type: syncType }, title: page.title, mimeType: "application/vnd.dust.confluence.page", async: true, @@ -415,6 +414,8 @@ export async function confluenceCheckAndUpsertPageActivity({ await upsertConfluencePageToDataSource( page, spaceName, + // Parent Ids will be computed after all page imports within the space have been completed. + [makePageInternalId(page.id), HiddenContentNodeParentId], confluenceConfig, isBatchSync ? "batch" : "incremental", dataSourceConfig, From d4a6f0e8761009a8b03469c2dd6413813d571736 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 12:50:08 +0100 Subject: [PATCH 07/18] add a command to upsert multiple pages, reading from a JSON file --- .../src/connectors/confluence/lib/cli.ts | 136 ++++++++++++++++++ types/src/connectors/admin/cli.ts | 4 +- 2 files changed, 139 insertions(+), 1 deletion(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index c2210bece368..f47931b33f52 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -2,13 +2,17 @@ import type { AdminSuccessResponseType, ConfluenceCommandType, ConfluenceUpsertPageResponseType, + ModelId, } from "@dust-tt/types"; +import assert from "assert"; +import fs from "fs/promises"; import { getConfluencePageParentIds, getSpaceHierarchy, } from "@connectors/connectors/confluence/lib/hierarchy"; import { + confluenceGetSpaceNameActivity, fetchConfluenceConfigurationActivity, getConfluenceClient, upsertConfluencePageToDataSource, @@ -18,6 +22,39 @@ import { ConfluencePage } from "@connectors/lib/models/confluence"; import { default as topLogger } from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; +interface cachedSpace { + spaceName: string | null; + spaceHierarchy: Record; +} + +async function cacheSpace( + cachedSpaces: Record, + { + confluenceCloudId, + connectorId, + spaceId, + }: { + confluenceCloudId: string; + connectorId: ModelId; + spaceId: string; + } +): Promise { + const cachedSpace = cachedSpaces[spaceId]; + if (cachedSpace) { + return cachedSpace; + } + const spaceName = await confluenceGetSpaceNameActivity({ + spaceId, + confluenceCloudId, + connectorId, + }); + const spaceHierarchy = spaceName + ? await getSpaceHierarchy(connectorId, spaceId) + : {}; // not fetching if we couldn't get the space from Confluence API anyway + cachedSpaces[spaceId] = { spaceName, spaceHierarchy }; + return { spaceName, spaceHierarchy }; +} + export const confluence = async ({ command, args, @@ -99,6 +136,105 @@ export const confluence = async ({ ); return { success: true }; } + case "upsert-pages": { + if (!args.connectorId) { + throw new Error("Missing --connectorId argument"); + } + if (!args.file) { + throw new Error("Missing --file argument"); + } + if (!args.keyInFile) { + throw new Error("Missing --keyInFile argument"); + } + const connectorId = args.connectorId; + const file = args.file; + const keyInFile = args.keyInFile; + + // parsing the JSON file + const fileContent = await fs.readFile(file, "utf-8"); + const jsonArray = JSON.parse(fileContent); + assert(Array.isArray(jsonArray), "The file content is not an array."); + + const pageIds = jsonArray.map((entry) => { + assert( + keyInFile in entry, + `Key "${keyInFile}" not found in entry ${JSON.stringify(entry)}` + ); + return entry[keyInFile]; + }); + + // fetching the pages in DB + const pagesInDb = Object.fromEntries( + ( + await ConfluencePage.findAll({ + attributes: ["pageId", "skipReason"], + where: { connectorId, pageId: pageIds }, + }) + ).map((page) => [page.pageId, page]) + ); + + const connector = await ConnectorResource.fetchById(connectorId); + assert(connector !== null, "Connector not found."); + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const confluenceConfig = + await fetchConfluenceConfigurationActivity(connectorId); + const client = await getConfluenceClient( + { cloudId: confluenceConfig?.cloudId }, + connector + ); + + const cachedSpaces: Record = {}; + + for (const pageId of pageIds) { + const loggerArgs = { + connectorId, + dataSourceId: dataSourceConfig.dataSourceId, + pageId, + workspaceId: dataSourceConfig.workspaceId, + }; + const localLogger = logger.child(loggerArgs); + + const pageInDb = pagesInDb[pageId]; + if (pageInDb && pageInDb.skipReason !== null) { + localLogger.info("Confluence page skipped."); + continue; + } + + const page = await client.getPageById(pageId); + if (!page) { + localLogger.info("Confluence page not found."); + continue; + } + // fetching the space if not already cached + const { spaceName, spaceHierarchy } = await cacheSpace(cachedSpaces, { + connectorId, + confluenceCloudId: confluenceConfig?.cloudId, + spaceId: page.spaceId, + }); + if (!spaceName) { + localLogger.info("Confluence space not found."); + continue; + } + + const parentIds = await getConfluencePageParentIds( + connectorId, + { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, + spaceHierarchy + ); + + localLogger.info("Upserting Confluence page."); + await upsertConfluencePageToDataSource( + page, + spaceName, + parentIds, + confluenceConfig, + "batch", + dataSourceConfig, + loggerArgs + ); + } + return { success: true }; + } default: throw new Error("Unknown Confluence command: " + command); diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index 7ee84a8c5c8b..c594a94b47be 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -28,10 +28,12 @@ export type ConnectorsCommandType = t.TypeOf; */ export const ConfluenceCommandSchema = t.type({ majorCommand: t.literal("confluence"), - command: t.literal("upsert-page"), + command: t.union([t.literal("upsert-page"), t.literal("upsert-pages")]), args: t.type({ connectorId: t.union([t.number, t.undefined]), pageId: t.union([t.string, t.undefined]), + file: t.union([t.string, t.undefined]), + keyInFile: t.union([t.string, t.undefined]), }), }); export type ConfluenceCommandType = t.TypeOf; From 39c5ad0240c0ec07fac001d924a44a23e1dfebdd Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 13:13:03 +0100 Subject: [PATCH 08/18] add db upserts too --- connectors/src/connectors/confluence/lib/cli.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index f47931b33f52..5c78659cf92b 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -15,6 +15,7 @@ import { confluenceGetSpaceNameActivity, fetchConfluenceConfigurationActivity, getConfluenceClient, + upsertConfluencePageInDb, upsertConfluencePageToDataSource, } from "@connectors/connectors/confluence/temporal/activities"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; @@ -88,6 +89,7 @@ export const confluence = async ({ workspaceId: dataSourceConfig.workspaceId, }; const localLogger = logger.child(loggerArgs); + const visitedAtMs = new Date().getTime(); const pageInDb = await ConfluencePage.findOne({ attributes: ["parentId", "skipReason"], @@ -134,6 +136,7 @@ export const confluence = async ({ dataSourceConfig, loggerArgs ); + await upsertConfluencePageInDb(connector.id, page, visitedAtMs); return { success: true }; } case "upsert-pages": { @@ -184,6 +187,7 @@ export const confluence = async ({ ); const cachedSpaces: Record = {}; + const visitedAtMs = new Date().getTime(); for (const pageId of pageIds) { const loggerArgs = { @@ -232,6 +236,7 @@ export const confluence = async ({ dataSourceConfig, loggerArgs ); + await upsertConfluencePageInDb(connector.id, page, visitedAtMs); } return { success: true }; } From cbff48a79646f4c24e1a2b7aa7c06e029006aefb Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 13:28:35 +0100 Subject: [PATCH 09/18] add confluence to the admin cli --- connectors/src/lib/cli.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/connectors/src/lib/cli.ts b/connectors/src/lib/cli.ts index b8d4e72fe750..247fe59d99ae 100644 --- a/connectors/src/lib/cli.ts +++ b/connectors/src/lib/cli.ts @@ -16,6 +16,7 @@ import PQueue from "p-queue"; import readline from "readline"; import { getConnectorManager } from "@connectors/connectors"; +import { confluence } from "@connectors/connectors/confluence/lib/cli"; import { github } from "@connectors/connectors/github/lib/cli"; import { google_drive } from "@connectors/connectors/google_drive/lib/cli"; import { intercom } from "@connectors/connectors/intercom/lib/cli"; @@ -34,6 +35,8 @@ export async function runCommand(adminCommand: AdminCommandType) { switch (adminCommand.majorCommand) { case "connectors": return connectors(adminCommand); + case "confluence": + return confluence(adminCommand); case "batch": return batch(adminCommand); case "notion": From eeaf46e336a88f1c02c51679d29daf3bc70f0609 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 13:33:55 +0100 Subject: [PATCH 10/18] fix ConfluenceUpsertPageResponseSchema --- types/src/connectors/admin/cli.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index c594a94b47be..5c3fdaab00ab 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -39,7 +39,7 @@ export const ConfluenceCommandSchema = t.type({ export type ConfluenceCommandType = t.TypeOf; export const ConfluenceUpsertPageResponseSchema = t.type({ - success: t.union([t.literal(true), t.literal(false)]), + success: t.boolean, }); export type ConfluenceUpsertPageResponseType = t.TypeOf< typeof ConfluenceUpsertPageResponseSchema From 042c5a74e1b24bf36264c3728e553e10a3718a29 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 17:05:10 +0100 Subject: [PATCH 11/18] create an activity for the full upsert --- .../src/connectors/confluence/lib/cli.ts | 190 ++---------------- .../confluence/temporal/activities.ts | 102 ++++++++++ 2 files changed, 119 insertions(+), 173 deletions(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index 5c78659cf92b..add46a24f422 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -2,60 +2,18 @@ import type { AdminSuccessResponseType, ConfluenceCommandType, ConfluenceUpsertPageResponseType, - ModelId, } from "@dust-tt/types"; import assert from "assert"; import fs from "fs/promises"; -import { - getConfluencePageParentIds, - getSpaceHierarchy, -} from "@connectors/connectors/confluence/lib/hierarchy"; -import { - confluenceGetSpaceNameActivity, - fetchConfluenceConfigurationActivity, - getConfluenceClient, - upsertConfluencePageInDb, - upsertConfluencePageToDataSource, -} from "@connectors/connectors/confluence/temporal/activities"; -import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; -import { ConfluencePage } from "@connectors/lib/models/confluence"; +import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; import { default as topLogger } from "@connectors/logger/logger"; -import { ConnectorResource } from "@connectors/resources/connector_resource"; interface cachedSpace { spaceName: string | null; spaceHierarchy: Record; } -async function cacheSpace( - cachedSpaces: Record, - { - confluenceCloudId, - connectorId, - spaceId, - }: { - confluenceCloudId: string; - connectorId: ModelId; - spaceId: string; - } -): Promise { - const cachedSpace = cachedSpaces[spaceId]; - if (cachedSpace) { - return cachedSpace; - } - const spaceName = await confluenceGetSpaceNameActivity({ - spaceId, - confluenceCloudId, - connectorId, - }); - const spaceHierarchy = spaceName - ? await getSpaceHierarchy(connectorId, spaceId) - : {}; // not fetching if we couldn't get the space from Confluence API anyway - cachedSpaces[spaceId] = { spaceName, spaceHierarchy }; - return { spaceName, spaceHierarchy }; -} - export const confluence = async ({ command, args, @@ -71,73 +29,12 @@ export const confluence = async ({ if (!args.pageId) { throw new Error("Missing --pageId argument"); } - const connectorId = args.connectorId; - const pageId = args.pageId; - - const connector = await ConnectorResource.fetchById(connectorId); - if (!connector) { - throw new Error("Connector not found."); - } - const dataSourceConfig = dataSourceConfigFromConnector(connector); - const confluenceConfig = - await fetchConfluenceConfigurationActivity(connectorId); - - const loggerArgs = { + const { connectorId, pageId } = args; + const success = await confluenceUpsertPageWithFullParentsActivity({ connectorId, - dataSourceId: dataSourceConfig.dataSourceId, pageId, - workspaceId: dataSourceConfig.workspaceId, - }; - const localLogger = logger.child(loggerArgs); - const visitedAtMs = new Date().getTime(); - - const pageInDb = await ConfluencePage.findOne({ - attributes: ["parentId", "skipReason"], - where: { connectorId, pageId }, }); - if (pageInDb && pageInDb.skipReason !== null) { - localLogger.info("Confluence page skipped."); - return { success: false }; - } - - const client = await getConfluenceClient( - { cloudId: confluenceConfig?.cloudId }, - connector - ); - - const page = await client.getPageById(pageId); - if (!page) { - localLogger.info("Confluence page not found."); - return { success: false }; - } - const space = await client.getSpaceById(page.spaceId); - if (!space) { - localLogger.info("Confluence space not found."); - return { success: false }; - } - - const cachedHierarchy = await getSpaceHierarchy( - connectorId, - page.spaceId - ); - const parentIds = await getConfluencePageParentIds( - connectorId, - { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, - cachedHierarchy - ); - - localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource( - page, - space.name, - parentIds, - confluenceConfig, - "batch", - dataSourceConfig, - loggerArgs - ); - await upsertConfluencePageInDb(connector.id, page, visitedAtMs); - return { success: true }; + return { success }; } case "upsert-pages": { if (!args.connectorId) { @@ -166,79 +63,26 @@ export const confluence = async ({ return entry[keyInFile]; }); - // fetching the pages in DB - const pagesInDb = Object.fromEntries( - ( - await ConfluencePage.findAll({ - attributes: ["pageId", "skipReason"], - where: { connectorId, pageId: pageIds }, - }) - ).map((page) => [page.pageId, page]) - ); - - const connector = await ConnectorResource.fetchById(connectorId); - assert(connector !== null, "Connector not found."); - const dataSourceConfig = dataSourceConfigFromConnector(connector); - const confluenceConfig = - await fetchConfluenceConfigurationActivity(connectorId); - const client = await getConfluenceClient( - { cloudId: confluenceConfig?.cloudId }, - connector - ); - - const cachedSpaces: Record = {}; - const visitedAtMs = new Date().getTime(); + let allSuccesses = true; + const cachedSpaceNames: Record = {}; + const cachedSpaceHierarchies: Record< + string, + Record + > = {}; for (const pageId of pageIds) { - const loggerArgs = { + const success = await confluenceUpsertPageWithFullParentsActivity({ connectorId, - dataSourceId: dataSourceConfig.dataSourceId, pageId, - workspaceId: dataSourceConfig.workspaceId, - }; - const localLogger = logger.child(loggerArgs); - - const pageInDb = pagesInDb[pageId]; - if (pageInDb && pageInDb.skipReason !== null) { - localLogger.info("Confluence page skipped."); - continue; - } - - const page = await client.getPageById(pageId); - if (!page) { - localLogger.info("Confluence page not found."); - continue; - } - // fetching the space if not already cached - const { spaceName, spaceHierarchy } = await cacheSpace(cachedSpaces, { - connectorId, - confluenceCloudId: confluenceConfig?.cloudId, - spaceId: page.spaceId, + cachedSpaceNames, + cachedSpaceHierarchies, }); - if (!spaceName) { - localLogger.info("Confluence space not found."); - continue; + if (!success) { + logger.error({ pageId }, "Failed to upsert page"); + allSuccesses = false; } - - const parentIds = await getConfluencePageParentIds( - connectorId, - { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, - spaceHierarchy - ); - - localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource( - page, - spaceName, - parentIds, - confluenceConfig, - "batch", - dataSourceConfig, - loggerArgs - ); - await upsertConfluencePageInDb(connector.id, page, visitedAtMs); } - return { success: true }; + return { success: allSuccesses }; } default: diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index 9ab9a42eddec..89459bdc5678 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -323,6 +323,10 @@ interface ConfluenceCheckAndUpsertPageActivityInput { visitedAtMs: number; } +/** + * Upsert a Confluence page without its full parents. + * Operates greedily by stopping if the page is restricted or if there is a version match (unless the page was moved, in this case we have to upsert because the parents have changed). + */ export async function confluenceCheckAndUpsertPageActivity({ connectorId, isBatchSync, @@ -429,6 +433,104 @@ export async function confluenceCheckAndUpsertPageActivity({ return true; } +/** + * Upsert a Confluence page with its full parent hierarchy. + * Expensive operation, it should be reserved to admin actions on a limited set of pages. + */ +export async function confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + cachedSpaceNames = {}, + cachedSpaceHierarchies = {}, +}: { + connectorId: ModelId; + pageId: string; + cachedSpaceNames?: Record; + cachedSpaceHierarchies?: Record>; +}): Promise { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("Connector not found."); + } + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const confluenceConfig = + await fetchConfluenceConfigurationActivity(connectorId); + + const loggerArgs = { + connectorId, + dataSourceId: dataSourceConfig.dataSourceId, + pageId, + workspaceId: dataSourceConfig.workspaceId, + }; + const localLogger = logger.child(loggerArgs); + const visitedAtMs = new Date().getTime(); + + const pageInDb = await ConfluencePage.findOne({ + attributes: ["parentId", "skipReason"], + where: { connectorId, pageId }, + }); + if (pageInDb && pageInDb.skipReason !== null) { + localLogger.info("Confluence page skipped."); + return false; + } + + const client = await getConfluenceClient( + { cloudId: confluenceConfig?.cloudId }, + connector + ); + + const hasReadRestrictions = await pageHasReadRestrictions(client, pageId); + if (hasReadRestrictions) { + localLogger.info("Skipping restricted Confluence page."); + return false; + } + + const page = await client.getPageById(pageId); + if (!page) { + localLogger.info("Confluence page not found."); + return false; + } + + let spaceName = cachedSpaceNames[page.spaceId]; + if (!spaceName) { + const space = await client.getSpaceById(page.spaceId); + if (!space) { + localLogger.info("Confluence space not found."); + return false; + } + cachedSpaceNames[page.spaceId] = space.name; + spaceName = space.name; + } + + if (!cachedSpaceHierarchies[page.spaceId]) { + cachedSpaceHierarchies[page.spaceId] = await getSpaceHierarchy( + connectorId, + page.spaceId + ); + } + + const parentIds = await getConfluencePageParentIds( + connectorId, + { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, + cachedSpaceHierarchies[page.spaceId] + ); + + localLogger.info("Upserting Confluence page."); + await upsertConfluencePageToDataSource( + page, + spaceName, + parentIds, + confluenceConfig, + "batch", + dataSourceConfig, + loggerArgs + ); + + await upsertConfluencePageInDb(connector.id, page, visitedAtMs); + + return true; +} + export async function confluenceGetActiveChildPageRefsActivity({ connectorId, parentPageId, From 27f2d9716f99f023fdc336d4f89740f2c631749c Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 17:45:58 +0100 Subject: [PATCH 12/18] move cli force-upserts to workflows --- .../src/connectors/confluence/lib/cli.ts | 88 +++++++++++++------ .../confluence/temporal/workflows.ts | 37 ++++++++ types/src/connectors/admin/cli.ts | 3 +- 3 files changed, 98 insertions(+), 30 deletions(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index add46a24f422..e3e7818bfc5c 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -6,14 +6,14 @@ import type { import assert from "assert"; import fs from "fs/promises"; -import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; +import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config"; +import { + confluenceUpsertPagesWithFullParentsWorkflow, + confluenceUpsertPageWithFullParentsWorkflow, +} from "@connectors/connectors/confluence/temporal/workflows"; +import { getTemporalClient } from "@connectors/lib/temporal"; import { default as topLogger } from "@connectors/logger/logger"; -interface cachedSpace { - spaceName: string | null; - spaceHierarchy: Record; -} - export const confluence = async ({ command, args, @@ -30,11 +30,34 @@ export const confluence = async ({ throw new Error("Missing --pageId argument"); } const { connectorId, pageId } = args; - const success = await confluenceUpsertPageWithFullParentsActivity({ - connectorId, - pageId, - }); - return { success }; + + const client = await getTemporalClient(); + const workflow = await client.workflow.start( + confluenceUpsertPageWithFullParentsWorkflow, + { + args: [{ connectorId, pageId }], + taskQueue: QUEUE_NAME, + workflowId: `confluence-upsert-page-${connectorId}-${pageId}`, + searchAttributes: { connectorId: [connectorId] }, + memo: { connectorId }, + } + ); + + const { workflowId } = workflow; + const temporalNamespace = process.env.TEMPORAL_NAMESPACE; + if (!temporalNamespace) { + logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`); + } else { + logger.info( + `[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + ); + } + return { + workflowId, + workflowUrl: temporalNamespace + ? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + : undefined, + }; } case "upsert-pages": { if (!args.connectorId) { @@ -63,26 +86,33 @@ export const confluence = async ({ return entry[keyInFile]; }); - let allSuccesses = true; - const cachedSpaceNames: Record = {}; - const cachedSpaceHierarchies: Record< - string, - Record - > = {}; - - for (const pageId of pageIds) { - const success = await confluenceUpsertPageWithFullParentsActivity({ - connectorId, - pageId, - cachedSpaceNames, - cachedSpaceHierarchies, - }); - if (!success) { - logger.error({ pageId }, "Failed to upsert page"); - allSuccesses = false; + const client = await getTemporalClient(); + const workflow = await client.workflow.start( + confluenceUpsertPagesWithFullParentsWorkflow, + { + args: [{ connectorId, pageIds }], + taskQueue: QUEUE_NAME, + workflowId: `confluence-upsert-pages-${connectorId}`, + searchAttributes: { connectorId: [connectorId] }, + memo: { connectorId }, } + ); + + const { workflowId } = workflow; + const temporalNamespace = process.env.TEMPORAL_NAMESPACE; + if (!temporalNamespace) { + logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`); + } else { + logger.info( + `[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + ); } - return { success: allSuccesses }; + return { + workflowId, + workflowUrl: temporalNamespace + ? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + : undefined, + }; } default: diff --git a/connectors/src/connectors/confluence/temporal/workflows.ts b/connectors/src/connectors/confluence/temporal/workflows.ts index 88d58be69f9d..4e4144dff7a4 100644 --- a/connectors/src/connectors/confluence/temporal/workflows.ts +++ b/connectors/src/connectors/confluence/temporal/workflows.ts @@ -10,6 +10,7 @@ import { import type { ConfluencePageRef } from "@connectors/connectors/confluence/lib/confluence_api"; import type * as activities from "@connectors/connectors/confluence/temporal/activities"; +import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; import type { SpaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals"; import { spaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals"; import { @@ -412,3 +413,39 @@ export async function confluencePersonalDataReportingWorkflow() { } } } + +export async function confluenceUpsertPageWithFullParentsWorkflow({ + connectorId, + pageId, +}: { + connectorId: ModelId; + pageId: string; +}) { + await confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + }); +} + +export async function confluenceUpsertPagesWithFullParentsWorkflow({ + connectorId, + pageIds, +}: { + connectorId: ModelId; + pageIds: string[]; +}) { + const cachedSpaceNames: Record = {}; + const cachedSpaceHierarchies: Record< + string, + Record + > = {}; + + for (const pageId of pageIds) { + await confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + cachedSpaceNames, + cachedSpaceHierarchies, + }); + } +} diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index 5c3fdaab00ab..2d2ff0b74507 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -39,7 +39,8 @@ export const ConfluenceCommandSchema = t.type({ export type ConfluenceCommandType = t.TypeOf; export const ConfluenceUpsertPageResponseSchema = t.type({ - success: t.boolean, + workflowId: t.string, + workflowUrl: t.union([t.string, t.undefined]), }); export type ConfluenceUpsertPageResponseType = t.TypeOf< typeof ConfluenceUpsertPageResponseSchema From 2be5a29603a413a503b897a2de09c091ca76af56 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 17:58:28 +0100 Subject: [PATCH 13/18] remove some exports and improve types --- .../src/connectors/confluence/temporal/activities.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index 89459bdc5678..2e33cfd3584e 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -222,7 +222,7 @@ export async function markPageHasVisited({ ); } -export async function upsertConfluencePageToDataSource( +async function upsertConfluencePageToDataSource( page: NonNullable>>, spaceName: string, parents: string[], @@ -296,7 +296,7 @@ export async function upsertConfluencePageToDataSource( } } -export async function upsertConfluencePageInDb( +async function upsertConfluencePageInDb( connectorId: ModelId, page: ConfluencePageWithBodyType, visitedAtMs: number @@ -446,7 +446,10 @@ export async function confluenceUpsertPageWithFullParentsActivity({ connectorId: ModelId; pageId: string; cachedSpaceNames?: Record; - cachedSpaceHierarchies?: Record>; + cachedSpaceHierarchies?: Record< + string, + Awaited> + >; }): Promise { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { From a385951698eb3409e5c36e43a3c3ce5b17b56657 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 20:00:54 +0100 Subject: [PATCH 14/18] remove unused import --- connectors/src/connectors/confluence/temporal/activities.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index 2e33cfd3584e..ebf22adf9d5b 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -44,7 +44,6 @@ import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status"; import mainLogger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import type { DataSourceConfig } from "@connectors/types/data_source_config"; -import assert from "assert"; /** * This type represents the ID that should be passed as parentId to a content node to hide it from the UI. From 647f68736c140fbbd9e8a4a780466cf131cc3183 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 20:06:15 +0100 Subject: [PATCH 15/18] cleanup --- .../confluence/temporal/activities.ts | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index ebf22adf9d5b..14af70f00b12 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -221,15 +221,25 @@ export async function markPageHasVisited({ ); } -async function upsertConfluencePageToDataSource( - page: NonNullable>>, - spaceName: string, - parents: string[], - confluenceConfig: ConfluenceConfiguration, - syncType: UpsertToDataSourceParams["upsertContext"]["sync_type"], - dataSourceConfig: DataSourceConfig, - loggerArgs: Record -) { +interface ConfluenceUpsertPageInput { + page: NonNullable>>; + spaceName: string; + parents: string[]; + confluenceConfig: ConfluenceConfiguration; + syncType?: UpsertToDataSourceParams["upsertContext"]["sync_type"]; + dataSourceConfig: DataSourceConfig; + loggerArgs: Record; +} + +async function upsertConfluencePageToDataSource({ + page, + spaceName, + parents, + confluenceConfig, + syncType = "batch", + dataSourceConfig, + loggerArgs, +}: ConfluenceUpsertPageInput) { const localLogger = logger.child(loggerArgs); const markdown = turndownService.turndown(page.body.storage.value); @@ -414,16 +424,16 @@ export async function confluenceCheckAndUpsertPageActivity({ localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource( + await upsertConfluencePageToDataSource({ page, spaceName, // Parent Ids will be computed after all page imports within the space have been completed. - [makePageInternalId(page.id), HiddenContentNodeParentId], + parents: [makePageInternalId(page.id), HiddenContentNodeParentId], confluenceConfig, - isBatchSync ? "batch" : "incremental", + syncType: isBatchSync ? "batch" : "incremental", dataSourceConfig, - loggerArgs - ); + loggerArgs, + }); localLogger.info("Upserting Confluence page in DB."); @@ -511,22 +521,21 @@ export async function confluenceUpsertPageWithFullParentsActivity({ ); } - const parentIds = await getConfluencePageParentIds( + const parents = await getConfluencePageParentIds( connectorId, { pageId: page.id, parentId: page.parentId, spaceId: page.spaceId }, cachedSpaceHierarchies[page.spaceId] ); localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource( + await upsertConfluencePageToDataSource({ page, spaceName, - parentIds, + parents, confluenceConfig, - "batch", dataSourceConfig, - loggerArgs - ); + loggerArgs, + }); await upsertConfluencePageInDb(connector.id, page, visitedAtMs); From f2d142ee332b7bd462cc2a86091e0ed223e226b0 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 21:53:55 +0100 Subject: [PATCH 16/18] uniformize logs --- connectors/src/connectors/confluence/temporal/activities.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index 14af70f00b12..d9ffcee48a58 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -423,7 +423,6 @@ export async function confluenceCheckAndUpsertPageActivity({ } localLogger.info("Upserting Confluence page."); - await upsertConfluencePageToDataSource({ page, spaceName, @@ -436,7 +435,6 @@ export async function confluenceCheckAndUpsertPageActivity({ }); localLogger.info("Upserting Confluence page in DB."); - await upsertConfluencePageInDb(connector.id, page, visitedAtMs); return true; @@ -537,6 +535,7 @@ export async function confluenceUpsertPageWithFullParentsActivity({ loggerArgs, }); + localLogger.info("Upserting Confluence page in DB."); await upsertConfluencePageInDb(connector.id, page, visitedAtMs); return true; From 436d7e4450684f2278ed5d0ae7af0c63e28d180c Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 21:55:13 +0100 Subject: [PATCH 17/18] :sparkles: --- .../connectors/confluence/temporal/activities.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/connectors/src/connectors/confluence/temporal/activities.ts b/connectors/src/connectors/confluence/temporal/activities.ts index d9ffcee48a58..af5104ea2afd 100644 --- a/connectors/src/connectors/confluence/temporal/activities.ts +++ b/connectors/src/connectors/confluence/temporal/activities.ts @@ -93,7 +93,7 @@ export async function getConfluenceClient( ) { const { cloudId, connectorId } = config; - // Ensure connector is fetched if not directly provided. + // Ensure the connector is fetched if not directly provided. const effectiveConnector = connector ?? (connectorId ? await fetchConfluenceConnector(connectorId) : undefined); @@ -334,7 +334,8 @@ interface ConfluenceCheckAndUpsertPageActivityInput { /** * Upsert a Confluence page without its full parents. - * Operates greedily by stopping if the page is restricted or if there is a version match (unless the page was moved, in this case we have to upsert because the parents have changed). + * Operates greedily by stopping if the page is restricted or if there is a version match + * (unless the page was moved, in this case, we have to upsert because the parents have changed). */ export async function confluenceCheckAndUpsertPageActivity({ connectorId, @@ -392,15 +393,15 @@ export async function confluenceCheckAndUpsertPageActivity({ return false; } - // Check version. + // Check the version. const isSameVersion = pageAlreadyInDb && pageAlreadyInDb.version === pageRef.version; - // Check if page was moved. Version is not bumped when a page is moved. + // Check whether the page was moved (the version is not bumped when a page is moved). const pageWasMoved = pageAlreadyInDb && pageAlreadyInDb.parentId !== pageRef.parentId; - // Only index in DB if the page does not exis, has been moved or we want to upsert. + // Only index in DB if the page does not exist, has been moved, or we want to upsert. if (isSameVersion && !forceUpsert && !pageWasMoved) { // Simply record that we visited the page. await markPageHasVisited({ @@ -891,7 +892,7 @@ export async function confluenceGetReportPersonalActionActivity( "Error while reporting Confluence account." ); - // If token has been revoked, return false. + // If the token has been revoked, return false. if (err instanceof ExternalOAuthTokenError) { return false; } From 19716baf6f1b4f970121400d0a0b7fed04521604 Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 22:19:19 +0100 Subject: [PATCH 18/18] fix incorrect import --- connectors/src/connectors/confluence/temporal/workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/src/connectors/confluence/temporal/workflows.ts b/connectors/src/connectors/confluence/temporal/workflows.ts index 4e4144dff7a4..47b260815274 100644 --- a/connectors/src/connectors/confluence/temporal/workflows.ts +++ b/connectors/src/connectors/confluence/temporal/workflows.ts @@ -10,7 +10,6 @@ import { import type { ConfluencePageRef } from "@connectors/connectors/confluence/lib/confluence_api"; import type * as activities from "@connectors/connectors/confluence/temporal/activities"; -import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; import type { SpaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals"; import { spaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals"; import { @@ -32,6 +31,7 @@ const { confluenceGetActiveChildPageRefsActivity, confluenceGetRootPageRefsActivity, fetchConfluenceSpaceIdsForConnectorActivity, + confluenceUpsertPageWithFullParentsActivity, confluenceGetReportPersonalActionActivity, fetchConfluenceUserAccountAndConnectorIdsActivity,