Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connectors] Add a CLI command confluence upsert-page #9358

Merged
merged 18 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions connectors/src/connectors/confluence/lib/cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import type {
AdminSuccessResponseType,
ConfluenceCommandType,
ConfluenceUpsertPageResponseType,
} from "@dust-tt/types";
import assert from "assert";
import fs from "fs/promises";

import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config";
import {
confluenceUpsertPagesWithFullParentsWorkflow,
confluenceUpsertPageWithFullParentsWorkflow,
} from "@connectors/connectors/confluence/temporal/workflows";
import { getTemporalClient } from "@connectors/lib/temporal";
import { default as topLogger } from "@connectors/logger/logger";

export const confluence = async ({
command,
args,
}: ConfluenceCommandType): Promise<
AdminSuccessResponseType | ConfluenceUpsertPageResponseType
> => {
const logger = topLogger.child({ majorCommand: "confluence", command, args });
switch (command) {
case "upsert-page": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of a risk of having some divergence between the cli code and the activities code, we generally try to call an activity from here even if that means it's async.

All that being said I'm fine with having a direct implementation. Any chance we can dry a bit these 2 commands?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep I agree, I was contemplating adding a new activity but in the end we won't do that in the workflow (upsert and check parents right after)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be an option on the activity + a new workflow to do just one page with that option set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm interesting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, doing that 👍

Copy link
Contributor Author

@aubin-tchoi aubin-tchoi Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it's a bit too convoluted to use the same activity: one fetches the page + upserts without parent and is optimized for the workflow (it does some version checking), the other one fetches the page, fetches the space using the page data, resolves parents and does an upsert with parents.

I added a separate activity to do the upsertWithFullParents and added workflows since we might want to do these with retries and history
SGTY @spolu ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yess!

if (!args.connectorId) {
throw new Error("Missing --connectorId argument");
}
if (!args.pageId) {
throw new Error("Missing --pageId argument");
}
const { connectorId, pageId } = args;

const client = await getTemporalClient();
const workflow = await client.workflow.start(
confluenceUpsertPageWithFullParentsWorkflow,
{
args: [{ connectorId, pageId }],
taskQueue: QUEUE_NAME,
workflowId: `confluence-upsert-page-${connectorId}-${pageId}`,
searchAttributes: { connectorId: [connectorId] },
memo: { connectorId },
}
);

const { workflowId } = workflow;
const temporalNamespace = process.env.TEMPORAL_NAMESPACE;
if (!temporalNamespace) {
logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`);
} else {
logger.info(
`[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
);
}
return {
workflowId,
workflowUrl: temporalNamespace
? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
: undefined,
};
}
case "upsert-pages": {
if (!args.connectorId) {
throw new Error("Missing --connectorId argument");
}
if (!args.file) {
throw new Error("Missing --file argument");
}
if (!args.keyInFile) {
throw new Error("Missing --keyInFile argument");
}
const connectorId = args.connectorId;
const file = args.file;
const keyInFile = args.keyInFile;

// parsing the JSON file
const fileContent = await fs.readFile(file, "utf-8");
const jsonArray = JSON.parse(fileContent);
assert(Array.isArray(jsonArray), "The file content is not an array.");

const pageIds = jsonArray.map((entry) => {
assert(
keyInFile in entry,
`Key "${keyInFile}" not found in entry ${JSON.stringify(entry)}`
);
return entry[keyInFile];
});

const client = await getTemporalClient();
const workflow = await client.workflow.start(
confluenceUpsertPagesWithFullParentsWorkflow,
{
args: [{ connectorId, pageIds }],
taskQueue: QUEUE_NAME,
workflowId: `confluence-upsert-pages-${connectorId}`,
searchAttributes: { connectorId: [connectorId] },
memo: { connectorId },
}
);

const { workflowId } = workflow;
const temporalNamespace = process.env.TEMPORAL_NAMESPACE;
if (!temporalNamespace) {
logger.info(`[Admin] Started temporal workflow with id: ${workflowId}`);
} else {
logger.info(
`[Admin] Started temporal workflow with id: ${workflowId} - https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
);
}
return {
workflowId,
workflowUrl: temporalNamespace
? `https://cloud.temporal.io/namespaces/${temporalNamespace}/workflows/${workflowId}`
: undefined,
};
}

default:
throw new Error("Unknown Confluence command: " + command);
}
};
Loading
Loading