Skip to content

Commit

Permalink
enh: cache nango access tokens (#2229)
Browse files Browse the repository at this point in the history
  • Loading branch information
fontanierh authored Oct 23, 2023
1 parent 5911537 commit e7aa66b
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 11 deletions.
83 changes: 83 additions & 0 deletions connectors/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions connectors/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"pg-hstore": "^2.3.4",
"pino": "^8.11.0",
"pino-pretty": "^10.0.0",
"redis": "^4.6.10",
"sequelize": "^6.31.0",
"uuid": "^9.0.0"
},
Expand Down
10 changes: 6 additions & 4 deletions connectors/src/connectors/notion/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
nango_client,
nangoDeleteConnection,
} from "@connectors/lib/nango_client";
import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers";
import { Err, Ok, Result } from "@connectors/lib/result";
import mainLogger from "@connectors/logger/logger";
import { DataSourceConfig } from "@connectors/types/data_source_config";
Expand All @@ -44,10 +45,11 @@ export async function createNotionConnector(
throw new Error("NANGO_NOTION_CONNECTOR_ID not set");
}

const notionAccessToken = (await nango_client().getToken(
NANGO_NOTION_CONNECTOR_ID,
nangoConnectionId
)) as string;
const notionAccessToken = await getAccessTokenFromNango({
connectionId: nangoConnectionId,
integrationId: NANGO_NOTION_CONNECTOR_ID,
useCache: false,
});

if (!validateAccessToken(notionAccessToken)) {
return new Err(new Error("Notion access token is invalid"));
Expand Down
11 changes: 6 additions & 5 deletions connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import {
NotionDatabase,
NotionPage,
} from "@connectors/lib/models";
import { nango_client } from "@connectors/lib/nango_client";
import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers";
import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status";
import mainLogger from "@connectors/logger/logger";

Expand Down Expand Up @@ -461,10 +461,11 @@ export async function getNotionAccessToken(
throw new Error("NANGO_NOTION_CONNECTOR_ID not set");
}

const notionAccessToken = (await nango_client().getToken(
NANGO_NOTION_CONNECTOR_ID,
nangoConnectionId
)) as string;
const notionAccessToken = await getAccessTokenFromNango({
connectionId: nangoConnectionId,
integrationId: NANGO_NOTION_CONNECTOR_ID,
useCache: true,
});

return notionAccessToken;
}
Expand Down
8 changes: 6 additions & 2 deletions connectors/src/connectors/slack/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from "@connectors/lib/data_sources";
import { WorkflowError } from "@connectors/lib/error";
import { SlackChannel, SlackMessages } from "@connectors/lib/models";
import { nango_client } from "@connectors/lib/nango_client";
import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers";
import {
reportInitialSyncProgress,
syncSucceeded,
Expand Down Expand Up @@ -682,7 +682,11 @@ export async function getAccessToken(
if (!NANGO_SLACK_CONNECTOR_ID) {
throw new Error("NANGO_SLACK_CONNECTOR_ID is not defined");
}
return nango_client().getToken(NANGO_SLACK_CONNECTOR_ID, nangoConnectionId);
return getAccessTokenFromNango({
connectionId: nangoConnectionId,
integrationId: NANGO_SLACK_CONNECTOR_ID,
useCache: true,
});
}

export async function saveSuccessSyncActivity(connectorId: string) {
Expand Down
58 changes: 58 additions & 0 deletions connectors/src/lib/nango_helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { redisClient } from "@connectors/lib/redis";
import { NangoConnectionId } from "@connectors/types/nango_connection_id";

import { nango_client } from "./nango_client";

const NANGO_ACCESS_TOKEN_TTL_SECONDS = 60 * 5; // 5 minutes

export async function getAccessTokenFromNango({
connectionId,
integrationId,
useCache = false,
}: {
connectionId: NangoConnectionId;
integrationId: string;
useCache?: boolean;
}) {
const cacheKey = `nango_access_token:${integrationId}/${connectionId}`;
const redis = await redisClient();

const _setCache = (token: string) =>
redis.set(cacheKey, token, {
EX: NANGO_ACCESS_TOKEN_TTL_SECONDS,
});

if (!useCache) {
const accessToken = await _getAccessTokenFromNango({
connectionId,
integrationId,
});
await _setCache(accessToken);
return accessToken;
}

const maybeAccessToken = await redis.get(cacheKey);
if (maybeAccessToken) {
return maybeAccessToken;
}
const accessToken = await nango_client().getToken(
integrationId,
connectionId
);
await _setCache(accessToken);
return accessToken;
}

async function _getAccessTokenFromNango({
connectionId,
integrationId,
}: {
connectionId: NangoConnectionId;
integrationId: string;
}) {
const accessToken = await nango_client().getToken(
integrationId,
connectionId
);
return accessToken;
}
16 changes: 16 additions & 0 deletions connectors/src/lib/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { createClient } from "redis";

export async function redisClient() {
const { REDIS_URI } = process.env;
if (!REDIS_URI) {
throw new Error("REDIS_URI is not defined");
}
const client = createClient({
url: REDIS_URI,
});
client.on("error", (err) => console.log("Redis Client Error", err));

await client.connect();

return client;
}

0 comments on commit e7aa66b

Please sign in to comment.