From ba06cec71f5021d6ec6abd94598db9b94fcd71ff Mon Sep 17 00:00:00 2001 From: rafael Date: Thu, 5 Dec 2024 01:18:32 -0300 Subject: [PATCH 1/5] RevenueCat sync in bsync --- packages/bsync/src/config.ts | 32 +++ packages/bsync/src/context.ts | 16 +- .../db/migrations/20241205T030533572Z-subs.ts | 24 ++ packages/bsync/src/db/migrations/index.ts | 1 + packages/bsync/src/db/schema/index.ts | 6 +- packages/bsync/src/db/schema/subs_item.ts | 14 + packages/bsync/src/db/schema/subs_op.ts | 17 ++ packages/bsync/src/index.ts | 7 + packages/bsync/src/subscriptions/index.ts | 2 + .../src/subscriptions/revenueCatClient.ts | 73 ++++++ .../subscriptions/revenueCatWebhookHandler.ts | 174 +++++++++++++ packages/bsync/tests/subscriptions.test.ts | 241 ++++++++++++++++++ 12 files changed, 605 insertions(+), 2 deletions(-) create mode 100644 packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts create mode 100644 packages/bsync/src/db/schema/subs_item.ts create mode 100644 packages/bsync/src/db/schema/subs_op.ts create mode 100644 packages/bsync/src/subscriptions/index.ts create mode 100644 packages/bsync/src/subscriptions/revenueCatClient.ts create mode 100644 packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts create mode 100644 packages/bsync/tests/subscriptions.test.ts diff --git a/packages/bsync/src/config.ts b/packages/bsync/src/config.ts index d5ea4454c2d..c6f562d802b 100644 --- a/packages/bsync/src/config.ts +++ b/packages/bsync/src/config.ts @@ -23,10 +23,25 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { apiKeys: new Set(env.apiKeys), } + let revenueCatCfg: RevenueCatConfig | undefined + if (env.revenueCatV1ApiKey) { + assert(env.revenueCatV1ApiUrl, 'missing revenue cat v1 api url') + assert( + env.revenueCatWebhookAuthorization, + 'missing revenue cat webhook authorization', + ) + revenueCatCfg = { + v1ApiKey: env.revenueCatV1ApiKey, + v1ApiUrl: env.revenueCatV1ApiUrl, + webhookAuthorization: env.revenueCatWebhookAuthorization, + } + } + return { service: serviceCfg, db: dbCfg, auth: authCfg, + revenueCat: revenueCatCfg, } } @@ -34,6 +49,7 @@ export type ServerConfig = { service: ServiceConfig db: DatabaseConfig auth: AuthConfig + revenueCat?: RevenueCatConfig } type ServiceConfig = { @@ -55,6 +71,12 @@ type AuthConfig = { apiKeys: Set } +type RevenueCatConfig = { + v1ApiUrl: string + v1ApiKey: string + webhookAuthorization: string +} + export const readEnv = (): ServerEnvironment => { return { // service @@ -70,6 +92,12 @@ export const readEnv = (): ServerEnvironment => { dbMigrate: envBool('BSYNC_DB_MIGRATE'), // secrets apiKeys: envList('BSYNC_API_KEYS'), + // revenue cat + revenueCatV1ApiKey: envStr('BSKY_REVENUE_CAT_V1_API_KEY'), + revenueCatV1ApiUrl: envStr('BSKY_REVENUE_CAT_V1_API_URL'), + revenueCatWebhookAuthorization: envStr( + 'BSKY_REVENUE_CAT_WEBHOOK_AUTHORIZATION', + ), } } @@ -87,4 +115,8 @@ export type ServerEnvironment = { dbMigrate?: boolean // secrets apiKeys: string[] + // revenue cat + revenueCatV1ApiUrl?: string + revenueCatV1ApiKey?: string + revenueCatWebhookAuthorization?: string } diff --git a/packages/bsync/src/context.ts b/packages/bsync/src/context.ts index 66a63510ad2..34c66e94f4e 100644 --- a/packages/bsync/src/context.ts +++ b/packages/bsync/src/context.ts @@ -4,21 +4,25 @@ import Database from './db' import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' import { EventEmitter } from 'stream' +import { RevenueCatClient } from './subscriptions' export type AppContextOptions = { db: Database + revenueCatClient: RevenueCatClient | undefined cfg: ServerConfig shutdown: AbortSignal } export class AppContext { db: Database + revenueCatClient: RevenueCatClient | undefined cfg: ServerConfig shutdown: AbortSignal events: TypedEventEmitter constructor(opts: AppContextOptions) { this.db = opts.db + this.revenueCatClient = opts.revenueCatClient this.cfg = opts.cfg this.shutdown = opts.shutdown this.events = new EventEmitter() as TypedEventEmitter @@ -36,7 +40,17 @@ export class AppContext { poolMaxUses: cfg.db.poolMaxUses, poolIdleTimeoutMs: cfg.db.poolIdleTimeoutMs, }) - return new AppContext({ db, cfg, shutdown, ...overrides }) + + let revenueCatClient: RevenueCatClient | undefined + if (cfg.revenueCat) { + revenueCatClient = new RevenueCatClient({ + v1ApiKey: cfg.revenueCat.v1ApiKey, + v1ApiUrl: cfg.revenueCat.v1ApiUrl, + webhookAuthorization: cfg.revenueCat.webhookAuthorization, + }) + } + + return new AppContext({ db, revenueCatClient, cfg, shutdown, ...overrides }) } } diff --git a/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts b/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts new file mode 100644 index 00000000000..12b1a7bcf17 --- /dev/null +++ b/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts @@ -0,0 +1,24 @@ +import { Kysely, sql } from 'kysely' + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('subs_op') + .addColumn('id', 'bigserial', (col) => col.primaryKey()) + .addColumn('actorDid', 'varchar', (col) => col.notNull()) + .addColumn('entitlements', 'jsonb', (col) => col.notNull()) + .addColumn('createdAt', 'timestamptz', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .execute() + await db.schema + .createTable('subs_item') + .addColumn('actorDid', 'varchar', (col) => col.primaryKey()) + .addColumn('entitlements', 'jsonb', (col) => col.notNull()) + .addColumn('fromId', 'bigint', (col) => col.notNull()) + .execute() +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('subs_item').execute() + await db.schema.dropTable('subs_op').execute() +} diff --git a/packages/bsync/src/db/migrations/index.ts b/packages/bsync/src/db/migrations/index.ts index 2d7485b4448..bb0b7923a9d 100644 --- a/packages/bsync/src/db/migrations/index.ts +++ b/packages/bsync/src/db/migrations/index.ts @@ -4,3 +4,4 @@ export * as _20240108T220751294Z from './20240108T220751294Z-init' export * as _20240717T224303472Z from './20240717T224303472Z-notif-ops' +export * as _20241205T030533572Z from './20241205T030533572Z-subs' diff --git a/packages/bsync/src/db/schema/index.ts b/packages/bsync/src/db/schema/index.ts index 922286feb10..dea00f7665e 100644 --- a/packages/bsync/src/db/schema/index.ts +++ b/packages/bsync/src/db/schema/index.ts @@ -3,11 +3,15 @@ import * as muteOp from './mute_op' import * as muteItem from './mute_item' import * as notifOp from './notif_op' import * as notifItem from './notif_item' +import * as subsOp from './subs_op' +import * as subsItem from './subs_item' export type DatabaseSchemaType = muteItem.PartialDB & muteOp.PartialDB & notifItem.PartialDB & - notifOp.PartialDB + notifOp.PartialDB & + subsItem.PartialDB & + subsOp.PartialDB export type DatabaseSchema = Kysely diff --git a/packages/bsync/src/db/schema/subs_item.ts b/packages/bsync/src/db/schema/subs_item.ts new file mode 100644 index 00000000000..523275cec23 --- /dev/null +++ b/packages/bsync/src/db/schema/subs_item.ts @@ -0,0 +1,14 @@ +import { ColumnType, Selectable } from 'kysely' + +export interface SubsItem { + actorDid: string + // https://github.com/kysely-org/kysely/issues/137 + entitlements: ColumnType + fromId: number +} + +export type SubsItemEntry = Selectable + +export const tableName = 'subs_item' + +export type PartialDB = { [tableName]: SubsItem } diff --git a/packages/bsync/src/db/schema/subs_op.ts b/packages/bsync/src/db/schema/subs_op.ts new file mode 100644 index 00000000000..e7feb00a1be --- /dev/null +++ b/packages/bsync/src/db/schema/subs_op.ts @@ -0,0 +1,17 @@ +import { ColumnType, GeneratedAlways, Selectable } from 'kysely' + +export interface SubsOp { + id: GeneratedAlways + actorDid: string + // https://github.com/kysely-org/kysely/issues/137 + entitlements: ColumnType + createdAt: GeneratedAlways +} + +export type SubsOpEntry = Selectable + +export const tableName = 'subs_op' + +export type PartialDB = { [tableName]: SubsOp } + +export const createSubsOpChannel = 'subs_op_create' // used with listen/notify diff --git a/packages/bsync/src/index.ts b/packages/bsync/src/index.ts index bf9b944267f..297326b19a1 100644 --- a/packages/bsync/src/index.ts +++ b/packages/bsync/src/index.ts @@ -8,6 +8,10 @@ import { ServerConfig } from './config' import routes from './routes' import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' +import { + isRevenueCatWebhookUrl, + revenueCatWebhookHandler, +} from './subscriptions' export * from './config' export * from './client' @@ -50,6 +54,9 @@ export class BsyncService { res.setHeader('content-type', 'application/json') return res.end(JSON.stringify({ version: cfg.service.version })) } + if (isRevenueCatWebhookUrl(req.url)) { + return revenueCatWebhookHandler(ctx, req, res) + } handler(req, res) }) return new BsyncService({ ctx, server, ac }) diff --git a/packages/bsync/src/subscriptions/index.ts b/packages/bsync/src/subscriptions/index.ts new file mode 100644 index 00000000000..a1a87a784ee --- /dev/null +++ b/packages/bsync/src/subscriptions/index.ts @@ -0,0 +1,2 @@ +export * from './revenueCatClient' +export * from './revenueCatWebhookHandler' diff --git a/packages/bsync/src/subscriptions/revenueCatClient.ts b/packages/bsync/src/subscriptions/revenueCatClient.ts new file mode 100644 index 00000000000..e71b61e8ed4 --- /dev/null +++ b/packages/bsync/src/subscriptions/revenueCatClient.ts @@ -0,0 +1,73 @@ +type Config = { + v1ApiKey: string + v1ApiUrl: string + webhookAuthorization: string +} + +// Reference: https://www.revenuecat.com/docs/api-v1#tag/customers +export type GetSubscriberResponse = { + subscriber: Subscriber +} + +export type Subscriber = { + entitlements: { + [entitlementIdentifier: string]: Entitlement + } +} + +export type Entitlement = { + expires_date: string +} + +export class RevenueCatClient { + private v1ApiKey: string + private v1ApiUrl: string + private webhookAuthorization: string + + constructor({ v1ApiKey, v1ApiUrl, webhookAuthorization }: Config) { + this.v1ApiKey = v1ApiKey + this.v1ApiUrl = v1ApiUrl + this.webhookAuthorization = webhookAuthorization + } + + private async fetch( + path: string, + method: string = 'GET', + ): Promise { + const url = new URL(path, this.v1ApiUrl) + const res = await fetch(url, { + method, + headers: { + Authorization: `Bearer ${this.v1ApiKey}`, + }, + }) + + if (!res.ok) { + throw new Error(`Failed to fetch ${path}: ${res.statusText}`) + } + + return res.json() as T + } + + private getSubscriber(did: string): Promise { + return this.fetch( + `/subscribers/${encodeURIComponent(did)}`, + ) + } + + async getEntitlementIdentifiers(did: string): Promise { + const { subscriber } = await this.getSubscriber(did) + + const now = Date.now() + return Object.entries(subscriber.entitlements) + .filter( + ([_, entitlement]) => + now < new Date(entitlement.expires_date).valueOf(), + ) + .map(([entitlementIdentifier]) => entitlementIdentifier) + } + + isWebhookAuthorizationValid(authorization: string | undefined): boolean { + return authorization === this.webhookAuthorization + } +} diff --git a/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts b/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts new file mode 100644 index 00000000000..4e7fc7fa49e --- /dev/null +++ b/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts @@ -0,0 +1,174 @@ +import { IncomingMessage, ServerResponse } from 'node:http' +import AppContext from '../context' +import { isValidDid } from '../routes/util' +import { Database } from '..' +import { sql } from 'kysely' +import { createSubsOpChannel } from '../db/schema/subs_op' + +export const isRevenueCatWebhookUrl = (urlStr: string | undefined) => { + if (!urlStr) return false + const url = new URL(urlStr, 'http://host') + return url.pathname === '/webhooks/revenuecat' +} + +const parseBody = async (req: IncomingMessage) => + new Promise((resolve, reject) => { + let body = '' + req.on('data', (chunk) => (body += chunk)) + req.on('end', () => { + try { + resolve(JSON.parse(body)) + } catch (err) { + reject(err) + } + }) + req.on('error', reject) + }) + +// Reference: https://www.revenuecat.com/docs/integrations/webhooks/event-types-and-fields#events-format +type RevenueCatEventBody = { + api_version: '1.0' + event: { + app_user_id: string + type: string + } +} + +export const revenueCatWebhookHandler = async ( + ctx: AppContext, + req: IncomingMessage, + res: ServerResponse, +) => { + const { db, revenueCatClient } = ctx + + res.setHeader('content-type', 'application/json') + + if (!revenueCatClient) { + res.statusCode = 501 + return res.end( + JSON.stringify({ + error: + 'Not Implemented: bsync is being served without RevenueCat support', + }), + ) + } + + if ( + !revenueCatClient.isWebhookAuthorizationValid(req.headers['authorization']) + ) { + res.statusCode = 403 + return res.end( + JSON.stringify({ + error: 'Forbidden: invalid authentication for RevenueCat webhook', + }), + ) + } + + if (req.method !== 'POST') { + res.statusCode = 501 + return res.end( + JSON.stringify({ + error: + 'Not Implemented: only POST method is supported for RevenueCat webhook', + }), + ) + } + + if (req.headers['content-type'] !== 'application/json') { + res.statusCode = 400 + return res.end( + JSON.stringify({ + error: + 'Bad request: body must be JSON with Content-Type: application/json', + }), + ) + } + + let body: RevenueCatEventBody + try { + body = (await parseBody(req)) as RevenueCatEventBody + } catch (error) { + res.statusCode = 400 + return res.end( + JSON.stringify({ + error: 'Bad request: malformed JSON body', + }), + ) + } + + try { + const { app_user_id: actorDid } = body.event + + if (!isValidDid(actorDid)) { + res.statusCode = 400 + return res.end( + JSON.stringify({ + error: 'Bad request: invalid DID in app_user_id', + }), + ) + } + + const entitlements = + await revenueCatClient.getEntitlementIdentifiers(actorDid) + + const id = await db.transaction(async (txn) => { + // create subs op + const id = await createSubsOp(txn, actorDid, entitlements) + // update subs state + await updateSubsItem(txn, id, actorDid, entitlements) + return id + }) + + res.statusCode = 200 + res.end(JSON.stringify({ success: true, operationId: id })) + } catch (error) { + res.statusCode = 500 + return res.end( + JSON.stringify({ + error: + 'Internal server error: an error happened while processing the request', + }), + ) + } +} + +const createSubsOp = async ( + db: Database, + actorDid: string, + entitlements: string[], +) => { + const { ref } = db.db.dynamic + const { id } = await db.db + .insertInto('subs_op') + .values({ + actorDid, + entitlements: JSON.stringify(entitlements), + }) + .returning('id') + .executeTakeFirstOrThrow() + await sql`notify ${ref(createSubsOpChannel)}`.execute(db.db) // emitted transactionally + return id +} + +const updateSubsItem = async ( + db: Database, + fromId: number, + actorDid: string, + entitlements: string[], +) => { + const { ref } = db.db.dynamic + await db.db + .insertInto('subs_item') + .values({ + actorDid, + entitlements: JSON.stringify(entitlements), + fromId, + }) + .onConflict((oc) => + oc.column('actorDid').doUpdateSet({ + entitlements: sql`${ref('excluded.entitlements')}`, + fromId: sql`${ref('excluded.fromId')}`, + }), + ) + .execute() +} diff --git a/packages/bsync/tests/subscriptions.test.ts b/packages/bsync/tests/subscriptions.test.ts new file mode 100644 index 00000000000..8ba8f963f31 --- /dev/null +++ b/packages/bsync/tests/subscriptions.test.ts @@ -0,0 +1,241 @@ +import http from 'node:http' +import { once } from 'node:events' +import getPort from 'get-port' +import { BsyncService, Database, envToCfg } from '../src' +import { Entitlement, GetSubscriberResponse } from '../src/subscriptions' + +const revenueCatWebhookAuthorization = 'Bearer any-token' + +describe('subscriptions', () => { + let bsync: BsyncService + let bsyncUrl: string + + const actorDid = 'did:example:a' + + let revenueCatServer: http.Server + let revenueCatApiMock: jest.Mock + + const TEN_MINUTES = 600_000 + const entitlementValid: Entitlement = { + expires_date: new Date(Date.now() + TEN_MINUTES).toISOString(), + } + const entitlementExpired: Entitlement = { + expires_date: new Date(Date.now() - TEN_MINUTES).toISOString(), + } + + beforeAll(async () => { + const revenueCatPort = await getPort() + + revenueCatApiMock = jest.fn() + revenueCatServer = await createMockRevenueCatService( + revenueCatPort, + revenueCatApiMock, + ) + + bsync = await BsyncService.create( + envToCfg({ + port: await getPort(), + dbUrl: process.env.DB_POSTGRES_URL, + dbSchema: 'bsync_subscriptions', + apiKeys: ['key-1'], + longPollTimeoutMs: 500, + revenueCatV1ApiKey: 'any-key', + revenueCatV1ApiUrl: `http://localhost:${revenueCatPort}`, + revenueCatWebhookAuthorization, + }), + ) + + bsyncUrl = `http://localhost:${bsync.ctx.cfg.service.port}` + + await bsync.ctx.db.migrateToLatestOrThrow() + await bsync.start() + }) + + afterAll(async () => { + await bsync.destroy() + revenueCatServer.close() + await once(revenueCatServer, 'close') + }) + + beforeEach(async () => { + await clearSubs(bsync.ctx.db) + }) + + describe('webhook handler', () => { + it('returns 403 if authorization is invalid', async () => { + const response = await fetch(`${bsyncUrl}/webhooks/revenuecat`, { + method: 'POST', + body: JSON.stringify({ event: { app_user_id: actorDid } }), + headers: { + Authorization: `not ${revenueCatWebhookAuthorization}`, + 'Content-Type': 'application/json', + }, + }) + + expect(response.status).toBe(403) + expect(response.json()).resolves.toMatchObject({ + error: 'Forbidden: invalid authentication for RevenueCat webhook', + }) + }) + + it('stores valid entitlements from the API response, excluding expired', async () => { + revenueCatApiMock.mockReturnValueOnce({ + subscriber: { + entitlements: { entitlementExpired }, + }, + }) + + await callWebhook(bsyncUrl, { + event: { app_user_id: actorDid }, + }) + + const op0 = await bsync.ctx.db.db + .selectFrom('subs_op') + .selectAll() + .where('actorDid', '=', actorDid) + .orderBy('id', 'desc') + .executeTakeFirstOrThrow() + + expect(op0).toMatchObject({ + id: expect.any(Number), + actorDid, + entitlements: [], + createdAt: expect.any(Date), + }) + + await expect( + bsync.ctx.db.db + .selectFrom('subs_item') + .selectAll() + .where('actorDid', '=', actorDid) + .executeTakeFirstOrThrow(), + ).resolves.toMatchObject({ + actorDid, + entitlements: [], + fromId: op0.id, + }) + + revenueCatApiMock.mockReturnValueOnce({ + subscriber: { + entitlements: { entitlementValid, entitlementExpired }, + }, + }) + + await callWebhook(bsyncUrl, { + event: { app_user_id: actorDid }, + }) + + const op1 = await bsync.ctx.db.db + .selectFrom('subs_op') + .selectAll() + .where('actorDid', '=', actorDid) + .orderBy('id', 'desc') + .executeTakeFirstOrThrow() + + expect(op1).toMatchObject({ + id: expect.any(Number), + actorDid, + entitlements: ['entitlementValid'], + createdAt: expect.any(Date), + }) + + await expect( + bsync.ctx.db.db + .selectFrom('subs_item') + .selectAll() + .where('actorDid', '=', actorDid) + .executeTakeFirstOrThrow(), + ).resolves.toMatchObject({ + actorDid, + entitlements: ['entitlementValid'], + fromId: op1.id, + }) + }) + + it('sets empty array in the cache if no entitlements are present at all', async () => { + revenueCatApiMock.mockReturnValue({ + subscriber: { entitlements: {} }, + }) + + await callWebhook(bsyncUrl, { + event: { app_user_id: actorDid }, + }) + + const op = await bsync.ctx.db.db + .selectFrom('subs_op') + .selectAll() + .where('actorDid', '=', actorDid) + .orderBy('id', 'desc') + .executeTakeFirstOrThrow() + + expect(op).toMatchObject({ + id: expect.any(Number), + actorDid, + entitlements: [], + createdAt: expect.any(Date), + }) + + await expect( + bsync.ctx.db.db + .selectFrom('subs_item') + .selectAll() + .where('actorDid', '=', actorDid) + .executeTakeFirstOrThrow(), + ).resolves.toMatchObject({ + actorDid, + entitlements: [], + fromId: op.id, + }) + }) + }) +}) + +const clearSubs = async (db: Database) => { + await db.db.deleteFrom('subs_item').execute() + await db.db.deleteFrom('subs_op').execute() +} + +const callWebhook = async ( + baseUrl: string, + body: Record, +): Promise => { + const response = await fetch(`${baseUrl}/webhooks/revenuecat`, { + method: 'POST', + body: JSON.stringify(body), + headers: { + Authorization: revenueCatWebhookAuthorization, + 'Content-Type': 'application/json', + }, + }) + + if (!response.ok) { + throw new Error( + `Unexpected status on calling the webhook: '${response.status}'`, + ) + } + + return response +} + +const createMockRevenueCatService = async ( + port: number, + apiMock: jest.Mock, +): Promise => { + const server = http.createServer((req, res) => { + if (!req.url) { + throw new Error('Unexpected empty URL in request to RevenueCat mock') + } + + if (/^\/subscribers\/(.*)$/.test(req.url)) { + const response = apiMock(req, res) + res.statusCode = 200 + return res.end(JSON.stringify(response)) + } + + throw new Error('Unexpected URL in request to RevenueCat mock') + }) + + server.listen(port) + await once(server, 'listening') + return server +} From ef2d614cbce141641b240e905fb7f108a66ba3ad Mon Sep 17 00:00:00 2001 From: rafael Date: Thu, 5 Dec 2024 16:50:53 -0300 Subject: [PATCH 2/5] Add express and refactor webhook handler --- packages/bsync/package.json | 7 + packages/bsync/src/api/health.ts | 12 + packages/bsync/src/api/revenueCat.ts | 76 ++++++ packages/bsync/src/context.ts | 4 +- ...bs.ts => 20241205T030533572Z-purchases.ts} | 8 +- packages/bsync/src/db/migrations/index.ts | 2 +- packages/bsync/src/db/schema/index.ts | 8 +- .../schema/{subs_item.ts => purchase_item.ts} | 8 +- .../db/schema/{subs_op.ts => purchase_op.ts} | 10 +- packages/bsync/src/index.ts | 77 +++--- .../src/purchases/addPurchaseOperation.ts | 58 ++++ packages/bsync/src/purchases/index.ts | 3 + .../revenueCatClient.ts | 21 +- .../bsync/src/purchases/revenueCatTypes.ts | 23 ++ packages/bsync/src/subscriptions/index.ts | 2 - .../subscriptions/revenueCatWebhookHandler.ts | 174 ------------ ...ubscriptions.test.ts => purchases.test.ts} | 55 ++-- pnpm-lock.yaml | 257 +++++++++++++++++- 18 files changed, 530 insertions(+), 275 deletions(-) create mode 100644 packages/bsync/src/api/health.ts create mode 100644 packages/bsync/src/api/revenueCat.ts rename packages/bsync/src/db/migrations/{20241205T030533572Z-subs.ts => 20241205T030533572Z-purchases.ts} (80%) rename packages/bsync/src/db/schema/{subs_item.ts => purchase_item.ts} (51%) rename packages/bsync/src/db/schema/{subs_op.ts => purchase_op.ts} (50%) create mode 100644 packages/bsync/src/purchases/addPurchaseOperation.ts create mode 100644 packages/bsync/src/purchases/index.ts rename packages/bsync/src/{subscriptions => purchases}/revenueCatClient.ts (77%) create mode 100644 packages/bsync/src/purchases/revenueCatTypes.ts delete mode 100644 packages/bsync/src/subscriptions/index.ts delete mode 100644 packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts rename packages/bsync/tests/{subscriptions.test.ts => purchases.test.ts} (82%) diff --git a/packages/bsync/package.json b/packages/bsync/package.json index e2de5c0cb55..feade31c73d 100644 --- a/packages/bsync/package.json +++ b/packages/bsync/package.json @@ -29,7 +29,11 @@ "@atproto/syntax": "workspace:^", "@bufbuild/protobuf": "^1.5.0", "@connectrpc/connect": "^1.1.4", + "@connectrpc/connect-express": "^1.1.4", "@connectrpc/connect-node": "^1.1.4", + "compression": "^1.7.4", + "cors": "^2.8.5", + "express": "^4.21.1", "http-terminator": "^3.2.0", "kysely": "^0.22.0", "pg": "^8.10.0", @@ -40,6 +44,9 @@ "@bufbuild/buf": "^1.28.1", "@bufbuild/protoc-gen-es": "^1.5.0", "@connectrpc/protoc-gen-connect-es": "^1.1.4", + "@types/compression": "^1.7.5", + "@types/cors": "^2.8.12", + "@types/express": "^4.17.13", "@types/pg": "^8.6.6", "get-port": "^5.1.1", "jest": "^28.1.2", diff --git a/packages/bsync/src/api/health.ts b/packages/bsync/src/api/health.ts new file mode 100644 index 00000000000..814579ac2d7 --- /dev/null +++ b/packages/bsync/src/api/health.ts @@ -0,0 +1,12 @@ +import express from 'express' +import AppContext from '../context' + +export const createRouter = (ctx: AppContext): express.Router => { + const router = express.Router() + + router.get('/_health', async function (_req, res) { + res.send({ version: ctx.cfg.service.version }) + }) + + return router +} diff --git a/packages/bsync/src/api/revenueCat.ts b/packages/bsync/src/api/revenueCat.ts new file mode 100644 index 00000000000..214fbeae79f --- /dev/null +++ b/packages/bsync/src/api/revenueCat.ts @@ -0,0 +1,76 @@ +import express, { RequestHandler } from 'express' +import { AppContext } from '..' +import { RevenueCatClient } from '../purchases' +import { addPurchaseOperation, RcEventBody } from '../purchases' +import { isValidDid } from '../routes/util' +import { httpLogger as log } from '..' + +type AppContextWithRevenueCatClient = AppContext & { + revenueCatClient: RevenueCatClient +} + +const auth = + (ctx: AppContextWithRevenueCatClient): RequestHandler => + (req: express.Request, res: express.Response, next: express.NextFunction) => + ctx.revenueCatClient.isWebhookAuthorizationValid( + req.header('Authorization'), + ) + ? next() + : res.status(403).send({ + success: false, + error: 'Forbidden: invalid authentication for RevenueCat webhook', + }) + +const webhookHandler = + (ctx: AppContextWithRevenueCatClient): RequestHandler => + async (req, res) => { + const { revenueCatClient } = ctx + + const body: RcEventBody = req.body + + try { + const { app_user_id: actorDid } = body.event + + if (!isValidDid(actorDid)) { + return res.status(400).send({ + success: false, + error: 'Bad request: invalid DID in app_user_id', + }) + } + + const entitlements = + await revenueCatClient.getEntitlementIdentifiers(actorDid) + + const id = await addPurchaseOperation(ctx.db, actorDid, entitlements) + + res.send({ success: true, operationId: id }) + } catch (error) { + log.error(error) + + res.status(500).send({ + success: false, + error: + 'Internal server error: an error happened while processing the request', + }) + } + } + +const assertAppContextWithRevenueCatClient: ( + ctx: AppContext, +) => asserts ctx is AppContextWithRevenueCatClient = (ctx: AppContext) => { + if (!ctx.revenueCatClient) { + throw new Error( + 'RevenueCat webhook was tried to be set up without configuring a RevenueCat client.', + ) + } +} + +export const createRouter = (ctx: AppContext): express.Router => { + assertAppContextWithRevenueCatClient(ctx) + + const router = express.Router() + router.use(auth(ctx)) + router.use(express.json()) + router.post('/', webhookHandler(ctx)) + return router +} diff --git a/packages/bsync/src/context.ts b/packages/bsync/src/context.ts index 34c66e94f4e..b5b0e454f64 100644 --- a/packages/bsync/src/context.ts +++ b/packages/bsync/src/context.ts @@ -4,7 +4,8 @@ import Database from './db' import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' import { EventEmitter } from 'stream' -import { RevenueCatClient } from './subscriptions' +import { RevenueCatClient } from './purchases' +import { createPurchaseOpChannel } from './db/schema/purchase_op' export type AppContextOptions = { db: Database @@ -59,4 +60,5 @@ export default AppContext export type AppEvents = { [createMuteOpChannel]: () => void [createNotifOpChannel]: () => void + [createPurchaseOpChannel]: () => void } diff --git a/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts b/packages/bsync/src/db/migrations/20241205T030533572Z-purchases.ts similarity index 80% rename from packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts rename to packages/bsync/src/db/migrations/20241205T030533572Z-purchases.ts index 12b1a7bcf17..2c459750556 100644 --- a/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts +++ b/packages/bsync/src/db/migrations/20241205T030533572Z-purchases.ts @@ -2,7 +2,7 @@ import { Kysely, sql } from 'kysely' export async function up(db: Kysely): Promise { await db.schema - .createTable('subs_op') + .createTable('purchase_op') .addColumn('id', 'bigserial', (col) => col.primaryKey()) .addColumn('actorDid', 'varchar', (col) => col.notNull()) .addColumn('entitlements', 'jsonb', (col) => col.notNull()) @@ -11,7 +11,7 @@ export async function up(db: Kysely): Promise { ) .execute() await db.schema - .createTable('subs_item') + .createTable('purchase_item') .addColumn('actorDid', 'varchar', (col) => col.primaryKey()) .addColumn('entitlements', 'jsonb', (col) => col.notNull()) .addColumn('fromId', 'bigint', (col) => col.notNull()) @@ -19,6 +19,6 @@ export async function up(db: Kysely): Promise { } export async function down(db: Kysely): Promise { - await db.schema.dropTable('subs_item').execute() - await db.schema.dropTable('subs_op').execute() + await db.schema.dropTable('purchase_item').execute() + await db.schema.dropTable('purchase_op').execute() } diff --git a/packages/bsync/src/db/migrations/index.ts b/packages/bsync/src/db/migrations/index.ts index bb0b7923a9d..167c9fdc534 100644 --- a/packages/bsync/src/db/migrations/index.ts +++ b/packages/bsync/src/db/migrations/index.ts @@ -4,4 +4,4 @@ export * as _20240108T220751294Z from './20240108T220751294Z-init' export * as _20240717T224303472Z from './20240717T224303472Z-notif-ops' -export * as _20241205T030533572Z from './20241205T030533572Z-subs' +export * as _20241205T030533572Z from './20241205T030533572Z-purchases' diff --git a/packages/bsync/src/db/schema/index.ts b/packages/bsync/src/db/schema/index.ts index dea00f7665e..6a6267f7531 100644 --- a/packages/bsync/src/db/schema/index.ts +++ b/packages/bsync/src/db/schema/index.ts @@ -3,15 +3,15 @@ import * as muteOp from './mute_op' import * as muteItem from './mute_item' import * as notifOp from './notif_op' import * as notifItem from './notif_item' -import * as subsOp from './subs_op' -import * as subsItem from './subs_item' +import * as purchaseOp from './purchase_op' +import * as purchaseItem from './purchase_item' export type DatabaseSchemaType = muteItem.PartialDB & muteOp.PartialDB & notifItem.PartialDB & notifOp.PartialDB & - subsItem.PartialDB & - subsOp.PartialDB + purchaseItem.PartialDB & + purchaseOp.PartialDB export type DatabaseSchema = Kysely diff --git a/packages/bsync/src/db/schema/subs_item.ts b/packages/bsync/src/db/schema/purchase_item.ts similarity index 51% rename from packages/bsync/src/db/schema/subs_item.ts rename to packages/bsync/src/db/schema/purchase_item.ts index 523275cec23..f90f840399a 100644 --- a/packages/bsync/src/db/schema/subs_item.ts +++ b/packages/bsync/src/db/schema/purchase_item.ts @@ -1,14 +1,14 @@ import { ColumnType, Selectable } from 'kysely' -export interface SubsItem { +export interface PurchaseItem { actorDid: string // https://github.com/kysely-org/kysely/issues/137 entitlements: ColumnType fromId: number } -export type SubsItemEntry = Selectable +export type PurchaseItemEntry = Selectable -export const tableName = 'subs_item' +export const tableName = 'purchase_item' -export type PartialDB = { [tableName]: SubsItem } +export type PartialDB = { [tableName]: PurchaseItem } diff --git a/packages/bsync/src/db/schema/subs_op.ts b/packages/bsync/src/db/schema/purchase_op.ts similarity index 50% rename from packages/bsync/src/db/schema/subs_op.ts rename to packages/bsync/src/db/schema/purchase_op.ts index e7feb00a1be..b451bc0137f 100644 --- a/packages/bsync/src/db/schema/subs_op.ts +++ b/packages/bsync/src/db/schema/purchase_op.ts @@ -1,6 +1,6 @@ import { ColumnType, GeneratedAlways, Selectable } from 'kysely' -export interface SubsOp { +export interface PurchaseOp { id: GeneratedAlways actorDid: string // https://github.com/kysely-org/kysely/issues/137 @@ -8,10 +8,10 @@ export interface SubsOp { createdAt: GeneratedAlways } -export type SubsOpEntry = Selectable +export type PurchaseOpEntry = Selectable -export const tableName = 'subs_op' +export const tableName = 'purchase_op' -export type PartialDB = { [tableName]: SubsOp } +export type PartialDB = { [tableName]: PurchaseOp } -export const createSubsOpChannel = 'subs_op_create' // used with listen/notify +export const createPurchaseOpChannel = 'purchase_op_create' // used with listen/notify diff --git a/packages/bsync/src/index.ts b/packages/bsync/src/index.ts index 297326b19a1..d71e9a7cb34 100644 --- a/packages/bsync/src/index.ts +++ b/packages/bsync/src/index.ts @@ -1,17 +1,20 @@ +import express from 'express' +import cors from 'cors' +import compression from 'compression' import http from 'node:http' import events from 'node:events' import { createHttpTerminator, HttpTerminator } from 'http-terminator' -import { connectNodeAdapter } from '@connectrpc/connect-node' import { dbLogger, loggerMiddleware } from './logger' import AppContext, { AppContextOptions } from './context' import { ServerConfig } from './config' import routes from './routes' import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' -import { - isRevenueCatWebhookUrl, - revenueCatWebhookHandler, -} from './subscriptions' +import * as health from './api/health' +import * as revenueCat from './api/revenueCat' +import { DAY, SECOND } from '@atproto/common' +import { expressConnectMiddleware } from '@connectrpc/connect-express' +import { createPurchaseOpChannel } from './db/schema/purchase_op' export * from './config' export * from './client' @@ -21,20 +24,20 @@ export { httpLogger } from './logger' export class BsyncService { public ctx: AppContext - public server: http.Server + public app: express.Application + public server?: http.Server private ac: AbortController - private terminator: HttpTerminator + private terminator?: HttpTerminator private dbStatsInterval?: NodeJS.Timeout constructor(opts: { ctx: AppContext - server: http.Server + app: express.Application ac: AbortController }) { this.ctx = opts.ctx - this.server = opts.server + this.app = opts.app this.ac = opts.ac - this.terminator = createHttpTerminator({ server: this.server }) } static async create( @@ -43,23 +46,25 @@ export class BsyncService { ): Promise { const ac = new AbortController() const ctx = await AppContext.fromConfig(cfg, ac.signal, overrides) - const handler = connectNodeAdapter({ - routes: routes(ctx), - shutdownSignal: ac.signal, - }) - const server = http.createServer((req, res) => { - loggerMiddleware(req, res) - if (isHealth(req.url)) { - res.statusCode = 200 - res.setHeader('content-type', 'application/json') - return res.end(JSON.stringify({ version: cfg.service.version })) - } - if (isRevenueCatWebhookUrl(req.url)) { - return revenueCatWebhookHandler(ctx, req, res) - } - handler(req, res) - }) - return new BsyncService({ ctx, server, ac }) + + const app = express() + app.use(cors({ maxAge: DAY / SECOND })) + app.use(loggerMiddleware) + app.use(compression()) + + app.use( + expressConnectMiddleware({ + routes: routes(ctx), + shutdownSignal: ac.signal, + }), + ) + + app.use(health.createRouter(ctx)) + if (ctx.revenueCatClient) { + app.use('/webhooks/revenuecat', revenueCat.createRouter(ctx)) + } + + return new BsyncService({ ctx, app, ac }) } async start(): Promise { @@ -77,15 +82,18 @@ export class BsyncService { ) }, 10000) await this.setupAppEvents() - this.server.listen(this.ctx.cfg.service.port) - this.server.keepAliveTimeout = 90000 + + const server = this.app.listen(this.ctx.cfg.service.port) + server.keepAliveTimeout = 90000 + this.server = server + this.terminator = createHttpTerminator({ server: this.server }) await events.once(this.server, 'listening') return this.server } async destroy(): Promise { this.ac.abort() - await this.terminator.terminate() + await this.terminator?.terminate() await this.ctx.db.close() clearInterval(this.dbStatsInterval) this.dbStatsInterval = undefined @@ -106,14 +114,11 @@ export class BsyncService { if (notif.channel === createNotifOpChannel) { this.ctx.events.emit(createNotifOpChannel) } + if (notif.channel === createPurchaseOpChannel) { + this.ctx.events.emit(createPurchaseOpChannel) + } }) } } export default BsyncService - -const isHealth = (urlStr: string | undefined) => { - if (!urlStr) return false - const url = new URL(urlStr, 'http://host') - return url.pathname === '/_health' -} diff --git a/packages/bsync/src/purchases/addPurchaseOperation.ts b/packages/bsync/src/purchases/addPurchaseOperation.ts new file mode 100644 index 00000000000..59eaa2552a6 --- /dev/null +++ b/packages/bsync/src/purchases/addPurchaseOperation.ts @@ -0,0 +1,58 @@ +import { sql } from 'kysely' +import { Database } from '..' +import { createPurchaseOpChannel } from '../db/schema/purchase_op' + +export const addPurchaseOperation = async ( + db: Database, + actorDid: string, + entitlements: string[], +) => { + return db.transaction(async (txn) => { + // create purchase op + const id = await createPurchaseOp(txn, actorDid, entitlements) + // update purchase state + await updatePurchaseItem(txn, id, actorDid, entitlements) + return id + }) +} + +const createPurchaseOp = async ( + db: Database, + actorDid: string, + entitlements: string[], +) => { + const { ref } = db.db.dynamic + const { id } = await db.db + .insertInto('purchase_op') + .values({ + actorDid, + entitlements: JSON.stringify(entitlements), + }) + .returning('id') + .executeTakeFirstOrThrow() + await sql`notify ${ref(createPurchaseOpChannel)}`.execute(db.db) // emitted transactionally + return id +} + +const updatePurchaseItem = async ( + db: Database, + fromId: number, + actorDid: string, + entitlements: string[], +) => { + const { ref } = db.db.dynamic + await db.db + .insertInto('purchase_item') + .values({ + actorDid, + entitlements: JSON.stringify(entitlements), + fromId, + }) + .onConflict((oc) => + oc.column('actorDid').doUpdateSet({ + entitlements: sql`${ref('excluded.entitlements')}`, + fromId: sql`${ref('excluded.fromId')}`, + }), + ) + .execute() +} diff --git a/packages/bsync/src/purchases/index.ts b/packages/bsync/src/purchases/index.ts new file mode 100644 index 00000000000..4a4b515dc03 --- /dev/null +++ b/packages/bsync/src/purchases/index.ts @@ -0,0 +1,3 @@ +export * from './addPurchaseOperation' +export * from './revenueCatClient' +export * from './revenueCatTypes' diff --git a/packages/bsync/src/subscriptions/revenueCatClient.ts b/packages/bsync/src/purchases/revenueCatClient.ts similarity index 77% rename from packages/bsync/src/subscriptions/revenueCatClient.ts rename to packages/bsync/src/purchases/revenueCatClient.ts index e71b61e8ed4..f223a43baaa 100644 --- a/packages/bsync/src/subscriptions/revenueCatClient.ts +++ b/packages/bsync/src/purchases/revenueCatClient.ts @@ -1,24 +1,11 @@ +import { RcGetSubscriberResponse } from './revenueCatTypes' + type Config = { v1ApiKey: string v1ApiUrl: string webhookAuthorization: string } -// Reference: https://www.revenuecat.com/docs/api-v1#tag/customers -export type GetSubscriberResponse = { - subscriber: Subscriber -} - -export type Subscriber = { - entitlements: { - [entitlementIdentifier: string]: Entitlement - } -} - -export type Entitlement = { - expires_date: string -} - export class RevenueCatClient { private v1ApiKey: string private v1ApiUrl: string @@ -49,8 +36,8 @@ export class RevenueCatClient { return res.json() as T } - private getSubscriber(did: string): Promise { - return this.fetch( + private getSubscriber(did: string): Promise { + return this.fetch( `/subscribers/${encodeURIComponent(did)}`, ) } diff --git a/packages/bsync/src/purchases/revenueCatTypes.ts b/packages/bsync/src/purchases/revenueCatTypes.ts new file mode 100644 index 00000000000..9d216287bbd --- /dev/null +++ b/packages/bsync/src/purchases/revenueCatTypes.ts @@ -0,0 +1,23 @@ +// Reference: https://www.revenuecat.com/docs/integrations/webhooks/event-types-and-fields#events-format +export type RcEventBody = { + api_version: '1.0' + event: { + app_user_id: string + type: string + } +} + +// Reference: https://www.revenuecat.com/docs/api-v1#tag/customers +export type RcGetSubscriberResponse = { + subscriber: RcSubscriber +} + +export type RcSubscriber = { + entitlements: { + [entitlementIdentifier: string]: RcEntitlement + } +} + +export type RcEntitlement = { + expires_date: string +} diff --git a/packages/bsync/src/subscriptions/index.ts b/packages/bsync/src/subscriptions/index.ts deleted file mode 100644 index a1a87a784ee..00000000000 --- a/packages/bsync/src/subscriptions/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './revenueCatClient' -export * from './revenueCatWebhookHandler' diff --git a/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts b/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts deleted file mode 100644 index 4e7fc7fa49e..00000000000 --- a/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts +++ /dev/null @@ -1,174 +0,0 @@ -import { IncomingMessage, ServerResponse } from 'node:http' -import AppContext from '../context' -import { isValidDid } from '../routes/util' -import { Database } from '..' -import { sql } from 'kysely' -import { createSubsOpChannel } from '../db/schema/subs_op' - -export const isRevenueCatWebhookUrl = (urlStr: string | undefined) => { - if (!urlStr) return false - const url = new URL(urlStr, 'http://host') - return url.pathname === '/webhooks/revenuecat' -} - -const parseBody = async (req: IncomingMessage) => - new Promise((resolve, reject) => { - let body = '' - req.on('data', (chunk) => (body += chunk)) - req.on('end', () => { - try { - resolve(JSON.parse(body)) - } catch (err) { - reject(err) - } - }) - req.on('error', reject) - }) - -// Reference: https://www.revenuecat.com/docs/integrations/webhooks/event-types-and-fields#events-format -type RevenueCatEventBody = { - api_version: '1.0' - event: { - app_user_id: string - type: string - } -} - -export const revenueCatWebhookHandler = async ( - ctx: AppContext, - req: IncomingMessage, - res: ServerResponse, -) => { - const { db, revenueCatClient } = ctx - - res.setHeader('content-type', 'application/json') - - if (!revenueCatClient) { - res.statusCode = 501 - return res.end( - JSON.stringify({ - error: - 'Not Implemented: bsync is being served without RevenueCat support', - }), - ) - } - - if ( - !revenueCatClient.isWebhookAuthorizationValid(req.headers['authorization']) - ) { - res.statusCode = 403 - return res.end( - JSON.stringify({ - error: 'Forbidden: invalid authentication for RevenueCat webhook', - }), - ) - } - - if (req.method !== 'POST') { - res.statusCode = 501 - return res.end( - JSON.stringify({ - error: - 'Not Implemented: only POST method is supported for RevenueCat webhook', - }), - ) - } - - if (req.headers['content-type'] !== 'application/json') { - res.statusCode = 400 - return res.end( - JSON.stringify({ - error: - 'Bad request: body must be JSON with Content-Type: application/json', - }), - ) - } - - let body: RevenueCatEventBody - try { - body = (await parseBody(req)) as RevenueCatEventBody - } catch (error) { - res.statusCode = 400 - return res.end( - JSON.stringify({ - error: 'Bad request: malformed JSON body', - }), - ) - } - - try { - const { app_user_id: actorDid } = body.event - - if (!isValidDid(actorDid)) { - res.statusCode = 400 - return res.end( - JSON.stringify({ - error: 'Bad request: invalid DID in app_user_id', - }), - ) - } - - const entitlements = - await revenueCatClient.getEntitlementIdentifiers(actorDid) - - const id = await db.transaction(async (txn) => { - // create subs op - const id = await createSubsOp(txn, actorDid, entitlements) - // update subs state - await updateSubsItem(txn, id, actorDid, entitlements) - return id - }) - - res.statusCode = 200 - res.end(JSON.stringify({ success: true, operationId: id })) - } catch (error) { - res.statusCode = 500 - return res.end( - JSON.stringify({ - error: - 'Internal server error: an error happened while processing the request', - }), - ) - } -} - -const createSubsOp = async ( - db: Database, - actorDid: string, - entitlements: string[], -) => { - const { ref } = db.db.dynamic - const { id } = await db.db - .insertInto('subs_op') - .values({ - actorDid, - entitlements: JSON.stringify(entitlements), - }) - .returning('id') - .executeTakeFirstOrThrow() - await sql`notify ${ref(createSubsOpChannel)}`.execute(db.db) // emitted transactionally - return id -} - -const updateSubsItem = async ( - db: Database, - fromId: number, - actorDid: string, - entitlements: string[], -) => { - const { ref } = db.db.dynamic - await db.db - .insertInto('subs_item') - .values({ - actorDid, - entitlements: JSON.stringify(entitlements), - fromId, - }) - .onConflict((oc) => - oc.column('actorDid').doUpdateSet({ - entitlements: sql`${ref('excluded.entitlements')}`, - fromId: sql`${ref('excluded.fromId')}`, - }), - ) - .execute() -} diff --git a/packages/bsync/tests/subscriptions.test.ts b/packages/bsync/tests/purchases.test.ts similarity index 82% rename from packages/bsync/tests/subscriptions.test.ts rename to packages/bsync/tests/purchases.test.ts index 8ba8f963f31..50705be476f 100644 --- a/packages/bsync/tests/subscriptions.test.ts +++ b/packages/bsync/tests/purchases.test.ts @@ -2,24 +2,24 @@ import http from 'node:http' import { once } from 'node:events' import getPort from 'get-port' import { BsyncService, Database, envToCfg } from '../src' -import { Entitlement, GetSubscriberResponse } from '../src/subscriptions' +import { RcEntitlement, RcGetSubscriberResponse } from '../src/purchases' const revenueCatWebhookAuthorization = 'Bearer any-token' -describe('subscriptions', () => { +describe('purchases', () => { let bsync: BsyncService let bsyncUrl: string const actorDid = 'did:example:a' let revenueCatServer: http.Server - let revenueCatApiMock: jest.Mock + let revenueCatApiMock: jest.Mock const TEN_MINUTES = 600_000 - const entitlementValid: Entitlement = { + const entitlementValid: RcEntitlement = { expires_date: new Date(Date.now() + TEN_MINUTES).toISOString(), } - const entitlementExpired: Entitlement = { + const entitlementExpired: RcEntitlement = { expires_date: new Date(Date.now() - TEN_MINUTES).toISOString(), } @@ -36,7 +36,7 @@ describe('subscriptions', () => { envToCfg({ port: await getPort(), dbUrl: process.env.DB_POSTGRES_URL, - dbSchema: 'bsync_subscriptions', + dbSchema: 'bsync_purchases', apiKeys: ['key-1'], longPollTimeoutMs: 500, revenueCatV1ApiKey: 'any-key', @@ -58,7 +58,7 @@ describe('subscriptions', () => { }) beforeEach(async () => { - await clearSubs(bsync.ctx.db) + await clearPurchases(bsync.ctx.db) }) describe('webhook handler', () => { @@ -78,6 +78,17 @@ describe('subscriptions', () => { }) }) + it('returns 400 if DID is invalid', async () => { + const response = await callWebhook(bsyncUrl, { + event: { app_user_id: 'invalidDid' }, + }) + + expect(response.status).toBe(400) + expect(response.json()).resolves.toMatchObject({ + error: 'Bad request: invalid DID in app_user_id', + }) + }) + it('stores valid entitlements from the API response, excluding expired', async () => { revenueCatApiMock.mockReturnValueOnce({ subscriber: { @@ -90,7 +101,7 @@ describe('subscriptions', () => { }) const op0 = await bsync.ctx.db.db - .selectFrom('subs_op') + .selectFrom('purchase_op') .selectAll() .where('actorDid', '=', actorDid) .orderBy('id', 'desc') @@ -105,7 +116,7 @@ describe('subscriptions', () => { await expect( bsync.ctx.db.db - .selectFrom('subs_item') + .selectFrom('purchase_item') .selectAll() .where('actorDid', '=', actorDid) .executeTakeFirstOrThrow(), @@ -126,7 +137,7 @@ describe('subscriptions', () => { }) const op1 = await bsync.ctx.db.db - .selectFrom('subs_op') + .selectFrom('purchase_op') .selectAll() .where('actorDid', '=', actorDid) .orderBy('id', 'desc') @@ -141,7 +152,7 @@ describe('subscriptions', () => { await expect( bsync.ctx.db.db - .selectFrom('subs_item') + .selectFrom('purchase_item') .selectAll() .where('actorDid', '=', actorDid) .executeTakeFirstOrThrow(), @@ -162,7 +173,7 @@ describe('subscriptions', () => { }) const op = await bsync.ctx.db.db - .selectFrom('subs_op') + .selectFrom('purchase_op') .selectAll() .where('actorDid', '=', actorDid) .orderBy('id', 'desc') @@ -177,7 +188,7 @@ describe('subscriptions', () => { await expect( bsync.ctx.db.db - .selectFrom('subs_item') + .selectFrom('purchase_item') .selectAll() .where('actorDid', '=', actorDid) .executeTakeFirstOrThrow(), @@ -190,16 +201,16 @@ describe('subscriptions', () => { }) }) -const clearSubs = async (db: Database) => { - await db.db.deleteFrom('subs_item').execute() - await db.db.deleteFrom('subs_op').execute() +const clearPurchases = async (db: Database) => { + await db.db.deleteFrom('purchase_item').execute() + await db.db.deleteFrom('purchase_op').execute() } const callWebhook = async ( baseUrl: string, body: Record, ): Promise => { - const response = await fetch(`${baseUrl}/webhooks/revenuecat`, { + return fetch(`${baseUrl}/webhooks/revenuecat`, { method: 'POST', body: JSON.stringify(body), headers: { @@ -207,19 +218,11 @@ const callWebhook = async ( 'Content-Type': 'application/json', }, }) - - if (!response.ok) { - throw new Error( - `Unexpected status on calling the webhook: '${response.status}'`, - ) - } - - return response } const createMockRevenueCatService = async ( port: number, - apiMock: jest.Mock, + apiMock: jest.Mock, ): Promise => { const server = http.createServer((req, res) => { if (!req.url) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0abdcf7ada4..fca90ea56e8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -314,9 +314,21 @@ importers: '@connectrpc/connect': specifier: ^1.1.4 version: 1.3.0(@bufbuild/protobuf@1.6.0) + '@connectrpc/connect-express': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protobuf@1.6.0)(@connectrpc/connect-node@1.3.0)(@connectrpc/connect@1.3.0) '@connectrpc/connect-node': specifier: ^1.1.4 version: 1.3.0(@bufbuild/protobuf@1.6.0)(@connectrpc/connect@1.3.0) + compression: + specifier: ^1.7.4 + version: 1.7.4 + cors: + specifier: ^2.8.5 + version: 2.8.5 + express: + specifier: ^4.21.1 + version: 4.21.1 http-terminator: specifier: ^3.2.0 version: 3.2.0 @@ -342,6 +354,15 @@ importers: '@connectrpc/protoc-gen-connect-es': specifier: ^1.1.4 version: 1.3.0(@bufbuild/protoc-gen-es@1.6.0)(@connectrpc/connect@1.3.0) + '@types/compression': + specifier: ^1.7.5 + version: 1.7.5 + '@types/cors': + specifier: ^2.8.12 + version: 2.8.12 + '@types/express': + specifier: ^4.17.13 + version: 4.17.21 '@types/pg': specifier: ^8.6.6 version: 8.6.6 @@ -6234,6 +6255,12 @@ packages: '@types/connect': 3.4.35 '@types/node': 18.19.56 + /@types/compression@1.7.5: + resolution: {integrity: sha512-AAQvK5pxMpaT+nDvhHrsBhLSYG5yQdtkaJE1WYieSNY2mVFKAgmU4ks65rkZD5oqnGCFLyQpUr1CqI4DmUMyDg==} + dependencies: + '@types/express': 4.17.21 + dev: true + /@types/connect@3.4.35: resolution: {integrity: sha512-cdeYyv4KWoEgpBISTxWvqYsVy444DOqehiF3fM3ne10AmJ62RSyNkUnxMJXHQWRQQX2eR94m5y1IZyDwBjV9FQ==} dependencies: @@ -7126,6 +7153,26 @@ packages: transitivePeerDependencies: - supports-color + /body-parser@1.20.3: + resolution: {integrity: sha512-7rAxByjUMqQ3/bHJy7D6OGXvx/MMc4IqBn/X0fcM1QUcAItpZrBEYhWGem+tzXH90c+G01ypMcYJBO9Y30203g==} + engines: {node: '>= 0.8', npm: 1.2.8000 || >= 1.4.16} + dependencies: + bytes: 3.1.2 + content-type: 1.0.5 + debug: 2.6.9 + depd: 2.0.0 + destroy: 1.2.0 + http-errors: 2.0.0 + iconv-lite: 0.4.24 + on-finished: 2.4.1 + qs: 6.13.0 + raw-body: 2.5.2 + type-is: 1.6.18 + unpipe: 1.0.0 + transitivePeerDependencies: + - supports-color + dev: false + /boolbase@1.0.0: resolution: {integrity: sha512-JZOSA7Mo9sNGB8+UjSgzdLtokWAky1zbztM3WRLCbZ70/3cTANmQmOdR7y2g+J0e2WXywy1yS468tY+IruqEww==} dev: true @@ -7272,6 +7319,17 @@ packages: function-bind: 1.1.1 get-intrinsic: 1.2.1 + /call-bind@1.0.7: + resolution: {integrity: sha512-GHTSNSYICQ7scH7sZ+M2rFopRoLh8t2bLSW6BbgrtLsahOIB5iyAVJf9GjWK3cYTDaMj4XdBpM1cA6pIS0Kv2w==} + engines: {node: '>= 0.4'} + dependencies: + es-define-property: 1.0.0 + es-errors: 1.3.0 + function-bind: 1.1.2 + get-intrinsic: 1.2.4 + set-function-length: 1.2.2 + dev: false + /callsites@3.1.0: resolution: {integrity: sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ==} engines: {node: '>=6'} @@ -7589,6 +7647,11 @@ packages: engines: {node: '>= 0.6'} dev: false + /cookie@0.7.1: + resolution: {integrity: sha512-6DnInpx7SJ2AK3+CTUE/ZM0vWTUboZCegxhC2xiIydHR9jNuTAASBrfEpHhiGOZw/nX51bHt6YQl8jsGo4y/0w==} + engines: {node: '>= 0.6'} + dev: false + /cors@2.8.5: resolution: {integrity: sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==} engines: {node: '>= 0.10'} @@ -7936,6 +7999,15 @@ packages: clone: 1.0.4 dev: true + /define-data-property@1.1.4: + resolution: {integrity: sha512-rBMvIzlpA8v6E+SJZoo++HAYqsLrkg7MSfIinMPFhmkorw7X+dOXVJQs+QT69zGkzMyfDnIMN2Wid1+NbL3T+A==} + engines: {node: '>= 0.4'} + dependencies: + es-define-property: 1.0.0 + es-errors: 1.3.0 + gopd: 1.0.1 + dev: false + /define-properties@1.2.0: resolution: {integrity: sha512-xvqAVKGfT1+UAvPwKTVw/njhdQ8ZhXK4lI0bCIuCMrp2up9nPnaDftrLtmpTazqd1o+UY4zgzU+avtMbDP+ldA==} engines: {node: '>= 0.4'} @@ -8116,6 +8188,11 @@ packages: resolution: {integrity: sha512-TPJXq8JqFaVYm2CWmPvnP2Iyo4ZSM7/QKcSmuMLDObfpH5fi7RUGmd/rTDf+rut/saiDiQEeVTNgAmJEdAOx0w==} engines: {node: '>= 0.8'} + /encodeurl@2.0.0: + resolution: {integrity: sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==} + engines: {node: '>= 0.8'} + dev: false + /encoding@0.1.13: resolution: {integrity: sha512-ETBauow1T35Y/WZMkio9jiM0Z5xjHHmJ4XmjZOq1l/dXz3lr2sRn87nJy20RupqSh1F2m3HHPSp8ShIPQJrJ3A==} requiresBuild: true @@ -8200,6 +8277,18 @@ packages: which-typed-array: 1.1.11 dev: true + /es-define-property@1.0.0: + resolution: {integrity: sha512-jxayLKShrEqqzJ0eumQbVhTYQM27CfT1T35+gCgDFoL82JLsXqTJ76zv6A0YLOgEnLUMvLzsDsGIrl8NFpT2gQ==} + engines: {node: '>= 0.4'} + dependencies: + get-intrinsic: 1.2.4 + dev: false + + /es-errors@1.3.0: + resolution: {integrity: sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw==} + engines: {node: '>= 0.4'} + dev: false + /es-set-tostringtag@2.0.1: resolution: {integrity: sha512-g3OMbtlwY3QewlqAiMLI47KywjWZoEytKr8pf6iTC8uJq5bIAH52Z9pnQ8pVL6whrCto53JZDuUIsifGeLorTg==} engines: {node: '>= 0.4'} @@ -8709,6 +8798,45 @@ packages: transitivePeerDependencies: - supports-color + /express@4.21.1: + resolution: {integrity: sha512-YSFlK1Ee0/GC8QaO91tHcDxJiE/X4FbpAyQWkxAvG6AXCuR65YzK8ua6D9hvi/TzUfZMpc+BwuM1IPw8fmQBiQ==} + engines: {node: '>= 0.10.0'} + dependencies: + accepts: 1.3.8 + array-flatten: 1.1.1 + body-parser: 1.20.3 + content-disposition: 0.5.4 + content-type: 1.0.5 + cookie: 0.7.1 + cookie-signature: 1.0.6 + debug: 2.6.9 + depd: 2.0.0 + encodeurl: 2.0.0 + escape-html: 1.0.3 + etag: 1.8.1 + finalhandler: 1.3.1 + fresh: 0.5.2 + http-errors: 2.0.0 + merge-descriptors: 1.0.3 + methods: 1.1.2 + on-finished: 2.4.1 + parseurl: 1.3.3 + path-to-regexp: 0.1.10 + proxy-addr: 2.0.7 + qs: 6.13.0 + range-parser: 1.2.1 + safe-buffer: 5.2.1 + send: 0.19.0 + serve-static: 1.16.2 + setprototypeof: 1.2.0 + statuses: 2.0.1 + type-is: 1.6.18 + utils-merge: 1.0.1 + vary: 1.1.2 + transitivePeerDependencies: + - supports-color + dev: false + /extendable-error@0.1.7: resolution: {integrity: sha512-UOiS2in6/Q0FK0R0q6UY9vYpQ21mr/Qn1KOnte7vsACuNJf514WvCCUHSRCPcgjPT2bAhNIJdlE6bVap1GKmeg==} dev: true @@ -8858,6 +8986,21 @@ packages: transitivePeerDependencies: - supports-color + /finalhandler@1.3.1: + resolution: {integrity: sha512-6BN9trH7bp3qvnrRyzsBz+g3lZxTNZTbVO2EV1CS0WIcDbawYVdYvGflME/9QP0h0pYlCDBCTjYa9nZzMDpyxQ==} + engines: {node: '>= 0.8'} + dependencies: + debug: 2.6.9 + encodeurl: 2.0.0 + escape-html: 1.0.3 + on-finished: 2.4.1 + parseurl: 1.3.3 + statuses: 2.0.1 + unpipe: 1.0.0 + transitivePeerDependencies: + - supports-color + dev: false + /find-up@4.1.0: resolution: {integrity: sha512-PpOwAdQ/YlXQ2vj8a3h8IipDuYRi3wceVQQGYWxNINccq40Anw7BlsEXCMbt1Zt+OLA6Fq9suIpIWD0OsnISlw==} engines: {node: '>=8'} @@ -9010,6 +9153,10 @@ packages: /function-bind@1.1.1: resolution: {integrity: sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==} + /function-bind@1.1.2: + resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==} + dev: false + /function.prototype.name@1.1.6: resolution: {integrity: sha512-Z5kx79swU5P27WEayXM1tBi5Ze/lbIyiNgU3qyXUOf9b2rgXYyF9Dy9Cx+IQv/Lc8WCG6L82zwUPpSS9hGehIg==} engines: {node: '>= 0.4'} @@ -9061,6 +9208,17 @@ packages: has-proto: 1.0.1 has-symbols: 1.0.3 + /get-intrinsic@1.2.4: + resolution: {integrity: sha512-5uYhsJH8VJBTv7oslg4BznJYhDoRI6waYCxMmCdnTrcCrHA/fCFKoTFz2JKKE0HdDFUF7/oQuhzumXJK7paBRQ==} + engines: {node: '>= 0.4'} + dependencies: + es-errors: 1.3.0 + function-bind: 1.1.2 + has-proto: 1.0.1 + has-symbols: 1.0.3 + hasown: 2.0.2 + dev: false + /get-package-type@0.1.0: resolution: {integrity: sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==} engines: {node: '>=8.0.0'} @@ -9202,7 +9360,6 @@ packages: resolution: {integrity: sha512-d65bNlIadxvpb/A2abVdlqKqV563juRnZ1Wtk6s1sIR8uNsXR70xqIzVqxVf1eTqDunwT2MkczEeaezCKTZhwA==} dependencies: get-intrinsic: 1.2.1 - dev: true /graceful-fs@4.2.11: resolution: {integrity: sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==} @@ -9264,6 +9421,12 @@ packages: dependencies: get-intrinsic: 1.2.1 + /has-property-descriptors@1.0.2: + resolution: {integrity: sha512-55JNKuIW+vq4Ke1BjOTjM2YctQIvCT7GFzHwmfZPGo5wnrgkid0YQtnAleFSqumZm4az3n2BS+erby5ipJdgrg==} + dependencies: + es-define-property: 1.0.0 + dev: false + /has-proto@1.0.1: resolution: {integrity: sha512-7qE+iP+O+bgF9clE5+UoBFzE65mlBiVj3tKCrlNQ0Ogwm0BjpT/gK4SlLYDMybDh5I3TCTKnPPa0oMG7JDYrhg==} engines: {node: '>= 0.4'} @@ -9295,6 +9458,13 @@ packages: inherits: 2.0.4 minimalistic-assert: 1.0.1 + /hasown@2.0.2: + resolution: {integrity: sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==} + engines: {node: '>= 0.4'} + dependencies: + function-bind: 1.1.2 + dev: false + /he@1.2.0: resolution: {integrity: sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==} hasBin: true @@ -10660,6 +10830,10 @@ packages: /merge-descriptors@1.0.1: resolution: {integrity: sha512-cCi6g3/Zr1iqQi6ySbseM1Xvooa98N0w31jzUYrXPX2xqObmFGHJ0tQ5u74H3mVh7wLouTseZyYIq39g8cNp1w==} + /merge-descriptors@1.0.3: + resolution: {integrity: sha512-gaNvAS7TZ897/rVaZ0nMtAyxNyi/pdbjbAwUpFQpN70GqnVfOiXpeUUMKRBmzXaSQ8DdTX4/0ms62r2K+hE6mQ==} + dev: false + /merge-stream@2.0.0: resolution: {integrity: sha512-abv/qOcuPfk3URPfDzmZU1LKmuw8kT+0nIHvKrKgFrwifol/doWcdA4ZqsWQ8ENrFKkd67Mfpo/LovbIUsbt3w==} dev: true @@ -11046,6 +11220,11 @@ packages: /object-inspect@1.12.3: resolution: {integrity: sha512-geUvdk7c+eizMNUDkRpW1wJwgfOiOeHbxBR/hLXK1aT6zmVSO0jsQcs7fj6MGw89jC/cjGfLcNOrtMYtGqm81g==} + /object-inspect@1.13.3: + resolution: {integrity: sha512-kDCGIbxkDSXE3euJZZXzc6to7fCrKHNI/hSRQnRuQ+BWjFNzZwiFF8fj/6o2t2G9/jTj8PSIYTfCLelLZEeRpA==} + engines: {node: '>= 0.4'} + dev: false + /object-keys@1.1.1: resolution: {integrity: sha512-NuAESUOUMrlIXOfHKzD6bpPu3tYt3xvjNdRIQ+FeT0lNb4K8WR70CaDxhuNguS2XG+GjkyMwOzsN5ZktImfhLA==} engines: {node: '>= 0.4'} @@ -11283,6 +11462,10 @@ packages: lru-cache: 10.2.0 minipass: 5.0.0 + /path-to-regexp@0.1.10: + resolution: {integrity: sha512-7lf7qcQidTku0Gu3YDPc8DJ1q7OOucfa/BSsIwjuh56VU7katFvuM8hULfkwB3Fns/rsVF7PwPKVw1sl5KQS9w==} + dev: false + /path-to-regexp@0.1.7: resolution: {integrity: sha512-5DFkuoqlv1uYQKxy8omFBeJPQcdoE07Kv2sferDCrAq1ohOU+MSDswDIbnx3YAM60qIOnYa53wBhXW0EbMonrQ==} @@ -12109,6 +12292,13 @@ packages: dependencies: side-channel: 1.0.4 + /qs@6.13.0: + resolution: {integrity: sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==} + engines: {node: '>=0.6'} + dependencies: + side-channel: 1.0.6 + dev: false + /querystringify@2.2.0: resolution: {integrity: sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==} dev: true @@ -12150,6 +12340,16 @@ packages: iconv-lite: 0.4.24 unpipe: 1.0.0 + /raw-body@2.5.2: + resolution: {integrity: sha512-8zGqypfENjCIqGhgXToC8aB2r7YrBX+AQAfIPs/Mlk+BtPTztOvTS01NRW/3Eh60J+a48lt8qsCzirQ6loCVfA==} + engines: {node: '>= 0.8'} + dependencies: + bytes: 3.1.2 + http-errors: 2.0.0 + iconv-lite: 0.4.24 + unpipe: 1.0.0 + dev: false + /rc@1.2.8: resolution: {integrity: sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==} hasBin: true @@ -12556,6 +12756,27 @@ packages: transitivePeerDependencies: - supports-color + /send@0.19.0: + resolution: {integrity: sha512-dW41u5VfLXu8SJh5bwRmyYUbAoSB3c9uQh6L8h/KtsFREPWpbX1lrljJo186Jc4nmci/sGUZ9a0a0J2zgfq2hw==} + engines: {node: '>= 0.8.0'} + dependencies: + debug: 2.6.9 + depd: 2.0.0 + destroy: 1.2.0 + encodeurl: 1.0.2 + escape-html: 1.0.3 + etag: 1.8.1 + fresh: 0.5.2 + http-errors: 2.0.0 + mime: 1.6.0 + ms: 2.1.3 + on-finished: 2.4.1 + range-parser: 1.2.1 + statuses: 2.0.1 + transitivePeerDependencies: + - supports-color + dev: false + /serialize-javascript@6.0.2: resolution: {integrity: sha512-Saa1xPByTTq2gdeFZYLLo+RFE35NHZkAbqZeWNd3BpzppeVisAqpDjcp8dyf6uIvEqJRd46jemmyA4iFIeVk8g==} dependencies: @@ -12573,10 +12794,34 @@ packages: transitivePeerDependencies: - supports-color + /serve-static@1.16.2: + resolution: {integrity: sha512-VqpjJZKadQB/PEbEwvFdO43Ax5dFBZ2UECszz8bQ7pi7wt//PWe1P6MN7eCnjsatYtBT6EuiClbjSWP2WrIoTw==} + engines: {node: '>= 0.8.0'} + dependencies: + encodeurl: 2.0.0 + escape-html: 1.0.3 + parseurl: 1.3.3 + send: 0.19.0 + transitivePeerDependencies: + - supports-color + dev: false + /set-blocking@2.0.0: resolution: {integrity: sha512-KiKBS8AnWGEyLzofFfmvKwpdPzqiy16LvQfK3yv/fVH7Bj13/wl3JSR1J+rfgRE9q7xUJK4qvgS8raSOeLUehw==} dev: true + /set-function-length@1.2.2: + resolution: {integrity: sha512-pgRc4hJ4/sNjWCSS9AmnS40x3bNMDTknHgL5UaMBTMyJnU90EgWh1Rz+MC9eFu4BuN/UwZjKQuY/1v3rM7HMfg==} + engines: {node: '>= 0.4'} + dependencies: + define-data-property: 1.1.4 + es-errors: 1.3.0 + function-bind: 1.1.2 + get-intrinsic: 1.2.4 + gopd: 1.0.1 + has-property-descriptors: 1.0.2 + dev: false + /setprototypeof@1.2.0: resolution: {integrity: sha512-E5LDX7Wrp85Kil5bhZv46j8jOeboKq5JMmYM3gVGdGH8xFpPWXUMsNrlODCrkoxMEeNi/XZIwuRvY4XNwYMJpw==} @@ -12658,6 +12903,16 @@ packages: get-intrinsic: 1.2.1 object-inspect: 1.12.3 + /side-channel@1.0.6: + resolution: {integrity: sha512-fDW/EZ6Q9RiO8eFG8Hj+7u/oW+XrPTIChwCOM2+th2A6OblDtYYIpve9m+KvI9Z4C9qSEXlaGR6bTEYHReuglA==} + engines: {node: '>= 0.4'} + dependencies: + call-bind: 1.0.7 + es-errors: 1.3.0 + get-intrinsic: 1.2.4 + object-inspect: 1.13.3 + dev: false + /signal-exit@3.0.7: resolution: {integrity: sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==} dev: true From 6a566e09add45c09c03fea58cc78ab4ff8d1eab3 Mon Sep 17 00:00:00 2001 From: rafael Date: Thu, 5 Dec 2024 17:17:11 -0300 Subject: [PATCH 3/5] Validate webhook payload with zod --- packages/bsync/package.json | 3 +- packages/bsync/src/api/revenueCat.ts | 34 +++++++++----- .../bsync/src/purchases/revenueCatTypes.ts | 10 ++++ packages/bsync/tests/purchases.test.ts | 47 ++++++++++++------- pnpm-lock.yaml | 3 ++ 5 files changed, 69 insertions(+), 28 deletions(-) diff --git a/packages/bsync/package.json b/packages/bsync/package.json index feade31c73d..9ffca0942d7 100644 --- a/packages/bsync/package.json +++ b/packages/bsync/package.json @@ -38,7 +38,8 @@ "kysely": "^0.22.0", "pg": "^8.10.0", "pino-http": "^8.2.1", - "typed-emitter": "^2.1.0" + "typed-emitter": "^2.1.0", + "zod": "^3.23.8" }, "devDependencies": { "@bufbuild/buf": "^1.28.1", diff --git a/packages/bsync/src/api/revenueCat.ts b/packages/bsync/src/api/revenueCat.ts index 214fbeae79f..8a63b4f8a92 100644 --- a/packages/bsync/src/api/revenueCat.ts +++ b/packages/bsync/src/api/revenueCat.ts @@ -1,6 +1,6 @@ import express, { RequestHandler } from 'express' import { AppContext } from '..' -import { RevenueCatClient } from '../purchases' +import { rcEventBodySchema, RevenueCatClient } from '../purchases' import { addPurchaseOperation, RcEventBody } from '../purchases' import { isValidDid } from '../routes/util' import { httpLogger as log } from '..' @@ -26,18 +26,30 @@ const webhookHandler = async (req, res) => { const { revenueCatClient } = ctx - const body: RcEventBody = req.body - + let body: RcEventBody try { - const { app_user_id: actorDid } = body.event + body = rcEventBodySchema.parse(req.body) + } catch (error) { + log.error({ error }, 'RevenueCat webhook body schema validation failed') - if (!isValidDid(actorDid)) { - return res.status(400).send({ - success: false, - error: 'Bad request: invalid DID in app_user_id', - }) - } + return res.status(400).send({ + success: false, + error: 'Bad request: body schema validation failed', + }) + } + const { app_user_id: actorDid } = body.event + + if (!isValidDid(actorDid)) { + log.error({ actorDid }, 'RevenueCat webhook got invalid DID') + + return res.status(400).send({ + success: false, + error: 'Bad request: invalid DID in app_user_id', + }) + } + + try { const entitlements = await revenueCatClient.getEntitlementIdentifiers(actorDid) @@ -45,7 +57,7 @@ const webhookHandler = res.send({ success: true, operationId: id }) } catch (error) { - log.error(error) + log.error({ error }, 'Error while processing RevenueCat webhook') res.status(500).send({ success: false, diff --git a/packages/bsync/src/purchases/revenueCatTypes.ts b/packages/bsync/src/purchases/revenueCatTypes.ts index 9d216287bbd..50842d05324 100644 --- a/packages/bsync/src/purchases/revenueCatTypes.ts +++ b/packages/bsync/src/purchases/revenueCatTypes.ts @@ -1,3 +1,5 @@ +import { z } from 'zod' + // Reference: https://www.revenuecat.com/docs/integrations/webhooks/event-types-and-fields#events-format export type RcEventBody = { api_version: '1.0' @@ -21,3 +23,11 @@ export type RcSubscriber = { export type RcEntitlement = { expires_date: string } + +export const rcEventBodySchema = z.object({ + api_version: z.literal('1.0'), + event: z.object({ + app_user_id: z.string(), + type: z.string(), + }), +}) diff --git a/packages/bsync/tests/purchases.test.ts b/packages/bsync/tests/purchases.test.ts index 50705be476f..239c34b8cd2 100644 --- a/packages/bsync/tests/purchases.test.ts +++ b/packages/bsync/tests/purchases.test.ts @@ -2,7 +2,11 @@ import http from 'node:http' import { once } from 'node:events' import getPort from 'get-port' import { BsyncService, Database, envToCfg } from '../src' -import { RcEntitlement, RcGetSubscriberResponse } from '../src/purchases' +import { + RcEntitlement, + RcEventBody, + RcGetSubscriberResponse, +} from '../src/purchases' const revenueCatWebhookAuthorization = 'Bearer any-token' @@ -62,7 +66,7 @@ describe('purchases', () => { }) describe('webhook handler', () => { - it('returns 403 if authorization is invalid', async () => { + it('replies 403 if authorization is invalid', async () => { const response = await fetch(`${bsyncUrl}/webhooks/revenuecat`, { method: 'POST', body: JSON.stringify({ event: { app_user_id: actorDid } }), @@ -78,10 +82,8 @@ describe('purchases', () => { }) }) - it('returns 400 if DID is invalid', async () => { - const response = await callWebhook(bsyncUrl, { - event: { app_user_id: 'invalidDid' }, - }) + it('replies 400 if DID is invalid', async () => { + const response = await callWebhook(bsyncUrl, buildWebhookBody('invalid')) expect(response.status).toBe(400) expect(response.json()).resolves.toMatchObject({ @@ -89,6 +91,17 @@ describe('purchases', () => { }) }) + it('replies 400 if body is invalid', async () => { + const response = await callWebhook(bsyncUrl, { + any: 'thing ', + } as unknown as RcEventBody) + + expect(response.status).toBe(400) + expect(response.json()).resolves.toMatchObject({ + error: 'Bad request: body schema validation failed', + }) + }) + it('stores valid entitlements from the API response, excluding expired', async () => { revenueCatApiMock.mockReturnValueOnce({ subscriber: { @@ -96,9 +109,7 @@ describe('purchases', () => { }, }) - await callWebhook(bsyncUrl, { - event: { app_user_id: actorDid }, - }) + await callWebhook(bsyncUrl, buildWebhookBody(actorDid)) const op0 = await bsync.ctx.db.db .selectFrom('purchase_op') @@ -132,9 +143,7 @@ describe('purchases', () => { }, }) - await callWebhook(bsyncUrl, { - event: { app_user_id: actorDid }, - }) + await callWebhook(bsyncUrl, buildWebhookBody(actorDid)) const op1 = await bsync.ctx.db.db .selectFrom('purchase_op') @@ -168,9 +177,7 @@ describe('purchases', () => { subscriber: { entitlements: {} }, }) - await callWebhook(bsyncUrl, { - event: { app_user_id: actorDid }, - }) + await callWebhook(bsyncUrl, buildWebhookBody(actorDid)) const op = await bsync.ctx.db.db .selectFrom('purchase_op') @@ -206,9 +213,17 @@ const clearPurchases = async (db: Database) => { await db.db.deleteFrom('purchase_op').execute() } +const buildWebhookBody = (actorDid: string): RcEventBody => ({ + api_version: '1.0', + event: { + app_user_id: actorDid, + type: 'INITIAL_PURCHASE', + }, +}) + const callWebhook = async ( baseUrl: string, - body: Record, + body: RcEventBody, ): Promise => { return fetch(`${baseUrl}/webhooks/revenuecat`, { method: 'POST', diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fca90ea56e8..f22d52497a6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -344,6 +344,9 @@ importers: typed-emitter: specifier: ^2.1.0 version: 2.1.0 + zod: + specifier: ^3.23.8 + version: 3.23.8 devDependencies: '@bufbuild/buf': specifier: ^1.28.1 From 601dc0a58be5e56ef250bc7570eed03584406d56 Mon Sep 17 00:00:00 2001 From: rafael Date: Thu, 5 Dec 2024 18:40:09 -0300 Subject: [PATCH 4/5] Remove cors middleware --- packages/bsync/package.json | 2 -- packages/bsync/src/index.ts | 3 --- pnpm-lock.yaml | 41 +++++++++++++++++++++++++++++-------- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/packages/bsync/package.json b/packages/bsync/package.json index 9ffca0942d7..7767a1f4469 100644 --- a/packages/bsync/package.json +++ b/packages/bsync/package.json @@ -32,7 +32,6 @@ "@connectrpc/connect-express": "^1.1.4", "@connectrpc/connect-node": "^1.1.4", "compression": "^1.7.4", - "cors": "^2.8.5", "express": "^4.21.1", "http-terminator": "^3.2.0", "kysely": "^0.22.0", @@ -46,7 +45,6 @@ "@bufbuild/protoc-gen-es": "^1.5.0", "@connectrpc/protoc-gen-connect-es": "^1.1.4", "@types/compression": "^1.7.5", - "@types/cors": "^2.8.12", "@types/express": "^4.17.13", "@types/pg": "^8.6.6", "get-port": "^5.1.1", diff --git a/packages/bsync/src/index.ts b/packages/bsync/src/index.ts index d71e9a7cb34..cc0c93f6569 100644 --- a/packages/bsync/src/index.ts +++ b/packages/bsync/src/index.ts @@ -1,5 +1,4 @@ import express from 'express' -import cors from 'cors' import compression from 'compression' import http from 'node:http' import events from 'node:events' @@ -12,7 +11,6 @@ import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' import * as health from './api/health' import * as revenueCat from './api/revenueCat' -import { DAY, SECOND } from '@atproto/common' import { expressConnectMiddleware } from '@connectrpc/connect-express' import { createPurchaseOpChannel } from './db/schema/purchase_op' @@ -48,7 +46,6 @@ export class BsyncService { const ctx = await AppContext.fromConfig(cfg, ac.signal, overrides) const app = express() - app.use(cors({ maxAge: DAY / SECOND })) app.use(loggerMiddleware) app.use(compression()) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f22d52497a6..96d2b118d3d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -323,9 +323,6 @@ importers: compression: specifier: ^1.7.4 version: 1.7.4 - cors: - specifier: ^2.8.5 - version: 2.8.5 express: specifier: ^4.21.1 version: 4.21.1 @@ -360,9 +357,6 @@ importers: '@types/compression': specifier: ^1.7.5 version: 1.7.5 - '@types/cors': - specifier: ^2.8.12 - version: 2.8.12 '@types/express': specifier: ^4.17.13 version: 4.17.21 @@ -377,7 +371,7 @@ importers: version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) ts-node: specifier: ^10.8.2 - version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + version: 10.8.2(@types/node@18.19.56)(typescript@5.6.3) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -10097,7 +10091,7 @@ packages: pretty-format: 28.1.3 slash: 3.0.0 strip-json-comments: 3.1.1 - ts-node: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + ts-node: 10.8.2(@types/node@18.19.56)(typescript@5.6.3) transitivePeerDependencies: - supports-color dev: true @@ -13606,6 +13600,37 @@ packages: yn: 3.1.1 dev: true + /ts-node@10.8.2(@types/node@18.19.56)(typescript@5.6.3): + resolution: {integrity: sha512-LYdGnoGddf1D6v8REPtIH+5iq/gTDuZqv2/UJUU7tKjuEU8xVZorBM+buCGNjj+pGEud+sOoM4CX3/YzINpENA==} + hasBin: true + peerDependencies: + '@swc/core': '>=1.2.50' + '@swc/wasm': '>=1.2.50' + '@types/node': '*' + typescript: '>=2.7' + peerDependenciesMeta: + '@swc/core': + optional: true + '@swc/wasm': + optional: true + dependencies: + '@cspotcode/source-map-support': 0.8.1 + '@tsconfig/node10': 1.0.9 + '@tsconfig/node12': 1.0.11 + '@tsconfig/node14': 1.0.3 + '@tsconfig/node16': 1.0.4 + '@types/node': 18.19.56 + acorn: 8.10.0 + acorn-walk: 8.2.0 + arg: 4.1.3 + create-require: 1.1.1 + diff: 4.0.2 + make-error: 1.3.6 + typescript: 5.6.3 + v8-compile-cache-lib: 3.0.1 + yn: 3.1.1 + dev: true + /tslib@1.14.1: resolution: {integrity: sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==} dev: false From 67cfef3385d60f46ae3a641b95c48ba1f93a46ed Mon Sep 17 00:00:00 2001 From: rafael Date: Fri, 6 Dec 2024 14:00:26 -0300 Subject: [PATCH 5/5] fix typo in env vars --- packages/bsync/src/config.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/bsync/src/config.ts b/packages/bsync/src/config.ts index c6f562d802b..1cb480331ec 100644 --- a/packages/bsync/src/config.ts +++ b/packages/bsync/src/config.ts @@ -93,10 +93,10 @@ export const readEnv = (): ServerEnvironment => { // secrets apiKeys: envList('BSYNC_API_KEYS'), // revenue cat - revenueCatV1ApiKey: envStr('BSKY_REVENUE_CAT_V1_API_KEY'), - revenueCatV1ApiUrl: envStr('BSKY_REVENUE_CAT_V1_API_URL'), + revenueCatV1ApiKey: envStr('BSYNC_REVENUE_CAT_V1_API_KEY'), + revenueCatV1ApiUrl: envStr('BSYNC_REVENUE_CAT_V1_API_URL'), revenueCatWebhookAuthorization: envStr( - 'BSKY_REVENUE_CAT_WEBHOOK_AUTHORIZATION', + 'BSYNC_REVENUE_CAT_WEBHOOK_AUTHORIZATION', ), } }