From 86277f9151e58a58fa9aa0e9d853d3c8513ef2fd Mon Sep 17 00:00:00 2001 From: Aubin Date: Fri, 13 Dec 2024 17:45:58 +0100 Subject: [PATCH] move cli force-upserts to workflows --- .../src/connectors/confluence/lib/cli.ts | 88 +++++++++++++------ .../confluence/temporal/workflows.ts | 37 ++++++++ types/src/connectors/admin/cli.ts | 3 +- 3 files changed, 98 insertions(+), 30 deletions(-) diff --git a/connectors/src/connectors/confluence/lib/cli.ts b/connectors/src/connectors/confluence/lib/cli.ts index add46a24f4223..e3e7818bfc5cb 100644 --- a/connectors/src/connectors/confluence/lib/cli.ts +++ b/connectors/src/connectors/confluence/lib/cli.ts @@ -6,14 +6,14 @@ import type { import assert from "assert"; import fs from "fs/promises"; -import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; +import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config"; +import { + confluenceUpsertPagesWithFullParentsWorkflow, + confluenceUpsertPageWithFullParentsWorkflow, +} from "@connectors/connectors/confluence/temporal/workflows"; +import { getTemporalClient } from "@connectors/lib/temporal"; import { default as topLogger } from "@connectors/logger/logger"; -interface cachedSpace { - spaceName: string | null; - spaceHierarchy: Record; -} - export const confluence = async ({ command, args, @@ -30,11 +30,34 @@ export const confluence = async ({ throw new Error("Missing --pageId argument"); } const { connectorId, pageId } = args; - const success = await confluenceUpsertPageWithFullParentsActivity({ - connectorId, - pageId, - }); - return { success }; + + const client = await getTemporalClient(); + const workflow = await client.workflow.start( + confluenceUpsertPageWithFullParentsWorkflow, + { + args: [{ connectorId, pageId }], + taskQueue: QUEUE_NAME, + workflowId: `confluence-upsert-page-${connectorId}-${pageId}`, + searchAttributes: { connectorId: [connectorId] }, + memo: { connectorId }, + } + ); + + const { workflowId } = workflow; + const temporalNamespace = process.env.TEMPORAL_NAMESPACE; + if (!temporalNamespace) { + logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`); + } else { + logger.info( + `[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + ); + } + return { + workflowId, + workflowUrl: temporalNamespace + ? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + : undefined, + }; } case "upsert-pages": { if (!args.connectorId) { @@ -63,26 +86,33 @@ export const confluence = async ({ return entry[keyInFile]; }); - let allSuccesses = true; - const cachedSpaceNames: Record = {}; - const cachedSpaceHierarchies: Record< - string, - Record - > = {}; - - for (const pageId of pageIds) { - const success = await confluenceUpsertPageWithFullParentsActivity({ - connectorId, - pageId, - cachedSpaceNames, - cachedSpaceHierarchies, - }); - if (!success) { - logger.error({ pageId }, "Failed to upsert page"); - allSuccesses = false; + const client = await getTemporalClient(); + const workflow = await client.workflow.start( + confluenceUpsertPagesWithFullParentsWorkflow, + { + args: [{ connectorId, pageIds }], + taskQueue: QUEUE_NAME, + workflowId: `confluence-upsert-pages-${connectorId}`, + searchAttributes: { connectorId: [connectorId] }, + memo: { connectorId }, } + ); + + const { workflowId } = workflow; + const temporalNamespace = process.env.TEMPORAL_NAMESPACE; + if (!temporalNamespace) { + logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`); + } else { + logger.info( + `[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + ); } - return { success: allSuccesses }; + return { + workflowId, + workflowUrl: temporalNamespace + ? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}` + : undefined, + }; } default: diff --git a/connectors/src/connectors/confluence/temporal/workflows.ts b/connectors/src/connectors/confluence/temporal/workflows.ts index 816a08beaabc5..8167519d12ace 100644 --- a/connectors/src/connectors/confluence/temporal/workflows.ts +++ b/connectors/src/connectors/confluence/temporal/workflows.ts @@ -10,6 +10,7 @@ import { import type { ConfluencePageRef } from "@connectors/connectors/confluence/lib/confluence_api"; import type * as activities from "@connectors/connectors/confluence/temporal/activities"; +import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities"; import type { SpaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals"; import { spaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals"; import { @@ -412,3 +413,39 @@ export async function confluencePersonalDataReportingWorkflow() { } } } + +export async function confluenceUpsertPageWithFullParentsWorkflow({ + connectorId, + pageId, +}: { + connectorId: ModelId; + pageId: string; +}) { + await confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + }); +} + +export async function confluenceUpsertPagesWithFullParentsWorkflow({ + connectorId, + pageIds, +}: { + connectorId: ModelId; + pageIds: string[]; +}) { + const cachedSpaceNames: Record = {}; + const cachedSpaceHierarchies: Record< + string, + Record + > = {}; + + for (const pageId of pageIds) { + await confluenceUpsertPageWithFullParentsActivity({ + connectorId, + pageId, + cachedSpaceNames, + cachedSpaceHierarchies, + }); + } +} diff --git a/types/src/connectors/admin/cli.ts b/types/src/connectors/admin/cli.ts index d14eb833e89e8..73e48f6bdbe66 100644 --- a/types/src/connectors/admin/cli.ts +++ b/types/src/connectors/admin/cli.ts @@ -39,7 +39,8 @@ export const ConfluenceCommandSchema = t.type({ export type ConfluenceCommandType = t.TypeOf; export const ConfluenceUpsertPageResponseSchema = t.type({ - success: t.boolean, + workflowId: t.string, + workflowUrl: t.union([t.string, t.undefined]), }); export type ConfluenceUpsertPageResponseType = t.TypeOf< typeof ConfluenceUpsertPageResponseSchema