Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appview v1 generating and ingesting mute ops w/ bsync #2067

Merged
merged 7 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-and-push-bsky-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- appview-v1-sync-mutes
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
Expand Down
12 changes: 12 additions & 0 deletions packages/bsky/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: v1
plugins:
- plugin: es
opt:
- target=ts
- import_extension=.ts
out: src/proto
- plugin: connect-es
opt:
- target=ts
- import_extension=.ts
out: src/proto
9 changes: 8 additions & 1 deletion packages/bsky/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"test": "../dev-infra/with-test-redis-and-db.sh jest",
"test:log": "tail -50 test.log | pino-pretty",
"test:updateSnapshot": "jest --updateSnapshot",
"migration:create": "ts-node ./bin/migration-create.ts"
"migration:create": "ts-node ./bin/migration-create.ts",
"buf:gen": "buf generate ../bsync/proto"
},
"dependencies": {
"@atproto/api": "workspace:^",
Expand All @@ -39,6 +40,9 @@
"@atproto/lexicon": "workspace:^",
"@atproto/repo": "workspace:^",
"@atproto/xrpc-server": "workspace:^",
"@bufbuild/protobuf": "^1.5.0",
"@connectrpc/connect": "^1.1.4",
"@connectrpc/connect-node": "^1.1.4",
"@did-plc/lib": "^0.0.1",
"@isaacs/ttlcache": "^1.4.1",
"compression": "^1.7.4",
Expand All @@ -65,6 +69,9 @@
"@atproto/lex-cli": "workspace:^",
"@atproto/pds": "workspace:^",
"@atproto/xrpc": "workspace:^",
"@bufbuild/buf": "^1.28.1",
"@bufbuild/protoc-gen-es": "^1.5.0",
"@connectrpc/protoc-gen-connect-es": "^1.1.4",
"@did-plc/server": "^0.0.1",
"@types/cors": "^2.8.12",
"@types/express": "^4.17.13",
Expand Down
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/muteActor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import assert from 'node:assert'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.muteActor({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { actor } = input.body
const requester = auth.credentials.iss
const db = ctx.db.getPrimary()
Expand All @@ -18,10 +21,34 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError('Cannot mute oneself')
}

await ctx.services.graph(db).muteActor({
subjectDid,
mutedByDid: requester,
})
const muteActor = async () => {
await ctx.services.graph(db).muteActor({
subjectDid,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.ADD,
actorDid: requester,
subject: subjectDid,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await muteActor()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/muteActorList.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import assert from 'node:assert'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import * as lex from '../../../../lexicon/lexicons'
import AppContext from '../../../../context'
import { AtUri } from '@atproto/syntax'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.muteActorList({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { list } = input.body
const requester = auth.credentials.iss

Expand All @@ -19,10 +22,34 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError(`Invalid collection: expected: ${collId}`)
}

await ctx.services.graph(db).muteActorList({
list,
mutedByDid: requester,
})
const muteActorList = async () => {
await ctx.services.graph(db).muteActorList({
list,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.ADD,
actorDid: requester,
subject: list,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await muteActorList()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/unmuteActor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import assert from 'node:assert'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.unmuteActor({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { actor } = input.body
const requester = auth.credentials.iss
const db = ctx.db.getPrimary()
Expand All @@ -18,10 +21,34 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError('Cannot mute oneself')
}

await ctx.services.graph(db).unmuteActor({
subjectDid,
mutedByDid: requester,
})
const unmuteActor = async () => {
await ctx.services.graph(db).unmuteActor({
subjectDid,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.REMOVE,
actorDid: requester,
subject: subjectDid,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await unmuteActor()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/unmuteActorList.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,45 @@
import assert from 'node:assert'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.unmuteActorList({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { list } = input.body
const requester = auth.credentials.iss
const db = ctx.db.getPrimary()

await ctx.services.graph(db).unmuteActorList({
list,
mutedByDid: requester,
})
const unmuteActorList = async () => {
await ctx.services.graph(db).unmuteActorList({
list,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.REMOVE,
actorDid: requester,
subject: list,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await unmuteActorList()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
41 changes: 41 additions & 0 deletions packages/bsky/src/bsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Service } from './proto/bsync_connect'
import {
Code,
ConnectError,
PromiseClient,
createPromiseClient,
Interceptor,
} from '@connectrpc/connect'
import {
createConnectTransport,
ConnectTransportOptions,
} from '@connectrpc/connect-node'

export type BsyncClient = PromiseClient<typeof Service>

export const createBsyncClient = (
opts: ConnectTransportOptions,
): BsyncClient => {
const transport = createConnectTransport(opts)
return createPromiseClient(Service, transport)
}

export { Code }

export const isBsyncError = (
err: unknown,
code?: Code,
): err is ConnectError => {
if (err instanceof ConnectError) {
return !code || err.code === code
}
return false
}

export const authWithApiKey =
(apiKey: string): Interceptor =>
(next) =>
(req) => {
req.header.set('authorization', `Bearer ${apiKey}`)
return next(req)
}
37 changes: 37 additions & 0 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export interface ServerConfigValues {
imgUriEndpoint?: string
blobCacheLocation?: string
searchEndpoint?: string
bsyncUrl?: string
bsyncApiKey?: string
bsyncHttpVersion?: '1.1' | '2'
bsyncIgnoreBadTls?: boolean
bsyncOnlyMutes?: boolean
adminPassword: string
moderatorPassword: string
triagePassword: string
Expand Down Expand Up @@ -88,6 +93,13 @@ export class ServerConfig {
const imgUriEndpoint = process.env.IMG_URI_ENDPOINT
const blobCacheLocation = process.env.BLOB_CACHE_LOC
const searchEndpoint = process.env.SEARCH_ENDPOINT
const bsyncUrl = process.env.BSKY_BSYNC_URL || undefined
const bsyncApiKey = process.env.BSKY_BSYNC_API_KEY || undefined
const bsyncHttpVersion = process.env.BSKY_BSYNC_HTTP_VERSION || '2'
const bsyncIgnoreBadTls = process.env.BSKY_BSYNC_IGNORE_BAD_TLS === 'true'
const bsyncOnlyMutes = process.env.BSKY_BSYNC_ONLY_MUTES === 'true'
assert(!bsyncOnlyMutes || bsyncUrl, 'bsync-only mutes requires a bsync url')
assert(bsyncHttpVersion === '1.1' || bsyncHttpVersion === '2')
const dbPrimaryPostgresUrl =
overrides?.dbPrimaryPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL
let dbReplicaPostgresUrls = overrides?.dbReplicaPostgresUrls
Expand Down Expand Up @@ -152,6 +164,11 @@ export class ServerConfig {
imgUriEndpoint,
blobCacheLocation,
searchEndpoint,
bsyncUrl,
bsyncApiKey,
bsyncHttpVersion,
bsyncIgnoreBadTls,
bsyncOnlyMutes,
adminPassword,
moderatorPassword,
triagePassword,
Expand Down Expand Up @@ -268,6 +285,26 @@ export class ServerConfig {
return this.cfg.searchEndpoint
}

get bsyncUrl() {
return this.cfg.bsyncUrl
}

get bsyncApiKey() {
return this.cfg.bsyncApiKey
}

get bsyncOnlyMutes() {
return this.cfg.bsyncOnlyMutes
}

get bsyncHttpVersion() {
return this.cfg.bsyncHttpVersion
}

get bsyncIgnoreBadTls() {
return this.cfg.bsyncIgnoreBadTls
}

get adminPassword() {
return this.cfg.adminPassword
}
Expand Down
Loading