Skip to content

Commit

Permalink
[connectors] Add a CLI command confluence upsert-page (#9358)
Browse files Browse the repository at this point in the history
* wip

* refactor: move page upsertion into a dedicated function

* feat: add page upsertion to the CLI command

* add return values for the CLI command

* skip pages that have a skip reason in the db

* add the correct parents

* add a command to upsert multiple pages, reading from a JSON file

* add db upserts too

* add confluence to the admin cli

* fix ConfluenceUpsertPageResponseSchema

* create an activity for the full upsert

* move cli force-upserts to workflows

* remove some exports and improve types

* remove unused import

* cleanup

* uniformize logs

* ✨

* fix incorrect import
  • Loading branch information
aubin-tchoi authored Dec 13, 2024
1 parent a75428a commit 2cf24b7
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 64 deletions.
121 changes: 121 additions & 0 deletions connectors/src/connectors/confluence/lib/cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import type {
AdminSuccessResponseType,
ConfluenceCommandType,
ConfluenceUpsertPageResponseType,
} from "@dust-tt/types";
import assert from "assert";
import fs from "fs/promises";

import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config";
import {
confluenceUpsertPagesWithFullParentsWorkflow,
confluenceUpsertPageWithFullParentsWorkflow,
} from "@connectors/connectors/confluence/temporal/workflows";
import { getTemporalClient } from "@connectors/lib/temporal";
import { default as topLogger } from "@connectors/logger/logger";

export const confluence = async ({
command,
args,
}: ConfluenceCommandType): Promise<
AdminSuccessResponseType | ConfluenceUpsertPageResponseType
> => {
const logger = topLogger.child({ majorCommand: "confluence", command, args });
switch (command) {
case "upsert-page": {
if (!args.connectorId) {
throw new Error("Missing --connectorId argument");
}
if (!args.pageId) {
throw new Error("Missing --pageId argument");
}
const { connectorId, pageId } = args;

const client = await getTemporalClient();
const workflow = await client.workflow.start(
confluenceUpsertPageWithFullParentsWorkflow,
{
args: [{ connectorId, pageId }],
taskQueue: QUEUE_NAME,
workflowId: `confluence-upsert-page-${connectorId}-${pageId}`,
searchAttributes: { connectorId: [connectorId] },
memo: { connectorId },
}
);

const { workflowId } = workflow;
const temporalNamespace = process.env.TEMPORAL_NAMESPACE;
if (!temporalNamespace) {
logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`);
} else {
logger.info(
`[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
);
}
return {
workflowId,
workflowUrl: temporalNamespace
? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
: undefined,
};
}
case "upsert-pages": {
if (!args.connectorId) {
throw new Error("Missing --connectorId argument");
}
if (!args.file) {
throw new Error("Missing --file argument");
}
if (!args.keyInFile) {
throw new Error("Missing --keyInFile argument");
}
const connectorId = args.connectorId;
const file = args.file;
const keyInFile = args.keyInFile;

// parsing the JSON file
const fileContent = await fs.readFile(file, "utf-8");
const jsonArray = JSON.parse(fileContent);
assert(Array.isArray(jsonArray), "The file content is not an array.");

const pageIds = jsonArray.map((entry) => {
assert(
keyInFile in entry,
`Key "${keyInFile}" not found in entry ${JSON.stringify(entry)}`
);
return entry[keyInFile];
});

const client = await getTemporalClient();
const workflow = await client.workflow.start(
confluenceUpsertPagesWithFullParentsWorkflow,
{
args: [{ connectorId, pageIds }],
taskQueue: QUEUE_NAME,
workflowId: `confluence-upsert-pages-${connectorId}`,
searchAttributes: { connectorId: [connectorId] },
memo: { connectorId },
}
);

const { workflowId } = workflow;
const temporalNamespace = process.env.TEMPORAL_NAMESPACE;
if (!temporalNamespace) {
logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`);
} else {
logger.info(
`[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
);
}
return {
workflowId,
workflowUrl: temporalNamespace
? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
: undefined,
};
}

default:
throw new Error("Unknown Confluence command: " + command);
}
};
Loading

0 comments on commit 2cf24b7

Please sign in to comment.