diff --git a/package-lock.json b/package-lock.json index 20863ce..c807444 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,12 +25,14 @@ "kysely": "^0.27.4", "multiformats": "^9.9.0", "pino": "^9.3.2", - "uhtml": "^4.5.9" + "uhtml": "^4.5.9", + "ws": "^8.18.0" }, "devDependencies": { "@atproto/lex-cli": "^0.4.1", "@types/better-sqlite3": "^7.6.11", "@types/express": "^4.17.21", + "@types/ws": "^8.5.13", "pino-pretty": "^11.0.0", "rimraf": "^5.0.0", "ts-node": "^10.9.2", @@ -1450,6 +1452,16 @@ "@types/send": "*" } }, + "node_modules/@types/ws": { + "version": "8.5.13", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.13.tgz", + "integrity": "sha512-osM/gWBTPKgHV8XkTunnegTRIsvF6owmf5w+JtAfOw472dptdm0dlGv4xCt6GwQRcC2XVOvvRE/0bAoQcL2QkA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@webreflection/signal": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/@webreflection/signal/-/signal-2.1.2.tgz", diff --git a/package.json b/package.json index c89956d..7c52871 100644 --- a/package.json +++ b/package.json @@ -30,12 +30,14 @@ "kysely": "^0.27.4", "multiformats": "^9.9.0", "pino": "^9.3.2", - "uhtml": "^4.5.9" + "uhtml": "^4.5.9", + "ws": "^8.18.0" }, "devDependencies": { "@atproto/lex-cli": "^0.4.1", "@types/better-sqlite3": "^7.6.11", "@types/express": "^4.17.21", + "@types/ws": "^8.5.13", "pino-pretty": "^11.0.0", "rimraf": "^5.0.0", "ts-node": "^10.9.2", diff --git a/src/ingester.ts b/src/firehose-ingester.ts similarity index 95% rename from src/ingester.ts rename to src/firehose-ingester.ts index bd09c32..6f73a9c 100644 --- a/src/ingester.ts +++ b/src/firehose-ingester.ts @@ -4,7 +4,7 @@ import { Firehose } from '@atproto/sync' import type { Database } from '#/db' import * as Status from '#/lexicon/types/xyz/statusphere/status' -export function createIngester(db: Database, idResolver: IdResolver) { +export function createFirehoseIngester(db: Database, idResolver: IdResolver) { const logger = pino({ name: 'firehose ingestion' }) return new Firehose({ idResolver, @@ -53,4 +53,4 @@ export function createIngester(db: Database, idResolver: IdResolver) { excludeIdentity: true, excludeAccount: true, }) -} +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 04e378c..febcaee 100755 --- a/src/index.ts +++ b/src/index.ts @@ -7,17 +7,17 @@ import { Firehose } from '@atproto/sync' import { createDb, migrateToLatest } from '#/db' import { env } from '#/lib/env' -import { createIngester } from '#/ingester' +import { createJetstreamIngester, Jetstream } from '#/jetstream-ingester' +import { createFirehoseIngester } from '#/firehose-ingester' import { createRouter } from '#/routes' import { createClient } from '#/auth/client' import { createBidirectionalResolver, createIdResolver, BidirectionalResolver } from '#/id-resolver' import type { Database } from '#/db' -import { IdResolver, MemoryCache } from '@atproto/identity' // Application state passed to the router and elsewhere export type AppContext = { db: Database - ingester: Firehose + ingester: Jetstream | Firehose logger: pino.Logger oauthClient: OAuthClient resolver: BidirectionalResolver @@ -41,7 +41,11 @@ export class Server { // Create the atproto utilities const oauthClient = await createClient(db) const baseIdResolver = createIdResolver() - const ingester = createIngester(db, baseIdResolver) + + // Uncomment whichever ingester you want to use + // const ingester = createFirehoseIngester(db, baseIdResolver) + const ingester = createJetstreamIngester(db) + const resolver = createBidirectionalResolver(baseIdResolver) const ctx = { db, diff --git a/src/jetstream-ingester.ts b/src/jetstream-ingester.ts new file mode 100644 index 0000000..983aac5 --- /dev/null +++ b/src/jetstream-ingester.ts @@ -0,0 +1,116 @@ +import WebSocket from 'ws' +import pino from 'pino'; +import type { Database } from '#/db' +import * as Status from '#/lexicon/types/xyz/statusphere/status' + +export function createJetstreamIngester(db: Database) { + const logger = pino({ name: 'websocket ingestion' }); + + return new Jetstream({ + db, + handleEvent: async (evt) => { + const now = new Date(); + const record = evt.commit?.record; + + if ( + (evt.commit?.operation === 'create' || evt.commit?.operation === 'update') && + evt.commit?.collection === 'xyz.statusphere.status' && + Status.isRecord(record) && + Status.validateRecord(record).success + ) { + await db + .insertInto('status') + .values({ + uri: evt.commit.rkey, + authorDid: evt.did, + status: record.status, + createdAt: record.createdAt, + indexedAt: now.toISOString(), + }) + .onConflict((oc) => + oc.column('uri').doUpdateSet({ + status: record.status, + indexedAt: now.toISOString(), + }) + ) + .execute(); + } else if ( + evt.commit?.operation === 'delete' && + evt.commit?.collection === 'xyz.statusphere.status' + ) { + await db.deleteFrom('status').where('uri', '=', evt.commit.rkey).execute(); + } + }, + onError: (err) => { + logger.error({ err }, 'error during WebSocket ingestion'); + }, + wantedCollections: ['xyz.statusphere.status'] + }); +} + +export class Jetstream { + private db: Database; + private handleEvent: (evt: any) => Promise; + private onError: (err: any) => void; + private ws?: WebSocket; + private isStarted = false; + private wantedCollections: string[]; + + constructor({ + db, + handleEvent, + onError, + wantedCollections, + }: { + db: Database; + handleEvent: (evt: any) => Promise; + onError: (err: any) => void; + wantedCollections: string[]; + }) { + this.db = db; + this.handleEvent = handleEvent; + this.onError = onError; + this.wantedCollections = wantedCollections; + } + + constructUrlWithQuery = (): string => { + const params = new URLSearchParams(); + params.append('wantedCollections', this.wantedCollections.join(',')); + return `wss://jetstream2.us-east.bsky.network/subscribe?${params.toString()}`; + }; + + start() { + if (this.isStarted) return; + this.isStarted = true; + this.ws = new WebSocket(this.constructUrlWithQuery()); + + this.ws.on('open', () => { + console.log('WebSocket connection opened.'); + }); + + this.ws.on('message', async (data) => { + try { + const event = JSON.parse(data.toString()); + await this.handleEvent(event); + } catch (err) { + this.onError(err); + } + }); + + this.ws.on('error', (err) => { + this.onError(err); + }); + + this.ws.on('close', (code, reason) => { + console.log(`WebSocket closed. Code: ${code}, Reason: ${reason}`); + this.isStarted = false; + }); + } + + destroy() { + if (this.ws) { + this.ws.close(); + this.isStarted = false; + } + } +}