From 24e67bbb4335c421851f443fa2ed1d7ea582b259 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Fri, 6 Dec 2024 14:54:00 +0100 Subject: [PATCH] chore: revamp upsert hooks --- .../hooks/data_source_helpers.ts | 0 .../actions/doc_tracker_retrieval.ts | 30 ++-- .../actions/doc_tracker_suggest_changes.ts | 0 .../hooks/document_tracker/consts.ts | 23 +++ .../hooks/document_tracker/index.ts | 35 +++++ .../hooks/document_tracker}/lib.ts | 74 +++++---- .../lib/document_upsert_hooks/hooks/index.ts | 49 ++++++ .../hooks/document_tracker/consts.ts | 9 -- .../hooks/document_tracker/index.ts | 2 - .../hooks/document_tracker/lib.ts | 21 --- .../document_tracker/suggest_changes/index.ts | 24 --- .../update_tracked_documents/index.ts | 17 -- .../update_tracked_documents/lib.ts | 136 ---------------- .../hooks/index.ts | 145 ------------------ .../hooks/types.ts | 1 - front/lib/upsert_queue.ts | 58 +------ .../[dsId]/documents/[documentId]/index.ts | 39 +---- front/start_worker.ts | 6 +- .../activities.ts | 77 +++------- front/temporal/document_tracker/client.ts | 37 +++++ front/temporal/document_tracker/config.ts | 2 + .../signals.ts | 0 .../worker.ts | 7 +- front/temporal/document_tracker/workflows.ts | 55 +++++++ .../documents_post_process_hooks/client.ts | 66 -------- .../documents_post_process_hooks/workflows.ts | 62 -------- front/temporal/upsert_queue/activities.ts | 17 +- 27 files changed, 310 insertions(+), 682 deletions(-) rename front/lib/{documents_post_process_hooks => document_upsert_hooks}/hooks/data_source_helpers.ts (100%) rename front/lib/{documents_post_process_hooks/hooks/document_tracker/suggest_changes => document_upsert_hooks/hooks/document_tracker}/actions/doc_tracker_retrieval.ts (78%) rename front/lib/{documents_post_process_hooks/hooks/document_tracker/suggest_changes => document_upsert_hooks/hooks/document_tracker}/actions/doc_tracker_suggest_changes.ts (100%) create mode 100644 front/lib/document_upsert_hooks/hooks/document_tracker/consts.ts create mode 100644 front/lib/document_upsert_hooks/hooks/document_tracker/index.ts rename front/lib/{documents_post_process_hooks/hooks/document_tracker/suggest_changes => document_upsert_hooks/hooks/document_tracker}/lib.ts (88%) create mode 100644 front/lib/document_upsert_hooks/hooks/index.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/document_tracker/consts.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/document_tracker/index.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/document_tracker/lib.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/index.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/index.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/lib.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/index.ts delete mode 100644 front/lib/documents_post_process_hooks/hooks/types.ts rename front/temporal/{documents_post_process_hooks => document_tracker}/activities.ts (59%) create mode 100644 front/temporal/document_tracker/client.ts create mode 100644 front/temporal/document_tracker/config.ts rename front/temporal/{documents_post_process_hooks => document_tracker}/signals.ts (100%) rename front/temporal/{documents_post_process_hooks => document_tracker}/worker.ts (75%) create mode 100644 front/temporal/document_tracker/workflows.ts delete mode 100644 front/temporal/documents_post_process_hooks/client.ts delete mode 100644 front/temporal/documents_post_process_hooks/workflows.ts diff --git a/front/lib/documents_post_process_hooks/hooks/data_source_helpers.ts b/front/lib/document_upsert_hooks/hooks/data_source_helpers.ts similarity index 100% rename from front/lib/documents_post_process_hooks/hooks/data_source_helpers.ts rename to front/lib/document_upsert_hooks/hooks/data_source_helpers.ts diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/actions/doc_tracker_retrieval.ts b/front/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_retrieval.ts similarity index 78% rename from front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/actions/doc_tracker_retrieval.ts rename to front/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_retrieval.ts index 7222e74de8b3..61bac225be93 100644 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/actions/doc_tracker_retrieval.ts +++ b/front/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_retrieval.ts @@ -2,8 +2,8 @@ import * as t from "io-ts"; import { callAction } from "@app/lib/actions/helpers"; import type { Authenticator } from "@app/lib/auth"; -import { getTrackableDataSourceViews } from "@app/lib/documents_post_process_hooks/hooks/document_tracker/lib"; import { cloneBaseConfig, DustProdActionRegistry } from "@app/lib/registry"; +import type { DataSourceViewResource } from "@app/lib/resources/data_source_view_resource"; // Part of the new doc tracker pipeline, performs the retrieval (semantic search) step // it takes {input_text: string} as input @@ -14,23 +14,25 @@ export async function callDocTrackerRetrievalAction( inputText, targetDocumentTokens, topK, - }: { inputText: string; targetDocumentTokens: number; topK: number } + dataSourceViews, + }: { + inputText: string; + targetDocumentTokens: number; + topK: number; + dataSourceViews: DataSourceViewResource[]; + } ): Promise> { - const action = DustProdActionRegistry["doc-tracker-retrieval"]; - const config = cloneBaseConfig(action.config); - - const trackableDataSourceViews = await getTrackableDataSourceViews(auth); - - if (!trackableDataSourceViews.length) { + if (!dataSourceViews.length) { return []; } - config.SEMANTIC_SEARCH.data_sources = trackableDataSourceViews.map( - (view) => ({ - workspace_id: auth.getNonNullableWorkspace().sId, - data_source_id: view.sId, - }) - ); + const action = DustProdActionRegistry["doc-tracker-retrieval"]; + const config = cloneBaseConfig(action.config); + + config.SEMANTIC_SEARCH.data_sources = dataSourceViews.map((view) => ({ + workspace_id: auth.getNonNullableWorkspace().sId, + data_source_id: view.sId, + })); config.SEMANTIC_SEARCH.target_document_tokens = targetDocumentTokens; config.SEMANTIC_SEARCH.top_k = topK; diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/actions/doc_tracker_suggest_changes.ts b/front/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_suggest_changes.ts similarity index 100% rename from front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/actions/doc_tracker_suggest_changes.ts rename to front/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_suggest_changes.ts diff --git a/front/lib/document_upsert_hooks/hooks/document_tracker/consts.ts b/front/lib/document_upsert_hooks/hooks/document_tracker/consts.ts new file mode 100644 index 000000000000..2149ef76b42b --- /dev/null +++ b/front/lib/document_upsert_hooks/hooks/document_tracker/consts.ts @@ -0,0 +1,23 @@ +import type { ConnectorProvider } from "@dust-tt/types"; +import { assertNever } from "@dust-tt/types"; + +export function isConnectorTypeTrackable( + connectorType: ConnectorProvider +): boolean { + switch (connectorType) { + case "google_drive": + case "github": + case "notion": + case "microsoft": + case "confluence": + case "intercom": + case "webcrawler": + case "snowflake": + case "zendesk": + return true; + case "slack": + return false; + default: + assertNever(connectorType); + } +} diff --git a/front/lib/document_upsert_hooks/hooks/document_tracker/index.ts b/front/lib/document_upsert_hooks/hooks/document_tracker/index.ts new file mode 100644 index 000000000000..bee65a9ac8c1 --- /dev/null +++ b/front/lib/document_upsert_hooks/hooks/document_tracker/index.ts @@ -0,0 +1,35 @@ +import { getFeatureFlags } from "@app/lib/auth"; +import type { DocumentUpsertHook } from "@app/lib/document_upsert_hooks/hooks"; +import { launchRunDocumentTrackerWorkflow } from "@app/temporal/document_tracker/client"; + +// this hook is meant to suggest changes to tracked documents +// based on new information that has been added to other documents +// it should run on upserts if the workspace has tracked docs +export const documentTrackerUpsertHook: DocumentUpsertHook = { + type: "document_tracker", + fn: async ({ + auth, + dataSourceId, + documentId, + documentHash, + dataSourceConnectorProvider, + }) => { + const owner = auth.workspace(); + if (!owner) { + return; + } + + const flags = await getFeatureFlags(owner); + if (!flags.includes("document_tracker")) { + return; + } + + await launchRunDocumentTrackerWorkflow({ + workspaceId: owner.sId, + dataSourceId, + documentId, + documentHash, + dataSourceConnectorProvider, + }); + }, +}; diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/lib.ts b/front/lib/document_upsert_hooks/hooks/document_tracker/lib.ts similarity index 88% rename from front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/lib.ts rename to front/lib/document_upsert_hooks/hooks/document_tracker/lib.ts index 70b6ff977fc2..cde74a592e8a 100644 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/lib.ts +++ b/front/lib/document_upsert_hooks/hooks/document_tracker/lib.ts @@ -1,29 +1,29 @@ +import type { ConnectorProvider, UpsertContext } from "@dust-tt/types"; import { CoreAPI } from "@dust-tt/types"; import sgMail from "@sendgrid/mail"; import { Op } from "sequelize"; import showdown from "showdown"; import config from "@app/lib/api/config"; +import type { Authenticator } from "@app/lib/auth"; import { getFeatureFlags } from "@app/lib/auth"; -import type { - DocumentsPostProcessHookFilterParams, - DocumentsPostProcessHookOnUpsertParams, -} from "@app/lib/documents_post_process_hooks/hooks"; import { getDatasource, getDocumentDiff, -} from "@app/lib/documents_post_process_hooks/hooks/data_source_helpers"; +} from "@app/lib/document_upsert_hooks/hooks/data_source_helpers"; +import { callDocTrackerRetrievalAction } from "@app/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_retrieval"; +import { callDocTrackerSuggestChangesAction } from "@app/lib/document_upsert_hooks/hooks/document_tracker/actions/doc_tracker_suggest_changes"; +import { isConnectorTypeTrackable } from "@app/lib/document_upsert_hooks/hooks/document_tracker/consts"; import { DocumentTrackerChangeSuggestion, TrackedDocument, } from "@app/lib/models/doc_tracker"; import { DataSourceResource } from "@app/lib/resources/data_source_resource"; +import { DataSourceViewResource } from "@app/lib/resources/data_source_view_resource"; +import { SpaceResource } from "@app/lib/resources/space_resource"; import { UserModel } from "@app/lib/resources/storage/models/user"; import mainLogger from "@app/logger/logger"; -import { callDocTrackerRetrievalAction } from "./actions/doc_tracker_retrieval"; -import { callDocTrackerSuggestChangesAction } from "./actions/doc_tracker_suggest_changes"; - const { SENDGRID_API_KEY } = process.env; // If the sum of all INSERT diffs is less than this number, we skip the hook. @@ -42,13 +42,23 @@ const TOTAL_TARGET_TOKENS = 8192; const MAX_TRACKED_DOCUMENTS = 1; const logger = mainLogger.child({ - postProcessHook: "document_tracker_suggest_changes", + hookType: "document_tracker", }); export async function shouldDocumentTrackerSuggestChangesRun( - params: DocumentsPostProcessHookFilterParams + auth: Authenticator, + { + dataSourceId, + documentId, + dataSourceConnectorProvider, + upsertContext, + }: { + dataSourceId: string; + documentId: string; + dataSourceConnectorProvider: ConnectorProvider; + upsertContext: UpsertContext; + } ): Promise { - const auth = params.auth; const owner = auth.getNonNullableWorkspace(); const flags = await getFeatureFlags(owner); @@ -56,19 +66,6 @@ export async function shouldDocumentTrackerSuggestChangesRun( return false; } - if (params.verb !== "upsert") { - logger.info( - "document_tracker_suggest_changes post process hook should only run for upsert." - ); - return false; - } - - const { - upsertContext, - dataSourceId, - documentId, - dataSourceConnectorProvider, - } = params; const isBatchSync = upsertContext?.sync_type === "batch"; const localLogger = logger.child({ @@ -144,13 +141,19 @@ export async function shouldDocumentTrackerSuggestChangesRun( return false; } -export async function documentTrackerSuggestChangesOnUpsert({ +export async function documentTrackerSuggestChanges({ auth, dataSourceId, documentId, documentHash, documentSourceUrl, -}: DocumentsPostProcessHookOnUpsertParams): Promise { +}: { + auth: Authenticator; + dataSourceId: string; + documentId: string; + documentHash: string; + documentSourceUrl: string; +}): Promise { const owner = auth.workspace(); if (!owner) { throw new Error("Workspace not found."); @@ -261,10 +264,12 @@ export async function documentTrackerSuggestChangesOnUpsert({ }, "Calling doc tracker retrieval action." ); + const dataSourceViews = await getTrackableDataSourceViews(auth); const retrievalResult = await callDocTrackerRetrievalAction(auth, { inputText: diffString, targetDocumentTokens: targetTrackedDocumentTokens, topK: MAX_TRACKED_DOCUMENTS, + dataSourceViews, }); if (!retrievalResult.length) { @@ -510,3 +515,20 @@ function getDocumentTitle(tags: string[]): string | null { } return maybeTitleTag.split("title:")[1]; } + +export async function getTrackableDataSourceViews( + auth: Authenticator +): Promise { + const globalSpace = await SpaceResource.fetchWorkspaceGlobalSpace(auth); + // TODO(DOC_TRACKER): + const views = await DataSourceViewResource.listBySpace(auth, globalSpace); + + // Filter data sources to only include trackable ones + const trackableViews = views.filter( + (view) => + view.dataSource.connectorProvider && + isConnectorTypeTrackable(view.dataSource.connectorProvider) + ); + + return trackableViews; +} diff --git a/front/lib/document_upsert_hooks/hooks/index.ts b/front/lib/document_upsert_hooks/hooks/index.ts new file mode 100644 index 000000000000..571c08da8cd5 --- /dev/null +++ b/front/lib/document_upsert_hooks/hooks/index.ts @@ -0,0 +1,49 @@ +import type { ConnectorProvider, UpsertContext } from "@dust-tt/types"; + +import type { Authenticator } from "@app/lib/auth"; +import { documentTrackerUpsertHook } from "@app/lib/document_upsert_hooks/hooks/document_tracker"; +import { wakeLock } from "@app/lib/wake_lock"; +import logger from "@app/logger/logger"; + +const DUST_WORKSPACE = "0ec9852c2f"; + +const _hooks = { + document_tracker_suggest_changes: documentTrackerUpsertHook, +} as const; + +export const DOCUMENT_UPSERT_HOOKS: Array = + Object.values(_hooks); + +export type DocumentUpsertHook = { + type: string; + fn: (params: { + auth: Authenticator; + dataSourceId: string; + documentId: string; + documentHash: string; + dataSourceConnectorProvider: ConnectorProvider | null; + upsertContext?: UpsertContext; + }) => Promise; +}; + +export function runDocumentUpsertHooks( + params: Parameters[0] +): void { + // TODO(document-tracker): remove this once we have a way to enable/disable + if (params.auth.workspace()?.sId !== DUST_WORKSPACE) { + return; + } + + for (const hook of DOCUMENT_UPSERT_HOOKS) { + void wakeLock(async () => { + try { + await hook.fn(params); + } catch (error) { + logger.error( + { hookType: hook.type, error }, + `Error running document upsert hook` + ); + } + }); + } +} diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/consts.ts b/front/lib/documents_post_process_hooks/hooks/document_tracker/consts.ts deleted file mode 100644 index 11344426dcc3..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/consts.ts +++ /dev/null @@ -1,9 +0,0 @@ -import type { ConnectorProvider } from "@dust-tt/types"; - -export const TRACKABLE_CONNECTOR_TYPES: ConnectorProvider[] = [ - "google_drive", - "github", - "notion", - "slack", - "microsoft", -]; diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/index.ts b/front/lib/documents_post_process_hooks/hooks/document_tracker/index.ts deleted file mode 100644 index b0a27f3b25ee..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { documentTrackerSuggestChangesPostProcessHook } from "@app/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes"; -export { documentTrackerUpdateTrackedDocumentsPostProcessHook } from "@app/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents"; diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/lib.ts b/front/lib/documents_post_process_hooks/hooks/document_tracker/lib.ts deleted file mode 100644 index f285e20a0fe5..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/lib.ts +++ /dev/null @@ -1,21 +0,0 @@ -import type { Authenticator } from "@app/lib/auth"; -import { TRACKABLE_CONNECTOR_TYPES } from "@app/lib/documents_post_process_hooks/hooks/document_tracker/consts"; -import { DataSourceViewResource } from "@app/lib/resources/data_source_view_resource"; -import { SpaceResource } from "@app/lib/resources/space_resource"; - -export async function getTrackableDataSourceViews( - auth: Authenticator -): Promise { - const globalSpace = await SpaceResource.fetchWorkspaceGlobalSpace(auth); - // TODO(DOC_TRACKER): - const views = await DataSourceViewResource.listBySpace(auth, globalSpace); - - // Filter data sources to only include trackable ones - const trackableViews = views.filter( - (view) => - view.dataSource.connectorProvider && - TRACKABLE_CONNECTOR_TYPES.includes(view.dataSource.connectorProvider) - ); - - return trackableViews; -} diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/index.ts b/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/index.ts deleted file mode 100644 index 9f368d88ec24..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/index.ts +++ /dev/null @@ -1,24 +0,0 @@ -import type { DocumentsPostProcessHook } from "@app/lib/documents_post_process_hooks/hooks"; -import { - documentTrackerSuggestChangesOnUpsert, - shouldDocumentTrackerSuggestChangesRun, -} from "@app/lib/documents_post_process_hooks/hooks/document_tracker/suggest_changes/lib"; - -// this hook is meant to suggest changes to tracked documents -// based on new information that has been added to other documents -// it should run on upserts if the workspace has tracked docs -export const documentTrackerSuggestChangesPostProcessHook: DocumentsPostProcessHook = - { - type: "document_tracker_suggest_changes", - getDebounceMs: async ({ dataSourceConnectorProvider }) => { - if (!dataSourceConnectorProvider) { - return 10000; // 10 seconds - } - if (dataSourceConnectorProvider === "notion") { - return 600000; // 10 minutes - } - return 3600000; // 1 hour - }, - filter: shouldDocumentTrackerSuggestChangesRun, - onUpsert: documentTrackerSuggestChangesOnUpsert, - }; diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/index.ts b/front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/index.ts deleted file mode 100644 index fa8ebc8c5916..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/index.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { DocumentsPostProcessHook } from "@app/lib/documents_post_process_hooks/hooks"; -import { - documentTrackerUpdateTrackedDocumentsOnDelete, - documentTrackerUpdateTrackedDocumentsOnUpsert, - shouldDocumentTrackerUpdateTrackedDocumentsRun, -} from "@app/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/lib"; - -// This hook is meant to update the TrackedDocuments table based on -// DUST_TRACK tags seen in documents. It should run on upserts, and on deletes (to cleanup) -export const documentTrackerUpdateTrackedDocumentsPostProcessHook: DocumentsPostProcessHook = - { - type: "document_tracker_update_tracked_documents", - getDebounceMs: async () => 1000, - filter: shouldDocumentTrackerUpdateTrackedDocumentsRun, - onUpsert: documentTrackerUpdateTrackedDocumentsOnUpsert, - onDelete: documentTrackerUpdateTrackedDocumentsOnDelete, - }; diff --git a/front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/lib.ts b/front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/lib.ts deleted file mode 100644 index c5d8f0bbbe50..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/document_tracker/update_tracked_documents/lib.ts +++ /dev/null @@ -1,136 +0,0 @@ -import type { ConnectorProvider } from "@dust-tt/types"; - -import { getFeatureFlags } from "@app/lib/auth"; -import { updateTrackedDocuments } from "@app/lib/document_tracker"; -import type { - DocumentsPostProcessHookFilterParams, - DocumentsPostProcessHookOnDeleteParams, - DocumentsPostProcessHookOnUpsertParams, -} from "@app/lib/documents_post_process_hooks/hooks"; -import { getDatasource } from "@app/lib/documents_post_process_hooks/hooks/data_source_helpers"; -import { TRACKABLE_CONNECTOR_TYPES } from "@app/lib/documents_post_process_hooks/hooks/document_tracker/consts"; -import { TrackedDocument } from "@app/lib/models/doc_tracker"; -import mainLogger from "@app/logger/logger"; - -const logger = mainLogger.child({ - postProcessHook: "document_tracker_update_tracked_documents", -}); - -export async function shouldDocumentTrackerUpdateTrackedDocumentsRun( - params: DocumentsPostProcessHookFilterParams -): Promise { - const { auth, dataSourceId, documentId, verb } = params; - const owner = auth.workspace(); - - if (!owner) { - logger.info( - "Workspace not found, document_tracker_update_tracked_documents post process hook should not run." - ); - return false; - } - - const localLogger = logger.child({ - workspaceId: owner.sId, - dataSourceId, - documentId, - }); - - const flags = await getFeatureFlags(owner); - - if (!flags.includes("document_tracker")) { - return false; - } - - const dataSource = await getDatasource(auth, dataSourceId); - - if ( - verb === "upsert" && - params.documentText.includes("DUST_TRACK(") && - TRACKABLE_CONNECTOR_TYPES.includes( - dataSource.connectorProvider as ConnectorProvider - ) - ) { - localLogger.info( - "Document includes DUST_TRACK tags, document_tracker_update_tracked_documents post process hook should run." - ); - return true; - } - - const docIsTracked = !!(await TrackedDocument.count({ - where: { - dataSourceId: dataSource.id, - documentId, - }, - })); - - if (docIsTracked) { - // Always run the document tracker for tracked documents, so we can - // garbage collect the TrackedDocuments if all the DUST_TRACK tags are removed. - - localLogger.info( - "Document is tracked, document_tracker_update_tracked_documents post process hook should run." - ); - return true; - } - - return false; -} - -export async function documentTrackerUpdateTrackedDocumentsOnUpsert({ - auth, - dataSourceId, - documentId, - documentText, -}: DocumentsPostProcessHookOnUpsertParams): Promise { - const owner = auth.workspace(); - if (!owner) { - throw new Error("Workspace not found."); - } - logger.info( - { - workspaceId: owner.sId, - dataSourceId, - documentId, - }, - "Running document_tracker_update_tracked_documents post upsert hook." - ); - - const dataSource = await getDatasource(auth, dataSourceId); - if ( - TRACKABLE_CONNECTOR_TYPES.includes( - dataSource.connectorProvider as ConnectorProvider - ) - ) { - logger.info("Updating tracked documents."); - await updateTrackedDocuments(auth, dataSource.id, documentId, documentText); - } -} - -export async function documentTrackerUpdateTrackedDocumentsOnDelete({ - auth, - dataSourceId, - documentId, -}: DocumentsPostProcessHookOnDeleteParams): Promise { - const owner = auth.workspace(); - if (!owner) { - throw new Error("Workspace not found."); - } - - logger.info( - { - workspaceId: owner.sId, - dataSourceId, - documentId, - }, - "Running document_tracker_update_tracked_documents onDelete." - ); - - const dataSource = await getDatasource(auth, dataSourceId); - - await TrackedDocument.destroy({ - where: { - dataSourceId: dataSource.id, - documentId, - }, - }); -} diff --git a/front/lib/documents_post_process_hooks/hooks/index.ts b/front/lib/documents_post_process_hooks/hooks/index.ts deleted file mode 100644 index 5db376838891..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/index.ts +++ /dev/null @@ -1,145 +0,0 @@ -import type { ConnectorProvider, UpsertContext } from "@dust-tt/types"; - -import type { Authenticator } from "@app/lib/auth"; -import { - documentTrackerSuggestChangesPostProcessHook, - documentTrackerUpdateTrackedDocumentsPostProcessHook, -} from "@app/lib/documents_post_process_hooks/hooks/document_tracker"; -import { DEFAULT_DOCUMENTS_POST_PROCESS_HOOKS_DEBOUNCE_MS } from "@app/lib/documents_post_process_hooks/hooks/types"; - -export const DOCUMENTS_POST_PROCESS_HOOK_TYPES = [ - "document_tracker_update_tracked_documents", - "document_tracker_suggest_changes", -] as const; - -export type DocumentsPostProcessHookType = - (typeof DOCUMENTS_POST_PROCESS_HOOK_TYPES)[number]; - -export type DocumentsPostProcessHookVerb = "upsert" | "delete"; - -export type DocumentsPostProcessHookOnUpsertParams = { - auth: Authenticator; - dataSourceId: string; - documentId: string; - documentSourceUrl?: string; - documentText: string; - documentHash: string; - dataSourceConnectorProvider: ConnectorProvider | null; - upsertContext?: UpsertContext; -}; - -export type DocumentsPostProcessHookOnDeleteParams = { - auth: Authenticator; - dataSourceId: string; - documentId: string; - dataSourceConnectorProvider: ConnectorProvider | null; -}; - -export type DocumentsPostProcessHookFilterParams = - | ({ verb: "upsert" } & DocumentsPostProcessHookOnUpsertParams) - | ({ verb: "delete" } & DocumentsPostProcessHookOnDeleteParams); - -export type DocumentsPostProcessHookDebounceMsParams = { - verb: "upsert"; -} & DocumentsPostProcessHookOnUpsertParams; - -// asyc function that will run in a temporal workflow -// can be expensive to run -// will be retried if it fails (indefinitely) -export type DocumentsPostProcessHookOnUpsert = ( - params: DocumentsPostProcessHookOnUpsertParams -) => Promise; - -// asyc function that will run in a temporal workflow -// can be expensive to run -// will be retried if it fails (indefinitely) -export type DocumentsPostProcessHookOnDelete = ( - params: DocumentsPostProcessHookOnDeleteParams -) => Promise; - -// returns true if the post process hook should run for this document -// returns false if the post process hook should not run for this document -// needs to be relatively quick to run, will run in the same process as calling code -export type DocumentsPostProcessHookFilter = ( - params: DocumentsPostProcessHookFilterParams -) => Promise; - -// How long should the hook sleep before running (debouncing) -// ran in the same process as calling code (no retries, needs to be quick to run) -export type DocumentsPostProcessHookDebounceMs = ( - params: DocumentsPostProcessHookDebounceMsParams -) => Promise; - -export type DocumentsPostProcessHook = { - onUpsert?: DocumentsPostProcessHookOnUpsert; - onDelete?: DocumentsPostProcessHookOnDelete; - filter: DocumentsPostProcessHookFilter; - type: DocumentsPostProcessHookType; - getDebounceMs?: DocumentsPostProcessHookDebounceMs; -}; - -export const DOCUMENTS_POST_PROCESS_HOOKS = [ - documentTrackerUpdateTrackedDocumentsPostProcessHook, - documentTrackerSuggestChangesPostProcessHook, -]; - -export const DOCUMENTS_POST_PROCESS_HOOK_BY_TYPE: Record< - DocumentsPostProcessHookType, - DocumentsPostProcessHook -> = DOCUMENTS_POST_PROCESS_HOOKS.reduce( - (acc, hook) => { - acc[hook.type] = hook; - return acc; - }, - {} as Record -); - -export async function getDocumentsPostUpsertHooksToRun( - params: DocumentsPostProcessHookOnUpsertParams -): Promise> { - // TODO: parallel - const hooksToRun: { - type: DocumentsPostProcessHookType; - debounceMs: number; - }[] = []; - - const paramsWithVerb = { ...params, verb: "upsert" as const }; - - for (const hook of DOCUMENTS_POST_PROCESS_HOOKS) { - if (!hook.onUpsert) { - continue; - } - - if (await hook.filter(paramsWithVerb)) { - const debounceMs = hook.getDebounceMs - ? await hook.getDebounceMs(paramsWithVerb) - : DEFAULT_DOCUMENTS_POST_PROCESS_HOOKS_DEBOUNCE_MS; - hooksToRun.push({ type: hook.type, debounceMs }); - } - } - - return hooksToRun; -} - -export async function getDocumentsPostDeleteHooksToRun( - params: DocumentsPostProcessHookOnDeleteParams -): Promise> { - // TODO: parallel - const hooksToRun: { - type: DocumentsPostProcessHookType; - }[] = []; - - const paramsWithVerb = { ...params, verb: "delete" as const }; - - for (const hook of DOCUMENTS_POST_PROCESS_HOOKS) { - if (!hook.onDelete) { - continue; - } - - if (await hook.filter(paramsWithVerb)) { - hooksToRun.push({ type: hook.type }); - } - } - - return hooksToRun; -} diff --git a/front/lib/documents_post_process_hooks/hooks/types.ts b/front/lib/documents_post_process_hooks/hooks/types.ts deleted file mode 100644 index f5acb8c7450f..000000000000 --- a/front/lib/documents_post_process_hooks/hooks/types.ts +++ /dev/null @@ -1 +0,0 @@ -export const DEFAULT_DOCUMENTS_POST_PROCESS_HOOKS_DEBOUNCE_MS = 10000; diff --git a/front/lib/upsert_queue.ts b/front/lib/upsert_queue.ts index e3daf8c5400f..3334aa9af65e 100644 --- a/front/lib/upsert_queue.ts +++ b/front/lib/upsert_queue.ts @@ -1,30 +1,19 @@ -import type { - CoreAPIDocument, - CoreAPILightDocument, - Result, - UpsertContext, -} from "@dust-tt/types"; +import type { Result } from "@dust-tt/types"; import { Err, FrontDataSourceDocumentSection, Ok, - sectionFullText, UpsertContextSchema, } from "@dust-tt/types"; import { Storage } from "@google-cloud/storage"; import * as t from "io-ts"; import { v4 as uuidv4 } from "uuid"; -import { Authenticator } from "@app/lib/auth"; -import { getDocumentsPostUpsertHooksToRun } from "@app/lib/documents_post_process_hooks/hooks"; import logger from "@app/logger/logger"; import { statsDClient } from "@app/logger/withlogging"; -import { launchRunPostUpsertHooksWorkflow } from "@app/temporal/documents_post_process_hooks/client"; import { launchUpsertDocumentWorkflow } from "@app/temporal/upsert_queue/client"; import { launchUpsertTableWorkflow } from "@app/temporal/upsert_tables/client"; -import type { DataSourceResource } from "./resources/data_source_resource"; - const { DUST_UPSERT_QUEUE_BUCKET, SERVICE_ACCOUNT } = process.env; export const EnqueueUpsertDocument = t.intersection([ @@ -180,48 +169,3 @@ async function enqueueUpsert({ } } } - -export async function runPostUpsertHooks({ - workspaceId, - dataSource, - documentId, - section, - document, - sourceUrl, - upsertContext, -}: { - workspaceId: string; - dataSource: DataSourceResource; - documentId: string; - section: t.TypeOf; - document: CoreAPILightDocument | CoreAPIDocument; - sourceUrl: string | null; - upsertContext?: UpsertContext; -}) { - const fullText = sectionFullText(section); - const auth = await Authenticator.internalAdminForWorkspace(workspaceId); - - const postUpsertHooksToRun = await getDocumentsPostUpsertHooksToRun({ - auth, - dataSourceId: dataSource.sId, - documentId: documentId, - documentText: fullText, - documentHash: document.hash, - dataSourceConnectorProvider: dataSource.connectorProvider || null, - documentSourceUrl: sourceUrl || undefined, - upsertContext, - }); - - // TODO: parallel. - for (const { type: hookType, debounceMs } of postUpsertHooksToRun) { - await launchRunPostUpsertHooksWorkflow( - workspaceId, - dataSource.sId, - documentId, - document.hash, - dataSource.connectorProvider || null, - hookType, - debounceMs - ); - } -} diff --git a/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/[documentId]/index.ts b/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/[documentId]/index.ts index 677ee1cdb15b..09f4457fda5c 100644 --- a/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/[documentId]/index.ts +++ b/front/pages/api/v1/w/[wId]/spaces/[spaceId]/data_sources/[dsId]/documents/[documentId]/index.ts @@ -17,17 +17,13 @@ import { fromError } from "zod-validation-error"; import { withPublicAPIAuthentication } from "@app/lib/api/auth_wrappers"; import apiConfig from "@app/lib/api/config"; -import { Authenticator } from "@app/lib/auth"; -import { getDocumentsPostDeleteHooksToRun } from "@app/lib/documents_post_process_hooks/hooks"; +import type { Authenticator } from "@app/lib/auth"; +import { runDocumentUpsertHooks } from "@app/lib/document_upsert_hooks/hooks"; import { DataSourceResource } from "@app/lib/resources/data_source_resource"; import { SpaceResource } from "@app/lib/resources/space_resource"; -import { - enqueueUpsertDocument, - runPostUpsertHooks, -} from "@app/lib/upsert_queue"; +import { enqueueUpsertDocument } from "@app/lib/upsert_queue"; import logger from "@app/logger/logger"; import { apiError, statsDClient } from "@app/logger/withlogging"; -import { launchRunPostDeleteHooksWorkflow } from "@app/temporal/documents_post_process_hooks/client"; export const config = { api: { @@ -566,13 +562,12 @@ async function handler( data_source: dataSource.toJSON(), }); - await runPostUpsertHooks({ - workspaceId: owner.sId, - dataSource, + runDocumentUpsertHooks({ + auth, + dataSourceId: dataSource.sId, documentId: req.query.documentId as string, - section, - document: upsertRes.value.document, - sourceUrl, + documentHash: upsertRes.value.document.hash, + dataSourceConnectorProvider: dataSource.connectorProvider || null, upsertContext: r.data.upsert_context || undefined, }); return; @@ -612,24 +607,6 @@ async function handler( }, }); - const postDeleteHooksToRun = await getDocumentsPostDeleteHooksToRun({ - auth: await Authenticator.internalAdminForWorkspace(owner.sId), - dataSourceId: dataSource.sId, - documentId: req.query.documentId as string, - dataSourceConnectorProvider: dataSource.connectorProvider || null, - }); - - // TODO: parallel. - for (const { type: hookType } of postDeleteHooksToRun) { - await launchRunPostDeleteHooksWorkflow( - owner.sId, - dataSource.sId, - req.query.documentId as string, - dataSource.connectorProvider || null, - hookType - ); - } - return; default: diff --git a/front/start_worker.ts b/front/start_worker.ts index d0fecfd9a25e..30bb7390cd2d 100644 --- a/front/start_worker.ts +++ b/front/start_worker.ts @@ -4,7 +4,7 @@ import { hideBin } from "yargs/helpers"; import logger from "@app/logger/logger"; import { runPokeWorker } from "@app/poke/temporal/worker"; -import { runPostUpsertHooksWorker } from "@app/temporal/documents_post_process_hooks/worker"; +import { runDocumentTrackerWorker } from "@app/temporal/document_tracker/worker"; import { runHardDeleteWorker } from "@app/temporal/hard_delete/worker"; import { runLabsWorker } from "@app/temporal/labs/worker"; import { runMentionsCountWorker } from "@app/temporal/mentions_count_queue/worker"; @@ -23,7 +23,7 @@ type WorkerName = | "mentions_count" | "permissions_queue" | "poke" - | "post_upsert_hooks" + | "document_tracker" | "production_checks" | "scrub_workspace_queue" | "update_workspace_usage" @@ -36,7 +36,7 @@ const workerFunctions: Record Promise> = { mentions_count: runMentionsCountWorker, permissions_queue: runPermissionsWorker, poke: runPokeWorker, - post_upsert_hooks: runPostUpsertHooksWorker, + document_tracker: runDocumentTrackerWorker, production_checks: runProductionChecksWorker, scrub_workspace_queue: runScrubWorkspaceQueueWorker, update_workspace_usage: runUpdateWorkspaceUsageWorker, diff --git a/front/temporal/documents_post_process_hooks/activities.ts b/front/temporal/document_tracker/activities.ts similarity index 59% rename from front/temporal/documents_post_process_hooks/activities.ts rename to front/temporal/document_tracker/activities.ts index 6e3d239baab6..e6084daaf734 100644 --- a/front/temporal/documents_post_process_hooks/activities.ts +++ b/front/temporal/document_tracker/activities.ts @@ -8,36 +8,27 @@ import { CoreAPI, Err, Ok } from "@dust-tt/types"; import config from "@app/lib/api/config"; import { Authenticator } from "@app/lib/auth"; -import type { DocumentsPostProcessHookType } from "@app/lib/documents_post_process_hooks/hooks"; -import { DOCUMENTS_POST_PROCESS_HOOK_BY_TYPE } from "@app/lib/documents_post_process_hooks/hooks"; +import { documentTrackerSuggestChanges } from "@app/lib/document_upsert_hooks/hooks/document_tracker/lib"; import { Workspace } from "@app/lib/models/workspace"; import { DataSourceResource } from "@app/lib/resources/data_source_resource"; import { withRetries } from "@app/lib/utils/retries"; import logger from "@app/logger/logger"; -export async function runPostUpsertHookActivity( +export async function runDocumentTrackerActivity( workspaceId: string, dataSourceId: string, documentId: string, documentHash: string, - dataSourceConnectorProvider: ConnectorProvider | null, - hookType: DocumentsPostProcessHookType + dataSourceConnectorProvider: ConnectorProvider | null ) { const localLogger = logger.child({ workspaceId, dataSourceId, documentId, dataSourceConnectorProvider, - hookType, }); - const hook = DOCUMENTS_POST_PROCESS_HOOK_BY_TYPE[hookType]; - if (!hook) { - localLogger.error("Unknown documents post process hook type"); - throw new Error(`Unknown documents post process hook type ${hookType}`); - } - - localLogger.info("Running documents post process hook onUpsert function."); + localLogger.info("Running document tracker activity."); const dataSourceDocumentRes = await withRetries(getDataSourceDocument)({ workspaceId, @@ -61,58 +52,34 @@ export async function runPostUpsertHookActivity( const documentText = dataSourceDocument.document.text || ""; const documentSourceUrl = dataSourceDocument.document.source_url || undefined; - if (!hook.onUpsert) { - localLogger.warn("No onUpsert function for documents post process hook"); + if (!documentText) { + localLogger.warn( + { + documentText, + }, + "Document text is empty. Skipping document tracker." + ); return; } - await hook.onUpsert({ - auth: await Authenticator.internalAdminForWorkspace(workspaceId), - dataSourceId, - documentId, - documentText, - documentSourceUrl, - documentHash, - dataSourceConnectorProvider, - }); - localLogger.info("Ran documents post process hook onUpsert function."); -} - -export async function runPostDeleteHookActivity( - workspaceId: string, - dataSourceId: string, - documentId: string, - dataSourceConnectorProvider: ConnectorProvider | null, - hookType: DocumentsPostProcessHookType -) { - const localLogger = logger.child({ - workspaceId, - dataSourceId, - documentId, - dataSourceConnectorProvider, - hookType, - }); - - const hook = DOCUMENTS_POST_PROCESS_HOOK_BY_TYPE[hookType]; - if (!hook) { - localLogger.error("Unknown documents post process hook type"); - throw new Error(`Unknown documents post process hook type ${hookType}`); - } - - localLogger.info("Running documents post process hook onDelete function."); - - if (!hook.onDelete) { - localLogger.warn("No onDelete function for documents post process hook"); + if (!documentSourceUrl) { + localLogger.warn( + { + documentSourceUrl, + }, + "Document source URL is empty. Skipping document tracker." + ); return; } - await hook.onDelete({ + await documentTrackerSuggestChanges({ auth: await Authenticator.internalAdminForWorkspace(workspaceId), dataSourceId, documentId, - dataSourceConnectorProvider, + documentSourceUrl, + documentHash, }); - localLogger.info("Ran documents post process hook ondelete function."); + localLogger.info("Ran documents post process hook onUpsert function."); } async function getDataSourceDocument({ diff --git a/front/temporal/document_tracker/client.ts b/front/temporal/document_tracker/client.ts new file mode 100644 index 000000000000..008453f0a991 --- /dev/null +++ b/front/temporal/document_tracker/client.ts @@ -0,0 +1,37 @@ +import type { ConnectorProvider } from "@dust-tt/types"; + +import { getTemporalClient } from "@app/lib/temporal"; +import { QUEUE_NAME } from "@app/temporal/document_tracker/config"; + +import { newUpsertSignal } from "./signals"; +import { runDocumentTrackerWorkflow } from "./workflows"; + +export async function launchRunDocumentTrackerWorkflow({ + workspaceId, + dataSourceId, + documentId, + documentHash, + dataSourceConnectorProvider, +}: { + workspaceId: string; + dataSourceId: string; + documentId: string; + documentHash: string; + dataSourceConnectorProvider: ConnectorProvider | null; +}) { + const client = await getTemporalClient(); + + await client.workflow.signalWithStart(runDocumentTrackerWorkflow, { + args: [ + workspaceId, + dataSourceId, + documentId, + documentHash, + dataSourceConnectorProvider, + ], + taskQueue: QUEUE_NAME, + workflowId: `workflow-run-document-tracker-${workspaceId}-${dataSourceId}-${documentId}`, + signal: newUpsertSignal, + signalArgs: undefined, + }); +} diff --git a/front/temporal/document_tracker/config.ts b/front/temporal/document_tracker/config.ts new file mode 100644 index 000000000000..425bcc77a916 --- /dev/null +++ b/front/temporal/document_tracker/config.ts @@ -0,0 +1,2 @@ +const QUEUE_VERSION = 1; +export const QUEUE_NAME = `document-tracker-queue-v${QUEUE_VERSION}`; diff --git a/front/temporal/documents_post_process_hooks/signals.ts b/front/temporal/document_tracker/signals.ts similarity index 100% rename from front/temporal/documents_post_process_hooks/signals.ts rename to front/temporal/document_tracker/signals.ts diff --git a/front/temporal/documents_post_process_hooks/worker.ts b/front/temporal/document_tracker/worker.ts similarity index 75% rename from front/temporal/documents_post_process_hooks/worker.ts rename to front/temporal/document_tracker/worker.ts index ee8b8593c1a9..e22db8be1e19 100644 --- a/front/temporal/documents_post_process_hooks/worker.ts +++ b/front/temporal/document_tracker/worker.ts @@ -4,14 +4,15 @@ import { Worker } from "@temporalio/worker"; import { getTemporalWorkerConnection } from "@app/lib/temporal"; import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring"; import logger from "@app/logger/logger"; -import * as activities from "@app/temporal/documents_post_process_hooks/activities"; +import * as activities from "@app/temporal/document_tracker/activities"; +import { QUEUE_NAME } from "@app/temporal/document_tracker/config"; -export async function runPostUpsertHooksWorker() { +export async function runDocumentTrackerWorker() { const { connection, namespace } = await getTemporalWorkerConnection(); const worker = await Worker.create({ workflowsPath: require.resolve("./workflows"), activities, - taskQueue: "post-upsert-hooks-queue", + taskQueue: QUEUE_NAME, connection, namespace, interceptors: { diff --git a/front/temporal/document_tracker/workflows.ts b/front/temporal/document_tracker/workflows.ts new file mode 100644 index 000000000000..a2989d187336 --- /dev/null +++ b/front/temporal/document_tracker/workflows.ts @@ -0,0 +1,55 @@ +import type { ConnectorProvider } from "@dust-tt/types"; +import { proxyActivities, setHandler, sleep } from "@temporalio/workflow"; + +import type * as activities from "./activities"; +import { newUpsertSignal } from "./signals"; + +const { runDocumentTrackerActivity } = proxyActivities({ + startToCloseTimeout: "60 minutes", +}); + +export async function runDocumentTrackerWorkflow( + workspaceId: string, + dataSourceId: string, + documentId: string, + documentHash: string, + dataSourceConnectorProvider: ConnectorProvider | null +) { + void workspaceId; + void dataSourceId; + void documentId; + void documentHash; + void dataSourceConnectorProvider; + + let signaled = false; + const debounceMs = (() => { + if (!dataSourceConnectorProvider) { + return 10000; + } + if (dataSourceConnectorProvider === "notion") { + return 600000; + } + return 3600000; + })(); + + setHandler(newUpsertSignal, () => { + signaled = true; + }); + + while (signaled) { + signaled = false; + + await sleep(debounceMs); + if (signaled) { + continue; + } + + await runDocumentTrackerActivity( + workspaceId, + dataSourceId, + documentId, + documentHash, + dataSourceConnectorProvider + ); + } +} diff --git a/front/temporal/documents_post_process_hooks/client.ts b/front/temporal/documents_post_process_hooks/client.ts deleted file mode 100644 index aaa373856908..000000000000 --- a/front/temporal/documents_post_process_hooks/client.ts +++ /dev/null @@ -1,66 +0,0 @@ -import type { ConnectorProvider } from "@dust-tt/types"; - -import type { DocumentsPostProcessHookType } from "@app/lib/documents_post_process_hooks/hooks"; - -export async function launchRunPostUpsertHooksWorkflow( - workspaceId: string, - dataSourceId: string, - documentId: string, - documentHash: string, - dataSourceConnectorProvider: ConnectorProvider | null, - hookType: DocumentsPostProcessHookType, - debounceMs: number -) { - void workspaceId; - void dataSourceId; - void documentId; - void documentHash; - void dataSourceConnectorProvider; - void hookType; - void debounceMs; - - // const client = await getTemporalClient(); - // await client.workflow.signalWithStart(runPostUpsertHooksWorkflow, { - // args: [ - // workspaceId, - // dataSourceId, - // documentId, - // documentHash, - // dataSourceConnectorProvider, - // hookType, - // debounceMs, - // ], - // taskQueue: QUEUE_NAME, - // workflowId: `workflow-run-post-upsert-hooks-${hookType}-${workspaceId}-${dataSourceId}-${documentId}`, - // signal: newUpsertSignal, - // signalArgs: undefined, - // }); -} - -export async function launchRunPostDeleteHooksWorkflow( - workspaceId: string, - dataSourceId: string, - documentId: string, - dataSourceConnectorProvider: ConnectorProvider | null, - hookType: DocumentsPostProcessHookType -) { - void workspaceId; - void dataSourceId; - void documentId; - void dataSourceConnectorProvider; - void hookType; - - // const client = await getTemporalClient(); - - // await client.workflow.start(runPostDeleteHoosWorkflow, { - // args: [ - // dataSourceId, - // workspaceId, - // documentId, - // dataSourceConnectorProvider, - // hookType, - // ], - // taskQueue: QUEUE_NAME, - // workflowId: `workflow-run-post-delete-hooks-${hookType}-${workspaceId}-${dataSourceId}-${documentId}`, - // }); -} diff --git a/front/temporal/documents_post_process_hooks/workflows.ts b/front/temporal/documents_post_process_hooks/workflows.ts deleted file mode 100644 index cdafcbc29d5c..000000000000 --- a/front/temporal/documents_post_process_hooks/workflows.ts +++ /dev/null @@ -1,62 +0,0 @@ -import type { ConnectorProvider } from "@dust-tt/types"; -import { proxyActivities, setHandler, sleep } from "@temporalio/workflow"; - -import type { DocumentsPostProcessHookType } from "@app/lib/documents_post_process_hooks/hooks"; -import type * as activities from "@app/temporal/documents_post_process_hooks/activities"; - -import { newUpsertSignal } from "./signals"; - -const { runPostUpsertHookActivity, runPostDeleteHookActivity } = - proxyActivities({ - startToCloseTimeout: "60 minute", - }); - -export async function runPostUpsertHooksWorkflow( - workspaceId: string, - dataSourceId: string, - documentId: string, - documentHash: string, - dataSourceConnectorProvider: ConnectorProvider | null, - hookType: DocumentsPostProcessHookType, - debounceMs: number -) { - let signaled = false; - - setHandler(newUpsertSignal, () => { - signaled = true; - }); - - while (signaled) { - signaled = false; - - await sleep(debounceMs); - if (signaled) { - continue; - } - - await runPostUpsertHookActivity( - workspaceId, - dataSourceId, - documentId, - documentHash, - dataSourceConnectorProvider, - hookType - ); - } -} - -export async function runPostDeleteHoosWorkflow( - workspaceId: string, - dataSourceId: string, - documentId: string, - dataSourceConnectorProvider: ConnectorProvider | null, - hookType: DocumentsPostProcessHookType -) { - await runPostDeleteHookActivity( - workspaceId, - dataSourceId, - documentId, - dataSourceConnectorProvider, - hookType - ); -} diff --git a/front/temporal/upsert_queue/activities.ts b/front/temporal/upsert_queue/activities.ts index bc85e13a75a5..16c79d6537cc 100644 --- a/front/temporal/upsert_queue/activities.ts +++ b/front/temporal/upsert_queue/activities.ts @@ -5,12 +5,10 @@ import * as reporter from "io-ts-reporters"; import config from "@app/lib/api/config"; import { Authenticator } from "@app/lib/auth"; +import { runDocumentUpsertHooks } from "@app/lib/document_upsert_hooks/hooks"; import { DataSourceResource } from "@app/lib/resources/data_source_resource"; import type { WorkflowError } from "@app/lib/temporal_monitoring"; -import { - EnqueueUpsertDocument, - runPostUpsertHooks, -} from "@app/lib/upsert_queue"; +import { EnqueueUpsertDocument } from "@app/lib/upsert_queue"; import mainLogger from "@app/logger/logger"; import { statsDClient } from "@app/logger/withlogging"; @@ -143,13 +141,12 @@ export async function upsertDocumentActivity( [] ); - await runPostUpsertHooks({ - workspaceId: upsertQueueItem.workspaceId, - dataSource, + runDocumentUpsertHooks({ + auth, + dataSourceId: dataSource.sId, documentId: upsertQueueItem.documentId, - section: upsertQueueItem.section, - document: upsertRes.value.document, - sourceUrl: upsertQueueItem.sourceUrl, + documentHash: upsertRes.value.document.hash, + dataSourceConnectorProvider: dataSource.connectorProvider || null, upsertContext: upsertQueueItem.upsertContext || undefined, }); }