Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slack Sync updates slack docs parents Field #1295

Merged
merged 35 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0a42e8d
doc update of parents field
philipperolet Aug 30, 2023
605fbe3
core api to update parents (+ refactor of update tags)
philipperolet Aug 30, 2023
8004a79
spolu review
philipperolet Aug 31, 2023
876225a
logging
philipperolet Aug 31, 2023
ca0e3f3
review 2
philipperolet Sep 1, 2023
406ca6f
fixed qdrant payload update via document hash
philipperolet Sep 1, 2023
7303190
WIP
philipperolet Sep 2, 2023
cf71c42
wip2
philipperolet Sep 4, 2023
f2eff93
wip 3
philipperolet Sep 5, 2023
9a30d02
wip 4
philipperolet Sep 5, 2023
2c7efc6
add parents endpoint to front
philipperolet Sep 5, 2023
5628ee9
cleaning
philipperolet Sep 5, 2023
1bdc7c8
Merge branch 'main' into ds-parents-field-notion
philipperolet Sep 5, 2023
120e2de
removed js Set in parents (semantics don't work)
philipperolet Sep 5, 2023
b8a3091
cleaning
philipperolet Sep 5, 2023
31a853d
Merge branch 'main' into ds-parents-field-notion
philipperolet Sep 5, 2023
2462e98
remove SyncWorkflowResult
philipperolet Sep 6, 2023
a350434
controlled memoization of getParents
philipperolet Sep 6, 2023
458bc7c
cleaning
philipperolet Sep 6, 2023
aaec779
handle void returns from pqueue execution in notion workflow
philipperolet Sep 6, 2023
0acb000
fix: document is of class object at runtime, not notionpage or notiondb
philipperolet Sep 6, 2023
ed66a6a
cleaning
philipperolet Sep 6, 2023
51ffbb1
bump notion workflow version
philipperolet Sep 6, 2023
7c000fe
pass timestamp to activity execution for better memoization
philipperolet Sep 6, 2023
83bdaa4
henry's suggestions
philipperolet Sep 6, 2023
16f74e1
migration script
philipperolet Sep 6, 2023
853d7ea
cleaning
philipperolet Sep 6, 2023
af02ecb
leaner getParents + fixes on potential inconsistencies
philipperolet Sep 6, 2023
c8bbaa7
Slack connector parents field update during sync
philipperolet Sep 6, 2023
ffad866
Migration for existing slack docs
philipperolet Sep 6, 2023
9ae414a
renaming
philipperolet Sep 6, 2023
974cd69
Merge branch 'ds-parents-field-notion' into ds-parents-slack
philipperolet Sep 6, 2023
c9a1682
smaller chunks
philipperolet Sep 8, 2023
3bfe90a
migration again
philipperolet Sep 8, 2023
ce34865
Merge branch 'main' into ds-parents-slack
philipperolet Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions connectors/migrations/20230906_2_slack_fill_parents_field.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import { Connector, SlackMessages } from "@connectors/lib/models";

async function main() {
// if first arg is "all", update all connectors, else update only the
// connector for the corresponding workspace id
const connectors =
process.argv[2] === "all"
? await Connector.findAll({
where: {
type: "slack",
},
})
: await Connector.findAll({
where: {
type: "slack",
workspaceId: process.argv[2],
},
});

for (const connector of connectors) {
console.log(`Updating parents field for connector ${connector.id}`);
await updateParentsFieldForConnector(connector);
}
}

async function updateParentsFieldForConnector(connector: Connector) {
// get all distinct documentIds and their channel ids from slack messages in
// this connector
const documentIdsAndChannels = await SlackMessages.findAll({
where: {
connectorId: connector.id,
},
attributes: ["documentId", "channelId"],
group: ["documentId", "channelId"],
});
// update all parents fields for all pages and databases by chunks of 128
const chunkSize = 32;
for (let i = 0; i < documentIdsAndChannels.length; i += chunkSize) {
const chunk = documentIdsAndChannels.slice(i, i + chunkSize);
console.log(`Updating ${chunk.length} documents`);
// update parents field for each document of the chunk, in parallel
await Promise.all(
chunk.map((documentIdAndChannel) =>
updateDocumentParentsField(connector, documentIdAndChannel.documentId, [
documentIdAndChannel.channelId,
])
)
);
}
}

main()
.then(() => {
console.log("Done");
process.exit(0);
})
.catch((err) => {
console.error(err);
process.exit(1);
});
52 changes: 52 additions & 0 deletions connectors/migrations/20230906_notion_fill_parents_field.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { updateAllParentsFields } from "@connectors/connectors/notion/lib/parents";
import { Connector, NotionDatabase, NotionPage } from "@connectors/lib/models";

async function main() {
// if first arg is "all", update all connectors, else update only the
// connector for the corresponding workspace id
const connectors =
process.argv[2] === "all"
? await Connector.findAll({
where: {
type: "notion",
},
})
: await Connector.findAll({
where: {
type: "notion",
workspaceId: process.argv[2],
},
});

for (const connector of connectors) {
console.log(`Updating parents field for connector ${connector.id}`);
await updateParentsFieldForConnector(connector);
}
}

async function updateParentsFieldForConnector(connector: Connector) {
// get all pages and databases for this connector
const pages = await NotionPage.findAll({
where: {
connectorId: connector.id,
},
});
const databases = await NotionDatabase.findAll({
where: {
connectorId: connector.id,
},
});

// update all parents fields for all pages and databases
await updateAllParentsFields(connector, [...pages, ...databases]);
}

main()
.then(() => {
console.log("Done");
process.exit(0);
})
.catch((err) => {
console.error(err);
process.exit(1);
});
56 changes: 56 additions & 0 deletions connectors/src/connectors/notion/lib/connectors_db_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,59 @@ export async function getNotionDatabaseFromConnectorsDb(

return NotionDatabase.findOne({ where });
}

/**
* Get children *that are pages* of a given notion page or database
*
* !! Not children *of a page*
*/
export async function getPageChildrenOf(
dataSourceInfo: DataSourceInfo,
notionId: string
): Promise<NotionPage[]> {
const connector = await Connector.findOne({
where: {
type: "notion",
workspaceId: dataSourceInfo.workspaceId,
dataSourceName: dataSourceInfo.dataSourceName,
},
});
if (!connector) {
throw new Error("Could not find connector");
}

return NotionPage.findAll({
where: {
parentId: notionId,
connectorId: connector.id,
},
});
}

/**
* Get children *that are databases* of a given notion page or database
*
* !! Not children *of a database*
*/
export async function getDatabaseChildrenOf(
dataSourceInfo: DataSourceInfo,
notionId: string
): Promise<NotionDatabase[]> {
const connector = await Connector.findOne({
where: {
type: "notion",
workspaceId: dataSourceInfo.workspaceId,
dataSourceName: dataSourceInfo.dataSourceName,
},
});
if (!connector) {
throw new Error("Could not find connector");
}

return NotionDatabase.findAll({
where: {
parentId: notionId,
connectorId: connector.id,
},
});
}
143 changes: 143 additions & 0 deletions connectors/src/connectors/notion/lib/parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import memoize from "lodash.memoize";

import {
getDatabaseChildrenOf,
getNotionDatabaseFromConnectorsDb,
getNotionPageFromConnectorsDb,
getPageChildrenOf,
} from "@connectors/connectors/notion/lib/connectors_db_helpers";
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import { NotionDatabase, NotionPage } from "@connectors/lib/models";
import {
DataSourceConfig,
DataSourceInfo,
} from "@connectors/types/data_source_config";

/** Compute the parents field for a notion pageOrDb See the [Design
* Doc](https://www.notion.so/dust-tt/Engineering-e0f834b5be5a43569baaf76e9c41adf2?p=3d26536a4e0a464eae0c3f8f27a7af97&pm=s)
* and the field documentation [in
* core](https://github.com/dust-tt/dust/blob/main/core/src/data_sources/data_source.rs)
* for relevant details
*
* @param memoizationKey optional key to control memoization of this function (not actually used by the functio)
*
*/
async function _getParents(
dataSourceInfo: DataSourceInfo,
pageOrDbId: string,
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- used for memoization
memoizationKey?: string
): Promise<string[]> {
const parents: string[] = [pageOrDbId];
const pageOrDb =
(await getNotionPageFromConnectorsDb(dataSourceInfo, pageOrDbId)) ||
(await getNotionDatabaseFromConnectorsDb(dataSourceInfo, pageOrDbId));
if (!pageOrDb) {
// pageOrDb is either not synced yet (not an issue, see design doc) or
// is not in Dust's scope, in both cases we can just return the page id
return parents;
}
switch (pageOrDb.parentType) {
case "workspace":
return parents;
case "block":
// rare cases in which doing something here is useful
// are ignored for now, see the design doc for details
return parents;
case "page":
case "database": {
return parents.concat(
// parentId cannot be undefined here
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await getParents(dataSourceInfo, pageOrDb.parentId!, memoizationKey)
);
}
default:
throw new Error(`Unhandled parent type ${pageOrDb.parentType}`);
}
}

export const getParents = memoize(
_getParents,
(dataSourceInfo, pageOrDbId, memoizationKey) => {
return `${dataSourceInfo.dataSourceName}:${pageOrDbId}:${memoizationKey}`;
}
);

export async function updateAllParentsFields(
dataSourceConfig: DataSourceConfig,
pageOrDbs: (NotionPage | NotionDatabase)[],
memoizationKey?: string
): Promise<number> {
/* Computing all descendants, then updating, ensures the field is updated only
once per page, limiting the load on the Datasource */
const pagesToUpdate = await getPagesToUpdate(pageOrDbs, dataSourceConfig);

// Update everybody's parents field. Use of a memoization key to control
// sharing memoization across updateAllParentsFields calls, which
// can be desired or not depending on the use case
for (const page of pagesToUpdate) {
const parents = await getParents(
dataSourceConfig,
page.notionPageId,
memoizationKey
);

await updateDocumentParentsField(
dataSourceConfig,
`notion-${page.notionPageId}`,
parents
);
}
return pagesToUpdate.length;
}

/** Get ids of all pages whose parents field should be updated: initial pages in
* pageOrDbs, and all the descendants of pageOrDbs that are pages (including
* children of databases)
*
* Note: databases are not stored in the Datasource, so they don't need to be
* updated
*/
async function getPagesToUpdate(
pageOrDbs: (NotionPage | NotionDatabase)[],
dataSourceConfig: DataSourceConfig
): Promise<NotionPage[]> {
const pagesToUpdate: NotionPage[] = [];

let i = 0;
while (i < pageOrDbs.length) {
// Visit next document and if it's a page add it to update list
const pageOrDb = pageOrDbs[i++] as NotionPage | NotionDatabase;
const pageOrDbId = notionPageOrDbId(pageOrDb);
if ((pageOrDb as NotionPage).notionPageId) {
pagesToUpdate.push(pageOrDb as NotionPage);
}

// Get children of the document
const pageChildren = await getPageChildrenOf(dataSourceConfig, pageOrDbId);
const databaseChildren = await getDatabaseChildrenOf(
dataSourceConfig,
pageOrDbId
);

// If they haven't yet been visited, add them to documents visited
// and to the list of documents whose children should be fetched
for (const child of [...pageChildren, ...databaseChildren]) {
if (
!pageOrDbs.some((d) => notionPageOrDbId(d) === notionPageOrDbId(child))
) {
pageOrDbs.push(child);
}
}
}

return pagesToUpdate;
}

function notionPageOrDbId(pageOrDb: NotionPage | NotionDatabase): string {
return (
(pageOrDb as NotionPage).notionPageId ||
(pageOrDb as NotionDatabase).notionDatabaseId
);
}
Loading