Skip to content

Commit

Permalink
feat: add "pause" method to all connectors (#4558)
Browse files Browse the repository at this point in the history
* feat: pause connectors

* endpoint

* front bindings

* terminate the workflows

* better naming

* more instance methods

* r

---------

Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Apr 4, 2024
1 parent b458004 commit 12bd151
Show file tree
Hide file tree
Showing 18 changed files with 284 additions and 3 deletions.
58 changes: 58 additions & 0 deletions connectors/src/api/pause_connector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import type { WithConnectorsAPIErrorReponse } from "@dust-tt/types";
import type { Request, Response } from "express";

import { PAUSE_CONNECTOR_BY_TYPE } from "@connectors/connectors";
import { errorFromAny } from "@connectors/lib/error";
import logger from "@connectors/logger/logger";
import { apiError, withLogging } from "@connectors/logger/withlogging";
import { ConnectorResource } from "@connectors/resources/connector_resource";

type ConnectorPauseResBody = WithConnectorsAPIErrorReponse<{
connectorId: string;
}>;

const _pauseConnectorAPIHandler = async (
req: Request<{ connector_id: string }, ConnectorPauseResBody>,
res: Response<ConnectorPauseResBody>
) => {
try {
const connector = await ConnectorResource.fetchById(
req.params.connector_id
);
if (!connector) {
return apiError(req, res, {
api_error: {
type: "connector_not_found",
message: "Connector not found",
},
status_code: 404,
});
}
const connectorPauser = PAUSE_CONNECTOR_BY_TYPE[connector.type];

const pauseRes = await connectorPauser(connector.id);

if (pauseRes.isErr()) {
return apiError(req, res, {
status_code: 500,
api_error: {
type: "internal_server_error",
message: pauseRes.error.message,
},
});
}

return res.sendStatus(204);
} catch (e) {
logger.error(errorFromAny(e), "Failed to pause the connector");
return apiError(req, res, {
status_code: 500,
api_error: {
type: "internal_server_error",
message: "Could not pause the connector",
},
});
}
};

export const pauseConnectorAPIHandler = withLogging(_pauseConnectorAPIHandler);
10 changes: 10 additions & 0 deletions connectors/src/api/webhooks/webhook_github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ const _webhookGithubAPIHandler = async (

const enabledConnectors: ConnectorResource[] = [];
for (const connector of connectors) {
if (connector.isPaused()) {
logger.info(
{
connectorId: connector.id,
installationId,
},
"Skipping webhook for Github connector because it is paused."
);
continue;
}
const connectorState = githubConnectorStates[connector.id];
if (!connectorState) {
logger.error(
Expand Down
21 changes: 21 additions & 0 deletions connectors/src/api/webhooks/webhook_google_drive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { launchGoogleDriveIncrementalSyncWorkflow } from "@connectors/connectors
import { GoogleDriveWebhook } from "@connectors/lib/models/google_drive";
import logger from "@connectors/logger/logger";
import { apiError, withLogging } from "@connectors/logger/withlogging";
import { ConnectorResource } from "@connectors/resources/connector_resource";

type GoogleDriveWebhookResBody = WithConnectorsAPIErrorReponse<null>;

Expand Down Expand Up @@ -41,6 +42,26 @@ const _webhookGoogleDriveAPIHandler = async (
},
});
}
const connector = await ConnectorResource.fetchById(webhook.connectorId);
if (!connector) {
return apiError(req, res, {
status_code: 404,
api_error: {
message: "Connector not found",
type: "invalid_request_error",
},
});
}
if (connector.isPaused()) {
logger.info(
{
connectorId: webhook.connectorId,
webhookId: webhook.webhookId,
},
"Did not signal a Gdrive webhook to the incremenal sync workflow because the connector is paused"
);
return res.status(200).end();
}

const workflowRes = await launchGoogleDriveIncrementalSyncWorkflow(
webhook.connectorId
Expand Down
13 changes: 12 additions & 1 deletion connectors/src/api/webhooks/webhook_intercom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const _webhookIntercomAPIHandler = async (
const connector = await ConnectorResource.fetchById(
intercomWorskpace.connectorId
);

if (!connector || connector.type !== "intercom") {
logger.error(
{
Expand All @@ -101,8 +102,18 @@ const _webhookIntercomAPIHandler = async (
return res.status(200).end();
}

// Check we have the permissions to sync this conversation
if (connector.isPaused()) {
logger.info(
{
connectorId: connector.id,
},
"[Intercom] Received webhook for paused connector, skipping."
);
return res.status(200).end();
}

if (!conversation.team_assignee_id) {
// Check we have the permissions to sync this conversation
logger.info(
"[Intercom] Received webhook for conversation without team, skipping."
);
Expand Down
2 changes: 2 additions & 0 deletions connectors/src/api_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { getConnectorAPIHandler } from "@connectors/api/get_connector";
import { getConnectorPermissionsAPIHandler } from "@connectors/api/get_connector_permissions";
import { getContentNodesParentsAPIHandler } from "@connectors/api/get_content_node_parents";
import { getContentNodesAPIHandler } from "@connectors/api/get_content_nodes";
import { pauseConnectorAPIHandler } from "@connectors/api/pause_connector";
import { resumeConnectorAPIHandler } from "@connectors/api/resume_connector";
import { setConnectorPermissionsAPIHandler } from "@connectors/api/set_connector_permissions";
import {
Expand Down Expand Up @@ -89,6 +90,7 @@ export function startServer(port: number) {
app.post("/connectors/create/:connector_provider", createConnectorAPIHandler);
app.post("/connectors/update/:connector_id/", getConnectorUpdateAPIHandler);
app.post("/connectors/stop/:connector_id", stopConnectorAPIHandler);
app.post("/connectors/pause/:connector_id", pauseConnectorAPIHandler);
app.post("/connectors/resume/:connector_id", resumeConnectorAPIHandler);
app.delete("/connectors/delete/:connector_id", deleteConnectorAPIHandler);
app.get("/connectors/:connector_id", getConnectorAPIHandler);
Expand Down
14 changes: 14 additions & 0 deletions connectors/src/connectors/github/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
GithubDiscussion,
GithubIssue,
} from "@connectors/lib/models/github";
import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal";
import mainLogger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import type { DataSourceConfig } from "@connectors/types/data_source_config";
Expand Down Expand Up @@ -142,6 +143,19 @@ export async function stopGithubConnector(
}
}

export async function pauseGithubConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "Connector not found");
return new Err(new Error("Connector not found"));
}
await connector.markAsPaused();
await terminateAllWorkflowsForConnectorId(connectorId);
return new Ok(undefined);
}

export async function resumeGithubConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
Expand Down
11 changes: 11 additions & 0 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import {
getGoogleCredentials,
} from "@connectors/connectors/google_drive/temporal/utils";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive";

Expand Down Expand Up @@ -825,3 +826,13 @@ export async function googleDriveGarbageCollect(connectorId: ModelId) {

return launchGoogleGarbageCollector(connectorId);
}

export async function pauseGoogleDriveConnector(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
return new Err(new Error(`Connector not found with id ${connectorId}`));
}
await connector.markAsPaused();
await terminateAllWorkflowsForConnectorId(connectorId);
return new Ok(undefined);
}
15 changes: 15 additions & 0 deletions connectors/src/connectors/google_drive/pauseGoogleDriveWebhooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { ModelId } from "@dust-tt/types";
import { Err, Ok } from "@dust-tt/types";

import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal";
import { ConnectorResource } from "@connectors/resources/connector_resource";

export async function pauseGoogleDriveWebhooks(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
return new Err(new Error(`Connector not found with id ${connectorId}`));
}
await connector.markAsPaused();
await terminateAllWorkflowsForConnectorId(connectorId);
return new Ok(undefined);
}
23 changes: 23 additions & 0 deletions connectors/src/connectors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
createGithubConnector,
fullResyncGithubConnector,
getGithubConfig,
pauseGithubConnector,
resumeGithubConnector,
retrieveGithubConnectorPermissions,
retrieveGithubContentNodeParents,
Expand All @@ -31,6 +32,7 @@ import {
createGoogleDriveConnector,
getGoogleDriveConfig,
googleDriveGarbageCollect,
pauseGoogleDriveConnector,
retrieveGoogleDriveConnectorPermissions,
retrieveGoogleDriveContentNodeParents,
retrieveGoogleDriveContentNodes,
Expand All @@ -43,6 +45,7 @@ import {
cleanupIntercomConnector,
createIntercomConnector,
fullResyncIntercomSyncWorkflow,
pauseIntercomConnector,
resumeIntercomConnector,
retrieveIntercomConnectorPermissions,
retrieveIntercomContentNodeParents,
Expand All @@ -59,6 +62,7 @@ import type {
ConnectorCreatorOAuth,
ConnectorCreatorUrl,
ConnectorGarbageCollector,
ConnectorPauser,
ConnectorPermissionRetriever,
ConnectorPermissionSetter,
ConnectorResumer,
Expand All @@ -72,6 +76,7 @@ import {
cleanupNotionConnector,
createNotionConnector,
fullResyncNotionConnector,
pauseNotionConnector,
resumeNotionConnector,
retrieveNotionConnectorPermissions,
retrieveNotionContentNodeParents,
Expand All @@ -83,6 +88,7 @@ import {
cleanupSlackConnector,
createSlackConnector,
getSlackConfig,
pauseSlackConnector,
retrieveSlackConnectorPermissions,
retrieveSlackContentNodes,
setSlackConfig,
Expand All @@ -95,6 +101,7 @@ import logger from "@connectors/logger/logger";
import {
cleanupWebcrawlerConnector,
createWebcrawlerConnector,
pauseWebcrawlerConnector,
retrieveWebcrawlerConnectorPermissions,
retrieveWebCrawlerContentNodeParents,
retrieveWebCrawlerContentNodes,
Expand Down Expand Up @@ -322,3 +329,19 @@ export const GARBAGE_COLLECT_BY_TYPE: Record<
throw new Error("Not implemented");
},
};

// If the connector has webhooks: stop processing them.
// If the connector has long-running workflows: stop them.
// Exclude the connector from the prod checks.
export const PAUSE_CONNECTOR_BY_TYPE: Record<
ConnectorProvider,
ConnectorPauser
> = {
confluence: stopConfluenceConnector,
slack: pauseSlackConnector,
notion: pauseNotionConnector,
github: pauseGithubConnector,
google_drive: pauseGoogleDriveConnector,
intercom: pauseIntercomConnector,
webcrawler: pauseWebcrawlerConnector,
};
16 changes: 16 additions & 0 deletions connectors/src/connectors/intercom/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,19 @@ export async function retrieveIntercomContentNodeParents(

return new Ok(parents);
}

export async function pauseIntercomConnector(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Intercom] Connector not found.");
return new Err(new Error("Connector not found"));
}

await connector.markAsPaused();
const stopRes = await stopIntercomConnector(connectorId);
if (stopRes.isErr()) {
return stopRes;
}

return new Ok(undefined);
}
4 changes: 4 additions & 0 deletions connectors/src/connectors/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,7 @@ export type ConnectorConfigGetter = (
export type ConnectorGarbageCollector = (
connectorId: ModelId
) => Promise<Result<string, Error>>;

export type ConnectorPauser = (
connectorId: ModelId
) => Promise<Result<undefined, Error>>;
25 changes: 25 additions & 0 deletions connectors/src/connectors/notion/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,31 @@ export async function stopNotionConnector(
return new Ok(undefined);
}

export async function pauseNotionConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);

if (!connector) {
logger.error(
{
connectorId,
},
"Notion connector not found."
);

return new Err(new Error("Connector not found"));
}

await connector.markAsPaused();
const stopRes = await stopNotionConnector(connector.id);
if (stopRes.isErr()) {
return stopRes;
}

return new Ok(undefined);
}

export async function resumeNotionConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
Expand Down
11 changes: 11 additions & 0 deletions connectors/src/connectors/slack/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
nango_client,
nangoDeleteConnection,
} from "@connectors/lib/nango_client.js";
import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import type { DataSourceConfig } from "@connectors/types/data_source_config.js";
Expand Down Expand Up @@ -642,3 +643,13 @@ export async function setSlackConfig(
}
}
}

export async function pauseSlackConnector(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
return new Err(new Error(`Connector not found with id ${connectorId}`));
}
await connector.markAsPaused();
await terminateAllWorkflowsForConnectorId(connectorId);
return new Ok(undefined);
}
Loading

0 comments on commit 12bd151

Please sign in to comment.