Skip to content

Commit

Permalink
Add express and refactor webhook handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbsky committed Dec 5, 2024
1 parent d634f9d commit 0faab33
Show file tree
Hide file tree
Showing 18 changed files with 531 additions and 282 deletions.
7 changes: 7 additions & 0 deletions packages/bsync/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
"@atproto/syntax": "workspace:^",
"@bufbuild/protobuf": "^1.5.0",
"@connectrpc/connect": "^1.1.4",
"@connectrpc/connect-express": "^1.1.4",
"@connectrpc/connect-node": "^1.1.4",
"compression": "^1.7.4",
"cors": "^2.8.5",
"express": "^4.21.1",
"http-terminator": "^3.2.0",
"kysely": "^0.22.0",
"pg": "^8.10.0",
Expand All @@ -40,6 +44,9 @@
"@bufbuild/buf": "^1.28.1",
"@bufbuild/protoc-gen-es": "^1.5.0",
"@connectrpc/protoc-gen-connect-es": "^1.1.4",
"@types/compression": "^1.7.5",
"@types/cors": "^2.8.12",
"@types/express": "^4.17.13",
"@types/pg": "^8.6.6",
"get-port": "^5.1.1",
"jest": "^28.1.2",
Expand Down
12 changes: 12 additions & 0 deletions packages/bsync/src/api/health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import express from 'express'
import AppContext from '../context'

export const createRouter = (ctx: AppContext): express.Router => {
const router = express.Router()

router.get('/_health', async function (_req, res) {
res.send({ version: ctx.cfg.service.version })
})

return router
}
76 changes: 76 additions & 0 deletions packages/bsync/src/api/revenueCat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import express, { RequestHandler } from 'express'
import { AppContext } from '..'
import { RevenueCatClient } from '../purchases'
import { addPurchaseOperation, RcEventBody } from '../purchases'
import { isValidDid } from '../routes/util'
import { httpLogger as log } from '..'

type AppContextWithRevenueCatClient = AppContext & {
revenueCatClient: RevenueCatClient
}

const auth =
(ctx: AppContextWithRevenueCatClient): RequestHandler =>
(req: express.Request, res: express.Response, next: express.NextFunction) =>
ctx.revenueCatClient.isWebhookAuthorizationValid(
req.header('Authorization'),
)
? next()
: res.status(403).send({
success: false,
error: 'Forbidden: invalid authentication for RevenueCat webhook',
})

const webhookHandler =
(ctx: AppContextWithRevenueCatClient): RequestHandler =>
async (req, res) => {
const { revenueCatClient } = ctx

const body: RcEventBody = req.body

try {
const { app_user_id: actorDid } = body.event

if (!isValidDid(actorDid)) {
return res.status(400).send({
success: false,
error: 'Bad request: invalid DID in app_user_id',
})
}

const entitlements =
await revenueCatClient.getEntitlementIdentifiers(actorDid)

const id = await addPurchaseOperation(ctx.db, actorDid, entitlements)

res.send({ success: true, operationId: id })
} catch (error) {
log.error(error)

res.status(500).send({
success: false,
error:
'Internal server error: an error happened while processing the request',
})
}
}

const assertAppContextWithRevenueCatClient: (
ctx: AppContext,
) => asserts ctx is AppContextWithRevenueCatClient = (ctx: AppContext) => {
if (!ctx.revenueCatClient) {
throw new Error(
'RevenueCat webhook was tried to be set up without configuring a RevenueCat client.',
)
}
}

export const createRouter = (ctx: AppContext): express.Router => {
assertAppContextWithRevenueCatClient(ctx)

const router = express.Router()
router.use(auth(ctx))
router.use(express.json())
router.post('/', webhookHandler(ctx))
return router
}
2 changes: 1 addition & 1 deletion packages/bsync/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Database from './db'
import { createMuteOpChannel } from './db/schema/mute_op'
import { createNotifOpChannel } from './db/schema/notif_op'
import { EventEmitter } from 'stream'
import { RevenueCatClient } from './subscriptions'
import { RevenueCatClient } from './purchases'

export type AppContextOptions = {
db: Database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Kysely, sql } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable('subs_op')
.createTable('purchase_op')
.addColumn('id', 'bigserial', (col) => col.primaryKey())
.addColumn('actorDid', 'varchar', (col) => col.notNull())
.addColumn('entitlements', 'jsonb', (col) => col.notNull())
Expand All @@ -11,14 +11,14 @@ export async function up(db: Kysely<unknown>): Promise<void> {
)
.execute()
await db.schema
.createTable('subs_item')
.createTable('purchase_item')
.addColumn('actorDid', 'varchar', (col) => col.primaryKey())
.addColumn('entitlements', 'jsonb', (col) => col.notNull())
.addColumn('fromId', 'bigint', (col) => col.notNull())
.execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('subs_item').execute()
await db.schema.dropTable('subs_op').execute()
await db.schema.dropTable('purchase_item').execute()
await db.schema.dropTable('purchase_op').execute()
}
2 changes: 1 addition & 1 deletion packages/bsync/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

export * as _20240108T220751294Z from './20240108T220751294Z-init'
export * as _20240717T224303472Z from './20240717T224303472Z-notif-ops'
export * as _20241205T030533572Z from './20241205T030533572Z-subs'
export * as _20241205T030533572Z from './20241205T030533572Z-purchases'
8 changes: 4 additions & 4 deletions packages/bsync/src/db/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import * as muteOp from './mute_op'
import * as muteItem from './mute_item'
import * as notifOp from './notif_op'
import * as notifItem from './notif_item'
import * as subsOp from './subs_op'
import * as subsItem from './subs_item'
import * as purchaseOp from './purchase_op'
import * as purchaseItem from './purchase_item'

