From e908d8a7c93d425c38d48d96ad2eaee34cff14bf Mon Sep 17 00:00:00 2001 From: dholms Date: Tue, 30 Jan 2024 15:41:55 -0600 Subject: [PATCH] cleanup, add cfg, add rate limiting on email --- packages/pds/package.json | 1 + packages/pds/src/config/config.ts | 20 ++++++ packages/pds/src/config/env.ts | 14 ++++ packages/pds/src/context.ts | 31 +++++++- packages/pds/src/signup-queue/activator.ts | 84 ++++++++++++++++++++-- packages/pds/src/signup-queue/notifier.ts | 30 -------- pnpm-lock.yaml | 3 + 7 files changed, 147 insertions(+), 36 deletions(-) delete mode 100644 packages/pds/src/signup-queue/notifier.ts diff --git a/packages/pds/package.json b/packages/pds/package.json index ce6e617607f..146efc228d5 100644 --- a/packages/pds/package.json +++ b/packages/pds/package.json @@ -70,6 +70,7 @@ "pg": "^8.10.0", "pino": "^8.15.0", "pino-http": "^8.2.1", + "rate-limiter-flexible": "^2.4.1", "sharp": "^0.32.6", "twilio": "^4.20.1", "typed-emitter": "^2.1.0", diff --git a/packages/pds/src/config/config.ts b/packages/pds/src/config/config.ts index 6652a2a29eb..1c39f65a81b 100644 --- a/packages/pds/src/config/config.ts +++ b/packages/pds/src/config/config.ts @@ -210,6 +210,16 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { } : { enabled: false } + const courierHttpVersion = env.courierHttpVersion ?? '2' + assert(courierHttpVersion === '1.1' || courierHttpVersion === '2') + const activatorCfg: ServerConfig['activator'] = { + courierUrl: env.courierUrl, + courierHttpVersion, + courierIgnoreBadTls: env.courierIgnoreBadTls, + courierApiKey: env.courierApiKey, + emailsPerDay: env.activatorEmailsPerDay, + } + const crawlersCfg: ServerConfig['crawlers'] = env.crawlers ?? [] return { @@ -226,6 +236,7 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { modService: modServiceCfg, redis: redisCfg, rateLimits: rateLimitsCfg, + activator: activatorCfg, crawlers: crawlersCfg, } } @@ -244,6 +255,7 @@ export type ServerConfig = { modService: ModServiceConfig redis: RedisScratchConfig | null rateLimits: RateLimitsConfig + activator: ActivatorConfig crawlers: string[] } @@ -353,6 +365,14 @@ export type RateLimitsConfig = } | { enabled: false } +export type ActivatorConfig = { + courierUrl?: string + courierHttpVersion?: '1.1' | '2' + courierIgnoreBadTls?: boolean + courierApiKey?: string + emailsPerDay?: number +} + export type BksyAppViewConfig = { url: string did: string diff --git a/packages/pds/src/config/env.ts b/packages/pds/src/config/env.ts index 1d332f9b8ff..b8d6cdc5b4b 100644 --- a/packages/pds/src/config/env.ts +++ b/packages/pds/src/config/env.ts @@ -86,6 +86,13 @@ export const readEnv = (): ServerEnvironment => { redisScratchAddress: envStr('PDS_REDIS_SCRATCH_ADDRESS'), redisScratchPassword: envStr('PDS_REDIS_SCRATCH_PASSWORD'), + // activator + courierUrl: envStr('PDS_COURIER_URL'), + courierHttpVersion: envStr('PDS_COURIER_HTTP_VERSION'), + courierIgnoreBadTls: envBool('PDS_COURIER_IGNORE_BAD_TLS'), + courierApiKey: envStr('PDS_COURIER_API_KEY'), + activatorEmailsPerDay: envInt('PDS_ACTIVATOR_EMAILS_PER_DAY'), + // crawlers crawlers: envList('PDS_CRAWLERS'), @@ -200,6 +207,13 @@ export type ServerEnvironment = { redisScratchAddress?: string redisScratchPassword?: string + // activator + courierUrl?: string + courierHttpVersion?: string + courierIgnoreBadTls?: boolean + courierApiKey?: string + activatorEmailsPerDay?: number + // crawler crawlers?: string[] diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 75224e3c83a..d5714de47f5 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -1,5 +1,6 @@ import * as nodemailer from 'nodemailer' import { Redis } from 'ioredis' +import { RateLimiterRedis } from 'rate-limiter-flexible' import * as plc from '@did-plc/lib' import * as crypto from '@atproto/crypto' import { IdResolver } from '@atproto/identity' @@ -25,6 +26,8 @@ import { TwilioClient } from './twilio' import assert from 'assert' import { SignupLimiter } from './signup-queue/limiter' import { SignupActivator } from './signup-queue/activator' +import { createCourierClient, authWithApiKey as courierAuth } from './courier' +import { DAY } from '@atproto/common' export type AppContextOptions = { db: Database @@ -231,7 +234,33 @@ export class AppContext { } const signupLimiter = new SignupLimiter(db) - const signupActivator = new SignupActivator(db) + const courierClient = cfg.activator.courierUrl + ? createCourierClient({ + baseUrl: cfg.activator.courierUrl, + httpVersion: cfg.activator.courierHttpVersion ?? '2', + nodeOptions: { + rejectUnauthorized: !cfg.activator.courierIgnoreBadTls, + }, + interceptors: cfg.activator.courierApiKey + ? [courierAuth(cfg.activator.courierApiKey)] + : [], + }) + : undefined + const limiter = + cfg.activator.emailsPerDay && redisScratch + ? new RateLimiterRedis({ + storeClient: redisScratch, + duration: DAY / 1000, + points: cfg.activator.emailsPerDay, + }) + : undefined + + const signupActivator = new SignupActivator({ + db, + mailer, + courierClient, + limiter, + }) const pdsAgents = new PdsAgents() diff --git a/packages/pds/src/signup-queue/activator.ts b/packages/pds/src/signup-queue/activator.ts index 99cd05c1e4e..94a98c57556 100644 --- a/packages/pds/src/signup-queue/activator.ts +++ b/packages/pds/src/signup-queue/activator.ts @@ -1,9 +1,13 @@ -import { SECOND, jitter, wait } from '@atproto/common' +import { RateLimiterAbstract } from 'rate-limiter-flexible' +import { SECOND, chunkArray, jitter, wait } from '@atproto/common' +import { DisconnectError } from '@atproto/xrpc-server' +import { Timestamp } from '@bufbuild/protobuf' import { limiterLogger as log } from '../logger' import Database from '../db' import { Leader } from '../db/leader' -import { DisconnectError } from '@atproto/xrpc-server' import { getQueueStatus } from './util' +import { ServerMailer } from '../mailer' +import { CourierClient } from '../courier' type LimiterFlags = { disableSignups: boolean @@ -17,16 +21,32 @@ type LimiterStatus = LimiterFlags & { export const ACCOUNT_ACTIVATOR_ID = 1010 +export type ActivatorOpts = { + db: Database + mailer?: ServerMailer + courierClient?: CourierClient + limiter?: RateLimiterAbstract +} + export class SignupActivator { leader: Leader + db: Database + mailer?: ServerMailer + courierClient?: CourierClient + limiter?: RateLimiterAbstract + destroyed = false promise: Promise = Promise.resolve() timer: NodeJS.Timer | undefined status: LimiterStatus - constructor(private db: Database, lockId = ACCOUNT_ACTIVATOR_ID) { - this.leader = new Leader(lockId, this.db) + constructor(opts: ActivatorOpts, lockId = ACCOUNT_ACTIVATOR_ID) { + this.leader = new Leader(lockId, opts.db) + this.db = opts.db + this.mailer = opts.mailer + this.courierClient = opts.courierClient + this.limiter = opts.limiter } async run() { @@ -99,6 +119,60 @@ export class SignupActivator { .execute() log.info({ count: activated.length }, 'activated accounts') - // @TODO send mail/push notifs + + const dids = activated.map((row) => row.did) + await Promise.all([ + this.sendActivationEmails(dids), + this.sendActivationPushNotifs(dids), + ]) + } + + async sendActivationEmails(dids: string[]) { + if (dids.length < 1 || !this.mailer) return + const users = await this.db.db + .selectFrom('user_account') + .innerJoin('did_handle', 'did_handle.did', 'user_account.did') + .where('did_handle.did', 'in', dids) + .select(['user_account.email', 'did_handle.handle']) + .execute() + for (const chunk of chunkArray(users, 100)) { + try { + await this.limiter?.consume('server-mailer-limit', chunk.length) + } catch (err) { + log.error({ err }, 'user activation email rate limit exceeded') + } + try { + await Promise.all( + chunk.map(({ email, handle }) => + this.mailer?.sendAccountActivated({ handle }, { to: email }), + ), + ) + } catch (err) { + log.error({ err, dids: chunk }, 'error sending activation emails') + } + await wait(SECOND) + } + } + + async sendActivationPushNotifs(dids: string[]) { + if (dids.length < 1 || !this.courierClient) return + for (const chunk of chunkArray(dids, 100)) { + const notifications = chunk.map((did) => ({ + id: `${did}-account-activated`, + recipientDid: did, + title: 'Great news!', + message: 'Your Bluesky account is ready to go', + collapseKey: 'account-activated', + alwaysDeliver: true, + timestamp: Timestamp.fromDate(new Date()), + })) + try { + await this.courierClient.pushNotifications({ + notifications, + }) + } catch (err) { + log.error({ err, dids: chunk }, 'error sending activation push notifs') + } + } } } diff --git a/packages/pds/src/signup-queue/notifier.ts b/packages/pds/src/signup-queue/notifier.ts deleted file mode 100644 index 19ae94e2e98..00000000000 --- a/packages/pds/src/signup-queue/notifier.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Timestamp } from '@bufbuild/protobuf' -import { CourierClient } from '../courier' -import { ServerMailer } from '../mailer' - -export class ActivationNotifier { - constructor( - public mailer: ServerMailer, - public courierClient: CourierClient, - ) {} - - async sendEmail(email: string, handle: string) { - await this.mailer.sendAccountActivated({ handle }, { to: email }) - } - - async sendPushNotif(did: string) { - await this.courierClient.pushNotifications({ - notifications: [ - { - id: `${did}-account-activated`, - recipientDid: did, - title: 'Great news!', - message: 'Your Bluesky account is ready to go', - collapseKey: 'account-activated', - alwaysDeliver: true, - timestamp: Timestamp.fromDate(new Date()), - }, - ], - }) - } -} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c6680c575a1..4d81c9ac3ef 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -579,6 +579,9 @@ importers: pino-http: specifier: ^8.2.1 version: 8.2.1 + rate-limiter-flexible: + specifier: ^2.4.1 + version: 2.4.1 sharp: specifier: ^0.32.6 version: 0.32.6