Skip to content

Commit

Permalink
move cli force-upserts to workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
aubin-tchoi committed Dec 13, 2024
1 parent 5e9015c commit 86277f9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 30 deletions.
88 changes: 59 additions & 29 deletions connectors/src/connectors/confluence/lib/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import type {
import assert from "assert";
import fs from "fs/promises";

import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities";
import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config";
import {
confluenceUpsertPagesWithFullParentsWorkflow,
confluenceUpsertPageWithFullParentsWorkflow,
} from "@connectors/connectors/confluence/temporal/workflows";
import { getTemporalClient } from "@connectors/lib/temporal";
import { default as topLogger } from "@connectors/logger/logger";

interface cachedSpace {
spaceName: string | null;
spaceHierarchy: Record<string, string | null>;
}

export const confluence = async ({
command,
args,
Expand All @@ -30,11 +30,34 @@ export const confluence = async ({
throw new Error("Missing --pageId argument");
}
const { connectorId, pageId } = args;
const success = await confluenceUpsertPageWithFullParentsActivity({
connectorId,
pageId,
});
return { success };

const client = await getTemporalClient();
const workflow = await client.workflow.start(
confluenceUpsertPageWithFullParentsWorkflow,
{
args: [{ connectorId, pageId }],
taskQueue: QUEUE_NAME,
workflowId: `confluence-upsert-page-${connectorId}-${pageId}`,
searchAttributes: { connectorId: [connectorId] },
memo: { connectorId },
}
);

const { workflowId } = workflow;
const temporalNamespace = process.env.TEMPORAL_NAMESPACE;
if (!temporalNamespace) {
logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`);
} else {
logger.info(
`[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
);
}
return {
workflowId,
workflowUrl: temporalNamespace
? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
: undefined,
};
}
case "upsert-pages": {
if (!args.connectorId) {
Expand Down Expand Up @@ -63,26 +86,33 @@ export const confluence = async ({
return entry[keyInFile];
});

let allSuccesses = true;
const cachedSpaceNames: Record<string, string> = {};
const cachedSpaceHierarchies: Record<
string,
Record<string, string | null>
> = {};

for (const pageId of pageIds) {
const success = await confluenceUpsertPageWithFullParentsActivity({
connectorId,
pageId,
cachedSpaceNames,
cachedSpaceHierarchies,
});
if (!success) {
logger.error({ pageId }, "Failed to upsert page");
allSuccesses = false;
const client = await getTemporalClient();
const workflow = await client.workflow.start(
confluenceUpsertPagesWithFullParentsWorkflow,
{
args: [{ connectorId, pageIds }],
taskQueue: QUEUE_NAME,
workflowId: `confluence-upsert-pages-${connectorId}`,
searchAttributes: { connectorId: [connectorId] },
memo: { connectorId },
}
);

const { workflowId } = workflow;
const temporalNamespace = process.env.TEMPORAL_NAMESPACE;
if (!temporalNamespace) {
logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`);
} else {
logger.info(
`[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
);
}
return { success: allSuccesses };
return {
workflowId,
workflowUrl: temporalNamespace
? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
: undefined,
};
}

default:
Expand Down
37 changes: 37 additions & 0 deletions connectors/src/connectors/confluence/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {

import type { ConfluencePageRef } from "@connectors/connectors/confluence/lib/confluence_api";
import type * as activities from "@connectors/connectors/confluence/temporal/activities";
import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities";
import type { SpaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals";
import { spaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals";
import {
Expand Down Expand Up @@ -412,3 +413,39 @@ export async function confluencePersonalDataReportingWorkflow() {
}
}
}

export async function confluenceUpsertPageWithFullParentsWorkflow({
connectorId,
pageId,
}: {
connectorId: ModelId;
pageId: string;
}) {
await confluenceUpsertPageWithFullParentsActivity({
connectorId,
pageId,
});
}

export async function confluenceUpsertPagesWithFullParentsWorkflow({
connectorId,
pageIds,
}: {
connectorId: ModelId;
pageIds: string[];
}) {
const cachedSpaceNames: Record<string, string> = {};
const cachedSpaceHierarchies: Record<
string,
Record<string, string | null>
> = {};

for (const pageId of pageIds) {
await confluenceUpsertPageWithFullParentsActivity({
connectorId,
pageId,
cachedSpaceNames,
cachedSpaceHierarchies,
});
}
}
3 changes: 2 additions & 1 deletion types/src/connectors/admin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export const ConfluenceCommandSchema = t.type({
export type ConfluenceCommandType = t.TypeOf<typeof ConfluenceCommandSchema>;

export const ConfluenceUpsertPageResponseSchema = t.type({
success: t.boolean,
workflowId: t.string,
workflowUrl: t.union([t.string, t.undefined]),
});
export type ConfluenceUpsertPageResponseType = t.TypeOf<
typeof ConfluenceUpsertPageResponseSchema
Expand Down

0 comments on commit 86277f9

Please sign in to comment.