export type DatabaseSchemaType = muteItem.PartialDB &
muteOp.PartialDB &
notifItem.PartialDB &
notifOp.PartialDB &
subsItem.PartialDB &
subsOp.PartialDB
purchaseItem.PartialDB &
purchaseOp.PartialDB

export type DatabaseSchema = Kysely<DatabaseSchemaType>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { ColumnType, Selectable } from 'kysely'

export interface SubsItem {
export interface PurchaseItem {
actorDid: string
// https://github.com/kysely-org/kysely/issues/137
entitlements: ColumnType<string[], string, string>
fromId: number
}

export type SubsItemEntry = Selectable<SubsItem>
export type PurchaseItemEntry = Selectable<PurchaseItem>

export const tableName = 'subs_item'
export const tableName = 'purchase_item'

export type PartialDB = { [tableName]: SubsItem }
export type PartialDB = { [tableName]: PurchaseItem }
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { ColumnType, GeneratedAlways, Selectable } from 'kysely'

export interface SubsOp {
export interface PurchaseOp {
id: GeneratedAlways<number>
actorDid: string
// https://github.com/kysely-org/kysely/issues/137
entitlements: ColumnType<string[], string, string>
createdAt: GeneratedAlways<Date>
}

export type SubsOpEntry = Selectable<SubsOp>
export type PurchaseOpEntry = Selectable<PurchaseOp>

export const tableName = 'subs_op'
export const tableName = 'purchase_op'

export type PartialDB = { [tableName]: SubsOp }
export type PartialDB = { [tableName]: PurchaseOp }

export const createSubsOpChannel = 'subs_op_create' // used with listen/notify
export const createPurchaseOpChannel = 'purchase_op_create' // used with listen/notify
73 changes: 37 additions & 36 deletions packages/bsync/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import express from 'express'
import cors from 'cors'
import compression from 'compression'
import http from 'node:http'
import events from 'node:events'
import { createHttpTerminator, HttpTerminator } from 'http-terminator'
import { connectNodeAdapter } from '@connectrpc/connect-node'
import { dbLogger, loggerMiddleware } from './logger'
import AppContext, { AppContextOptions } from './context'
import { ServerConfig } from './config'
import routes from './routes'
import { createMuteOpChannel } from './db/schema/mute_op'
import { createNotifOpChannel } from './db/schema/notif_op'
import {
isRevenueCatWebhookUrl,
revenueCatWebhookHandler,
} from './subscriptions'
import * as health from './api/health'
import * as revenueCat from './api/revenueCat'
import { DAY, SECOND } from '@atproto/common'
import { expressConnectMiddleware } from '@connectrpc/connect-express'

export * from './config'
export * from './client'
Expand All @@ -21,20 +23,20 @@ export { httpLogger } from './logger'

export class BsyncService {
public ctx: AppContext
public server: http.Server
public app: express.Application
public server?: http.Server
private ac: AbortController
private terminator: HttpTerminator
private terminator?: HttpTerminator
private dbStatsInterval?: NodeJS.Timeout

constructor(opts: {
ctx: AppContext
server: http.Server
app: express.Application
ac: AbortController
}) {
this.ctx = opts.ctx
this.server = opts.server
this.app = opts.app
this.ac = opts.ac
this.terminator = createHttpTerminator({ server: this.server })
}

static async create(
Expand All @@ -43,23 +45,25 @@ export class BsyncService {
): Promise<BsyncService> {
const ac = new AbortController()
const ctx = await AppContext.fromConfig(cfg, ac.signal, overrides)
const handler = connectNodeAdapter({
routes: routes(ctx),
shutdownSignal: ac.signal,
})
const server = http.createServer((req, res) => {
loggerMiddleware(req, res)
if (isHealth(req.url)) {
res.statusCode = 200
res.setHeader('content-type', 'application/json')
return res.end(JSON.stringify({ version: cfg.service.version }))
}
if (isRevenueCatWebhookUrl(req.url)) {
return revenueCatWebhookHandler(ctx, req, res)
}
handler(req, res)
})
return new BsyncService({ ctx, server, ac })

const app = express()
app.use(cors({ maxAge: DAY / SECOND }))
app.use(loggerMiddleware)
app.use(compression())

app.use(
expressConnectMiddleware({
routes: routes(ctx),
shutdownSignal: ac.signal,
}),
)

app.use(health.createRouter(ctx))
if (ctx.revenueCatClient) {
app.use('/webhooks/revenuecat', revenueCat.createRouter(ctx))
}

return new BsyncService({ ctx, app, ac })
}

async start(): Promise<http.Server> {
Expand All @@ -77,15 +81,18 @@ export class BsyncService {
)
}, 10000)
await this.setupAppEvents()
this.server.listen(this.ctx.cfg.service.port)
this.server.keepAliveTimeout = 90000

const server = this.app.listen(this.ctx.cfg.service.port)
server.keepAliveTimeout = 90000
this.server = server
this.terminator = createHttpTerminator({ server: this.server })
await events.once(this.server, 'listening')
return this.server
}

async destroy(): Promise<void> {
this.ac.abort()
await this.terminator.terminate()
await this.terminator?.terminate()
await this.ctx.db.close()
clearInterval(this.dbStatsInterval)
this.dbStatsInterval = undefined
Expand All @@ -111,9 +118,3 @@ export class BsyncService {
}

export default BsyncService

const isHealth = (urlStr: string | undefined) => {
if (!urlStr) return false
const url = new URL(urlStr, 'http://host')
return url.pathname === '/_health'
}
Loading

0 comments on commit 0faab33

Please sign in to comment.