diff --git a/connectors/package-lock.json b/connectors/package-lock.json index fd2c7d782a3a..e6cd6fd5eff0 100644 --- a/connectors/package-lock.json +++ b/connectors/package-lock.json @@ -42,6 +42,7 @@ "pg-hstore": "^2.3.4", "pino": "^8.11.0", "pino-pretty": "^10.0.0", + "redis": "^4.6.10", "sequelize": "^6.31.0", "uuid": "^9.0.0" }, @@ -1179,6 +1180,59 @@ "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==" }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.5.11", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.11.tgz", + "integrity": "sha512-cV7yHcOAtNQ5x/yQl7Yw1xf53kO0FNDTdDU6bFIMbW6ljB7U7ns0YRM+QIkpoqTAt6zK5k9Fq0QWlUbLcq9AvA==", + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/graph": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", + "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.6.tgz", + "integrity": "sha512-rcZO3bfQbm2zPRpqo82XbW8zg4G/w4W3tI7X8Mqleq9goQjAGLL7q/1n1ZX4dXEAmORVZ4s1+uKLaUOg7LrUhw==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.1.5", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.5.tgz", + "integrity": "sha512-hPP8w7GfGsbtYEJdn4n7nXa6xt6hVZnnDktKW4ArMaFQ/m/aR7eFvsLQmG/mn1Upq99btPJk+F27IQ2dYpCoUg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@scarf/scarf": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/@scarf/scarf/-/scarf-1.1.1.tgz", @@ -2615,6 +2669,14 @@ "wrap-ansi": "^7.0.0" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -3651,6 +3713,14 @@ "node": ">=12" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "engines": { + "node": ">= 4" + } + }, "node_modules/get-caller-file": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz", @@ -5333,6 +5403,19 @@ "node": ">= 12.13.0" } }, + "node_modules/redis": { + "version": "4.6.10", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.10.tgz", + "integrity": "sha512-mmbyhuKgDiJ5TWUhiKhBssz+mjsuSI/lSZNPI9QvZOYzWvYGejtb+W3RlDDf8LD6Bdl5/mZeG8O1feUGhXTxEg==", + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.11", + "@redis/graph": "1.1.0", + "@redis/json": "1.0.6", + "@redis/search": "1.1.5", + "@redis/time-series": "1.0.5" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", diff --git a/connectors/package.json b/connectors/package.json index a2b5ed739cb7..8f89b88296ab 100644 --- a/connectors/package.json +++ b/connectors/package.json @@ -47,6 +47,7 @@ "pg-hstore": "^2.3.4", "pino": "^8.11.0", "pino-pretty": "^10.0.0", + "redis": "^4.6.10", "sequelize": "^6.31.0", "uuid": "^9.0.0" }, diff --git a/connectors/src/connectors/notion/index.ts b/connectors/src/connectors/notion/index.ts index 9169cdc2afdc..6488cbc74d5d 100644 --- a/connectors/src/connectors/notion/index.ts +++ b/connectors/src/connectors/notion/index.ts @@ -18,6 +18,7 @@ import { nango_client, nangoDeleteConnection, } from "@connectors/lib/nango_client"; +import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers"; import { Err, Ok, Result } from "@connectors/lib/result"; import mainLogger from "@connectors/logger/logger"; import { DataSourceConfig } from "@connectors/types/data_source_config"; @@ -44,10 +45,11 @@ export async function createNotionConnector( throw new Error("NANGO_NOTION_CONNECTOR_ID not set"); } - const notionAccessToken = (await nango_client().getToken( - NANGO_NOTION_CONNECTOR_ID, - nangoConnectionId - )) as string; + const notionAccessToken = await getAccessTokenFromNango({ + connectionId: nangoConnectionId, + integrationId: NANGO_NOTION_CONNECTOR_ID, + useCache: false, + }); if (!validateAccessToken(notionAccessToken)) { return new Err(new Error("Notion access token is invalid")); diff --git a/connectors/src/connectors/notion/temporal/activities.ts b/connectors/src/connectors/notion/temporal/activities.ts index 5dc05d06426f..c378513911c8 100644 --- a/connectors/src/connectors/notion/temporal/activities.ts +++ b/connectors/src/connectors/notion/temporal/activities.ts @@ -51,7 +51,7 @@ import { NotionDatabase, NotionPage, } from "@connectors/lib/models"; -import { nango_client } from "@connectors/lib/nango_client"; +import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers"; import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status"; import mainLogger from "@connectors/logger/logger"; @@ -461,10 +461,11 @@ export async function getNotionAccessToken( throw new Error("NANGO_NOTION_CONNECTOR_ID not set"); } - const notionAccessToken = (await nango_client().getToken( - NANGO_NOTION_CONNECTOR_ID, - nangoConnectionId - )) as string; + const notionAccessToken = await getAccessTokenFromNango({ + connectionId: nangoConnectionId, + integrationId: NANGO_NOTION_CONNECTOR_ID, + useCache: true, + }); return notionAccessToken; } diff --git a/connectors/src/connectors/slack/temporal/activities.ts b/connectors/src/connectors/slack/temporal/activities.ts index 18ecd422926f..cc72aee48fd5 100644 --- a/connectors/src/connectors/slack/temporal/activities.ts +++ b/connectors/src/connectors/slack/temporal/activities.ts @@ -25,7 +25,7 @@ import { } from "@connectors/lib/data_sources"; import { WorkflowError } from "@connectors/lib/error"; import { SlackChannel, SlackMessages } from "@connectors/lib/models"; -import { nango_client } from "@connectors/lib/nango_client"; +import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers"; import { reportInitialSyncProgress, syncSucceeded, @@ -682,7 +682,11 @@ export async function getAccessToken( if (!NANGO_SLACK_CONNECTOR_ID) { throw new Error("NANGO_SLACK_CONNECTOR_ID is not defined"); } - return nango_client().getToken(NANGO_SLACK_CONNECTOR_ID, nangoConnectionId); + return getAccessTokenFromNango({ + connectionId: nangoConnectionId, + integrationId: NANGO_SLACK_CONNECTOR_ID, + useCache: true, + }); } export async function saveSuccessSyncActivity(connectorId: string) { diff --git a/connectors/src/lib/nango_helpers.ts b/connectors/src/lib/nango_helpers.ts new file mode 100644 index 000000000000..2768341a122c --- /dev/null +++ b/connectors/src/lib/nango_helpers.ts @@ -0,0 +1,58 @@ +import { redisClient } from "@connectors/lib/redis"; +import { NangoConnectionId } from "@connectors/types/nango_connection_id"; + +import { nango_client } from "./nango_client"; + +const NANGO_ACCESS_TOKEN_TTL_SECONDS = 60 * 5; // 5 minutes + +export async function getAccessTokenFromNango({ + connectionId, + integrationId, + useCache = false, +}: { + connectionId: NangoConnectionId; + integrationId: string; + useCache?: boolean; +}) { + const cacheKey = `nango_access_token:${integrationId}/${connectionId}`; + const redis = await redisClient(); + + const _setCache = (token: string) => + redis.set(cacheKey, token, { + EX: NANGO_ACCESS_TOKEN_TTL_SECONDS, + }); + + if (!useCache) { + const accessToken = await _getAccessTokenFromNango({ + connectionId, + integrationId, + }); + await _setCache(accessToken); + return accessToken; + } + + const maybeAccessToken = await redis.get(cacheKey); + if (maybeAccessToken) { + return maybeAccessToken; + } + const accessToken = await nango_client().getToken( + integrationId, + connectionId + ); + await _setCache(accessToken); + return accessToken; +} + +async function _getAccessTokenFromNango({ + connectionId, + integrationId, +}: { + connectionId: NangoConnectionId; + integrationId: string; +}) { + const accessToken = await nango_client().getToken( + integrationId, + connectionId + ); + return accessToken; +} diff --git a/connectors/src/lib/redis.ts b/connectors/src/lib/redis.ts new file mode 100644 index 000000000000..7e2966714378 --- /dev/null +++ b/connectors/src/lib/redis.ts @@ -0,0 +1,16 @@ +import { createClient } from "redis"; + +export async function redisClient() { + const { REDIS_URI } = process.env; + if (!REDIS_URI) { + throw new Error("REDIS_URI is not defined"); + } + const client = createClient({ + url: REDIS_URI, + }); + client.on("error", (err) => console.log("Redis Client Error", err)); + + await client.connect(); + + return client; +}