From b426b5d8eb3ce283d24253fd576ba413de3e4427 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 3 Apr 2024 18:44:38 +0200 Subject: [PATCH 1/7] feat: pause connectors --- connectors/src/api/webhooks/webhook_github.ts | 10 ++++++++ .../src/api/webhooks/webhook_google_drive.ts | 21 ++++++++++++++++ .../src/api/webhooks/webhook_intercom.ts | 13 +++++++++- connectors/src/connectors/github/index.ts | 12 +++++++++ .../src/connectors/google_drive/index.ts | 9 +++++++ connectors/src/connectors/index.ts | 23 +++++++++++++++++ connectors/src/connectors/intercom/index.ts | 16 ++++++++++++ connectors/src/connectors/interface.ts | 4 +++ connectors/src/connectors/notion/index.ts | 25 +++++++++++++++++++ connectors/src/connectors/slack/index.ts | 11 ++++++++ connectors/src/connectors/webcrawler/index.ts | 15 +++++++++++ .../webcrawler/temporal/activities.ts | 7 +++++- .../src/resources/connector_resource.ts | 12 +++++++++ .../storage/models/connector_model.ts | 13 +++++++++- 14 files changed, 188 insertions(+), 3 deletions(-) diff --git a/connectors/src/api/webhooks/webhook_github.ts b/connectors/src/api/webhooks/webhook_github.ts index b0560489d09d..e4e031087cde 100644 --- a/connectors/src/api/webhooks/webhook_github.ts +++ b/connectors/src/api/webhooks/webhook_github.ts @@ -140,6 +140,16 @@ const _webhookGithubAPIHandler = async ( const enabledConnectors: ConnectorResource[] = []; for (const connector of connectors) { + if (connector.pausedAt) { + logger.info( + { + connectorId: connector.id, + installationId, + }, + "Skipping webhook for Github connector because it is paused." + ); + continue; + } const connectorState = githubConnectorStates[connector.id]; if (!connectorState) { logger.error( diff --git a/connectors/src/api/webhooks/webhook_google_drive.ts b/connectors/src/api/webhooks/webhook_google_drive.ts index 198c9320604f..99bf5c904eec 100644 --- a/connectors/src/api/webhooks/webhook_google_drive.ts +++ b/connectors/src/api/webhooks/webhook_google_drive.ts @@ -6,6 +6,7 @@ import { launchGoogleDriveIncrementalSyncWorkflow } from "@connectors/connectors import { GoogleDriveWebhook } from "@connectors/lib/models/google_drive"; import logger from "@connectors/logger/logger"; import { apiError, withLogging } from "@connectors/logger/withlogging"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; type GoogleDriveWebhookResBody = WithConnectorsAPIErrorReponse; @@ -41,6 +42,26 @@ const _webhookGoogleDriveAPIHandler = async ( }, }); } + const connector = await ConnectorResource.fetchById(webhook.connectorId); + if (!connector) { + return apiError(req, res, { + status_code: 404, + api_error: { + message: "Connector not found", + type: "invalid_request_error", + }, + }); + } + if (connector.pausedAt) { + logger.info( + { + connectorId: webhook.connectorId, + webhookId: webhook.webhookId, + }, + "Did not signal a Gdrive webhook to the incremenal sync workflow because the connector is paused" + ); + return res.status(200).end(); + } const workflowRes = await launchGoogleDriveIncrementalSyncWorkflow( webhook.connectorId diff --git a/connectors/src/api/webhooks/webhook_intercom.ts b/connectors/src/api/webhooks/webhook_intercom.ts index 881b381448bb..ca1ed2c37fb1 100644 --- a/connectors/src/api/webhooks/webhook_intercom.ts +++ b/connectors/src/api/webhooks/webhook_intercom.ts @@ -91,6 +91,7 @@ const _webhookIntercomAPIHandler = async ( const connector = await ConnectorResource.fetchById( intercomWorskpace.connectorId ); + if (!connector || connector.type !== "intercom") { logger.error( { @@ -101,8 +102,18 @@ const _webhookIntercomAPIHandler = async ( return res.status(200).end(); } - // Check we have the permissions to sync this conversation + if (connector?.pausedAt) { + logger.info( + { + connectorId: connector.id, + }, + "[Intercom] Received webhook for paused connector, skipping." + ); + return res.status(200).end(); + } + if (!conversation.team_assignee_id) { + // Check we have the permissions to sync this conversation logger.info( "[Intercom] Received webhook for conversation without team, skipping." ); diff --git a/connectors/src/connectors/github/index.ts b/connectors/src/connectors/github/index.ts index 7af15b70e79d..6c0d1e7eae04 100644 --- a/connectors/src/connectors/github/index.ts +++ b/connectors/src/connectors/github/index.ts @@ -142,6 +142,18 @@ export async function stopGithubConnector( } } +export async function pauseGithubWebhooks( + connectorId: ModelId +): Promise> { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "Connector not found"); + return new Err(new Error("Connector not found")); + } + await connector.update({ pausedAt: new Date() }); + return new Ok(undefined); +} + export async function resumeGithubConnector( connectorId: ModelId ): Promise> { diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index 8f9af43fd5a9..302d8ff20d4c 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -825,3 +825,12 @@ export async function googleDriveGarbageCollect(connectorId: ModelId) { return launchGoogleGarbageCollector(connectorId); } + +export async function pauseGoogleDriveWebhooks(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + return new Err(new Error(`Connector not found with id ${connectorId}`)); + } + await connector.update({ pausedAt: new Date() }); + return new Ok(undefined); +} diff --git a/connectors/src/connectors/index.ts b/connectors/src/connectors/index.ts index 683a7c38a2ba..cf9d2f5a0db6 100644 --- a/connectors/src/connectors/index.ts +++ b/connectors/src/connectors/index.ts @@ -18,6 +18,7 @@ import { createGithubConnector, fullResyncGithubConnector, getGithubConfig, + pauseGithubWebhooks, resumeGithubConnector, retrieveGithubConnectorPermissions, retrieveGithubContentNodeParents, @@ -31,6 +32,7 @@ import { createGoogleDriveConnector, getGoogleDriveConfig, googleDriveGarbageCollect, + pauseGoogleDriveWebhooks, retrieveGoogleDriveConnectorPermissions, retrieveGoogleDriveContentNodeParents, retrieveGoogleDriveContentNodes, @@ -43,6 +45,7 @@ import { cleanupIntercomConnector, createIntercomConnector, fullResyncIntercomSyncWorkflow, + pauseIntercomConnector, resumeIntercomConnector, retrieveIntercomConnectorPermissions, retrieveIntercomContentNodeParents, @@ -59,6 +62,7 @@ import type { ConnectorCreatorOAuth, ConnectorCreatorUrl, ConnectorGarbageCollector, + ConnectorPauser, ConnectorPermissionRetriever, ConnectorPermissionSetter, ConnectorResumer, @@ -72,6 +76,7 @@ import { cleanupNotionConnector, createNotionConnector, fullResyncNotionConnector, + pauseNotionConnector, resumeNotionConnector, retrieveNotionConnectorPermissions, retrieveNotionContentNodeParents, @@ -83,6 +88,7 @@ import { cleanupSlackConnector, createSlackConnector, getSlackConfig, + pauseSlackWebhooks, retrieveSlackConnectorPermissions, retrieveSlackContentNodes, setSlackConfig, @@ -95,6 +101,7 @@ import logger from "@connectors/logger/logger"; import { cleanupWebcrawlerConnector, createWebcrawlerConnector, + pauseWebcrawlerConnector, retrieveWebcrawlerConnectorPermissions, retrieveWebCrawlerContentNodeParents, retrieveWebCrawlerContentNodes, @@ -322,3 +329,19 @@ export const GARBAGE_COLLECT_BY_TYPE: Record< throw new Error("Not implemented"); }, }; + +// If the connector has webhooks: stop processing them. +// If the connector has long-running workflows: stop them. +// Exclude the connector from the prod checks. +export const PAUSE_CONNECTOR_BY_TYPE: Record< + ConnectorProvider, + ConnectorPauser +> = { + confluence: stopConfluenceConnector, + slack: pauseSlackWebhooks, + notion: pauseNotionConnector, + github: pauseGithubWebhooks, + google_drive: pauseGoogleDriveWebhooks, + intercom: pauseIntercomConnector, + webcrawler: pauseWebcrawlerConnector, +}; diff --git a/connectors/src/connectors/intercom/index.ts b/connectors/src/connectors/intercom/index.ts index 448322385620..4675ceb77766 100644 --- a/connectors/src/connectors/intercom/index.ts +++ b/connectors/src/connectors/intercom/index.ts @@ -740,3 +740,19 @@ export async function retrieveIntercomContentNodeParents( return new Ok(parents); } + +export async function pauseIntercomConnector(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Intercom] Connector not found."); + return new Err(new Error("Connector not found")); + } + + await connector.update({ pausedAt: new Date() }); + const stopRes = await stopIntercomConnector(connectorId); + if (stopRes.isErr()) { + return stopRes; + } + + return new Ok(undefined); +} diff --git a/connectors/src/connectors/interface.ts b/connectors/src/connectors/interface.ts index 488b37733dd0..9d8dd9e7db79 100644 --- a/connectors/src/connectors/interface.ts +++ b/connectors/src/connectors/interface.ts @@ -90,3 +90,7 @@ export type ConnectorConfigGetter = ( export type ConnectorGarbageCollector = ( connectorId: ModelId ) => Promise>; + +export type ConnectorPauser = ( + connectorId: ModelId +) => Promise>; diff --git a/connectors/src/connectors/notion/index.ts b/connectors/src/connectors/notion/index.ts index 5f2372112edb..daa8964c79a4 100644 --- a/connectors/src/connectors/notion/index.ts +++ b/connectors/src/connectors/notion/index.ts @@ -205,6 +205,31 @@ export async function stopNotionConnector( return new Ok(undefined); } +export async function pauseNotionConnector( + connectorId: ModelId +): Promise> { + const connector = await ConnectorResource.fetchById(connectorId); + + if (!connector) { + logger.error( + { + connectorId, + }, + "Notion connector not found." + ); + + return new Err(new Error("Connector not found")); + } + + await connector.update({ pausedAt: new Date() }); + const stopRes = await stopNotionConnector(connector.id); + if (stopRes.isErr()) { + return stopRes; + } + + return new Ok(undefined); +} + export async function resumeNotionConnector( connectorId: ModelId ): Promise> { diff --git a/connectors/src/connectors/slack/index.ts b/connectors/src/connectors/slack/index.ts index a7ab6551121b..cdd5c1af933e 100644 --- a/connectors/src/connectors/slack/index.ts +++ b/connectors/src/connectors/slack/index.ts @@ -642,3 +642,14 @@ export async function setSlackConfig( } } } + +export async function pauseSlackWebhooks(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + return new Err(new Error(`Connector not found with id ${connectorId}`)); + } + await connector.update({ + pausedAt: new Date(), + }); + return new Ok(undefined); +} diff --git a/connectors/src/connectors/webcrawler/index.ts b/connectors/src/connectors/webcrawler/index.ts index bdf31c0329f3..bd7ec5ee06dc 100644 --- a/connectors/src/connectors/webcrawler/index.ts +++ b/connectors/src/connectors/webcrawler/index.ts @@ -270,6 +270,21 @@ export async function stopWebcrawlerConnector( return new Ok(undefined); } +export async function pauseWebcrawlerConnector( + connectorId: ModelId +): Promise> { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("Connector not found."); + } + await connector.update({ pausedAt: new Date() }); + const stopRes = await stopCrawlWebsiteWorkflow(connectorId); + if (stopRes.isErr()) { + return stopRes; + } + return new Ok(undefined); +} + export async function cleanupWebcrawlerConnector( connectorId: ModelId ): Promise> { diff --git a/connectors/src/connectors/webcrawler/temporal/activities.ts b/connectors/src/connectors/webcrawler/temporal/activities.ts index 928194b308bc..3de53caab89a 100644 --- a/connectors/src/connectors/webcrawler/temporal/activities.ts +++ b/connectors/src/connectors/webcrawler/temporal/activities.ts @@ -413,5 +413,10 @@ export async function getWebsitesToCrawl() { allConnectorIds.push(...websites.map((w) => w.connectorId)); } - return allConnectorIds; + const connectors = await ConnectorResource.fetchByIds(allConnectorIds); + const unPausedConnectorIds = connectors + .filter((c) => !c.pausedAt) + .map((c) => c.id); + + return unPausedConnectorIds; } diff --git a/connectors/src/resources/connector_resource.ts b/connectors/src/resources/connector_resource.ts index f36e49553211..2af0c139f698 100644 --- a/connectors/src/resources/connector_resource.ts +++ b/connectors/src/resources/connector_resource.ts @@ -114,6 +114,18 @@ export class ConnectorResource extends BaseResource { return new this(this.model, blob.get()); } + static async fetchByIds(ids: number[]) { + const blobs = await ConnectorResource.model.findAll({ + where: { + id: ids, + }, + }); + + return blobs.map( + (b: ConnectorModel) => new ConnectorResource(ConnectorModel, b.get()) + ); + } + async delete(): Promise> { return sequelizeConnection.transaction(async (transaction) => { try { diff --git a/connectors/src/resources/storage/models/connector_model.ts b/connectors/src/resources/storage/models/connector_model.ts index fd20ebc3d6f1..d05259e038a4 100644 --- a/connectors/src/resources/storage/models/connector_model.ts +++ b/connectors/src/resources/storage/models/connector_model.ts @@ -34,6 +34,8 @@ export class ConnectorModel extends Model< declare firstSuccessfulSyncTime?: Date; declare firstSyncProgress?: string; declare lastGCTime: Date | null; + + declare pausedAt?: Date | null; } ConnectorModel.init( @@ -105,10 +107,19 @@ ConnectorModel.init( type: DataTypes.DATE, allowNull: true, }, + pausedAt: { + type: DataTypes.DATE, + allowNull: true, + }, }, { sequelize: sequelizeConnection, modelName: "connectors", - indexes: [{ fields: ["workspaceId", "dataSourceName"], unique: true }], + indexes: [ + { fields: ["workspaceId", "dataSourceName"], unique: true }, + { + fields: ["pausedAt"], + }, + ], } ); From c914e35ded0427b550c96fe92a6f0f65d66bdd27 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 3 Apr 2024 18:49:20 +0200 Subject: [PATCH 2/7] endpoint --- connectors/src/api/pause_connector.ts | 58 +++++++++++++++++++++++++++ connectors/src/api_server.ts | 2 + 2 files changed, 60 insertions(+) create mode 100644 connectors/src/api/pause_connector.ts diff --git a/connectors/src/api/pause_connector.ts b/connectors/src/api/pause_connector.ts new file mode 100644 index 000000000000..07bcec582606 --- /dev/null +++ b/connectors/src/api/pause_connector.ts @@ -0,0 +1,58 @@ +import type { WithConnectorsAPIErrorReponse } from "@dust-tt/types"; +import type { Request, Response } from "express"; + +import { PAUSE_CONNECTOR_BY_TYPE } from "@connectors/connectors"; +import { errorFromAny } from "@connectors/lib/error"; +import logger from "@connectors/logger/logger"; +import { apiError, withLogging } from "@connectors/logger/withlogging"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; + +type ConnectorPauseResBody = WithConnectorsAPIErrorReponse<{ + connectorId: string; +}>; + +const _pauseConnectorAPIHandler = async ( + req: Request<{ connector_id: string }, ConnectorPauseResBody>, + res: Response +) => { + try { + const connector = await ConnectorResource.fetchById( + req.params.connector_id + ); + if (!connector) { + return apiError(req, res, { + api_error: { + type: "connector_not_found", + message: "Connector not found", + }, + status_code: 404, + }); + } + const connectorPauser = PAUSE_CONNECTOR_BY_TYPE[connector.type]; + + const pauseRes = await connectorPauser(connector.id); + + if (pauseRes.isErr()) { + return apiError(req, res, { + status_code: 500, + api_error: { + type: "internal_server_error", + message: pauseRes.error.message, + }, + }); + } + + return res.sendStatus(204); + } catch (e) { + logger.error(errorFromAny(e), "Failed to pause the connector"); + return apiError(req, res, { + status_code: 500, + api_error: { + type: "internal_server_error", + message: "Could not pause the connector", + }, + }); + } +}; + +export const pauseConnectorAPIHandler = withLogging(_pauseConnectorAPIHandler); diff --git a/connectors/src/api_server.ts b/connectors/src/api_server.ts index e087c60944fa..552ef2a8e300 100644 --- a/connectors/src/api_server.ts +++ b/connectors/src/api_server.ts @@ -11,6 +11,7 @@ import { getConnectorAPIHandler } from "@connectors/api/get_connector"; import { getConnectorPermissionsAPIHandler } from "@connectors/api/get_connector_permissions"; import { getContentNodesParentsAPIHandler } from "@connectors/api/get_content_node_parents"; import { getContentNodesAPIHandler } from "@connectors/api/get_content_nodes"; +import { pauseConnectorAPIHandler } from "@connectors/api/pause_connector"; import { resumeConnectorAPIHandler } from "@connectors/api/resume_connector"; import { setConnectorPermissionsAPIHandler } from "@connectors/api/set_connector_permissions"; import { @@ -89,6 +90,7 @@ export function startServer(port: number) { app.post("/connectors/create/:connector_provider", createConnectorAPIHandler); app.post("/connectors/update/:connector_id/", getConnectorUpdateAPIHandler); app.post("/connectors/stop/:connector_id", stopConnectorAPIHandler); + app.post("/connectors/pause/:connector_id", pauseConnectorAPIHandler); app.post("/connectors/resume/:connector_id", resumeConnectorAPIHandler); app.delete("/connectors/delete/:connector_id", deleteConnectorAPIHandler); app.get("/connectors/:connector_id", getConnectorAPIHandler); From 5797c0e5f719ab066445b84e2f265a29822c2a09 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 3 Apr 2024 18:50:54 +0200 Subject: [PATCH 3/7] front bindings --- types/src/front/lib/connectors_api.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/types/src/front/lib/connectors_api.ts b/types/src/front/lib/connectors_api.ts index 5e376921020e..978e5f632bca 100644 --- a/types/src/front/lib/connectors_api.ts +++ b/types/src/front/lib/connectors_api.ts @@ -183,6 +183,20 @@ export class ConnectorsAPI { return this._resultFromResponse(res); } + async pauseConnector( + connectorId: string + ): Promise> { + const res = await fetch( + `${CONNECTORS_API}/connectors/pause/${encodeURIComponent(connectorId)}`, + { + method: "POST", + headers: this.getDefaultHeaders(), + } + ); + + return this._resultFromResponse(res); + } + async resumeConnector( connectorId: string ): Promise> { From f5efd8df7148ccab17c27e7e66b50b5c6c024226 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 4 Apr 2024 14:39:15 +0200 Subject: [PATCH 4/7] terminate the workflows --- connectors/src/connectors/github/index.ts | 2 ++ connectors/src/connectors/google_drive/index.ts | 2 ++ connectors/src/connectors/slack/index.ts | 2 ++ 3 files changed, 6 insertions(+) diff --git a/connectors/src/connectors/github/index.ts b/connectors/src/connectors/github/index.ts index 6c0d1e7eae04..16c8b3ca5b7a 100644 --- a/connectors/src/connectors/github/index.ts +++ b/connectors/src/connectors/github/index.ts @@ -28,6 +28,7 @@ import { GithubDiscussion, GithubIssue, } from "@connectors/lib/models/github"; +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; import mainLogger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import type { DataSourceConfig } from "@connectors/types/data_source_config"; @@ -151,6 +152,7 @@ export async function pauseGithubWebhooks( return new Err(new Error("Connector not found")); } await connector.update({ pausedAt: new Date() }); + await terminateAllWorkflowsForConnectorId(connectorId); return new Ok(undefined); } diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index 302d8ff20d4c..ff5b8a7e0a15 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -63,6 +63,7 @@ import { getGoogleCredentials, } from "@connectors/connectors/google_drive/temporal/utils"; import { concurrentExecutor } from "@connectors/lib/async_utils"; +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive"; @@ -832,5 +833,6 @@ export async function pauseGoogleDriveWebhooks(connectorId: ModelId) { return new Err(new Error(`Connector not found with id ${connectorId}`)); } await connector.update({ pausedAt: new Date() }); + await terminateAllWorkflowsForConnectorId(connectorId); return new Ok(undefined); } diff --git a/connectors/src/connectors/slack/index.ts b/connectors/src/connectors/slack/index.ts index cdd5c1af933e..64b5ab19a2e5 100644 --- a/connectors/src/connectors/slack/index.ts +++ b/connectors/src/connectors/slack/index.ts @@ -30,6 +30,7 @@ import { nango_client, nangoDeleteConnection, } from "@connectors/lib/nango_client.js"; +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import type { DataSourceConfig } from "@connectors/types/data_source_config.js"; @@ -651,5 +652,6 @@ export async function pauseSlackWebhooks(connectorId: ModelId) { await connector.update({ pausedAt: new Date(), }); + await terminateAllWorkflowsForConnectorId(connectorId); return new Ok(undefined); } From 789e1a32d1c974b1336be797d37f4d26b18d5732 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 4 Apr 2024 14:41:03 +0200 Subject: [PATCH 5/7] better naming --- connectors/src/connectors/github/index.ts | 2 +- connectors/src/connectors/google_drive/index.ts | 2 +- connectors/src/connectors/index.ts | 12 ++++++------ connectors/src/connectors/slack/index.ts | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/connectors/src/connectors/github/index.ts b/connectors/src/connectors/github/index.ts index 16c8b3ca5b7a..0e56945ffd52 100644 --- a/connectors/src/connectors/github/index.ts +++ b/connectors/src/connectors/github/index.ts @@ -143,7 +143,7 @@ export async function stopGithubConnector( } } -export async function pauseGithubWebhooks( +export async function pauseGithubConnector( connectorId: ModelId ): Promise> { const connector = await ConnectorResource.fetchById(connectorId); diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index ff5b8a7e0a15..5fc125e1f6ef 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -827,7 +827,7 @@ export async function googleDriveGarbageCollect(connectorId: ModelId) { return launchGoogleGarbageCollector(connectorId); } -export async function pauseGoogleDriveWebhooks(connectorId: ModelId) { +export async function pauseGoogleDriveConnector(connectorId: ModelId) { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { return new Err(new Error(`Connector not found with id ${connectorId}`)); diff --git a/connectors/src/connectors/index.ts b/connectors/src/connectors/index.ts index cf9d2f5a0db6..02376c08bdfb 100644 --- a/connectors/src/connectors/index.ts +++ b/connectors/src/connectors/index.ts @@ -18,7 +18,7 @@ import { createGithubConnector, fullResyncGithubConnector, getGithubConfig, - pauseGithubWebhooks, + pauseGithubConnector, resumeGithubConnector, retrieveGithubConnectorPermissions, retrieveGithubContentNodeParents, @@ -32,7 +32,7 @@ import { createGoogleDriveConnector, getGoogleDriveConfig, googleDriveGarbageCollect, - pauseGoogleDriveWebhooks, + pauseGoogleDriveConnector, retrieveGoogleDriveConnectorPermissions, retrieveGoogleDriveContentNodeParents, retrieveGoogleDriveContentNodes, @@ -88,7 +88,7 @@ import { cleanupSlackConnector, createSlackConnector, getSlackConfig, - pauseSlackWebhooks, + pauseSlackConnector, retrieveSlackConnectorPermissions, retrieveSlackContentNodes, setSlackConfig, @@ -338,10 +338,10 @@ export const PAUSE_CONNECTOR_BY_TYPE: Record< ConnectorPauser > = { confluence: stopConfluenceConnector, - slack: pauseSlackWebhooks, + slack: pauseSlackConnector, notion: pauseNotionConnector, - github: pauseGithubWebhooks, - google_drive: pauseGoogleDriveWebhooks, + github: pauseGithubConnector, + google_drive: pauseGoogleDriveConnector, intercom: pauseIntercomConnector, webcrawler: pauseWebcrawlerConnector, }; diff --git a/connectors/src/connectors/slack/index.ts b/connectors/src/connectors/slack/index.ts index 64b5ab19a2e5..a86d424137f4 100644 --- a/connectors/src/connectors/slack/index.ts +++ b/connectors/src/connectors/slack/index.ts @@ -644,7 +644,7 @@ export async function setSlackConfig( } } -export async function pauseSlackWebhooks(connectorId: ModelId) { +export async function pauseSlackConnector(connectorId: ModelId) { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { return new Err(new Error(`Connector not found with id ${connectorId}`)); From 60ac34475a409bd8f2fb3fa7081775ad6d933fb7 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 4 Apr 2024 14:43:49 +0200 Subject: [PATCH 6/7] more instance methods --- connectors/src/api/webhooks/webhook_github.ts | 2 +- .../src/api/webhooks/webhook_google_drive.ts | 2 +- connectors/src/api/webhooks/webhook_intercom.ts | 2 +- connectors/src/connectors/github/index.ts | 2 +- connectors/src/connectors/google_drive/index.ts | 2 +- .../google_drive/pauseGoogleDriveWebhooks.ts | 15 +++++++++++++++ connectors/src/connectors/intercom/index.ts | 2 +- connectors/src/connectors/notion/index.ts | 2 +- connectors/src/connectors/slack/index.ts | 4 +--- connectors/src/connectors/webcrawler/index.ts | 2 +- .../connectors/webcrawler/temporal/activities.ts | 2 +- connectors/src/resources/connector_resource.ts | 8 ++++++++ 12 files changed, 33 insertions(+), 12 deletions(-) create mode 100644 connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts diff --git a/connectors/src/api/webhooks/webhook_github.ts b/connectors/src/api/webhooks/webhook_github.ts index e4e031087cde..5499b0480073 100644 --- a/connectors/src/api/webhooks/webhook_github.ts +++ b/connectors/src/api/webhooks/webhook_github.ts @@ -140,7 +140,7 @@ const _webhookGithubAPIHandler = async ( const enabledConnectors: ConnectorResource[] = []; for (const connector of connectors) { - if (connector.pausedAt) { + if (connector.isPaused()) { logger.info( { connectorId: connector.id, diff --git a/connectors/src/api/webhooks/webhook_google_drive.ts b/connectors/src/api/webhooks/webhook_google_drive.ts index 99bf5c904eec..2aed4f7cb2c9 100644 --- a/connectors/src/api/webhooks/webhook_google_drive.ts +++ b/connectors/src/api/webhooks/webhook_google_drive.ts @@ -52,7 +52,7 @@ const _webhookGoogleDriveAPIHandler = async ( }, }); } - if (connector.pausedAt) { + if (connector.isPaused()) { logger.info( { connectorId: webhook.connectorId, diff --git a/connectors/src/api/webhooks/webhook_intercom.ts b/connectors/src/api/webhooks/webhook_intercom.ts index ca1ed2c37fb1..8968e3ef02bd 100644 --- a/connectors/src/api/webhooks/webhook_intercom.ts +++ b/connectors/src/api/webhooks/webhook_intercom.ts @@ -102,7 +102,7 @@ const _webhookIntercomAPIHandler = async ( return res.status(200).end(); } - if (connector?.pausedAt) { + if (connector?.isPaused()) { logger.info( { connectorId: connector.id, diff --git a/connectors/src/connectors/github/index.ts b/connectors/src/connectors/github/index.ts index 0e56945ffd52..99402686996f 100644 --- a/connectors/src/connectors/github/index.ts +++ b/connectors/src/connectors/github/index.ts @@ -151,7 +151,7 @@ export async function pauseGithubConnector( logger.error({ connectorId }, "Connector not found"); return new Err(new Error("Connector not found")); } - await connector.update({ pausedAt: new Date() }); + await connector.markAsPaused(); await terminateAllWorkflowsForConnectorId(connectorId); return new Ok(undefined); } diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index 5fc125e1f6ef..dd2f9d334914 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -832,7 +832,7 @@ export async function pauseGoogleDriveConnector(connectorId: ModelId) { if (!connector) { return new Err(new Error(`Connector not found with id ${connectorId}`)); } - await connector.update({ pausedAt: new Date() }); + await connector.markAsPaused(); await terminateAllWorkflowsForConnectorId(connectorId); return new Ok(undefined); } diff --git a/connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts b/connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts new file mode 100644 index 000000000000..661aa3b34866 --- /dev/null +++ b/connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts @@ -0,0 +1,15 @@ +import type { ModelId } from "@dust-tt/types"; +import { Err, Ok } from "@dust-tt/types"; + +import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; + +export async function pauseGoogleDriveWebhooks(connectorId: ModelId) { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + return new Err(new Error(`Connector not found with id ${connectorId}`)); + } + await connector.markAsPaused(); + await terminateAllWorkflowsForConnectorId(connectorId); + return new Ok(undefined); +} diff --git a/connectors/src/connectors/intercom/index.ts b/connectors/src/connectors/intercom/index.ts index 4675ceb77766..48c6a4874fa6 100644 --- a/connectors/src/connectors/intercom/index.ts +++ b/connectors/src/connectors/intercom/index.ts @@ -748,7 +748,7 @@ export async function pauseIntercomConnector(connectorId: ModelId) { return new Err(new Error("Connector not found")); } - await connector.update({ pausedAt: new Date() }); + await connector.markAsPaused(); const stopRes = await stopIntercomConnector(connectorId); if (stopRes.isErr()) { return stopRes; diff --git a/connectors/src/connectors/notion/index.ts b/connectors/src/connectors/notion/index.ts index daa8964c79a4..cbe5f14c271b 100644 --- a/connectors/src/connectors/notion/index.ts +++ b/connectors/src/connectors/notion/index.ts @@ -221,7 +221,7 @@ export async function pauseNotionConnector( return new Err(new Error("Connector not found")); } - await connector.update({ pausedAt: new Date() }); + await connector.markAsPaused(); const stopRes = await stopNotionConnector(connector.id); if (stopRes.isErr()) { return stopRes; diff --git a/connectors/src/connectors/slack/index.ts b/connectors/src/connectors/slack/index.ts index a86d424137f4..59bb40d9441b 100644 --- a/connectors/src/connectors/slack/index.ts +++ b/connectors/src/connectors/slack/index.ts @@ -649,9 +649,7 @@ export async function pauseSlackConnector(connectorId: ModelId) { if (!connector) { return new Err(new Error(`Connector not found with id ${connectorId}`)); } - await connector.update({ - pausedAt: new Date(), - }); + await connector.markAsPaused(); await terminateAllWorkflowsForConnectorId(connectorId); return new Ok(undefined); } diff --git a/connectors/src/connectors/webcrawler/index.ts b/connectors/src/connectors/webcrawler/index.ts index bd7ec5ee06dc..2f2d9bf577fc 100644 --- a/connectors/src/connectors/webcrawler/index.ts +++ b/connectors/src/connectors/webcrawler/index.ts @@ -277,7 +277,7 @@ export async function pauseWebcrawlerConnector( if (!connector) { throw new Error("Connector not found."); } - await connector.update({ pausedAt: new Date() }); + await connector.markAsPaused(); const stopRes = await stopCrawlWebsiteWorkflow(connectorId); if (stopRes.isErr()) { return stopRes; diff --git a/connectors/src/connectors/webcrawler/temporal/activities.ts b/connectors/src/connectors/webcrawler/temporal/activities.ts index 3de53caab89a..84f40982261f 100644 --- a/connectors/src/connectors/webcrawler/temporal/activities.ts +++ b/connectors/src/connectors/webcrawler/temporal/activities.ts @@ -415,7 +415,7 @@ export async function getWebsitesToCrawl() { const connectors = await ConnectorResource.fetchByIds(allConnectorIds); const unPausedConnectorIds = connectors - .filter((c) => !c.pausedAt) + .filter((c) => !c.isPaused()) .map((c) => c.id); return unPausedConnectorIds; diff --git a/connectors/src/resources/connector_resource.ts b/connectors/src/resources/connector_resource.ts index 2af0c139f698..157e15c98ad5 100644 --- a/connectors/src/resources/connector_resource.ts +++ b/connectors/src/resources/connector_resource.ts @@ -144,4 +144,12 @@ export class ConnectorResource extends BaseResource { } }); } + + isPaused() { + return !!this.pausedAt; + } + + async markAsPaused() { + return this.update({ pausedAt: new Date() }); + } } From 7204d913ac9e3cdaa7345673dd67f1c409706733 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 4 Apr 2024 14:44:50 +0200 Subject: [PATCH 7/7] r --- connectors/src/api/webhooks/webhook_intercom.ts | 2 +- connectors/src/resources/connector_resource.ts | 4 ++-- connectors/src/resources/storage/models/connector_model.ts | 7 +------ 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/connectors/src/api/webhooks/webhook_intercom.ts b/connectors/src/api/webhooks/webhook_intercom.ts index 8968e3ef02bd..c28f07c9023f 100644 --- a/connectors/src/api/webhooks/webhook_intercom.ts +++ b/connectors/src/api/webhooks/webhook_intercom.ts @@ -102,7 +102,7 @@ const _webhookIntercomAPIHandler = async ( return res.status(200).end(); } - if (connector?.isPaused()) { + if (connector.isPaused()) { logger.info( { connectorId: connector.id, diff --git a/connectors/src/resources/connector_resource.ts b/connectors/src/resources/connector_resource.ts index 157e15c98ad5..c1cf2ae61ae8 100644 --- a/connectors/src/resources/connector_resource.ts +++ b/connectors/src/resources/connector_resource.ts @@ -1,4 +1,4 @@ -import type { ConnectorProvider, Result } from "@dust-tt/types"; +import type { ConnectorProvider, ModelId, Result } from "@dust-tt/types"; import { Err, Ok } from "@dust-tt/types"; import type { Attributes, @@ -114,7 +114,7 @@ export class ConnectorResource extends BaseResource { return new this(this.model, blob.get()); } - static async fetchByIds(ids: number[]) { + static async fetchByIds(ids: ModelId[]) { const blobs = await ConnectorResource.model.findAll({ where: { id: ids, diff --git a/connectors/src/resources/storage/models/connector_model.ts b/connectors/src/resources/storage/models/connector_model.ts index d05259e038a4..0e129b357577 100644 --- a/connectors/src/resources/storage/models/connector_model.ts +++ b/connectors/src/resources/storage/models/connector_model.ts @@ -115,11 +115,6 @@ ConnectorModel.init( { sequelize: sequelizeConnection, modelName: "connectors", - indexes: [ - { fields: ["workspaceId", "dataSourceName"], unique: true }, - { - fields: ["pausedAt"], - }, - ], + indexes: [{ fields: ["workspaceId", "dataSourceName"], unique: true }], } );