diff --git a/.github/workflows/build-and-push-pds-aws.yaml b/.github/workflows/build-and-push-pds-aws.yaml index 959dbfd7e94..08a8b5c23d2 100644 --- a/.github/workflows/build-and-push-pds-aws.yaml +++ b/.github/workflows/build-and-push-pds-aws.yaml @@ -3,7 +3,7 @@ on: push: branches: - main - - signup-queueing-take2 + - signup-queue-notify env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/packages/pds/buf.gen.yaml b/packages/pds/buf.gen.yaml new file mode 100644 index 00000000000..a81e4248719 --- /dev/null +++ b/packages/pds/buf.gen.yaml @@ -0,0 +1,12 @@ +version: v1 +plugins: + - plugin: es + opt: + - target=ts + - import_extension=.ts + out: src/proto + - plugin: connect-es + opt: + - target=ts + - import_extension=.ts + out: src/proto diff --git a/packages/pds/package.json b/packages/pds/package.json index e575d094400..146efc228d5 100644 --- a/packages/pds/package.json +++ b/packages/pds/package.json @@ -29,7 +29,8 @@ "test:sqlite": "jest --testPathIgnorePatterns /tests/proxied/*", "test:log": "tail -50 test.log | pino-pretty", "test:updateSnapshot": "jest --updateSnapshot", - "migration:create": "ts-node ./bin/migration-create.ts" + "migration:create": "ts-node ./bin/migration-create.ts", + "buf:gen": "buf generate ./proto" }, "dependencies": { "@atproto/api": "workspace:^", @@ -42,6 +43,9 @@ "@atproto/syntax": "workspace:^", "@atproto/xrpc": "workspace:^", "@atproto/xrpc-server": "workspace:^", + "@bufbuild/protobuf": "^1.5.0", + "@connectrpc/connect": "^1.1.4", + "@connectrpc/connect-node": "^1.1.4", "@did-plc/lib": "^0.0.1", "better-sqlite3": "^7.6.2", "bytes": "^3.1.2", @@ -66,6 +70,7 @@ "pg": "^8.10.0", "pino": "^8.15.0", "pino-http": "^8.2.1", + "rate-limiter-flexible": "^2.4.1", "sharp": "^0.32.6", "twilio": "^4.20.1", "typed-emitter": "^2.1.0", @@ -77,6 +82,9 @@ "@atproto/bsky": "workspace:^", "@atproto/dev-env": "workspace:^", "@atproto/lex-cli": "workspace:^", + "@bufbuild/buf": "^1.28.1", + "@bufbuild/protoc-gen-es": "^1.5.0", + "@connectrpc/protoc-gen-connect-es": "^1.1.4", "@did-plc/server": "^0.0.1", "@types/cors": "^2.8.12", "@types/disposable-email": "^0.2.0", diff --git a/packages/pds/proto/courier.proto b/packages/pds/proto/courier.proto new file mode 100644 index 00000000000..7e46d4d652f --- /dev/null +++ b/packages/pds/proto/courier.proto @@ -0,0 +1,56 @@ +syntax = "proto3"; + +package courier; +option go_package = "./;courier"; + +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; + +// +// Messages +// + +// Ping +message PingRequest {} +message PingResponse {} + +// Notifications + +enum AppPlatform { + APP_PLATFORM_UNSPECIFIED = 0; + APP_PLATFORM_IOS = 1; + APP_PLATFORM_ANDROID = 2; + APP_PLATFORM_WEB = 3; +} + +message Notification { + string id = 1; + string recipient_did = 2; + string title = 3; + string message = 4; + string collapse_key = 5; + bool always_deliver = 6; + google.protobuf.Timestamp timestamp = 7; + google.protobuf.Struct additional = 8; +} + +message PushNotificationsRequest { + repeated Notification notifications = 1; +} + +message PushNotificationsResponse {} + +message RegisterDeviceTokenRequest { + string did = 1; + string token = 2; + string app_id = 3; + AppPlatform platform = 4; +} + +message RegisterDeviceTokenResponse {} + +service Service { + rpc Ping(PingRequest) returns (PingResponse); + rpc PushNotifications(PushNotificationsRequest) returns (PushNotificationsResponse); + rpc RegisterDeviceToken(RegisterDeviceTokenRequest) returns (RegisterDeviceTokenResponse); +} \ No newline at end of file diff --git a/packages/pds/src/config/config.ts b/packages/pds/src/config/config.ts index a96462e05ab..161b5e1abec 100644 --- a/packages/pds/src/config/config.ts +++ b/packages/pds/src/config/config.ts @@ -211,6 +211,16 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { } : { enabled: false } + const courierHttpVersion = env.courierHttpVersion ?? '2' + assert(courierHttpVersion === '1.1' || courierHttpVersion === '2') + const activatorCfg: ServerConfig['activator'] = { + courierUrl: env.courierUrl, + courierHttpVersion, + courierIgnoreBadTls: env.courierIgnoreBadTls, + courierApiKey: env.courierApiKey, + emailsPerDay: env.activatorEmailsPerDay, + } + const crawlersCfg: ServerConfig['crawlers'] = env.crawlers ?? [] return { @@ -227,6 +237,7 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { modService: modServiceCfg, redis: redisCfg, rateLimits: rateLimitsCfg, + activator: activatorCfg, crawlers: crawlersCfg, } } @@ -245,6 +256,7 @@ export type ServerConfig = { modService: ModServiceConfig redis: RedisScratchConfig | null rateLimits: RateLimitsConfig + activator: ActivatorConfig crawlers: string[] } @@ -355,6 +367,14 @@ export type RateLimitsConfig = } | { enabled: false } +export type ActivatorConfig = { + courierUrl?: string + courierHttpVersion?: '1.1' | '2' + courierIgnoreBadTls?: boolean + courierApiKey?: string + emailsPerDay?: number +} + export type BksyAppViewConfig = { url: string did: string diff --git a/packages/pds/src/config/env.ts b/packages/pds/src/config/env.ts index 324d74889db..0156bf3445f 100644 --- a/packages/pds/src/config/env.ts +++ b/packages/pds/src/config/env.ts @@ -87,6 +87,13 @@ export const readEnv = (): ServerEnvironment => { redisScratchAddress: envStr('PDS_REDIS_SCRATCH_ADDRESS'), redisScratchPassword: envStr('PDS_REDIS_SCRATCH_PASSWORD'), + // activator + courierUrl: envStr('PDS_COURIER_URL'), + courierHttpVersion: envStr('PDS_COURIER_HTTP_VERSION'), + courierIgnoreBadTls: envBool('PDS_COURIER_IGNORE_BAD_TLS'), + courierApiKey: envStr('PDS_COURIER_API_KEY'), + activatorEmailsPerDay: envInt('PDS_ACTIVATOR_EMAILS_PER_DAY'), + // crawlers crawlers: envList('PDS_CRAWLERS'), @@ -202,6 +209,13 @@ export type ServerEnvironment = { redisScratchAddress?: string redisScratchPassword?: string + // activator + courierUrl?: string + courierHttpVersion?: string + courierIgnoreBadTls?: boolean + courierApiKey?: string + activatorEmailsPerDay?: number + // crawler crawlers?: string[] diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 75224e3c83a..d5714de47f5 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -1,5 +1,6 @@ import * as nodemailer from 'nodemailer' import { Redis } from 'ioredis' +import { RateLimiterRedis } from 'rate-limiter-flexible' import * as plc from '@did-plc/lib' import * as crypto from '@atproto/crypto' import { IdResolver } from '@atproto/identity' @@ -25,6 +26,8 @@ import { TwilioClient } from './twilio' import assert from 'assert' import { SignupLimiter } from './signup-queue/limiter' import { SignupActivator } from './signup-queue/activator' +import { createCourierClient, authWithApiKey as courierAuth } from './courier' +import { DAY } from '@atproto/common' export type AppContextOptions = { db: Database @@ -231,7 +234,33 @@ export class AppContext { } const signupLimiter = new SignupLimiter(db) - const signupActivator = new SignupActivator(db) + const courierClient = cfg.activator.courierUrl + ? createCourierClient({ + baseUrl: cfg.activator.courierUrl, + httpVersion: cfg.activator.courierHttpVersion ?? '2', + nodeOptions: { + rejectUnauthorized: !cfg.activator.courierIgnoreBadTls, + }, + interceptors: cfg.activator.courierApiKey + ? [courierAuth(cfg.activator.courierApiKey)] + : [], + }) + : undefined + const limiter = + cfg.activator.emailsPerDay && redisScratch + ? new RateLimiterRedis({ + storeClient: redisScratch, + duration: DAY / 1000, + points: cfg.activator.emailsPerDay, + }) + : undefined + + const signupActivator = new SignupActivator({ + db, + mailer, + courierClient, + limiter, + }) const pdsAgents = new PdsAgents() diff --git a/packages/pds/src/courier.ts b/packages/pds/src/courier.ts new file mode 100644 index 00000000000..aeb095898f6 --- /dev/null +++ b/packages/pds/src/courier.ts @@ -0,0 +1,41 @@ +import { Service } from './proto/courier_connect' +import { + Code, + ConnectError, + PromiseClient, + createPromiseClient, + Interceptor, +} from '@connectrpc/connect' +import { + createConnectTransport, + ConnectTransportOptions, +} from '@connectrpc/connect-node' + +export type CourierClient = PromiseClient + +export const createCourierClient = ( + opts: ConnectTransportOptions, +): CourierClient => { + const transport = createConnectTransport(opts) + return createPromiseClient(Service, transport) +} + +export { Code } + +export const isCourierError = ( + err: unknown, + code?: Code, +): err is ConnectError => { + if (err instanceof ConnectError) { + return !code || err.code === code + } + return false +} + +export const authWithApiKey = + (apiKey: string): Interceptor => + (next) => + (req) => { + req.header.set('authorization', `Bearer ${apiKey}`) + return next(req) + } diff --git a/packages/pds/src/mailer/index.ts b/packages/pds/src/mailer/index.ts index 92ce8a88c83..3840d1789af 100644 --- a/packages/pds/src/mailer/index.ts +++ b/packages/pds/src/mailer/index.ts @@ -26,6 +26,7 @@ export class ServerMailer { deleteAccount: this.compile('delete-account'), confirmEmail: this.compile('confirm-email'), updateEmail: this.compile('update-email'), + accountActivated: this.compile('account-activated'), } } @@ -65,6 +66,16 @@ export class ServerMailer { }) } + async sendAccountActivated( + params: { handle: string }, + mailOpts: Mail.Options, + ) { + return this.sendTemplate('accountActivated', params, { + subject: 'Your Bluesky Account is Activated!', + ...mailOpts, + }) + } + private async sendTemplate(templateName, params, mailOpts: Mail.Options) { const html = this.templates[templateName]({ ...params, diff --git a/packages/pds/src/mailer/templates/account-activated.hbs b/packages/pds/src/mailer/templates/account-activated.hbs new file mode 100644 index 00000000000..a037d6e5d01 --- /dev/null +++ b/packages/pds/src/mailer/templates/account-activated.hbs @@ -0,0 +1,298 @@ + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/packages/pds/src/proto/courier_connect.ts b/packages/pds/src/proto/courier_connect.ts new file mode 100644 index 00000000000..04d482e0788 --- /dev/null +++ b/packages/pds/src/proto/courier_connect.ts @@ -0,0 +1,50 @@ +// @generated by protoc-gen-connect-es v1.3.0 with parameter "target=ts,import_extension=.ts" +// @generated from file courier.proto (package courier, syntax proto3) +/* eslint-disable */ +// @ts-nocheck + +import { + PingRequest, + PingResponse, + PushNotificationsRequest, + PushNotificationsResponse, + RegisterDeviceTokenRequest, + RegisterDeviceTokenResponse, +} from './courier_pb.ts' +import { MethodKind } from '@bufbuild/protobuf' + +/** + * @generated from service courier.Service + */ +export const Service = { + typeName: 'courier.Service', + methods: { + /** + * @generated from rpc courier.Service.Ping + */ + ping: { + name: 'Ping', + I: PingRequest, + O: PingResponse, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc courier.Service.PushNotifications + */ + pushNotifications: { + name: 'PushNotifications', + I: PushNotificationsRequest, + O: PushNotificationsResponse, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc courier.Service.RegisterDeviceToken + */ + registerDeviceToken: { + name: 'RegisterDeviceToken', + I: RegisterDeviceTokenRequest, + O: RegisterDeviceTokenResponse, + kind: MethodKind.Unary, + }, + }, +} as const diff --git a/packages/pds/src/proto/courier_pb.ts b/packages/pds/src/proto/courier_pb.ts new file mode 100644 index 00000000000..2e88dd630bc --- /dev/null +++ b/packages/pds/src/proto/courier_pb.ts @@ -0,0 +1,473 @@ +// @generated by protoc-gen-es v1.7.1 with parameter "target=ts,import_extension=.ts" +// @generated from file courier.proto (package courier, syntax proto3) +/* eslint-disable */ +// @ts-nocheck + +import type { + BinaryReadOptions, + FieldList, + JsonReadOptions, + JsonValue, + PartialMessage, + PlainMessage, +} from '@bufbuild/protobuf' +import { Message, proto3, Struct, Timestamp } from '@bufbuild/protobuf' + +/** + * @generated from enum courier.AppPlatform + */ +export enum AppPlatform { + /** + * @generated from enum value: APP_PLATFORM_UNSPECIFIED = 0; + */ + UNSPECIFIED = 0, + + /** + * @generated from enum value: APP_PLATFORM_IOS = 1; + */ + IOS = 1, + + /** + * @generated from enum value: APP_PLATFORM_ANDROID = 2; + */ + ANDROID = 2, + + /** + * @generated from enum value: APP_PLATFORM_WEB = 3; + */ + WEB = 3, +} +// Retrieve enum metadata with: proto3.getEnumType(AppPlatform) +proto3.util.setEnumType(AppPlatform, 'courier.AppPlatform', [ + { no: 0, name: 'APP_PLATFORM_UNSPECIFIED' }, + { no: 1, name: 'APP_PLATFORM_IOS' }, + { no: 2, name: 'APP_PLATFORM_ANDROID' }, + { no: 3, name: 'APP_PLATFORM_WEB' }, +]) + +/** + * Ping + * + * @generated from message courier.PingRequest + */ +export class PingRequest extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.PingRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): PingRequest { + return new PingRequest().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): PingRequest { + return new PingRequest().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): PingRequest { + return new PingRequest().fromJsonString(jsonString, options) + } + + static equals( + a: PingRequest | PlainMessage | undefined, + b: PingRequest | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(PingRequest, a, b) + } +} + +/** + * @generated from message courier.PingResponse + */ +export class PingResponse extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.PingResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): PingResponse { + return new PingResponse().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): PingResponse { + return new PingResponse().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): PingResponse { + return new PingResponse().fromJsonString(jsonString, options) + } + + static equals( + a: PingResponse | PlainMessage | undefined, + b: PingResponse | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(PingResponse, a, b) + } +} + +/** + * @generated from message courier.Notification + */ +export class Notification extends Message { + /** + * @generated from field: string id = 1; + */ + id = '' + + /** + * @generated from field: string recipient_did = 2; + */ + recipientDid = '' + + /** + * @generated from field: string title = 3; + */ + title = '' + + /** + * @generated from field: string message = 4; + */ + message = '' + + /** + * @generated from field: string collapse_key = 5; + */ + collapseKey = '' + + /** + * @generated from field: bool always_deliver = 6; + */ + alwaysDeliver = false + + /** + * @generated from field: google.protobuf.Timestamp timestamp = 7; + */ + timestamp?: Timestamp + + /** + * @generated from field: google.protobuf.Struct additional = 8; + */ + additional?: Struct + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.Notification' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: 'id', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { + no: 2, + name: 'recipient_did', + kind: 'scalar', + T: 9 /* ScalarType.STRING */, + }, + { no: 3, name: 'title', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { no: 4, name: 'message', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { + no: 5, + name: 'collapse_key', + kind: 'scalar', + T: 9 /* ScalarType.STRING */, + }, + { + no: 6, + name: 'always_deliver', + kind: 'scalar', + T: 8 /* ScalarType.BOOL */, + }, + { no: 7, name: 'timestamp', kind: 'message', T: Timestamp }, + { no: 8, name: 'additional', kind: 'message', T: Struct }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): Notification { + return new Notification().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): Notification { + return new Notification().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): Notification { + return new Notification().fromJsonString(jsonString, options) + } + + static equals( + a: Notification | PlainMessage | undefined, + b: Notification | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(Notification, a, b) + } +} + +/** + * @generated from message courier.PushNotificationsRequest + */ +export class PushNotificationsRequest extends Message { + /** + * @generated from field: repeated courier.Notification notifications = 1; + */ + notifications: Notification[] = [] + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.PushNotificationsRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { + no: 1, + name: 'notifications', + kind: 'message', + T: Notification, + repeated: true, + }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): PushNotificationsRequest { + return new PushNotificationsRequest().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): PushNotificationsRequest { + return new PushNotificationsRequest().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): PushNotificationsRequest { + return new PushNotificationsRequest().fromJsonString(jsonString, options) + } + + static equals( + a: + | PushNotificationsRequest + | PlainMessage + | undefined, + b: + | PushNotificationsRequest + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(PushNotificationsRequest, a, b) + } +} + +/** + * @generated from message courier.PushNotificationsResponse + */ +export class PushNotificationsResponse extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.PushNotificationsResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): PushNotificationsResponse { + return new PushNotificationsResponse().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): PushNotificationsResponse { + return new PushNotificationsResponse().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): PushNotificationsResponse { + return new PushNotificationsResponse().fromJsonString(jsonString, options) + } + + static equals( + a: + | PushNotificationsResponse + | PlainMessage + | undefined, + b: + | PushNotificationsResponse + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(PushNotificationsResponse, a, b) + } +} + +/** + * @generated from message courier.RegisterDeviceTokenRequest + */ +export class RegisterDeviceTokenRequest extends Message { + /** + * @generated from field: string did = 1; + */ + did = '' + + /** + * @generated from field: string token = 2; + */ + token = '' + + /** + * @generated from field: string app_id = 3; + */ + appId = '' + + /** + * @generated from field: courier.AppPlatform platform = 4; + */ + platform = AppPlatform.UNSPECIFIED + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.RegisterDeviceTokenRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: 'did', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { no: 2, name: 'token', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { no: 3, name: 'app_id', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { + no: 4, + name: 'platform', + kind: 'enum', + T: proto3.getEnumType(AppPlatform), + }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): RegisterDeviceTokenRequest { + return new RegisterDeviceTokenRequest().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): RegisterDeviceTokenRequest { + return new RegisterDeviceTokenRequest().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): RegisterDeviceTokenRequest { + return new RegisterDeviceTokenRequest().fromJsonString(jsonString, options) + } + + static equals( + a: + | RegisterDeviceTokenRequest + | PlainMessage + | undefined, + b: + | RegisterDeviceTokenRequest + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(RegisterDeviceTokenRequest, a, b) + } +} + +/** + * @generated from message courier.RegisterDeviceTokenResponse + */ +export class RegisterDeviceTokenResponse extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'courier.RegisterDeviceTokenResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): RegisterDeviceTokenResponse { + return new RegisterDeviceTokenResponse().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): RegisterDeviceTokenResponse { + return new RegisterDeviceTokenResponse().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): RegisterDeviceTokenResponse { + return new RegisterDeviceTokenResponse().fromJsonString(jsonString, options) + } + + static equals( + a: + | RegisterDeviceTokenResponse + | PlainMessage + | undefined, + b: + | RegisterDeviceTokenResponse + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(RegisterDeviceTokenResponse, a, b) + } +} diff --git a/packages/pds/src/signup-queue/activator.ts b/packages/pds/src/signup-queue/activator.ts index 71842243725..34111e947a0 100644 --- a/packages/pds/src/signup-queue/activator.ts +++ b/packages/pds/src/signup-queue/activator.ts @@ -1,9 +1,13 @@ -import { SECOND, jitter, wait } from '@atproto/common' +import { RateLimiterAbstract } from 'rate-limiter-flexible' +import { SECOND, chunkArray, jitter, wait } from '@atproto/common' +import { DisconnectError } from '@atproto/xrpc-server' +import { Timestamp } from '@bufbuild/protobuf' import { limiterLogger as log } from '../logger' import Database from '../db' import { Leader } from '../db/leader' -import { DisconnectError } from '@atproto/xrpc-server' import { getQueueStatus } from './util' +import { ServerMailer } from '../mailer' +import { CourierClient } from '../courier' type LimiterFlags = { disableSignups: boolean @@ -17,16 +21,32 @@ type LimiterStatus = LimiterFlags & { export const ACCOUNT_ACTIVATOR_ID = 1010 +export type ActivatorOpts = { + db: Database + mailer?: ServerMailer + courierClient?: CourierClient + limiter?: RateLimiterAbstract +} + export class SignupActivator { leader: Leader + db: Database + mailer?: ServerMailer + courierClient?: CourierClient + limiter?: RateLimiterAbstract + destroyed = false promise: Promise = Promise.resolve() timer: NodeJS.Timer | undefined status: LimiterStatus - constructor(private db: Database, lockId = ACCOUNT_ACTIVATOR_ID) { - this.leader = new Leader(lockId, this.db) + constructor(opts: ActivatorOpts, lockId = ACCOUNT_ACTIVATOR_ID) { + this.leader = new Leader(lockId, opts.db) + this.db = opts.db + this.mailer = opts.mailer + this.courierClient = opts.courierClient + this.limiter = opts.limiter } async run() { @@ -101,6 +121,60 @@ export class SignupActivator { .execute() log.info({ count: activated.length }, 'activated accounts') - // @TODO send mail/push notifs + + const dids = activated.map((row) => row.did) + await Promise.all([ + this.sendActivationEmails(dids), + this.sendActivationPushNotifs(dids), + ]) + } + + async sendActivationEmails(dids: string[]) { + if (dids.length < 1 || !this.mailer) return + const users = await this.db.db + .selectFrom('user_account') + .innerJoin('did_handle', 'did_handle.did', 'user_account.did') + .where('did_handle.did', 'in', dids) + .select(['user_account.email', 'did_handle.handle']) + .execute() + for (const chunk of chunkArray(users, 100)) { + try { + await this.limiter?.consume('server-mailer-limit', chunk.length) + } catch (err) { + log.error({ err }, 'user activation email rate limit exceeded') + } + try { + await Promise.all( + chunk.map(({ email, handle }) => + this.mailer?.sendAccountActivated({ handle }, { to: email }), + ), + ) + } catch (err) { + log.error({ err, dids: chunk }, 'error sending activation emails') + } + await wait(SECOND) + } + } + + async sendActivationPushNotifs(dids: string[]) { + if (dids.length < 1 || !this.courierClient) return + for (const chunk of chunkArray(dids, 100)) { + const notifications = chunk.map((did) => ({ + id: `${did}-account-activated`, + recipientDid: did, + title: 'Great news!', + message: 'Your Bluesky account is ready to go', + collapseKey: 'account-activated', + alwaysDeliver: true, + timestamp: Timestamp.fromDate(new Date()), + })) + try { + await this.courierClient.pushNotifications({ + notifications, + }) + } catch (err) { + log.error({ err, dids: chunk }, 'error sending activation push notifs') + } + } } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index dbd04f5c644..4d81c9ac3ef 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -498,6 +498,15 @@ importers: '@atproto/xrpc-server': specifier: workspace:^ version: link:../xrpc-server + '@bufbuild/protobuf': + specifier: ^1.5.0 + version: 1.7.1 + '@connectrpc/connect': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protobuf@1.7.1) + '@connectrpc/connect-node': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protobuf@1.7.1)(@connectrpc/connect@1.3.0) '@did-plc/lib': specifier: ^0.0.1 version: 0.0.1 @@ -570,6 +579,9 @@ importers: pino-http: specifier: ^8.2.1 version: 8.2.1 + rate-limiter-flexible: + specifier: ^2.4.1 + version: 2.4.1 sharp: specifier: ^0.32.6 version: 0.32.6 @@ -595,6 +607,15 @@ importers: '@atproto/lex-cli': specifier: workspace:^ version: link:../lex-cli + '@bufbuild/buf': + specifier: ^1.28.1 + version: 1.29.0 + '@bufbuild/protoc-gen-es': + specifier: ^1.5.0 + version: 1.7.1(@bufbuild/protobuf@1.7.1) + '@connectrpc/protoc-gen-connect-es': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protoc-gen-es@1.7.1)(@connectrpc/connect@1.3.0) '@did-plc/server': specifier: ^0.0.1 version: 0.0.1 @@ -4266,6 +4287,103 @@ packages: resolution: {integrity: sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==} dev: true + /@bufbuild/buf-darwin-arm64@1.29.0: + resolution: {integrity: sha512-5hKxsARoY2WpWq1n5ONFqqGuauHb4yILKXCy37KRYCKiRLWmIP5yI3gWvWHKoH7sUJWTQmBqdJoCvYQr6ahQnw==} + engines: {node: '>=12'} + cpu: [arm64] + os: [darwin] + requiresBuild: true + dev: true + optional: true + + /@bufbuild/buf-darwin-x64@1.29.0: + resolution: {integrity: sha512-wOAPxbPLBns4AHiComWtdO1sx1J1p6mDYTbqmloHuI+B5U2rDbMsoHoe4nBcoMF8+RHxoqjypha29wVo6yzbZg==} + engines: {node: '>=12'} + cpu: [x64] + os: [darwin] + requiresBuild: true + dev: true + optional: true + + /@bufbuild/buf-linux-aarch64@1.29.0: + resolution: {integrity: sha512-jLk2J/wyyM7KNJ/DkLfhy3eS2/Bdb70e/56adMkapSoLJmghnpgxW+oFznMxxQUX5I9BU5hTn1UhDFxgLwhP7g==} + engines: {node: '>=12'} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: true + optional: true + + /@bufbuild/buf-linux-x64@1.29.0: + resolution: {integrity: sha512-heLOywj3Oaoh69RnTx7tHsuz6rEnvz77bghLEOghsrjBR6Jcpcwc137EZR4kRTIWJNrE8Kmo3RVeXlv144qQIQ==} + engines: {node: '>=12'} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: true + optional: true + + /@bufbuild/buf-win32-arm64@1.29.0: + resolution: {integrity: sha512-Eglyvr3PLqVucuHBcQ61conyBgH9BRaoLpKWcce1gYBVlxMQM1NxjVjGOWihxQ1dXXw5qZXmYfVODf3gSwPMuQ==} + engines: {node: '>=12'} + cpu: [arm64] + os: [win32] + requiresBuild: true + dev: true + optional: true + + /@bufbuild/buf-win32-x64@1.29.0: + resolution: {integrity: sha512-wRk6co+nqHqEq4iLolXgej0jUVlWlTtGHjKaq54lTbKZrwxrBgql6qS06abgNPRASX0++XT9m3QRZ97qEIC/HQ==} + engines: {node: '>=12'} + cpu: [x64] + os: [win32] + requiresBuild: true + dev: true + optional: true + + /@bufbuild/buf@1.29.0: + resolution: {integrity: sha512-euksXeFtvlvAV5j94LqXb69qQcJvFfo8vN1d3cx+IzhOKoipykuQQTq7mOWVo2R0kdk6yIMBLBofOYOsh0Df8g==} + engines: {node: '>=12'} + hasBin: true + requiresBuild: true + optionalDependencies: + '@bufbuild/buf-darwin-arm64': 1.29.0 + '@bufbuild/buf-darwin-x64': 1.29.0 + '@bufbuild/buf-linux-aarch64': 1.29.0 + '@bufbuild/buf-linux-x64': 1.29.0 + '@bufbuild/buf-win32-arm64': 1.29.0 + '@bufbuild/buf-win32-x64': 1.29.0 + dev: true + + /@bufbuild/protobuf@1.7.1: + resolution: {integrity: sha512-UlI3lKLFBjZQJ0cHf47YUH6DzZxZYWk3sf6dKYyPUaXrfXq4z+zZqNO3q0lPUzyJgh14s6VscjcNFBaQBhYd9Q==} + + /@bufbuild/protoc-gen-es@1.7.1(@bufbuild/protobuf@1.7.1): + resolution: {integrity: sha512-N1diiVcDkTTNX+b9rDY8EVgOXu0W8kRmf2w3nbYi8q/hfM6vBg4zry0m4v3ARSgKp60bCey1WUDBuiynm5+PqQ==} + engines: {node: '>=14'} + hasBin: true + peerDependencies: + '@bufbuild/protobuf': 1.7.1 + peerDependenciesMeta: + '@bufbuild/protobuf': + optional: true + dependencies: + '@bufbuild/protobuf': 1.7.1 + '@bufbuild/protoplugin': 1.7.1 + transitivePeerDependencies: + - supports-color + dev: true + + /@bufbuild/protoplugin@1.7.1: + resolution: {integrity: sha512-bnPFXs38IXjL2EdpkthkCa/+SXOxERnXyV///rQj1wyidJmw21wOvqpucuIh25YnPtdrUItcIFFDVCoKPkuCPQ==} + dependencies: + '@bufbuild/protobuf': 1.7.1 + '@typescript/vfs': 1.5.0 + typescript: 4.5.2 + transitivePeerDependencies: + - supports-color + dev: true + /@cbor-extract/cbor-extract-darwin-arm64@2.1.1: resolution: {integrity: sha512-blVBy5MXz6m36Vx0DfLd7PChOQKEs8lK2bD1WJn/vVgG4FXZiZmZb2GECHFvVPA5T7OnODd9xZiL3nMCv6QUhA==} cpu: [arm64] @@ -4517,6 +4635,46 @@ packages: prettier: 2.7.1 dev: true + /@connectrpc/connect-node@1.3.0(@bufbuild/protobuf@1.7.1)(@connectrpc/connect@1.3.0): + resolution: {integrity: sha512-2fV/z/8MUFOkTn2Gbm7T/qvRfkpt/D/w7ykYqACZRH6VNG/faY4lH2wUZiNkwv9tzTrECKOJFyPsaGs3nRYK3w==} + engines: {node: '>=16.0.0'} + peerDependencies: + '@bufbuild/protobuf': ^1.4.2 + '@connectrpc/connect': 1.3.0 + dependencies: + '@bufbuild/protobuf': 1.7.1 + '@connectrpc/connect': 1.3.0(@bufbuild/protobuf@1.7.1) + undici: 5.28.2 + dev: false + + /@connectrpc/connect@1.3.0(@bufbuild/protobuf@1.7.1): + resolution: {integrity: sha512-kTeWxJnLLtxKc2ZSDN0rIBgwfP8RwcLknthX4AKlIAmN9ZC4gGnCbwp+3BKcP/WH5c8zGBAWqSY3zeqCM+ah7w==} + peerDependencies: + '@bufbuild/protobuf': ^1.4.2 + dependencies: + '@bufbuild/protobuf': 1.7.1 + + /@connectrpc/protoc-gen-connect-es@1.3.0(@bufbuild/protoc-gen-es@1.7.1)(@connectrpc/connect@1.3.0): + resolution: {integrity: sha512-UbQN48c0zafo5EFSsh3POIJP6ofYiAgKE1aFOZ2Er4W3flUYihydZdM6TQauPkn7jDj4w9jjLSTTZ9//ecUbPA==} + engines: {node: '>=16.0.0'} + hasBin: true + peerDependencies: + '@bufbuild/protoc-gen-es': ^1.6.0 + '@connectrpc/connect': 1.3.0 + peerDependenciesMeta: + '@bufbuild/protoc-gen-es': + optional: true + '@connectrpc/connect': + optional: true + dependencies: + '@bufbuild/protobuf': 1.7.1 + '@bufbuild/protoc-gen-es': 1.7.1(@bufbuild/protobuf@1.7.1) + '@bufbuild/protoplugin': 1.7.1 + '@connectrpc/connect': 1.3.0(@bufbuild/protobuf@1.7.1) + transitivePeerDependencies: + - supports-color + dev: true + /@cspotcode/source-map-support@0.8.1: resolution: {integrity: sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==} engines: {node: '>=12'} @@ -4623,6 +4781,11 @@ packages: - supports-color dev: true + /@fastify/busboy@2.1.0: + resolution: {integrity: sha512-+KpH+QxZU7O4675t3mnkQKcZZg56u+K/Ct2K+N2AZYNVK8kyeo/bI18tI8aPm3tvNNRyTWfj6s5tnGNlcbQRsA==} + engines: {node: '>=14'} + dev: false + /@fastify/deepmerge@1.3.0: resolution: {integrity: sha512-J8TOSBq3SoZbDhM9+R/u77hP93gz/rajSA+K2kGyijPpORPWUXHUpTaleoj+92As0S9uPRP7Oi8IqMf0u+ro6A==} @@ -5609,6 +5772,14 @@ packages: eslint-visitor-keys: 3.4.3 dev: true + /@typescript/vfs@1.5.0: + resolution: {integrity: sha512-AJS307bPgbsZZ9ggCT3wwpg3VbTKMFNHfaY/uF0ahSkYYrPF2dSSKDNIDIQAHm9qJqbLvCsSJH7yN4Vs/CsMMg==} + dependencies: + debug: 4.3.4 + transitivePeerDependencies: + - supports-color + dev: true + /abbrev@1.1.1: resolution: {integrity: sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==} dev: true @@ -11027,6 +11198,12 @@ packages: rxjs: 7.8.1 dev: false + /typescript@4.5.2: + resolution: {integrity: sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw==} + engines: {node: '>=4.2.0'} + hasBin: true + dev: true + /typescript@4.8.4: resolution: {integrity: sha512-QCh+85mCy+h0IGff8r5XWzOVSbBO+KfeYrMQh7NJ58QujwcE22u+NUSmUxqF+un70P9GXKxa2HCNiTTMJknyjQ==} engines: {node: '>=4.2.0'} @@ -11061,6 +11238,13 @@ packages: which-boxed-primitive: 1.0.2 dev: true + /undici@5.28.2: + resolution: {integrity: sha512-wh1pHJHnUeQV5Xa8/kyQhO7WFa8M34l026L5P/+2TYiakvGy5Rdc8jWZVyG7ieht/0WgJLEd3kcU5gKx+6GC8w==} + engines: {node: '>=14.0'} + dependencies: + '@fastify/busboy': 2.1.0 + dev: false + /unicode-canonical-property-names-ecmascript@2.0.0: resolution: {integrity: sha512-yY5PpDlfVIU5+y/BSCxAJRBIS1Zc2dDG3Ujq+sR0U+JjUevW2JhocOF+soROYDSaAezOzOKuyyixhD6mBknSmQ==} engines: {node: '>=4'}