diff --git a/.github/workflows/build-and-push-bsky-aws.yaml b/.github/workflows/build-and-push-bsky-aws.yaml index 36b1aa23cb3..d56d56d0030 100644 --- a/.github/workflows/build-and-push-bsky-aws.yaml +++ b/.github/workflows/build-and-push-bsky-aws.yaml @@ -3,6 +3,7 @@ on: push: branches: - main + - appview-v1-sync-mutes env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/packages/bsky/buf.gen.yaml b/packages/bsky/buf.gen.yaml new file mode 100644 index 00000000000..a81e4248719 --- /dev/null +++ b/packages/bsky/buf.gen.yaml @@ -0,0 +1,12 @@ +version: v1 +plugins: + - plugin: es + opt: + - target=ts + - import_extension=.ts + out: src/proto + - plugin: connect-es + opt: + - target=ts + - import_extension=.ts + out: src/proto diff --git a/packages/bsky/package.json b/packages/bsky/package.json index 8385dab926b..754d3c614fe 100644 --- a/packages/bsky/package.json +++ b/packages/bsky/package.json @@ -28,7 +28,8 @@ "test": "../dev-infra/with-test-redis-and-db.sh jest", "test:log": "tail -50 test.log | pino-pretty", "test:updateSnapshot": "jest --updateSnapshot", - "migration:create": "ts-node ./bin/migration-create.ts" + "migration:create": "ts-node ./bin/migration-create.ts", + "buf:gen": "buf generate ../bsync/proto" }, "dependencies": { "@atproto/api": "workspace:^", @@ -39,6 +40,9 @@ "@atproto/lexicon": "workspace:^", "@atproto/repo": "workspace:^", "@atproto/xrpc-server": "workspace:^", + "@bufbuild/protobuf": "^1.5.0", + "@connectrpc/connect": "^1.1.4", + "@connectrpc/connect-node": "^1.1.4", "@did-plc/lib": "^0.0.1", "@isaacs/ttlcache": "^1.4.1", "compression": "^1.7.4", @@ -65,6 +69,9 @@ "@atproto/lex-cli": "workspace:^", "@atproto/pds": "workspace:^", "@atproto/xrpc": "workspace:^", + "@bufbuild/buf": "^1.28.1", + "@bufbuild/protoc-gen-es": "^1.5.0", + "@connectrpc/protoc-gen-connect-es": "^1.1.4", "@did-plc/server": "^0.0.1", "@types/cors": "^2.8.12", "@types/express": "^4.17.13", diff --git a/packages/bsky/src/api/app/bsky/graph/muteActor.ts b/packages/bsky/src/api/app/bsky/graph/muteActor.ts index acf72bdd2eb..72a1635c55d 100644 --- a/packages/bsky/src/api/app/bsky/graph/muteActor.ts +++ b/packages/bsky/src/api/app/bsky/graph/muteActor.ts @@ -1,11 +1,14 @@ +import assert from 'node:assert' import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' +import { MuteOperation_Type } from '../../../../proto/bsync_pb' +import { BsyncClient } from '../../../../bsync' export default function (server: Server, ctx: AppContext) { server.app.bsky.graph.muteActor({ auth: ctx.authVerifier.standard, - handler: async ({ auth, input }) => { + handler: async ({ req, auth, input }) => { const { actor } = input.body const requester = auth.credentials.iss const db = ctx.db.getPrimary() @@ -18,10 +21,34 @@ export default function (server: Server, ctx: AppContext) { throw new InvalidRequestError('Cannot mute oneself') } - await ctx.services.graph(db).muteActor({ - subjectDid, - mutedByDid: requester, - }) + const muteActor = async () => { + await ctx.services.graph(db).muteActor({ + subjectDid, + mutedByDid: requester, + }) + } + + const addBsyncMuteOp = async (bsyncClient: BsyncClient) => { + await bsyncClient.addMuteOperation({ + type: MuteOperation_Type.ADD, + actorDid: requester, + subject: subjectDid, + }) + } + + if (ctx.cfg.bsyncOnlyMutes) { + assert(ctx.bsyncClient) + await addBsyncMuteOp(ctx.bsyncClient) + } else { + await muteActor() + if (ctx.bsyncClient) { + try { + await addBsyncMuteOp(ctx.bsyncClient) + } catch (err) { + req.log.warn(err, 'failed to sync mute op to bsync') + } + } + } }, }) } diff --git a/packages/bsky/src/api/app/bsky/graph/muteActorList.ts b/packages/bsky/src/api/app/bsky/graph/muteActorList.ts index d732c3cd89f..f2ee8bfeea0 100644 --- a/packages/bsky/src/api/app/bsky/graph/muteActorList.ts +++ b/packages/bsky/src/api/app/bsky/graph/muteActorList.ts @@ -1,13 +1,16 @@ +import assert from 'node:assert' import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' import * as lex from '../../../../lexicon/lexicons' import AppContext from '../../../../context' import { AtUri } from '@atproto/syntax' +import { MuteOperation_Type } from '../../../../proto/bsync_pb' +import { BsyncClient } from '../../../../bsync' export default function (server: Server, ctx: AppContext) { server.app.bsky.graph.muteActorList({ auth: ctx.authVerifier.standard, - handler: async ({ auth, input }) => { + handler: async ({ req, auth, input }) => { const { list } = input.body const requester = auth.credentials.iss @@ -19,10 +22,34 @@ export default function (server: Server, ctx: AppContext) { throw new InvalidRequestError(`Invalid collection: expected: ${collId}`) } - await ctx.services.graph(db).muteActorList({ - list, - mutedByDid: requester, - }) + const muteActorList = async () => { + await ctx.services.graph(db).muteActorList({ + list, + mutedByDid: requester, + }) + } + + const addBsyncMuteOp = async (bsyncClient: BsyncClient) => { + await bsyncClient.addMuteOperation({ + type: MuteOperation_Type.ADD, + actorDid: requester, + subject: list, + }) + } + + if (ctx.cfg.bsyncOnlyMutes) { + assert(ctx.bsyncClient) + await addBsyncMuteOp(ctx.bsyncClient) + } else { + await muteActorList() + if (ctx.bsyncClient) { + try { + await addBsyncMuteOp(ctx.bsyncClient) + } catch (err) { + req.log.warn(err, 'failed to sync mute op to bsync') + } + } + } }, }) } diff --git a/packages/bsky/src/api/app/bsky/graph/unmuteActor.ts b/packages/bsky/src/api/app/bsky/graph/unmuteActor.ts index 5308aef4f47..e35e00202ef 100644 --- a/packages/bsky/src/api/app/bsky/graph/unmuteActor.ts +++ b/packages/bsky/src/api/app/bsky/graph/unmuteActor.ts @@ -1,11 +1,14 @@ +import assert from 'node:assert' import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' +import { MuteOperation_Type } from '../../../../proto/bsync_pb' +import { BsyncClient } from '../../../../bsync' export default function (server: Server, ctx: AppContext) { server.app.bsky.graph.unmuteActor({ auth: ctx.authVerifier.standard, - handler: async ({ auth, input }) => { + handler: async ({ req, auth, input }) => { const { actor } = input.body const requester = auth.credentials.iss const db = ctx.db.getPrimary() @@ -18,10 +21,34 @@ export default function (server: Server, ctx: AppContext) { throw new InvalidRequestError('Cannot mute oneself') } - await ctx.services.graph(db).unmuteActor({ - subjectDid, - mutedByDid: requester, - }) + const unmuteActor = async () => { + await ctx.services.graph(db).unmuteActor({ + subjectDid, + mutedByDid: requester, + }) + } + + const addBsyncMuteOp = async (bsyncClient: BsyncClient) => { + await bsyncClient.addMuteOperation({ + type: MuteOperation_Type.REMOVE, + actorDid: requester, + subject: subjectDid, + }) + } + + if (ctx.cfg.bsyncOnlyMutes) { + assert(ctx.bsyncClient) + await addBsyncMuteOp(ctx.bsyncClient) + } else { + await unmuteActor() + if (ctx.bsyncClient) { + try { + await addBsyncMuteOp(ctx.bsyncClient) + } catch (err) { + req.log.warn(err, 'failed to sync mute op to bsync') + } + } + } }, }) } diff --git a/packages/bsky/src/api/app/bsky/graph/unmuteActorList.ts b/packages/bsky/src/api/app/bsky/graph/unmuteActorList.ts index 059fa5605d9..18d612d7c95 100644 --- a/packages/bsky/src/api/app/bsky/graph/unmuteActorList.ts +++ b/packages/bsky/src/api/app/bsky/graph/unmuteActorList.ts @@ -1,18 +1,45 @@ +import assert from 'node:assert' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' +import { MuteOperation_Type } from '../../../../proto/bsync_pb' +import { BsyncClient } from '../../../../bsync' export default function (server: Server, ctx: AppContext) { server.app.bsky.graph.unmuteActorList({ auth: ctx.authVerifier.standard, - handler: async ({ auth, input }) => { + handler: async ({ req, auth, input }) => { const { list } = input.body const requester = auth.credentials.iss const db = ctx.db.getPrimary() - await ctx.services.graph(db).unmuteActorList({ - list, - mutedByDid: requester, - }) + const unmuteActorList = async () => { + await ctx.services.graph(db).unmuteActorList({ + list, + mutedByDid: requester, + }) + } + + const addBsyncMuteOp = async (bsyncClient: BsyncClient) => { + await bsyncClient.addMuteOperation({ + type: MuteOperation_Type.REMOVE, + actorDid: requester, + subject: list, + }) + } + + if (ctx.cfg.bsyncOnlyMutes) { + assert(ctx.bsyncClient) + await addBsyncMuteOp(ctx.bsyncClient) + } else { + await unmuteActorList() + if (ctx.bsyncClient) { + try { + await addBsyncMuteOp(ctx.bsyncClient) + } catch (err) { + req.log.warn(err, 'failed to sync mute op to bsync') + } + } + } }, }) } diff --git a/packages/bsky/src/bsync.ts b/packages/bsky/src/bsync.ts new file mode 100644 index 00000000000..40a21a9c2c5 --- /dev/null +++ b/packages/bsky/src/bsync.ts @@ -0,0 +1,41 @@ +import { Service } from './proto/bsync_connect' +import { + Code, + ConnectError, + PromiseClient, + createPromiseClient, + Interceptor, +} from '@connectrpc/connect' +import { + createConnectTransport, + ConnectTransportOptions, +} from '@connectrpc/connect-node' + +export type BsyncClient = PromiseClient + +export const createBsyncClient = ( + opts: ConnectTransportOptions, +): BsyncClient => { + const transport = createConnectTransport(opts) + return createPromiseClient(Service, transport) +} + +export { Code } + +export const isBsyncError = ( + err: unknown, + code?: Code, +): err is ConnectError => { + if (err instanceof ConnectError) { + return !code || err.code === code + } + return false +} + +export const authWithApiKey = + (apiKey: string): Interceptor => + (next) => + (req) => { + req.header.set('authorization', `Bearer ${apiKey}`) + return next(req) + } diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index faa1ac7953d..da695cef4fb 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -31,6 +31,11 @@ export interface ServerConfigValues { imgUriEndpoint?: string blobCacheLocation?: string searchEndpoint?: string + bsyncUrl?: string + bsyncApiKey?: string + bsyncHttpVersion?: '1.1' | '2' + bsyncIgnoreBadTls?: boolean + bsyncOnlyMutes?: boolean adminPassword: string moderatorPassword: string triagePassword: string @@ -88,6 +93,13 @@ export class ServerConfig { const imgUriEndpoint = process.env.IMG_URI_ENDPOINT const blobCacheLocation = process.env.BLOB_CACHE_LOC const searchEndpoint = process.env.SEARCH_ENDPOINT + const bsyncUrl = process.env.BSKY_BSYNC_URL || undefined + const bsyncApiKey = process.env.BSKY_BSYNC_API_KEY || undefined + const bsyncHttpVersion = process.env.BSKY_BSYNC_HTTP_VERSION || '2' + const bsyncIgnoreBadTls = process.env.BSKY_BSYNC_IGNORE_BAD_TLS === 'true' + const bsyncOnlyMutes = process.env.BSKY_BSYNC_ONLY_MUTES === 'true' + assert(!bsyncOnlyMutes || bsyncUrl, 'bsync-only mutes requires a bsync url') + assert(bsyncHttpVersion === '1.1' || bsyncHttpVersion === '2') const dbPrimaryPostgresUrl = overrides?.dbPrimaryPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL let dbReplicaPostgresUrls = overrides?.dbReplicaPostgresUrls @@ -152,6 +164,11 @@ export class ServerConfig { imgUriEndpoint, blobCacheLocation, searchEndpoint, + bsyncUrl, + bsyncApiKey, + bsyncHttpVersion, + bsyncIgnoreBadTls, + bsyncOnlyMutes, adminPassword, moderatorPassword, triagePassword, @@ -268,6 +285,26 @@ export class ServerConfig { return this.cfg.searchEndpoint } + get bsyncUrl() { + return this.cfg.bsyncUrl + } + + get bsyncApiKey() { + return this.cfg.bsyncApiKey + } + + get bsyncOnlyMutes() { + return this.cfg.bsyncOnlyMutes + } + + get bsyncHttpVersion() { + return this.cfg.bsyncHttpVersion + } + + get bsyncIgnoreBadTls() { + return this.cfg.bsyncIgnoreBadTls + } + get adminPassword() { return this.cfg.adminPassword } diff --git a/packages/bsky/src/context.ts b/packages/bsky/src/context.ts index 9a3eb222cdf..fb5993a5aa4 100644 --- a/packages/bsky/src/context.ts +++ b/packages/bsky/src/context.ts @@ -13,6 +13,7 @@ import { MountedAlgos } from './feed-gen/types' import { NotificationServer } from './notifications' import { Redis } from './redis' import { AuthVerifier } from './auth-verifier' +import { BsyncClient } from './bsync' export class AppContext { constructor( @@ -27,6 +28,7 @@ export class AppContext { redis: Redis backgroundQueue: BackgroundQueue searchAgent?: AtpAgent + bsyncClient?: BsyncClient algos: MountedAlgos notifServer: NotificationServer authVerifier: AuthVerifier @@ -77,6 +79,10 @@ export class AppContext { return this.opts.searchAgent } + get bsyncClient(): BsyncClient | undefined { + return this.opts.bsyncClient + } + get authVerifier(): AuthVerifier { return this.opts.authVerifier } diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 7c89a997310..48dcc82fc39 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -34,6 +34,7 @@ import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' import { Redis } from './redis' import { AuthVerifier } from './auth-verifier' +import { authWithApiKey, createBsyncClient } from './bsync' export type { ServerConfigValues } from './config' export type { MountedAlgos } from './feed-gen/types' @@ -135,6 +136,17 @@ export class BskyAppView { triagePass: config.triagePassword, }) + const bsyncClient = config.bsyncUrl + ? createBsyncClient({ + baseUrl: config.bsyncUrl, + httpVersion: config.bsyncHttpVersion ?? '2', + nodeOptions: { rejectUnauthorized: !config.bsyncIgnoreBadTls }, + interceptors: config.bsyncApiKey + ? [authWithApiKey(config.bsyncApiKey)] + : [], + }) + : undefined + const ctx = new AppContext({ db, cfg: config, @@ -146,6 +158,7 @@ export class BskyAppView { redis, backgroundQueue, searchAgent, + bsyncClient, algos, notifServer, authVerifier, diff --git a/packages/bsky/src/ingester/config.ts b/packages/bsky/src/ingester/config.ts index 5c157571f2a..0a3d9e79e5a 100644 --- a/packages/bsky/src/ingester/config.ts +++ b/packages/bsky/src/ingester/config.ts @@ -10,6 +10,10 @@ export interface IngesterConfigValues { redisPassword?: string repoProvider: string labelProvider?: string + bsyncUrl?: string + bsyncApiKey?: string + bsyncHttpVersion?: '1.1' | '2' + bsyncIgnoreBadTls?: boolean ingesterPartitionCount: number ingesterNamespace?: string ingesterSubLockId?: number @@ -42,6 +46,16 @@ export class IngesterConfig { overrides?.redisPassword || process.env.REDIS_PASSWORD || undefined const repoProvider = overrides?.repoProvider || process.env.REPO_PROVIDER // E.g. ws://abc.com:4000 const labelProvider = overrides?.labelProvider || process.env.LABEL_PROVIDER + const bsyncUrl = + overrides?.bsyncUrl || process.env.BSKY_BSYNC_URL || undefined + const bsyncApiKey = + overrides?.bsyncApiKey || process.env.BSKY_BSYNC_API_KEY || undefined + const bsyncHttpVersion = + overrides?.bsyncHttpVersion || process.env.BSKY_BSYNC_HTTP_VERSION || '2' + const bsyncIgnoreBadTls = + overrides?.bsyncIgnoreBadTls || + process.env.BSKY_BSYNC_IGNORE_BAD_TLS === 'true' + assert(bsyncHttpVersion === '1.1' || bsyncHttpVersion === '2') const ingesterPartitionCount = overrides?.ingesterPartitionCount || maybeParseInt(process.env.INGESTER_PARTITION_COUNT) @@ -72,6 +86,10 @@ export class IngesterConfig { redisPassword, repoProvider, labelProvider, + bsyncUrl, + bsyncApiKey, + bsyncHttpVersion, + bsyncIgnoreBadTls, ingesterPartitionCount, ingesterSubLockId, ingesterNamespace, @@ -117,6 +135,22 @@ export class IngesterConfig { return this.cfg.labelProvider } + get bsyncUrl() { + return this.cfg.bsyncUrl + } + + get bsyncApiKey() { + return this.cfg.bsyncApiKey + } + + get bsyncHttpVersion() { + return this.cfg.bsyncHttpVersion + } + + get bsyncIgnoreBadTls() { + return this.cfg.bsyncIgnoreBadTls + } + get ingesterPartitionCount() { return this.cfg.ingesterPartitionCount } diff --git a/packages/bsky/src/ingester/context.ts b/packages/bsky/src/ingester/context.ts index 797545b9f98..debf9843ea6 100644 --- a/packages/bsky/src/ingester/context.ts +++ b/packages/bsky/src/ingester/context.ts @@ -2,6 +2,7 @@ import { PrimaryDatabase } from '../db' import { Redis } from '../redis' import { IngesterConfig } from './config' import { LabelSubscription } from './label-subscription' +import { MuteSubscription } from './mute-subscription' export class IngesterContext { constructor( @@ -10,6 +11,7 @@ export class IngesterContext { redis: Redis cfg: IngesterConfig labelSubscription?: LabelSubscription + muteSubscription?: MuteSubscription }, ) {} @@ -28,6 +30,10 @@ export class IngesterContext { get labelSubscription(): LabelSubscription | undefined { return this.opts.labelSubscription } + + get muteSubscription(): MuteSubscription | undefined { + return this.opts.muteSubscription + } } export default IngesterContext diff --git a/packages/bsky/src/ingester/index.ts b/packages/bsky/src/ingester/index.ts index b923b92c09c..76225f13d38 100644 --- a/packages/bsky/src/ingester/index.ts +++ b/packages/bsky/src/ingester/index.ts @@ -5,7 +5,9 @@ import { Redis } from '../redis' import { IngesterConfig } from './config' import { IngesterContext } from './context' import { IngesterSubscription } from './subscription' +import { authWithApiKey, createBsyncClient } from '../bsync' import { LabelSubscription } from './label-subscription' +import { MuteSubscription } from './mute-subscription' export { IngesterConfig } from './config' export type { IngesterConfigValues } from './config' @@ -27,14 +29,28 @@ export class BskyIngester { cfg: IngesterConfig }): BskyIngester { const { db, redis, cfg } = opts + const bsyncClient = cfg.bsyncUrl + ? createBsyncClient({ + baseUrl: cfg.bsyncUrl, + httpVersion: cfg.bsyncHttpVersion ?? '2', + nodeOptions: { rejectUnauthorized: !cfg.bsyncIgnoreBadTls }, + interceptors: cfg.bsyncApiKey + ? [authWithApiKey(cfg.bsyncApiKey)] + : [], + }) + : undefined const labelSubscription = cfg.labelProvider ? new LabelSubscription(db, cfg.labelProvider) : undefined + const muteSubscription = bsyncClient + ? new MuteSubscription(db, redis, bsyncClient) + : undefined const ctx = new IngesterContext({ db, redis, cfg, labelSubscription, + muteSubscription, }) const sub = new IngesterSubscription(ctx, { service: cfg.repoProvider, @@ -73,11 +89,13 @@ export class BskyIngester { ) }, 500) await this.ctx.labelSubscription?.start() + await this.ctx.muteSubscription?.start() this.sub.run() return this } async destroy(opts?: { skipDb: boolean }): Promise { + await this.ctx.muteSubscription?.destroy() await this.ctx.labelSubscription?.destroy() await this.sub.destroy() clearInterval(this.subStatsInterval) diff --git a/packages/bsky/src/ingester/mute-subscription.ts b/packages/bsky/src/ingester/mute-subscription.ts new file mode 100644 index 00000000000..9adb685c160 --- /dev/null +++ b/packages/bsky/src/ingester/mute-subscription.ts @@ -0,0 +1,213 @@ +import assert from 'node:assert' +import { PrimaryDatabase } from '../db' +import { Redis } from '../redis' +import { BsyncClient, Code, isBsyncError } from '../bsync' +import { MuteOperation, MuteOperation_Type } from '../proto/bsync_pb' +import logger from './logger' +import { wait } from '@atproto/common' +import { + AtUri, + InvalidDidError, + ensureValidAtUri, + ensureValidDid, +} from '@atproto/syntax' +import { ids } from '../lexicon/lexicons' + +const CURSOR_KEY = 'ingester:mute:cursor' + +export class MuteSubscription { + ac = new AbortController() + running: Promise | undefined + cursor: string | null = null + + constructor( + public db: PrimaryDatabase, + public redis: Redis, + public bsyncClient: BsyncClient, + ) {} + + async start() { + if (this.running) return + this.ac = new AbortController() + this.running = this.run() + .catch((err) => { + // allow this to cause an unhandled rejection, let deployment handle the crash. + logger.error({ err }, 'mute subscription crashed') + throw err + }) + .finally(() => (this.running = undefined)) + } + + private async run() { + this.cursor = await this.getCursor() + while (!this.ac.signal.aborted) { + try { + // get page of mute ops, long-polling + const page = await this.bsyncClient.scanMuteOperations( + { + limit: 100, + cursor: this.cursor ?? undefined, + }, + { signal: this.ac.signal }, + ) + if (!page.cursor) { + throw new BadResponseError('cursor is missing') + } + // process + const now = new Date() + for (const op of page.operations) { + if (this.ac.signal.aborted) return + if (op.type === MuteOperation_Type.ADD) { + await this.handleAddOp(op, now) + } else if (op.type === MuteOperation_Type.REMOVE) { + await this.handleRemoveOp(op) + } else if (op.type === MuteOperation_Type.CLEAR) { + await this.handleClearOp(op) + } else { + logger.warn( + { id: op.id, type: op.type }, + 'unknown mute subscription op type', + ) + } + } + // update cursor + await this.setCursor(page.cursor) + this.cursor = page.cursor + } catch (err) { + if (isBsyncError(err, Code.Canceled)) { + return // canceled, probably from destroy() + } + if (err instanceof BadResponseError) { + logger.warn({ err }, 'bad response from bsync') + } else { + logger.error({ err }, 'unexpected error processing mute subscription') + } + await wait(1000) // wait a second before trying again + } + } + } + + async handleAddOp(op: MuteOperation, createdAt: Date) { + assert(op.type === MuteOperation_Type.ADD) + if (!isValidDid(op.actorDid)) { + logger.warn({ id: op.id, type: op.type }, 'bad actor in mute op') + return + } + if (isValidDid(op.subject)) { + await this.db.db + .insertInto('mute') + .values({ + subjectDid: op.subject, + mutedByDid: op.actorDid, + createdAt: createdAt.toISOString(), + }) + .onConflict((oc) => oc.doNothing()) + .execute() + } else { + const listUri = isValidAtUri(op.subject) + ? new AtUri(op.subject) + : undefined + if (listUri?.collection !== ids.AppBskyGraphList) { + logger.warn({ id: op.id, type: op.type }, 'bad subject in mute op') + return + } + await this.db.db + .insertInto('list_mute') + .values({ + listUri: op.subject, + mutedByDid: op.actorDid, + createdAt: createdAt.toISOString(), + }) + .onConflict((oc) => oc.doNothing()) + .execute() + } + } + + async handleRemoveOp(op: MuteOperation) { + assert(op.type === MuteOperation_Type.REMOVE) + if (!isValidDid(op.actorDid)) { + logger.warn({ id: op.id, type: op.type }, 'bad actor in mute op') + return + } + if (isValidDid(op.subject)) { + await this.db.db + .deleteFrom('mute') + .where('subjectDid', '=', op.subject) + .where('mutedByDid', '=', op.actorDid) + .execute() + } else { + const listUri = isValidAtUri(op.subject) + ? new AtUri(op.subject) + : undefined + if (listUri?.collection !== ids.AppBskyGraphList) { + logger.warn({ id: op.id, type: op.type }, 'bad subject in mute op') + return + } + await this.db.db + .deleteFrom('list_mute') + .where('listUri', '=', op.subject) + .where('mutedByDid', '=', op.actorDid) + .execute() + } + } + + async handleClearOp(op: MuteOperation) { + assert(op.type === MuteOperation_Type.CLEAR) + if (!isValidDid(op.actorDid)) { + logger.warn({ id: op.id, type: op.type }, 'bad actor in mute op') + return + } + if (op.subject) { + logger.warn({ id: op.id, type: op.type }, 'bad subject in mute op') + return + } + await this.db.db + .deleteFrom('mute') + .where('mutedByDid', '=', op.actorDid) + .execute() + await this.db.db + .deleteFrom('list_mute') + .where('mutedByDid', '=', op.actorDid) + .execute() + } + + async getCursor(): Promise { + return await this.redis.get(CURSOR_KEY) + } + + async setCursor(cursor: string): Promise { + await this.redis.set(CURSOR_KEY, cursor) + } + + async destroy() { + this.ac.abort() + await this.running + } + + get destroyed() { + return this.ac.signal.aborted + } +} + +class BadResponseError extends Error {} + +const isValidDid = (did: string) => { + try { + ensureValidDid(did) + return true + } catch (err) { + if (err instanceof InvalidDidError) { + return false + } + throw err + } +} + +const isValidAtUri = (uri: string) => { + try { + ensureValidAtUri(uri) + return true + } catch { + return false + } +} diff --git a/packages/bsync/src/gen/bsync_connect.ts b/packages/bsky/src/proto/bsync_connect.ts similarity index 100% rename from packages/bsync/src/gen/bsync_connect.ts rename to packages/bsky/src/proto/bsync_connect.ts diff --git a/packages/bsync/src/gen/bsync_pb.ts b/packages/bsky/src/proto/bsync_pb.ts similarity index 100% rename from packages/bsync/src/gen/bsync_pb.ts rename to packages/bsky/src/proto/bsync_pb.ts diff --git a/packages/bsky/tests/subscription/mutes.test.ts b/packages/bsky/tests/subscription/mutes.test.ts new file mode 100644 index 00000000000..9b3f194050b --- /dev/null +++ b/packages/bsky/tests/subscription/mutes.test.ts @@ -0,0 +1,170 @@ +import AtpAgent from '@atproto/api' +import { wait } from '@atproto/common' +import { TestNetwork, SeedClient, basicSeed, TestBsync } from '@atproto/dev-env' +import assert from 'assert' + +describe('sync mutes', () => { + let network: TestNetwork + let bsync: TestBsync + let pdsAgent: AtpAgent + let sc: SeedClient + + beforeAll(async () => { + assert(process.env.DB_POSTGRES_URL) + bsync = await TestBsync.create({ + dbSchema: 'bsync_subscription_mutes', + dbUrl: process.env.DB_POSTGRES_URL, + }) + network = await TestNetwork.create({ + dbPostgresSchema: 'bsky_subscription_mutes', + bsky: { + bsyncUrl: bsync.url, + bsyncApiKey: [...bsync.ctx.cfg.auth.apiKeys][0], + bsyncHttpVersion: '1.1', + bsyncOnlyMutes: true, + ingester: { + bsyncUrl: bsync.url, + bsyncApiKey: [...bsync.ctx.cfg.auth.apiKeys][0], + bsyncHttpVersion: '1.1', + }, + }, + }) + pdsAgent = network.pds.getClient() + sc = network.getSeedClient() + await basicSeed(sc) + }) + + afterAll(async () => { + await network.close() + await bsync.close() + }) + + it('mutes and unmutes actors.', async () => { + await pdsAgent.api.app.bsky.graph.muteActor( + { actor: sc.dids.alice }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await pdsAgent.api.app.bsky.graph.muteActor( + { actor: sc.dids.carol }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await pdsAgent.api.app.bsky.graph.muteActor( + { actor: sc.dids.dan }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await processAllMuteOps(network, bsync) + const { data: mutes1 } = await pdsAgent.api.app.bsky.graph.getMutes( + {}, + { headers: sc.getHeaders(sc.dids.bob) }, + ) + expect(mutes1.mutes.map((mute) => mute.did)).toEqual([ + sc.dids.dan, + sc.dids.carol, + sc.dids.alice, + ]) + await pdsAgent.api.app.bsky.graph.unmuteActor( + { actor: sc.dids.carol }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await processAllMuteOps(network, bsync) + const { data: mutes2 } = await pdsAgent.api.app.bsky.graph.getMutes( + {}, + { headers: sc.getHeaders(sc.dids.bob) }, + ) + expect(mutes2.mutes.map((mute) => mute.did)).toEqual([ + sc.dids.dan, + sc.dids.alice, + ]) + }) + + it('mutes and unmutes lists.', async () => { + // create lists + const list1 = await pdsAgent.api.app.bsky.graph.list.create( + { repo: sc.dids.bob }, + { + name: 'mod list 1', + purpose: 'app.bsky.graph.defs#modlist', + createdAt: new Date().toISOString(), + }, + sc.getHeaders(sc.dids.bob), + ) + const list2 = await pdsAgent.api.app.bsky.graph.list.create( + { repo: sc.dids.bob }, + { + name: 'mod list 2', + purpose: 'app.bsky.graph.defs#modlist', + createdAt: new Date().toISOString(), + }, + sc.getHeaders(sc.dids.bob), + ) + const list3 = await pdsAgent.api.app.bsky.graph.list.create( + { repo: sc.dids.bob }, + { + name: 'mod list 3', + purpose: 'app.bsky.graph.defs#modlist', + createdAt: new Date().toISOString(), + }, + sc.getHeaders(sc.dids.bob), + ) + await network.processAll() + await pdsAgent.api.app.bsky.graph.muteActorList( + { list: list1.uri }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await pdsAgent.api.app.bsky.graph.muteActorList( + { list: list2.uri }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await pdsAgent.api.app.bsky.graph.muteActorList( + { list: list3.uri }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await processAllMuteOps(network, bsync) + const { data: listmutes1 } = await pdsAgent.api.app.bsky.graph.getListMutes( + {}, + { headers: sc.getHeaders(sc.dids.bob) }, + ) + expect(listmutes1.lists.map((list) => list.uri)).toEqual([ + list3.uri, + list2.uri, + list1.uri, + ]) + await pdsAgent.api.app.bsky.graph.unmuteActorList( + { list: list2.uri }, + { headers: sc.getHeaders(sc.dids.bob), encoding: 'application/json' }, + ) + await processAllMuteOps(network, bsync) + const { data: listmutes2 } = await pdsAgent.api.app.bsky.graph.getListMutes( + {}, + { headers: sc.getHeaders(sc.dids.bob) }, + ) + expect(listmutes2.lists.map((list) => list.uri)).toEqual([ + list3.uri, + list1.uri, + ]) + }) +}) + +async function processAllMuteOps(network: TestNetwork, bsync: TestBsync) { + const getBsyncCursor = async () => { + const result = await bsync.ctx.db.db + .selectFrom('mute_op') + .orderBy('id', 'desc') + .select('id') + .limit(1) + .executeTakeFirst() + return result?.id.toString() ?? null + } + assert(network.bsky.ingester.ctx.muteSubscription) + let total = 0 + while ( + (await getBsyncCursor()) !== + network.bsky.ingester.ctx.muteSubscription.cursor + ) { + if (total > 5000) { + throw new Error('timeout while processing mute ops') + } + await wait(50) + total += 50 + } +} diff --git a/packages/bsync/buf.gen.yaml b/packages/bsync/buf.gen.yaml index 9cccf918873..a81e4248719 100644 --- a/packages/bsync/buf.gen.yaml +++ b/packages/bsync/buf.gen.yaml @@ -4,10 +4,9 @@ plugins: opt: - target=ts - import_extension=.ts - - out: src/gen + out: src/proto - plugin: connect-es opt: - target=ts - import_extension=.ts - out: src/gen + out: src/proto diff --git a/packages/bsync/src/client.ts b/packages/bsync/src/client.ts index ea38f3d01ee..d6c3b9df774 100644 --- a/packages/bsync/src/client.ts +++ b/packages/bsync/src/client.ts @@ -7,7 +7,7 @@ import { ConnectTransportOptions, createConnectTransport, } from '@connectrpc/connect-node' -import { Service } from './gen/bsync_connect' +import { Service } from './proto/bsync_connect' export type BsyncClient = PromiseClient diff --git a/packages/bsync/src/db/schema/mute_op.ts b/packages/bsync/src/db/schema/mute_op.ts index 0efa56daf7b..3ac99276718 100644 --- a/packages/bsync/src/db/schema/mute_op.ts +++ b/packages/bsync/src/db/schema/mute_op.ts @@ -1,5 +1,5 @@ import { GeneratedAlways, Selectable } from 'kysely' -import { MuteOperation_Type } from '../../gen/bsync_pb' +import { MuteOperation_Type } from '../../proto/bsync_pb' export interface MuteOp { id: GeneratedAlways diff --git a/packages/bsync/src/proto/bsync_connect.ts b/packages/bsync/src/proto/bsync_connect.ts new file mode 100644 index 00000000000..94a37266bfd --- /dev/null +++ b/packages/bsync/src/proto/bsync_connect.ts @@ -0,0 +1,54 @@ +// @generated by protoc-gen-connect-es v1.3.0 with parameter "target=ts,import_extension=.ts" +// @generated from file bsync.proto (package bsync, syntax proto3) +/* eslint-disable */ +// @ts-nocheck + +import { + AddMuteOperationRequest, + AddMuteOperationResponse, + PingRequest, + PingResponse, + ScanMuteOperationsRequest, + ScanMuteOperationsResponse, +} from './bsync_pb.ts' +import { MethodKind } from '@bufbuild/protobuf' + +/** + * @generated from service bsync.Service + */ +export const Service = { + typeName: 'bsync.Service', + methods: { + /** + * Sync + * + * @generated from rpc bsync.Service.AddMuteOperation + */ + addMuteOperation: { + name: 'AddMuteOperation', + I: AddMuteOperationRequest, + O: AddMuteOperationResponse, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc bsync.Service.ScanMuteOperations + */ + scanMuteOperations: { + name: 'ScanMuteOperations', + I: ScanMuteOperationsRequest, + O: ScanMuteOperationsResponse, + kind: MethodKind.Unary, + }, + /** + * Ping + * + * @generated from rpc bsync.Service.Ping + */ + ping: { + name: 'Ping', + I: PingRequest, + O: PingResponse, + kind: MethodKind.Unary, + }, + }, +} as const diff --git a/packages/bsync/src/proto/bsync_pb.ts b/packages/bsync/src/proto/bsync_pb.ts new file mode 100644 index 00000000000..aafd4012b25 --- /dev/null +++ b/packages/bsync/src/proto/bsync_pb.ts @@ -0,0 +1,459 @@ +// @generated by protoc-gen-es v1.6.0 with parameter "target=ts,import_extension=.ts" +// @generated from file bsync.proto (package bsync, syntax proto3) +/* eslint-disable */ +// @ts-nocheck + +import type { + BinaryReadOptions, + FieldList, + JsonReadOptions, + JsonValue, + PartialMessage, + PlainMessage, +} from '@bufbuild/protobuf' +import { Message, proto3 } from '@bufbuild/protobuf' + +/** + * @generated from message bsync.MuteOperation + */ +export class MuteOperation extends Message { + /** + * @generated from field: string id = 1; + */ + id = '' + + /** + * @generated from field: bsync.MuteOperation.Type type = 2; + */ + type = MuteOperation_Type.UNSPECIFIED + + /** + * @generated from field: string actor_did = 3; + */ + actorDid = '' + + /** + * @generated from field: string subject = 4; + */ + subject = '' + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.MuteOperation' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: 'id', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { + no: 2, + name: 'type', + kind: 'enum', + T: proto3.getEnumType(MuteOperation_Type), + }, + { no: 3, name: 'actor_did', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { no: 4, name: 'subject', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): MuteOperation { + return new MuteOperation().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): MuteOperation { + return new MuteOperation().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): MuteOperation { + return new MuteOperation().fromJsonString(jsonString, options) + } + + static equals( + a: MuteOperation | PlainMessage | undefined, + b: MuteOperation | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(MuteOperation, a, b) + } +} + +/** + * @generated from enum bsync.MuteOperation.Type + */ +export enum MuteOperation_Type { + /** + * @generated from enum value: TYPE_UNSPECIFIED = 0; + */ + UNSPECIFIED = 0, + + /** + * @generated from enum value: TYPE_ADD = 1; + */ + ADD = 1, + + /** + * @generated from enum value: TYPE_REMOVE = 2; + */ + REMOVE = 2, + + /** + * @generated from enum value: TYPE_CLEAR = 3; + */ + CLEAR = 3, +} +// Retrieve enum metadata with: proto3.getEnumType(MuteOperation_Type) +proto3.util.setEnumType(MuteOperation_Type, 'bsync.MuteOperation.Type', [ + { no: 0, name: 'TYPE_UNSPECIFIED' }, + { no: 1, name: 'TYPE_ADD' }, + { no: 2, name: 'TYPE_REMOVE' }, + { no: 3, name: 'TYPE_CLEAR' }, +]) + +/** + * @generated from message bsync.AddMuteOperationRequest + */ +export class AddMuteOperationRequest extends Message { + /** + * @generated from field: bsync.MuteOperation.Type type = 1; + */ + type = MuteOperation_Type.UNSPECIFIED + + /** + * @generated from field: string actor_did = 2; + */ + actorDid = '' + + /** + * @generated from field: string subject = 3; + */ + subject = '' + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.AddMuteOperationRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { + no: 1, + name: 'type', + kind: 'enum', + T: proto3.getEnumType(MuteOperation_Type), + }, + { no: 2, name: 'actor_did', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { no: 3, name: 'subject', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): AddMuteOperationRequest { + return new AddMuteOperationRequest().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): AddMuteOperationRequest { + return new AddMuteOperationRequest().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): AddMuteOperationRequest { + return new AddMuteOperationRequest().fromJsonString(jsonString, options) + } + + static equals( + a: + | AddMuteOperationRequest + | PlainMessage + | undefined, + b: + | AddMuteOperationRequest + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(AddMuteOperationRequest, a, b) + } +} + +/** + * @generated from message bsync.AddMuteOperationResponse + */ +export class AddMuteOperationResponse extends Message { + /** + * @generated from field: bsync.MuteOperation operation = 1; + */ + operation?: MuteOperation + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.AddMuteOperationResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: 'operation', kind: 'message', T: MuteOperation }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): AddMuteOperationResponse { + return new AddMuteOperationResponse().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): AddMuteOperationResponse { + return new AddMuteOperationResponse().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): AddMuteOperationResponse { + return new AddMuteOperationResponse().fromJsonString(jsonString, options) + } + + static equals( + a: + | AddMuteOperationResponse + | PlainMessage + | undefined, + b: + | AddMuteOperationResponse + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(AddMuteOperationResponse, a, b) + } +} + +/** + * @generated from message bsync.ScanMuteOperationsRequest + */ +export class ScanMuteOperationsRequest extends Message { + /** + * @generated from field: string cursor = 1; + */ + cursor = '' + + /** + * @generated from field: int32 limit = 2; + */ + limit = 0 + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.ScanMuteOperationsRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: 'cursor', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + { no: 2, name: 'limit', kind: 'scalar', T: 5 /* ScalarType.INT32 */ }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): ScanMuteOperationsRequest { + return new ScanMuteOperationsRequest().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): ScanMuteOperationsRequest { + return new ScanMuteOperationsRequest().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): ScanMuteOperationsRequest { + return new ScanMuteOperationsRequest().fromJsonString(jsonString, options) + } + + static equals( + a: + | ScanMuteOperationsRequest + | PlainMessage + | undefined, + b: + | ScanMuteOperationsRequest + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(ScanMuteOperationsRequest, a, b) + } +} + +/** + * @generated from message bsync.ScanMuteOperationsResponse + */ +export class ScanMuteOperationsResponse extends Message { + /** + * @generated from field: repeated bsync.MuteOperation operations = 1; + */ + operations: MuteOperation[] = [] + + /** + * @generated from field: string cursor = 2; + */ + cursor = '' + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.ScanMuteOperationsResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { + no: 1, + name: 'operations', + kind: 'message', + T: MuteOperation, + repeated: true, + }, + { no: 2, name: 'cursor', kind: 'scalar', T: 9 /* ScalarType.STRING */ }, + ]) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): ScanMuteOperationsResponse { + return new ScanMuteOperationsResponse().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): ScanMuteOperationsResponse { + return new ScanMuteOperationsResponse().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): ScanMuteOperationsResponse { + return new ScanMuteOperationsResponse().fromJsonString(jsonString, options) + } + + static equals( + a: + | ScanMuteOperationsResponse + | PlainMessage + | undefined, + b: + | ScanMuteOperationsResponse + | PlainMessage + | undefined, + ): boolean { + return proto3.util.equals(ScanMuteOperationsResponse, a, b) + } +} + +/** + * Ping + * + * @generated from message bsync.PingRequest + */ +export class PingRequest extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.PingRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): PingRequest { + return new PingRequest().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): PingRequest { + return new PingRequest().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): PingRequest { + return new PingRequest().fromJsonString(jsonString, options) + } + + static equals( + a: PingRequest | PlainMessage | undefined, + b: PingRequest | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(PingRequest, a, b) + } +} + +/** + * @generated from message bsync.PingResponse + */ +export class PingResponse extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'bsync.PingResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary( + bytes: Uint8Array, + options?: Partial, + ): PingResponse { + return new PingResponse().fromBinary(bytes, options) + } + + static fromJson( + jsonValue: JsonValue, + options?: Partial, + ): PingResponse { + return new PingResponse().fromJson(jsonValue, options) + } + + static fromJsonString( + jsonString: string, + options?: Partial, + ): PingResponse { + return new PingResponse().fromJsonString(jsonString, options) + } + + static equals( + a: PingResponse | PlainMessage | undefined, + b: PingResponse | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(PingResponse, a, b) + } +} diff --git a/packages/bsync/src/routes/add-mute-operation.ts b/packages/bsync/src/routes/add-mute-operation.ts index fd535cd824b..2b118c443d4 100644 --- a/packages/bsync/src/routes/add-mute-operation.ts +++ b/packages/bsync/src/routes/add-mute-operation.ts @@ -6,8 +6,8 @@ import { ensureValidDid, } from '@atproto/syntax' import { Code, ConnectError, ServiceImpl } from '@connectrpc/connect' -import { Service } from '../gen/bsync_connect' -import { AddMuteOperationResponse, MuteOperation_Type } from '../gen/bsync_pb' +import { Service } from '../proto/bsync_connect' +import { AddMuteOperationResponse, MuteOperation_Type } from '../proto/bsync_pb' import AppContext from '../context' import { createMuteOpChannel } from '../db/schema/mute_op' import { authWithApiKey } from './auth' diff --git a/packages/bsync/src/routes/index.ts b/packages/bsync/src/routes/index.ts index 726949f4c64..6ed2689580e 100644 --- a/packages/bsync/src/routes/index.ts +++ b/packages/bsync/src/routes/index.ts @@ -1,6 +1,6 @@ import { sql } from 'kysely' import { ConnectRouter } from '@connectrpc/connect' -import { Service } from '../gen/bsync_connect' +import { Service } from '../proto/bsync_connect' import AppContext from '../context' import addMuteOperation from './add-mute-operation' import scanMuteOperations from './scan-mute-operations' diff --git a/packages/bsync/src/routes/scan-mute-operations.ts b/packages/bsync/src/routes/scan-mute-operations.ts index c0f4f403ef8..671632531b0 100644 --- a/packages/bsync/src/routes/scan-mute-operations.ts +++ b/packages/bsync/src/routes/scan-mute-operations.ts @@ -1,7 +1,7 @@ import { once } from 'node:events' import { Code, ConnectError, ServiceImpl } from '@connectrpc/connect' -import { Service } from '../gen/bsync_connect' -import { ScanMuteOperationsResponse } from '../gen/bsync_pb' +import { Service } from '../proto/bsync_connect' +import { ScanMuteOperationsResponse } from '../proto/bsync_pb' import AppContext from '../context' import { createMuteOpChannel } from '../db/schema/mute_op' import { authWithApiKey } from './auth' diff --git a/packages/bsync/tests/mutes.test.ts b/packages/bsync/tests/mutes.test.ts index 03dee110f85..231df3063dc 100644 --- a/packages/bsync/tests/mutes.test.ts +++ b/packages/bsync/tests/mutes.test.ts @@ -8,7 +8,7 @@ import { createClient, envToCfg, } from '../src' -import { MuteOperation, MuteOperation_Type } from '../src/gen/bsync_pb' +import { MuteOperation, MuteOperation_Type } from '../src/proto/bsync_pb' describe('mutes', () => { let bsync: BsyncService diff --git a/packages/dev-env/package.json b/packages/dev-env/package.json index 2592e52b81f..352ff883e60 100644 --- a/packages/dev-env/package.json +++ b/packages/dev-env/package.json @@ -27,6 +27,7 @@ "dependencies": { "@atproto/api": "workspace:^", "@atproto/bsky": "workspace:^", + "@atproto/bsync": "workspace:^", "@atproto/common-web": "workspace:^", "@atproto/crypto": "workspace:^", "@atproto/identity": "workspace:^", diff --git a/packages/dev-env/src/bsync.ts b/packages/dev-env/src/bsync.ts new file mode 100644 index 00000000000..b0059638535 --- /dev/null +++ b/packages/dev-env/src/bsync.ts @@ -0,0 +1,36 @@ +import getPort from 'get-port' +import * as bsync from '@atproto/bsync' +import { BsyncConfig } from './types' + +export class TestBsync { + constructor( + public url: string, + public port: number, + public service: bsync.BsyncService, + ) {} + + static async create(cfg: BsyncConfig): Promise { + const port = cfg.port || (await getPort()) + const url = `http://localhost:${port}` + + const config = bsync.envToCfg({ + port, + apiKeys: cfg.apiKeys ?? ['api-key'], + ...cfg, + }) + + const service = await bsync.BsyncService.create(config) + await service.ctx.db.migrateToLatestOrThrow() + await service.start() + + return new TestBsync(url, port, service) + } + + get ctx(): bsync.AppContext { + return this.service.ctx + } + + async close() { + await this.service.destroy() + } +} diff --git a/packages/dev-env/src/index.ts b/packages/dev-env/src/index.ts index 5e8524edf68..fe11f2275c9 100644 --- a/packages/dev-env/src/index.ts +++ b/packages/dev-env/src/index.ts @@ -1,4 +1,5 @@ export * from './bsky' +export * from './bsync' export * from './network' export * from './network-no-appview' export * from './pds' diff --git a/packages/dev-env/src/types.ts b/packages/dev-env/src/types.ts index 51d22976da2..c3307910459 100644 --- a/packages/dev-env/src/types.ts +++ b/packages/dev-env/src/types.ts @@ -1,5 +1,6 @@ import * as pds from '@atproto/pds' import * as bsky from '@atproto/bsky' +import * as bsync from '@atproto/bsync' import * as ozone from '@atproto/ozone' import { ImageInvalidator } from '@atproto/bsky' import { ExportableKeypair } from '@atproto/crypto' @@ -28,6 +29,10 @@ export type BskyConfig = Partial & { ingester?: Partial } +export type BsyncConfig = Partial & { + dbUrl: string +} + export type OzoneConfig = Partial & { plcUrl: string appviewUrl: string diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 09aca984856..d7dcdc7db48 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -189,6 +189,15 @@ importers: '@atproto/xrpc-server': specifier: workspace:^ version: link:../xrpc-server + '@bufbuild/protobuf': + specifier: ^1.5.0 + version: 1.6.0 + '@connectrpc/connect': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protobuf@1.6.0) + '@connectrpc/connect-node': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protobuf@1.6.0)(@connectrpc/connect@1.3.0) '@did-plc/lib': specifier: ^0.0.1 version: 0.0.1 @@ -259,6 +268,15 @@ importers: '@atproto/xrpc': specifier: workspace:^ version: link:../xrpc + '@bufbuild/buf': + specifier: ^1.28.1 + version: 1.28.1 + '@bufbuild/protoc-gen-es': + specifier: ^1.5.0 + version: 1.6.0(@bufbuild/protobuf@1.6.0) + '@connectrpc/protoc-gen-connect-es': + specifier: ^1.1.4 + version: 1.3.0(@bufbuild/protoc-gen-es@1.6.0)(@connectrpc/connect@1.3.0) '@did-plc/server': specifier: ^0.0.1 version: 0.0.1 @@ -396,6 +414,9 @@ importers: '@atproto/bsky': specifier: workspace:^ version: link:../bsky + '@atproto/bsync': + specifier: workspace:^ + version: link:../bsync '@atproto/common-web': specifier: workspace:^ version: link:../common-web