Skip to content

Commit

Permalink
create an activity for the full upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
aubin-tchoi committed Dec 13, 2024
1 parent 99dbf60 commit 5e9015c
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 173 deletions.
190 changes: 17 additions & 173 deletions connectors/src/connectors/confluence/lib/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,18 @@ import type {
AdminSuccessResponseType,
ConfluenceCommandType,
ConfluenceUpsertPageResponseType,
ModelId,
} from "@dust-tt/types";
import assert from "assert";
import fs from "fs/promises";

import {
getConfluencePageParentIds,
getSpaceHierarchy,
} from "@connectors/connectors/confluence/lib/hierarchy";
import {
confluenceGetSpaceNameActivity,
fetchConfluenceConfigurationActivity,
getConfluenceClient,
upsertConfluencePageInDb,
upsertConfluencePageToDataSource,
} from "@connectors/connectors/confluence/temporal/activities";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { ConfluencePage } from "@connectors/lib/models/confluence";
import { confluenceUpsertPageWithFullParentsActivity } from "@connectors/connectors/confluence/temporal/activities";
import { default as topLogger } from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";

interface cachedSpace {
spaceName: string | null;
spaceHierarchy: Record<string, string | null>;
}

async function cacheSpace(
cachedSpaces: Record<string, cachedSpace>,
{
confluenceCloudId,
connectorId,
spaceId,
}: {
confluenceCloudId: string;
connectorId: ModelId;
spaceId: string;
}
): Promise<cachedSpace> {
const cachedSpace = cachedSpaces[spaceId];
if (cachedSpace) {
return cachedSpace;
}
const spaceName = await confluenceGetSpaceNameActivity({
spaceId,
confluenceCloudId,
connectorId,
});
const spaceHierarchy = spaceName
? await getSpaceHierarchy(connectorId, spaceId)
: {}; // not fetching if we couldn't get the space from Confluence API anyway
cachedSpaces[spaceId] = { spaceName, spaceHierarchy };
return { spaceName, spaceHierarchy };
}

