From 12bd151c5b7836406da17115699f05fe6484f28a Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 4 Apr 2024 15:56:37 +0200 Subject: [PATCH] feat: add "pause" method to all connectors (#4558) * feat: pause connectors * endpoint * front bindings * terminate the workflows * better naming * more instance methods * r --------- Co-authored-by: Henry Fontanier --- connectors/src/api/pause_connector.ts | 58 +++++++++++++++++++ connectors/src/api/webhooks/webhook_github.ts | 10 ++++ .../src/api/webhooks/webhook_google_drive.ts | 21 +++++++ .../src/api/webhooks/webhook_intercom.ts | 13 ++++- connectors/src/api_server.ts | 2 + connectors/src/connectors/github/index.ts | 14 +++++ .../src/connectors/google_drive/index.ts | 11 ++++ .../google_drive/pauseGoogleDriveWebhooks.ts | 15 +++++ connectors/src/connectors/index.ts | 23 ++++++++ connectors/src/connectors/intercom/index.ts | 16 +++++ connectors/src/connectors/interface.ts | 4 ++ connectors/src/connectors/notion/index.ts | 25 ++++++++ connectors/src/connectors/slack/index.ts | 11 ++++ connectors/src/connectors/webcrawler/index.ts | 15 +++++ .../webcrawler/temporal/activities.ts | 7 ++- .../src/resources/connector_resource.ts | 22 ++++++- .../storage/models/connector_model.ts | 6 ++ types/src/front/lib/connectors_api.ts | 14 +++++ 18 files changed, 284 insertions(+), 3 deletions(-) create mode 100644 connectors/src/api/pause_connector.ts create mode 100644 connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts diff --git a/connectors/src/api/pause_connector.ts b/connectors/src/api/pause_connector.ts new file mode 100644 index 000000000000..07bcec582606 --- /dev/null +++ b/connectors/src/api/pause_connector.ts @@ -0,0 +1,58 @@ +import type { WithConnectorsAPIErrorReponse } from "@dust-tt/types"; +import type { Request, Response } from "express"; + +import { PAUSE_CONNECTOR_BY_TYPE } from "@connectors/connectors"; +import { errorFromAny } from "@connectors/lib/error"; +import logger from "@connectors/logger/logger"; +import { apiError, withLogging } from "@connectors/logger/withlogging"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; + +type ConnectorPauseResBody = WithConnectorsAPIErrorReponse<{ + connectorId: string; +}>; + +const _pauseConnectorAPIHandler = async ( + req: Request<{ connector_id: string }, ConnectorPauseResBody>, + res: Response +) => { + try { + const connector = await ConnectorResource.fetchById( + req.params.connector_id + ); + if (!connector) { + return apiError(req, res, { + api_error: { + type: "connector_not_found", + message: "Connector not found", + }, + status_code: 404, + }); + } + const connectorPauser = PAUSE_CONNECTOR_BY_TYPE[connector.type]; + + const pauseRes = await connectorPauser(connector.id); + + if (pauseRes.isErr()) { + return apiError(req, res, { + status_code: 500, + api_error: { + type: "internal_server_error", + message: pauseRes.error.message, + }, + }); + } + + return res.sendStatus(204); + } catch (e) { + logger.error(errorFromAny(e), "Failed to pause the connector"); + return apiError(req, res, { + status_code: 500, + api_error: { + type: "internal_server_error", + message: "Could not pause the connector", + }, + }); + } +}; + +export const pauseConnectorAPIHandler = withLogging(_pauseConnectorAPIHandler); diff --git a/connectors/src/api/webhooks/webhook_github.ts b/connectors/src/api/webhooks/webhook_github.ts index b0560489d09d..5499b0480073 100644 --- a/connectors/src/api/webhooks/webhook_github.ts +++ b/connectors/src/api/webhooks/webhook_github.ts @@ -140,6 +140,16 @@ const _webhookGithubAPIHandler = async ( const enabledConnectors: ConnectorResource[] = []; for (const connector of connectors) { + if (connector.isPaused()) { + logger.info( + { + connectorId: connector.id, + installationId, + }, + "Skipping webhook for Github connector because it is paused." + ); + continue; + } const connectorState = githubConnectorStates[connector.id]; if (!connectorState) { logger.error( diff --git a/connectors/src/api/webhooks/webhook_google_drive.ts b/connectors/src/api/webhooks/webhook_google_drive.ts index 198c9320604f..2aed4f7cb2c9 100644 --- a/connectors/src/api/webhooks/webhook_google_drive.ts +++ b/connectors/src/api/webhooks/webhook_google_drive.ts @@ -6,6 +6,7 @@ import { launchGoogleDriveIncrementalSyncWorkflow } from "@connectors/connectors import { GoogleDriveWebhook } from "@connectors/lib/models/google_drive"; import logger from "@connectors/logger/logger"; import { apiError, withLogging } from "@connectors/logger/withlogging"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; type GoogleDriveWebhookResBody = WithConnectorsAPIErrorReponse; @@ -41,6 +42,26 @@ const _webhookGoogleDriveAPIHandler = async ( }, }); } + const connector = await ConnectorResource.fetchById(webhook.connectorId); + if (!connector) { + return apiError(req, res, { + status_code: 404, + api_error: { + message: "Connector not found", + type: "invalid_request_error", + }, + }); + } + if (connector.isPaused()) { + logger.info( + { + connectorId: webhook.connectorId, + webhookId: webhook.webhookId, + }, + "Did not signal a Gdrive webhook to the incremenal sync workflow because the connector is paused" + ); + return res.status(200).end(); + } const workflowRes = await launchGoogleDriveIncrementalSyncWorkflow( webhook.connectorId diff --git a/connectors/src/api/webhooks/webhook_intercom.ts b/connectors/src/api/webhooks/webhook_intercom.ts index 881b381448bb..c28f07c9023f 100644 --- a/connectors/src/api/webhooks/webhook_intercom.ts +++ b/connectors/src/api/webhooks/webhook_intercom.ts @@ -91,6 +91,7 @@ const _webhookIntercomAPIHandler = async ( const connector = await ConnectorResource.fetchById( intercomWorskpace.connectorId ); + if (!connector || connector.type !== "intercom") { logger.error( { @@ -101,8 +102,18 @@ const _webhookIntercomAPIHandler = async ( return res.status(200).end(); } - // Check we have the permissions to sync this conversation + if (connector.isPaused()) { + logger.info( + { + connectorId: connector.id, + }, + "[Intercom] Received webhook for paused connector, skipping." + ); + return res.status(200).end(); + } + if (!conversation.team_assignee_id) { + // Check we have the permissions to sync this conversation logger.info( "[Intercom] Received webhook for conversation without team, skipping." ); diff --git a/connectors/src/api_server.ts b/connectors/src/api_server.ts index e087c60944fa..552ef2a8e300 100644 --- a/connectors/src/api_server.ts +++ b/connectors/src/api_server.ts @@ -11,6 +11,7 @@ import { getConnectorAPIHandler } from "@connectors/api/get_connector"; import { getConnectorPermissionsAPIHandler } from "@connectors/api/get_connector_permissions"; import { getContentNodesParentsAPIHandler } from "@connectors/api/get_content_node_parents"; import { getContentNodesAPIHandler } from "@connectors/api/get_content_nodes"; +import { pauseConnectorAPIHandler } from "@connectors/api/pause_connector"; import { resumeConnectorAPIHandler } from "@connectors/api/resume_connector"; import { setConnectorPermissionsAPIHandler } from "@connectors/api/set_connector_permissions"; import { @@ -89,6 +90,7 @@ export function startServer(port: number) { app.post("/connectors/create/:connector_provider", createConnectorAPIHandler); app.post("/connectors/update/:connector_id/", getConnectorUpdateAPIHandler); app.post("/connectors/stop/:connector_id", stopConnectorAPIHandler); + app.post("/connectors/pause/:connector_id", pauseConnectorAPIHandler); app.post("/connectors/resume/:connector_id", resumeConnectorAPIHandler); app.delete("/connectors/delete/:connector_id", deleteConnectorAPIHandler); app.get("/connectors/:connector_id", getConnectorAPIHandler); diff --git a/connectors/src/connectors/github/index.ts b/connectors/src/connectors/github/index.ts index 7af15b70e79d..99402686996f 100644 --- a/connectors/src/connectors/github/index.ts +++ b/connectors/src/connectors/github/index.ts @@ -28,6 +28,7 @@ import { GithubDiscussion, GithubIssue, } from "@connectors/lib/models/github"; +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; import mainLogger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import type { DataSourceConfig } from "@connectors/types/data_source_config"; @@ -142,6 +143,19 @@ export async function stopGithubConnector( } } +export async function pauseGithubConnector( + connectorId: ModelId +): Promise> { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "Connector not found"); + return new Err(new Error("Connector not found")); + } + await connector.markAsPaused(); + await terminateAllWorkflowsForConnectorId(connectorId); + return new Ok(undefined); +} + export async function resumeGithubConnector( connectorId: ModelId ): Promise> { diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index 8f9af43fd5a9..dd2f9d334914 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -63,6 +63,7 @@ import { getGoogleCredentials, } from "@connectors/connectors/google_drive/temporal/utils"; import { concurrentExecutor } from "@connectors/lib/async_utils"; +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive"; @@ -825,3 +826,13 @@ export async function googleDriveGarbageCollect(connectorId: ModelId) { return launchGoogleGarbageCollector(connectorId); } + +export async function pauseGoogleDriveConnector(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + return new Err(new Error(`Connector not found with id ${connectorId}`)); + } + await connector.markAsPaused(); + await terminateAllWorkflowsForConnectorId(connectorId); + return new Ok(undefined); +} diff --git a/connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts b/connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts new file mode 100644 index 000000000000..661aa3b34866 --- /dev/null +++ b/connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts @@ -0,0 +1,15 @@ +import type { ModelId } from "@dust-tt/types"; +import { Err, Ok } from "@dust-tt/types"; + +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; + +export async function pauseGoogleDriveWebhooks(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + return new Err(new Error(`Connector not found with id ${connectorId}`)); + } + await connector.markAsPaused(); + await terminateAllWorkflowsForConnectorId(connectorId); + return new Ok(undefined); +} diff --git a/connectors/src/connectors/index.ts b/connectors/src/connectors/index.ts index 683a7c38a2ba..02376c08bdfb 100644 --- a/connectors/src/connectors/index.ts +++ b/connectors/src/connectors/index.ts @@ -18,6 +18,7 @@ import { createGithubConnector, fullResyncGithubConnector, getGithubConfig, + pauseGithubConnector, resumeGithubConnector, retrieveGithubConnectorPermissions, retrieveGithubContentNodeParents, @@ -31,6 +32,7 @@ import { createGoogleDriveConnector, getGoogleDriveConfig, googleDriveGarbageCollect, + pauseGoogleDriveConnector, retrieveGoogleDriveConnectorPermissions, retrieveGoogleDriveContentNodeParents, retrieveGoogleDriveContentNodes, @@ -43,6 +45,7 @@ import { cleanupIntercomConnector, createIntercomConnector, fullResyncIntercomSyncWorkflow, + pauseIntercomConnector, resumeIntercomConnector, retrieveIntercomConnectorPermissions, retrieveIntercomContentNodeParents, @@ -59,6 +62,7 @@ import type { ConnectorCreatorOAuth, ConnectorCreatorUrl, ConnectorGarbageCollector, + ConnectorPauser, ConnectorPermissionRetriever, ConnectorPermissionSetter, ConnectorResumer, @@ -72,6 +76,7 @@ import { cleanupNotionConnector, createNotionConnector, fullResyncNotionConnector, + pauseNotionConnector, resumeNotionConnector, retrieveNotionConnectorPermissions, retrieveNotionContentNodeParents, @@ -83,6 +88,7 @@ import { cleanupSlackConnector, createSlackConnector, getSlackConfig, + pauseSlackConnector, retrieveSlackConnectorPermissions, retrieveSlackContentNodes, setSlackConfig, @@ -95,6 +101,7 @@ import logger from "@connectors/logger/logger"; import { cleanupWebcrawlerConnector, createWebcrawlerConnector, + pauseWebcrawlerConnector, retrieveWebcrawlerConnectorPermissions, retrieveWebCrawlerContentNodeParents, retrieveWebCrawlerContentNodes, @@ -322,3 +329,19 @@ export const GARBAGE_COLLECT_BY_TYPE: Record< throw new Error("Not implemented"); }, }; + +// If the connector has webhooks: stop processing them. +// If the connector has long-running workflows: stop them. +// Exclude the connector from the prod checks. +export const PAUSE_CONNECTOR_BY_TYPE: Record< + ConnectorProvider, + ConnectorPauser +> = { + confluence: stopConfluenceConnector, + slack: pauseSlackConnector, + notion: pauseNotionConnector, + github: pauseGithubConnector, + google_drive: pauseGoogleDriveConnector, + intercom: pauseIntercomConnector, + webcrawler: pauseWebcrawlerConnector, +}; diff --git a/connectors/src/connectors/intercom/index.ts b/connectors/src/connectors/intercom/index.ts index 448322385620..48c6a4874fa6 100644 --- a/connectors/src/connectors/intercom/index.ts +++ b/connectors/src/connectors/intercom/index.ts @@ -740,3 +740,19 @@ export async function retrieveIntercomContentNodeParents( return new Ok(parents); } + +export async function pauseIntercomConnector(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Intercom] Connector not found."); + return new Err(new Error("Connector not found")); + } + + await connector.markAsPaused(); + const stopRes = await stopIntercomConnector(connectorId); + if (stopRes.isErr()) { + return stopRes; + } + + return new Ok(undefined); +} diff --git a/connectors/src/connectors/interface.ts b/connectors/src/connectors/interface.ts index 488b37733dd0..9d8dd9e7db79 100644 --- a/connectors/src/connectors/interface.ts +++ b/connectors/src/connectors/interface.ts @@ -90,3 +90,7 @@ export type ConnectorConfigGetter = ( export type ConnectorGarbageCollector = ( connectorId: ModelId ) => Promise>; + +export type ConnectorPauser = ( + connectorId: ModelId +) => Promise>; diff --git a/connectors/src/connectors/notion/index.ts b/connectors/src/connectors/notion/index.ts index 5f2372112edb..cbe5f14c271b 100644 --- a/connectors/src/connectors/notion/index.ts +++ b/connectors/src/connectors/notion/index.ts @@ -205,6 +205,31 @@ export async function stopNotionConnector( return new Ok(undefined); } +export async function pauseNotionConnector( + connectorId: ModelId +): Promise> { + const connector = await ConnectorResource.fetchById(connectorId); + + if (!connector) { + logger.error( + { + connectorId, + }, + "Notion connector not found." + ); + + return new Err(new Error("Connector not found")); + } + + await connector.markAsPaused(); + const stopRes = await stopNotionConnector(connector.id); + if (stopRes.isErr()) { + return stopRes; + } + + return new Ok(undefined); +} + export async function resumeNotionConnector( connectorId: ModelId ): Promise> { diff --git a/connectors/src/connectors/slack/index.ts b/connectors/src/connectors/slack/index.ts index a7ab6551121b..59bb40d9441b 100644 --- a/connectors/src/connectors/slack/index.ts +++ b/connectors/src/connectors/slack/index.ts @@ -30,6 +30,7 @@ import { nango_client, nangoDeleteConnection, } from "@connectors/lib/nango_client.js"; +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import type { DataSourceConfig } from "@connectors/types/data_source_config.js"; @@ -642,3 +643,13 @@ export async function setSlackConfig( } } } + +export async function pauseSlackConnector(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + return new Err(new Error(`Connector not found with id ${connectorId}`)); + } + await connector.markAsPaused(); + await terminateAllWorkflowsForConnectorId(connectorId); + return new Ok(undefined); +} diff --git a/connectors/src/connectors/webcrawler/index.ts b/connectors/src/connectors/webcrawler/index.ts index bdf31c0329f3..2f2d9bf577fc 100644 --- a/connectors/src/connectors/webcrawler/index.ts +++ b/connectors/src/connectors/webcrawler/index.ts @@ -270,6 +270,21 @@ export async function stopWebcrawlerConnector( return new Ok(undefined); } +export async function pauseWebcrawlerConnector( + connectorId: ModelId +): Promise> { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("Connector not found."); + } + await connector.markAsPaused(); + const stopRes = await stopCrawlWebsiteWorkflow(connectorId); + if (stopRes.isErr()) { + return stopRes; + } + return new Ok(undefined); +} + export async function cleanupWebcrawlerConnector( connectorId: ModelId ): Promise> { diff --git a/connectors/src/connectors/webcrawler/temporal/activities.ts b/connectors/src/connectors/webcrawler/temporal/activities.ts index 928194b308bc..84f40982261f 100644 --- a/connectors/src/connectors/webcrawler/temporal/activities.ts +++ b/connectors/src/connectors/webcrawler/temporal/activities.ts @@ -413,5 +413,10 @@ export async function getWebsitesToCrawl() { allConnectorIds.push(...websites.map((w) => w.connectorId)); } - return allConnectorIds; + const connectors = await ConnectorResource.fetchByIds(allConnectorIds); + const unPausedConnectorIds = connectors + .filter((c) => !c.isPaused()) + .map((c) => c.id); + + return unPausedConnectorIds; } diff --git a/connectors/src/resources/connector_resource.ts b/connectors/src/resources/connector_resource.ts index f36e49553211..c1cf2ae61ae8 100644 --- a/connectors/src/resources/connector_resource.ts +++ b/connectors/src/resources/connector_resource.ts @@ -1,4 +1,4 @@ -import type { ConnectorProvider, Result } from "@dust-tt/types"; +import type { ConnectorProvider, ModelId, Result } from "@dust-tt/types"; import { Err, Ok } from "@dust-tt/types"; import type { Attributes, @@ -114,6 +114,18 @@ export class ConnectorResource extends BaseResource { return new this(this.model, blob.get()); } + static async fetchByIds(ids: ModelId[]) { + const blobs = await ConnectorResource.model.findAll({ + where: { + id: ids, + }, + }); + + return blobs.map( + (b: ConnectorModel) => new ConnectorResource(ConnectorModel, b.get()) + ); + } + async delete(): Promise> { return sequelizeConnection.transaction(async (transaction) => { try { @@ -132,4 +144,12 @@ export class ConnectorResource extends BaseResource { } }); } + + isPaused() { + return !!this.pausedAt; + } + + async markAsPaused() { + return this.update({ pausedAt: new Date() }); + } } diff --git a/connectors/src/resources/storage/models/connector_model.ts b/connectors/src/resources/storage/models/connector_model.ts index fd20ebc3d6f1..0e129b357577 100644 --- a/connectors/src/resources/storage/models/connector_model.ts +++ b/connectors/src/resources/storage/models/connector_model.ts @@ -34,6 +34,8 @@ export class ConnectorModel extends Model< declare firstSuccessfulSyncTime?: Date; declare firstSyncProgress?: string; declare lastGCTime: Date | null; + + declare pausedAt?: Date | null; } ConnectorModel.init( @@ -105,6 +107,10 @@ ConnectorModel.init( type: DataTypes.DATE, allowNull: true, }, + pausedAt: { + type: DataTypes.DATE, + allowNull: true, + }, }, { sequelize: sequelizeConnection, diff --git a/types/src/front/lib/connectors_api.ts b/types/src/front/lib/connectors_api.ts index 5e376921020e..978e5f632bca 100644 --- a/types/src/front/lib/connectors_api.ts +++ b/types/src/front/lib/connectors_api.ts @@ -183,6 +183,20 @@ export class ConnectorsAPI { return this._resultFromResponse(res); } + async pauseConnector( + connectorId: string + ): Promise> { + const res = await fetch( + `${CONNECTORS_API}/connectors/pause/${encodeURIComponent(connectorId)}`, + { + method: "POST", + headers: this.getDefaultHeaders(), + } + ); + + return this._resultFromResponse(res); + } + async resumeConnector( connectorId: string ): Promise> {