diff --git a/packages/bsync/src/config.ts b/packages/bsync/src/config.ts index d5ea4454c2d..c6f562d802b 100644 --- a/packages/bsync/src/config.ts +++ b/packages/bsync/src/config.ts @@ -23,10 +23,25 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { apiKeys: new Set(env.apiKeys), } + let revenueCatCfg: RevenueCatConfig | undefined + if (env.revenueCatV1ApiKey) { + assert(env.revenueCatV1ApiUrl, 'missing revenue cat v1 api url') + assert( + env.revenueCatWebhookAuthorization, + 'missing revenue cat webhook authorization', + ) + revenueCatCfg = { + v1ApiKey: env.revenueCatV1ApiKey, + v1ApiUrl: env.revenueCatV1ApiUrl, + webhookAuthorization: env.revenueCatWebhookAuthorization, + } + } + return { service: serviceCfg, db: dbCfg, auth: authCfg, + revenueCat: revenueCatCfg, } } @@ -34,6 +49,7 @@ export type ServerConfig = { service: ServiceConfig db: DatabaseConfig auth: AuthConfig + revenueCat?: RevenueCatConfig } type ServiceConfig = { @@ -55,6 +71,12 @@ type AuthConfig = { apiKeys: Set } +type RevenueCatConfig = { + v1ApiUrl: string + v1ApiKey: string + webhookAuthorization: string +} + export const readEnv = (): ServerEnvironment => { return { // service @@ -70,6 +92,12 @@ export const readEnv = (): ServerEnvironment => { dbMigrate: envBool('BSYNC_DB_MIGRATE'), // secrets apiKeys: envList('BSYNC_API_KEYS'), + // revenue cat + revenueCatV1ApiKey: envStr('BSKY_REVENUE_CAT_V1_API_KEY'), + revenueCatV1ApiUrl: envStr('BSKY_REVENUE_CAT_V1_API_URL'), + revenueCatWebhookAuthorization: envStr( + 'BSKY_REVENUE_CAT_WEBHOOK_AUTHORIZATION', + ), } } @@ -87,4 +115,8 @@ export type ServerEnvironment = { dbMigrate?: boolean // secrets apiKeys: string[] + // revenue cat + revenueCatV1ApiUrl?: string + revenueCatV1ApiKey?: string + revenueCatWebhookAuthorization?: string } diff --git a/packages/bsync/src/context.ts b/packages/bsync/src/context.ts index 66a63510ad2..34c66e94f4e 100644 --- a/packages/bsync/src/context.ts +++ b/packages/bsync/src/context.ts @@ -4,21 +4,25 @@ import Database from './db' import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' import { EventEmitter } from 'stream' +import { RevenueCatClient } from './subscriptions' export type AppContextOptions = { db: Database + revenueCatClient: RevenueCatClient | undefined cfg: ServerConfig shutdown: AbortSignal } export class AppContext { db: Database + revenueCatClient: RevenueCatClient | undefined cfg: ServerConfig shutdown: AbortSignal events: TypedEventEmitter constructor(opts: AppContextOptions) { this.db = opts.db + this.revenueCatClient = opts.revenueCatClient this.cfg = opts.cfg this.shutdown = opts.shutdown this.events = new EventEmitter() as TypedEventEmitter @@ -36,7 +40,17 @@ export class AppContext { poolMaxUses: cfg.db.poolMaxUses, poolIdleTimeoutMs: cfg.db.poolIdleTimeoutMs, }) - return new AppContext({ db, cfg, shutdown, ...overrides }) + + let revenueCatClient: RevenueCatClient | undefined + if (cfg.revenueCat) { + revenueCatClient = new RevenueCatClient({ + v1ApiKey: cfg.revenueCat.v1ApiKey, + v1ApiUrl: cfg.revenueCat.v1ApiUrl, + webhookAuthorization: cfg.revenueCat.webhookAuthorization, + }) + } + + return new AppContext({ db, revenueCatClient, cfg, shutdown, ...overrides }) } } diff --git a/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts b/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts new file mode 100644 index 00000000000..12b1a7bcf17 --- /dev/null +++ b/packages/bsync/src/db/migrations/20241205T030533572Z-subs.ts @@ -0,0 +1,24 @@ +import { Kysely, sql } from 'kysely' + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('subs_op') + .addColumn('id', 'bigserial', (col) => col.primaryKey()) + .addColumn('actorDid', 'varchar', (col) => col.notNull()) + .addColumn('entitlements', 'jsonb', (col) => col.notNull()) + .addColumn('createdAt', 'timestamptz', (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .execute() + await db.schema + .createTable('subs_item') + .addColumn('actorDid', 'varchar', (col) => col.primaryKey()) + .addColumn('entitlements', 'jsonb', (col) => col.notNull()) + .addColumn('fromId', 'bigint', (col) => col.notNull()) + .execute() +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('subs_item').execute() + await db.schema.dropTable('subs_op').execute() +} diff --git a/packages/bsync/src/db/migrations/index.ts b/packages/bsync/src/db/migrations/index.ts index 2d7485b4448..bb0b7923a9d 100644 --- a/packages/bsync/src/db/migrations/index.ts +++ b/packages/bsync/src/db/migrations/index.ts @@ -4,3 +4,4 @@ export * as _20240108T220751294Z from './20240108T220751294Z-init' export * as _20240717T224303472Z from './20240717T224303472Z-notif-ops' +export * as _20241205T030533572Z from './20241205T030533572Z-subs' diff --git a/packages/bsync/src/db/schema/index.ts b/packages/bsync/src/db/schema/index.ts index 922286feb10..dea00f7665e 100644 --- a/packages/bsync/src/db/schema/index.ts +++ b/packages/bsync/src/db/schema/index.ts @@ -3,11 +3,15 @@ import * as muteOp from './mute_op' import * as muteItem from './mute_item' import * as notifOp from './notif_op' import * as notifItem from './notif_item' +import * as subsOp from './subs_op' +import * as subsItem from './subs_item' export type DatabaseSchemaType = muteItem.PartialDB & muteOp.PartialDB & notifItem.PartialDB & - notifOp.PartialDB + notifOp.PartialDB & + subsItem.PartialDB & + subsOp.PartialDB export type DatabaseSchema = Kysely diff --git a/packages/bsync/src/db/schema/subs_item.ts b/packages/bsync/src/db/schema/subs_item.ts new file mode 100644 index 00000000000..523275cec23 --- /dev/null +++ b/packages/bsync/src/db/schema/subs_item.ts @@ -0,0 +1,14 @@ +import { ColumnType, Selectable } from 'kysely' + +export interface SubsItem { + actorDid: string + // https://github.com/kysely-org/kysely/issues/137 + entitlements: ColumnType + fromId: number +} + +export type SubsItemEntry = Selectable + +export const tableName = 'subs_item' + +export type PartialDB = { [tableName]: SubsItem } diff --git a/packages/bsync/src/db/schema/subs_op.ts b/packages/bsync/src/db/schema/subs_op.ts new file mode 100644 index 00000000000..e7feb00a1be --- /dev/null +++ b/packages/bsync/src/db/schema/subs_op.ts @@ -0,0 +1,17 @@ +import { ColumnType, GeneratedAlways, Selectable } from 'kysely' + +export interface SubsOp { + id: GeneratedAlways + actorDid: string + // https://github.com/kysely-org/kysely/issues/137 + entitlements: ColumnType + createdAt: GeneratedAlways +} + +export type SubsOpEntry = Selectable + +export const tableName = 'subs_op' + +export type PartialDB = { [tableName]: SubsOp } + +export const createSubsOpChannel = 'subs_op_create' // used with listen/notify diff --git a/packages/bsync/src/index.ts b/packages/bsync/src/index.ts index bf9b944267f..297326b19a1 100644 --- a/packages/bsync/src/index.ts +++ b/packages/bsync/src/index.ts @@ -8,6 +8,10 @@ import { ServerConfig } from './config' import routes from './routes' import { createMuteOpChannel } from './db/schema/mute_op' import { createNotifOpChannel } from './db/schema/notif_op' +import { + isRevenueCatWebhookUrl, + revenueCatWebhookHandler, +} from './subscriptions' export * from './config' export * from './client' @@ -50,6 +54,9 @@ export class BsyncService { res.setHeader('content-type', 'application/json') return res.end(JSON.stringify({ version: cfg.service.version })) } + if (isRevenueCatWebhookUrl(req.url)) { + return revenueCatWebhookHandler(ctx, req, res) + } handler(req, res) }) return new BsyncService({ ctx, server, ac }) diff --git a/packages/bsync/src/subscriptions/index.ts b/packages/bsync/src/subscriptions/index.ts new file mode 100644 index 00000000000..a1a87a784ee --- /dev/null +++ b/packages/bsync/src/subscriptions/index.ts @@ -0,0 +1,2 @@ +export * from './revenueCatClient' +export * from './revenueCatWebhookHandler' diff --git a/packages/bsync/src/subscriptions/revenueCatClient.ts b/packages/bsync/src/subscriptions/revenueCatClient.ts new file mode 100644 index 00000000000..e71b61e8ed4 --- /dev/null +++ b/packages/bsync/src/subscriptions/revenueCatClient.ts @@ -0,0 +1,73 @@ +type Config = { + v1ApiKey: string + v1ApiUrl: string + webhookAuthorization: string +} + +// Reference: https://www.revenuecat.com/docs/api-v1#tag/customers +export type GetSubscriberResponse = { + subscriber: Subscriber +} + +export type Subscriber = { + entitlements: { + [entitlementIdentifier: string]: Entitlement + } +} + +export type Entitlement = { + expires_date: string +} + +export class RevenueCatClient { + private v1ApiKey: string + private v1ApiUrl: string + private webhookAuthorization: string + + constructor({ v1ApiKey, v1ApiUrl, webhookAuthorization }: Config) { + this.v1ApiKey = v1ApiKey + this.v1ApiUrl = v1ApiUrl + this.webhookAuthorization = webhookAuthorization + } + + private async fetch( + path: string, + method: string = 'GET', + ): Promise { + const url = new URL(path, this.v1ApiUrl) + const res = await fetch(url, { + method, + headers: { + Authorization: `Bearer ${this.v1ApiKey}`, + }, + }) + + if (!res.ok) { + throw new Error(`Failed to fetch ${path}: ${res.statusText}`) + } + + return res.json() as T + } + + private getSubscriber(did: string): Promise { + return this.fetch( + `/subscribers/${encodeURIComponent(did)}`, + ) + } + + async getEntitlementIdentifiers(did: string): Promise { + const { subscriber } = await this.getSubscriber(did) + + const now = Date.now() + return Object.entries(subscriber.entitlements) + .filter( + ([_, entitlement]) => + now < new Date(entitlement.expires_date).valueOf(), + ) + .map(([entitlementIdentifier]) => entitlementIdentifier) + } + + isWebhookAuthorizationValid(authorization: string | undefined): boolean { + return authorization === this.webhookAuthorization + } +} diff --git a/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts b/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts new file mode 100644 index 00000000000..4e7fc7fa49e --- /dev/null +++ b/packages/bsync/src/subscriptions/revenueCatWebhookHandler.ts @@ -0,0 +1,174 @@ +import { IncomingMessage, ServerResponse } from 'node:http' +import AppContext from '../context' +import { isValidDid } from '../routes/util' +import { Database } from '..' +import { sql } from 'kysely' +import { createSubsOpChannel } from '../db/schema/subs_op' + +export const isRevenueCatWebhookUrl = (urlStr: string | undefined) => { + if (!urlStr) return false + const url = new URL(urlStr, 'http://host') + return url.pathname === '/webhooks/revenuecat' +} + +const parseBody = async (req: IncomingMessage) => + new Promise((resolve, reject) => { + let body = '' + req.on('data', (chunk) => (body += chunk)) + req.on('end', () => { + try { + resolve(JSON.parse(body)) + } catch (err) { + reject(err) + } + }) + req.on('error', reject) + }) + +// Reference: https://www.revenuecat.com/docs/integrations/webhooks/event-types-and-fields#events-format +type RevenueCatEventBody = { + api_version: '1.0' + event: { + app_user_id: string + type: string + } +} + +export const revenueCatWebhookHandler = async ( + ctx: AppContext, + req: IncomingMessage, + res: ServerResponse, +) => { + const { db, revenueCatClient } = ctx + + res.setHeader('content-type', 'application/json') + + if (!revenueCatClient) { + res.statusCode = 501 + return res.end( + JSON.stringify({ + error: + 'Not Implemented: bsync is being served without RevenueCat support', + }), + ) + } + + if ( + !revenueCatClient.isWebhookAuthorizationValid(req.headers['authorization']) + ) { + res.statusCode = 403 + return res.end( + JSON.stringify({ + error: 'Forbidden: invalid authentication for RevenueCat webhook', + }), + ) + } + + if (req.method !== 'POST') { + res.statusCode = 501 + return res.end( + JSON.stringify({ + error: + 'Not Implemented: only POST method is supported for RevenueCat webhook', + }), + ) + } + + if (req.headers['content-type'] !== 'application/json') { + res.statusCode = 400 + return res.end( + JSON.stringify({ + error: + 'Bad request: body must be JSON with Content-Type: application/json', + }), + ) + } + + let body: RevenueCatEventBody + try { + body = (await parseBody(req)) as RevenueCatEventBody + } catch (error) { + res.statusCode = 400 + return res.end( + JSON.stringify({ + error: 'Bad request: malformed JSON body', + }), + ) + } + + try { + const { app_user_id: actorDid } = body.event + + if (!isValidDid(actorDid)) { + res.statusCode = 400 + return res.end( + JSON.stringify({ + error: 'Bad request: invalid DID in app_user_id', + }), + ) + } + + const entitlements = + await revenueCatClient.getEntitlementIdentifiers(actorDid) + + const id = await db.transaction(async (txn) => { + // create subs op + const id = await createSubsOp(txn, actorDid, entitlements) + // update subs state + await updateSubsItem(txn, id, actorDid, entitlements) + return id + }) + + res.statusCode = 200 + res.end(JSON.stringify({ success: true, operationId: id })) + } catch (error) { + res.statusCode = 500 + return res.end( + JSON.stringify({ + error: + 'Internal server error: an error happened while processing the request', + }), + ) + } +} + +const createSubsOp = async ( + db: Database, + actorDid: string, + entitlements: string[], +) => { + const { ref } = db.db.dynamic + const { id } = await db.db + .insertInto('subs_op') + .values({ + actorDid, + entitlements: JSON.stringify(entitlements), + }) + .returning('id') + .executeTakeFirstOrThrow() + await sql`notify ${ref(createSubsOpChannel)}`.execute(db.db) // emitted transactionally + return id +} + +const updateSubsItem = async ( + db: Database, + fromId: number, + actorDid: string, + entitlements: string[], +) => { + const { ref } = db.db.dynamic + await db.db + .insertInto('subs_item') + .values({ + actorDid, + entitlements: JSON.stringify(entitlements), + fromId, + }) + .onConflict((oc) => + oc.column('actorDid').doUpdateSet({ + entitlements: sql`${ref('excluded.entitlements')}`, + fromId: sql`${ref('excluded.fromId')}`, + }), + ) + .execute() +} diff --git a/packages/bsync/tests/subscriptions.test.ts b/packages/bsync/tests/subscriptions.test.ts new file mode 100644 index 00000000000..31a6151a027 --- /dev/null +++ b/packages/bsync/tests/subscriptions.test.ts @@ -0,0 +1,241 @@ +import http, { IncomingMessage, ServerResponse } from 'node:http' +import { once } from 'node:events' +import getPort from 'get-port' +import { BsyncService, Database, envToCfg } from '../src' +import { Entitlement, GetSubscriberResponse } from '../src/subscriptions' + +const revenueCatWebhookAuthorization = 'Bearer any-token' + +describe('subscriptions', () => { + let bsync: BsyncService + let bsyncUrl: string + + const actorDid = 'did:example:a' + + let revenueCatServer: http.Server + let revenueCatApiMock: jest.Mock + + const TEN_MINUTES = 600_000 + const entitlementValid: Entitlement = { + expires_date: new Date(Date.now() + TEN_MINUTES).toISOString(), + } + const entitlementExpired: Entitlement = { + expires_date: new Date(Date.now() - TEN_MINUTES).toISOString(), + } + + beforeAll(async () => { + const revenueCatPort = await getPort() + + revenueCatApiMock = jest.fn() + revenueCatServer = await createMockRevenueCatService( + revenueCatPort, + revenueCatApiMock, + ) + + bsync = await BsyncService.create( + envToCfg({ + port: await getPort(), + dbUrl: process.env.DB_POSTGRES_URL, + dbSchema: 'bsync_subscriptions', + apiKeys: ['key-1'], + longPollTimeoutMs: 500, + revenueCatV1ApiKey: 'any-key', + revenueCatV1ApiUrl: `http://localhost:${revenueCatPort}`, + revenueCatWebhookAuthorization, + }), + ) + + bsyncUrl = `http://localhost:${bsync.ctx.cfg.service.port}` + + await bsync.ctx.db.migrateToLatestOrThrow() + await bsync.start() + }) + + afterAll(async () => { + await bsync.destroy() + revenueCatServer.close() + await once(revenueCatServer, 'close') + }) + + beforeEach(async () => { + await clearSubs(bsync.ctx.db) + }) + + describe('webhook handler', () => { + it('returns 403 if authorization is invalid', async () => { + const response = await fetch(`${bsyncUrl}/webhooks/revenuecat`, { + method: 'POST', + body: JSON.stringify({ event: { app_user_id: actorDid } }), + headers: { + Authorization: `not ${revenueCatWebhookAuthorization}`, + 'Content-Type': 'application/json', + }, + }) + + expect(response.status).toBe(403) + expect(response.json()).resolves.toMatchObject({ + error: 'Forbidden: invalid authentication for RevenueCat webhook', + }) + }) + + it('stores valid entitlements from the API response, excluding expired', async () => { + revenueCatApiMock.mockReturnValueOnce({ + subscriber: { + entitlements: { entitlementExpired }, + }, + }) + + await callWebhook(bsyncUrl, { + event: { app_user_id: actorDid }, + }) + + const op0 = await bsync.ctx.db.db + .selectFrom('subs_op') + .selectAll() + .where('actorDid', '=', actorDid) + .orderBy('id', 'desc') + .executeTakeFirstOrThrow() + + expect(op0).toMatchObject({ + id: expect.any(Number), + actorDid, + entitlements: [], + createdAt: expect.any(Date), + }) + + await expect( + bsync.ctx.db.db + .selectFrom('subs_item') + .selectAll() + .where('actorDid', '=', actorDid) + .executeTakeFirstOrThrow(), + ).resolves.toMatchObject({ + actorDid, + entitlements: [], + fromId: op0.id, + }) + + revenueCatApiMock.mockReturnValueOnce({ + subscriber: { + entitlements: { entitlementValid, entitlementExpired }, + }, + }) + + await callWebhook(bsyncUrl, { + event: { app_user_id: actorDid }, + }) + + const op1 = await bsync.ctx.db.db + .selectFrom('subs_op') + .selectAll() + .where('actorDid', '=', actorDid) + .orderBy('id', 'desc') + .executeTakeFirstOrThrow() + + expect(op1).toMatchObject({ + id: expect.any(Number), + actorDid, + entitlements: ['entitlementValid'], + createdAt: expect.any(Date), + }) + + await expect( + bsync.ctx.db.db + .selectFrom('subs_item') + .selectAll() + .where('actorDid', '=', actorDid) + .executeTakeFirstOrThrow(), + ).resolves.toMatchObject({ + actorDid, + entitlements: ['entitlementValid'], + fromId: op1.id, + }) + }) + + it('sets empty array in the cache if no entitlements are present at all', async () => { + revenueCatApiMock.mockReturnValue({ + subscriber: { entitlements: {} }, + }) + + await callWebhook(bsyncUrl, { + event: { app_user_id: actorDid }, + }) + + const op = await bsync.ctx.db.db + .selectFrom('subs_op') + .selectAll() + .where('actorDid', '=', actorDid) + .orderBy('id', 'desc') + .executeTakeFirstOrThrow() + + expect(op).toMatchObject({ + id: expect.any(Number), + actorDid, + entitlements: [], + createdAt: expect.any(Date), + }) + + await expect( + bsync.ctx.db.db + .selectFrom('subs_item') + .selectAll() + .where('actorDid', '=', actorDid) + .executeTakeFirstOrThrow(), + ).resolves.toMatchObject({ + actorDid, + entitlements: [], + fromId: op.id, + }) + }) + }) +}) + +const clearSubs = async (db: Database) => { + await db.db.deleteFrom('subs_item').execute() + await db.db.deleteFrom('subs_op').execute() +} + +const callWebhook = async ( + baseUrl: string, + body: Record, +): Promise => { + const response = await fetch(`${baseUrl}/webhooks/revenuecat`, { + method: 'POST', + body: JSON.stringify(body), + headers: { + Authorization: revenueCatWebhookAuthorization, + 'Content-Type': 'application/json', + }, + }) + + if (!response.ok) { + throw new Error( + `Unexpected status on calling the webhook: '${response.status}'`, + ) + } + + return response +} + +const createMockRevenueCatService = async ( + port: number, + apiMock: jest.Mock, +): Promise => { + const server = http.createServer((req, res) => { + if (!req.url) { + throw new Error('Unexpected empty URL in request to RevenueCat mock') + } + + if (/^\/subscribers\/(.*)$/.test(req.url)) { + const response = apiMock(req, res) + res.statusCode = 200 + return res.end(JSON.stringify(response)) + } + + throw new Error('Unexpected URL in request to RevenueCat mock') + }) + + server.listen(port) + await once(server, 'listening') + return server +}