export const confluence = async ({
command,
args,
Expand All @@ -71,73 +29,12 @@ export const confluence = async ({
if (!args.pageId) {
throw new Error("Missing --pageId argument");
}
const connectorId = args.connectorId;
const pageId = args.pageId;

const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error("Connector not found.");
}
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const confluenceConfig =
await fetchConfluenceConfigurationActivity(connectorId);

const loggerArgs = {
const { connectorId, pageId } = args;
const success = await confluenceUpsertPageWithFullParentsActivity({
connectorId,
dataSourceId: dataSourceConfig.dataSourceId,
pageId,
workspaceId: dataSourceConfig.workspaceId,
};
const localLogger = logger.child(loggerArgs);
const visitedAtMs = new Date().getTime();

const pageInDb = await ConfluencePage.findOne({
attributes: ["parentId", "skipReason"],
where: { connectorId, pageId },
});
if (pageInDb && pageInDb.skipReason !== null) {
localLogger.info("Confluence page skipped.");
return { success: false };
}

const client = await getConfluenceClient(
{ cloudId: confluenceConfig?.cloudId },
connector
);

const page = await client.getPageById(pageId);
if (!page) {
localLogger.info("Confluence page not found.");
return { success: false };
}
const space = await client.getSpaceById(page.spaceId);
if (!space) {
localLogger.info("Confluence space not found.");
return { success: false };
}

const cachedHierarchy = await getSpaceHierarchy(
connectorId,
page.spaceId
);
const parentIds = await getConfluencePageParentIds(
connectorId,
{ pageId: page.id, parentId: page.parentId, spaceId: page.spaceId },
cachedHierarchy
);

localLogger.info("Upserting Confluence page.");
await upsertConfluencePageToDataSource(
page,
space.name,
parentIds,
confluenceConfig,
"batch",
dataSourceConfig,
loggerArgs
);
await upsertConfluencePageInDb(connector.id, page, visitedAtMs);
return { success: true };
return { success };
}
case "upsert-pages": {
if (!args.connectorId) {
Expand Down Expand Up @@ -166,79 +63,26 @@ export const confluence = async ({
return entry[keyInFile];
});

// fetching the pages in DB
const pagesInDb = Object.fromEntries(
(
await ConfluencePage.findAll({
attributes: ["pageId", "skipReason"],
where: { connectorId, pageId: pageIds },
})
).map((page) => [page.pageId, page])
);

const connector = await ConnectorResource.fetchById(connectorId);
assert(connector !== null, "Connector not found.");
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const confluenceConfig =
await fetchConfluenceConfigurationActivity(connectorId);
const client = await getConfluenceClient(
{ cloudId: confluenceConfig?.cloudId },
connector
);

const cachedSpaces: Record<string, cachedSpace> = {};
const visitedAtMs = new Date().getTime();
let allSuccesses = true;
const cachedSpaceNames: Record<string, string> = {};
const cachedSpaceHierarchies: Record<
string,
Record<string, string | null>
> = {};

for (const pageId of pageIds) {
const loggerArgs = {
const success = await confluenceUpsertPageWithFullParentsActivity({
connectorId,
dataSourceId: dataSourceConfig.dataSourceId,
pageId,
workspaceId: dataSourceConfig.workspaceId,
};
const localLogger = logger.child(loggerArgs);

const pageInDb = pagesInDb[pageId];
if (pageInDb && pageInDb.skipReason !== null) {
localLogger.info("Confluence page skipped.");
continue;
}

const page = await client.getPageById(pageId);
if (!page) {
localLogger.info("Confluence page not found.");
continue;
}
// fetching the space if not already cached
const { spaceName, spaceHierarchy } = await cacheSpace(cachedSpaces, {
connectorId,
confluenceCloudId: confluenceConfig?.cloudId,
spaceId: page.spaceId,
cachedSpaceNames,
cachedSpaceHierarchies,
});
if (!spaceName) {
localLogger.info("Confluence space not found.");
continue;
if (!success) {
logger.error({ pageId }, "Failed to upsert page");
allSuccesses = false;
}

const parentIds = await getConfluencePageParentIds(
connectorId,
{ pageId: page.id, parentId: page.parentId, spaceId: page.spaceId },
spaceHierarchy
);

localLogger.info("Upserting Confluence page.");
await upsertConfluencePageToDataSource(
page,
spaceName,
parentIds,
confluenceConfig,
"batch",
dataSourceConfig,
loggerArgs
);
await upsertConfluencePageInDb(connector.id, page, visitedAtMs);
}
return { success: true };
return { success: allSuccesses };
}

default:
Expand Down
102 changes: 102 additions & 0 deletions connectors/src/connectors/confluence/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ interface ConfluenceCheckAndUpsertPageActivityInput {
visitedAtMs: number;
}

/**
* Upsert a Confluence page without its full parents.
* Operates greedily by stopping if the page is restricted or if there is a version match (unless the page was moved, in this case we have to upsert because the parents have changed).
*/
export async function confluenceCheckAndUpsertPageActivity({
connectorId,
isBatchSync,
Expand Down Expand Up @@ -428,6 +432,104 @@ export async function confluenceCheckAndUpsertPageActivity({
return true;
}

/**
* Upsert a Confluence page with its full parent hierarchy.
* Expensive operation, it should be reserved to admin actions on a limited set of pages.
*/
export async function confluenceUpsertPageWithFullParentsActivity({
connectorId,
pageId,
cachedSpaceNames = {},
cachedSpaceHierarchies = {},
}: {
connectorId: ModelId;
pageId: string;
cachedSpaceNames?: Record<string, string>;
cachedSpaceHierarchies?: Record<string, Record<string, string | null>>;
}): Promise<boolean> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error("Connector not found.");
}
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const confluenceConfig =
await fetchConfluenceConfigurationActivity(connectorId);

const loggerArgs = {
connectorId,
dataSourceId: dataSourceConfig.dataSourceId,
pageId,
workspaceId: dataSourceConfig.workspaceId,
};
const localLogger = logger.child(loggerArgs);
const visitedAtMs = new Date().getTime();

const pageInDb = await ConfluencePage.findOne({
attributes: ["parentId", "skipReason"],
where: { connectorId, pageId },
});
if (pageInDb && pageInDb.skipReason !== null) {
localLogger.info("Confluence page skipped.");
return false;
}

const client = await getConfluenceClient(
{ cloudId: confluenceConfig?.cloudId },
connector
);

const hasReadRestrictions = await pageHasReadRestrictions(client, pageId);
if (hasReadRestrictions) {
localLogger.info("Skipping restricted Confluence page.");
return false;
}

const page = await client.getPageById(pageId);
if (!page) {
localLogger.info("Confluence page not found.");
return false;
}

let spaceName = cachedSpaceNames[page.spaceId];
if (!spaceName) {
const space = await client.getSpaceById(page.spaceId);
if (!space) {
localLogger.info("Confluence space not found.");
return false;
}
cachedSpaceNames[page.spaceId] = space.name;
spaceName = space.name;
}

if (!cachedSpaceHierarchies[page.spaceId]) {
cachedSpaceHierarchies[page.spaceId] = await getSpaceHierarchy(
connectorId,
page.spaceId
);
}

const parentIds = await getConfluencePageParentIds(
connectorId,
{ pageId: page.id, parentId: page.parentId, spaceId: page.spaceId },
cachedSpaceHierarchies[page.spaceId]
);

localLogger.info("Upserting Confluence page.");
await upsertConfluencePageToDataSource(
page,
spaceName,
parentIds,
confluenceConfig,
"batch",
dataSourceConfig,
loggerArgs
);

await upsertConfluencePageInDb(connector.id, page, visitedAtMs);

return true;
}

export async function confluenceGetActiveChildPageRefsActivity({
connectorId,
parentPageId,
Expand Down

0 comments on commit 5e9015c

Please sign in to comment.