diff --git a/packages/bsky/src/api/app/bsky/graph/getMutes.ts b/packages/bsky/src/api/app/bsky/graph/getMutes.ts index e69803d144a..9b334cf4829 100644 --- a/packages/bsky/src/api/app/bsky/graph/getMutes.ts +++ b/packages/bsky/src/api/app/bsky/graph/getMutes.ts @@ -1,5 +1,5 @@ import { Server } from '../../../../lexicon' -import { paginate, TimeCidKeyset } from '../../../../db/pagination' +import { CreatedAtDidKeyset, paginate } from '../../../../db/pagination' import AppContext from '../../../../context' import { notSoftDeletedClause } from '../../../../db/util' @@ -44,12 +44,3 @@ export default function (server: Server, ctx: AppContext) { }, }) } - -export class CreatedAtDidKeyset extends TimeCidKeyset<{ - createdAt: string - did: string // dids are treated identically to cids in TimeCidKeyset -}> { - labelResult(result: { createdAt: string; did: string }) { - return { primary: result.createdAt, secondary: result.did } - } -} diff --git a/packages/bsky/src/data-plane/server/index.ts b/packages/bsky/src/data-plane/server/index.ts index 1c29a776a29..6ceec06fd72 100644 --- a/packages/bsky/src/data-plane/server/index.ts +++ b/packages/bsky/src/data-plane/server/index.ts @@ -2,13 +2,15 @@ import http from 'http' import events from 'events' import express from 'express' import { expressConnectMiddleware } from '@connectrpc/connect-express' -import routes from './routes' +import createRoutes from './routes' +import { Database } from '../../db' export class DataPlaneServer { constructor(public server: http.Server) {} - static async create(port: number) { + static async create(db: Database, port: number) { const app = express() + const routes = createRoutes(db) app.use(expressConnectMiddleware({ routes })) const server = app.listen(port) await events.once(server, 'listening') diff --git a/packages/bsky/src/data-plane/server/routes.ts b/packages/bsky/src/data-plane/server/routes.ts deleted file mode 100644 index aa3fa5a6573..00000000000 --- a/packages/bsky/src/data-plane/server/routes.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { ConnectRouter } from '@connectrpc/connect' -import { Service } from '../gen/bsky_connect' - -export default (router: ConnectRouter) => - router.service(Service, { - async getFollowers(req) { - return { - uris: [req.actorDid], - cursor: 'test-cursor', - } - }, - }) diff --git a/packages/bsky/src/data-plane/server/routes/blocks.ts b/packages/bsky/src/data-plane/server/routes/blocks.ts new file mode 100644 index 00000000000..66a0fdaaec5 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/blocks.ts @@ -0,0 +1,121 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getBidirectionalBlock(req) { + const { actorDid, targetDid } = req + const res = await db.db + .selectFrom('actor_block') + .where((qb) => + qb + .where('actor_block.creator', '=', actorDid) + .where('actor_block.subjectDid', '=', targetDid), + ) + .orWhere((qb) => + qb + .where('actor_block.creator', '=', targetDid) + .where('actor_block.subjectDid', '=', actorDid), + ) + .limit(1) + .selectAll() + .executeTakeFirst() + + return { + blockUri: res?.uri, + } + }, + + async getBlocks(req) { + const { actorDid, cursor, limit } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('actor_block') + .where('actor_block.creator', '=', actorDid) + .selectAll() + + const keyset = new TimeCidKeyset( + ref('actor_block.sortAt'), + ref('actor_block.cid'), + ) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const blocks = await builder.execute() + return { + blockUris: blocks.map((b) => b.uri), + cursor: keyset.packFromResult(blocks), + } + }, + + async getBidirectionalBlockViaList(req) { + const { actorDid, targetDid } = req + const res = await db.db + .selectFrom('list_block') + .innerJoin('list_item', 'list_item.listUri', 'list_block.subjectUri') + .where((qb) => + qb + .where('list_block.creator', '=', actorDid) + .where('list_item.subjectDid', '=', targetDid), + ) + .orWhere((qb) => + qb + .where('list_block.creator', '=', targetDid) + .where('list_item.subjectDid', '=', actorDid), + ) + .limit(1) + .selectAll('list_block') + .executeTakeFirst() + + return { + listUri: res?.subjectUri, + } + }, + + async getBlocklistSubscription(req) { + const { actorDid, listUri } = req + const res = await db.db + .selectFrom('list_block') + .where('creator', '=', actorDid) + .where('subjectUri', '=', listUri) + .selectAll() + .limit(1) + .executeTakeFirst() + return { + subscribed: !!res, + } + }, + + async getBlocklistSubscriptions(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + let builder = db.db + .selectFrom('list') + .whereExists( + db.db + .selectFrom('list_block') + .where('list_block.creator', '=', actorDid) + .whereRef('list_block.subjectUri', '=', ref('list.uri')) + .selectAll(), + ) + .selectAll('list') + + const keyset = new TimeCidKeyset(ref('list.createdAt'), ref('list.cid')) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + const lists = await builder.execute() + + return { + listUris: lists.map((l) => l.uri), + cursor: keyset.packFromResult(lists), + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/feed-gens.ts b/packages/bsky/src/data-plane/server/routes/feed-gens.ts new file mode 100644 index 00000000000..92df9a1a082 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/feed-gens.ts @@ -0,0 +1,67 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import * as ui8 from 'uint8arrays' +import { Database } from '../../../db' +import { keyBy } from '@atproto/common' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getFeedGenerators(req) { + if (req.uris.length === 0) { + return { records: [] } + } + const res = await db.db + .selectFrom('record') + .selectAll() + .where('uri', 'in', req.uris) + .execute() + const byUri = keyBy(res, 'uri') + const records = req.uris.map((uri) => { + const row = byUri[uri] + const json = row ? row.json : JSON.stringify(null) + return ui8.fromString(json, 'utf8') + }) + return { records } + }, + + async getActorFeeds(req) { + const { actorDid, limit, cursor } = req + + const { ref } = db.db.dynamic + let builder = db.db + .selectFrom('feed_generator') + .selectAll() + .where('feed_generator.creator', '=', actorDid) + + const keyset = new TimeCidKeyset( + ref('feed_generator.createdAt'), + ref('feed_generator.cid'), + ) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + const feeds = await builder.execute() + + return { + uris: feeds.map((f) => f.uri), + cursor: keyset.packFromResult(feeds), + } + }, + + async getSuggestedFeeds() { + const feeds = await db.db + .selectFrom('suggested_feed') + .orderBy('suggested_feed.order', 'asc') + .selectAll() + .execute() + return { + uris: feeds.map((f) => f.uri), + } + }, + + async getFeedGeneratorStatus() { + throw new Error('unimplemented') + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/feeds.ts b/packages/bsky/src/data-plane/server/routes/feeds.ts new file mode 100644 index 00000000000..f0620b8d631 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/feeds.ts @@ -0,0 +1,128 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getAuthorFeed(req) { + const { actorDid, limit, cursor, repliesOnly, mediaOnly } = req + const { ref } = db.db.dynamic + + // defaults to posts, reposts, and replies + let builder = db.db + .selectFrom('feed_item') + .selectAll('feed_item') + .where('originatorDid', '=', actorDid) + + if (mediaOnly) { + builder = builder + // only your own posts + .where('type', '=', 'post') + // only posts with media + .whereExists((qb) => + qb + .selectFrom('post_embed_image') + .select('post_embed_image.postUri') + .whereRef('post_embed_image.postUri', '=', 'feed_item.postUri'), + ) + } else if (repliesOnly) { + // @TODO + } + + const keyset = new TimeCidKeyset( + ref('feed_item.sortAt'), + ref('feed_item.cid'), + ) + + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const feedItems = await builder.execute() + + return { + uris: feedItems.map((row) => row.uri), + cursor: keyset.packFromResult(feedItems), + } + }, + + async getTimeline(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + + const keyset = new TimeCidKeyset( + ref('feed_item.sortAt'), + ref('feed_item.cid'), + ) + + let followQb = db.db + .selectFrom('feed_item') + .innerJoin('follow', 'follow.subjectDid', 'feed_item.originatorDid') + .where('follow.creator', '=', actorDid) + .selectAll('feed_item') + + followQb = paginate(followQb, { + limit, + cursor, + keyset, + tryIndex: true, + }) + + let selfQb = db.db + .selectFrom('feed_item') + .where('feed_item.originatorDid', '=', actorDid) + .selectAll('feed_item') + + selfQb = paginate(selfQb, { + limit: Math.min(limit, 10), + cursor, + keyset, + tryIndex: true, + }) + + const [followRes, selfRes] = await Promise.all([ + followQb.execute(), + selfQb.execute(), + ]) + + const feedItems = [...followRes, ...selfRes] + .sort((a, b) => { + if (a.sortAt > b.sortAt) return -1 + if (a.sortAt < b.sortAt) return 1 + return a.cid > b.cid ? -1 : 1 + }) + .slice(0, limit) + + return { + uris: feedItems.map((item) => item.uri), + cursor: keyset.packFromResult(feedItems), + } + }, + + async getListFeed(req) { + const { listUri, cursor, limit } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('post') + .selectAll() + .innerJoin('list_item', 'list_item.subjectDid', 'post.creator') + .where('list_item.listUri', '=', listUri) + + const keyset = new TimeCidKeyset(ref('post.sortAt'), ref('post.cid')) + builder = paginate(builder, { + limit, + cursor, + keyset, + tryIndex: true, + }) + const feedItems = await builder.execute() + + return { + uris: feedItems.map((item) => item.uri), + cursor: keyset.packFromResult(feedItems), + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/follows.ts b/packages/bsky/src/data-plane/server/routes/follows.ts new file mode 100644 index 00000000000..3d3fc94a1f9 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/follows.ts @@ -0,0 +1,101 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { TimeCidKeyset, paginate } from '../../../db/pagination' +import { keyBy } from '@atproto/common' + +export default (db: Database): Partial> => ({ + async getActorFollowsActors(req) { + const { actorDid, targetDids } = req + if (targetDids.length < 1) { + return { uris: [] } + } + const res = await db.db + .selectFrom('follow') + .where('follow.creator', '=', actorDid) + .where('follow.subjectDid', 'in', targetDids) + .selectAll() + .execute() + const bySubject = keyBy(res, 'subjectDid') + const uris = targetDids.map((did) => bySubject[did]?.uri ?? '') + return { + uris, + } + }, + async getFollowers(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + let followersReq = db.db + .selectFrom('follow') + .where('follow.subjectDid', '=', actorDid) + .innerJoin('actor as creator', 'creator.did', 'follow.creator') + .selectAll('creator') + .select([ + 'follow.uri as uri', + 'follow.cid as cid', + 'follow.sortAt as sortAt', + ]) + + const keyset = new TimeCidKeyset(ref('follow.sortAt'), ref('follow.cid')) + followersReq = paginate(followersReq, { + limit, + cursor, + keyset, + }) + + const followers = await followersReq.execute() + return { + uris: followers.map((f) => f.uri), + cursor: keyset.packFromResult(followers), + } + }, + async getFollows(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + + let followsReq = db.db + .selectFrom('follow') + .where('follow.creator', '=', actorDid) + .innerJoin('actor as subject', 'subject.did', 'follow.subjectDid') + .selectAll('subject') + .select([ + 'follow.uri as uri', + 'follow.cid as cid', + 'follow.sortAt as sortAt', + ]) + + const keyset = new TimeCidKeyset(ref('follow.sortAt'), ref('follow.cid')) + followsReq = paginate(followsReq, { + limit, + cursor, + keyset, + }) + + const follows = await followsReq.execute() + + return { + uris: follows.map((f) => f.uri), + cursor: keyset.packFromResult(follows), + } + }, + async getFollowersCount(req) { + const res = await db.db + .selectFrom('profile_agg') + .select('followersCount') + .where('did', '=', req.actorDid) + .executeTakeFirst() + return { + count: res?.followersCount, + } + }, + async getFollowsCount(req) { + const res = await db.db + .selectFrom('profile_agg') + .select('followsCount') + .where('did', '=', req.actorDid) + .executeTakeFirst() + return { + count: res?.followsCount, + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/index.ts b/packages/bsky/src/data-plane/server/routes/index.ts new file mode 100644 index 00000000000..bc0b439fcf4 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/index.ts @@ -0,0 +1,46 @@ +import { ConnectRouter } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' + +import blocks from './blocks' +import feedGens from './feed-gens' +import feeds from './feeds' +import follows from './follows' +import labels from './labels' +import likes from './likes' +import lists from './lists' +import moderation from './moderation' +import mutes from './mutes' +import notifs from './notifs' +import posts from './posts' +import profile from './profile' +import reposts from './reposts' +import search from './search' +import suggestions from './suggestions' +import sync from './sync' +import threads from './threads' +import { Database } from '../../../db' + +export default (db: Database) => (router: ConnectRouter) => + router.service(Service, { + ...blocks(db), + ...feedGens(db), + ...feeds(db), + ...follows(db), + ...labels(db), + ...likes(db), + ...lists(db), + ...moderation(db), + ...mutes(db), + ...notifs(db), + ...posts(db), + ...profile(db), + ...reposts(db), + ...search(db), + ...suggestions(db), + ...sync(db), + ...threads(db), + + async ping() { + return {} + }, + }) diff --git a/packages/bsky/src/data-plane/server/routes/labels.ts b/packages/bsky/src/data-plane/server/routes/labels.ts new file mode 100644 index 00000000000..d7c94f74b0d --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/labels.ts @@ -0,0 +1,28 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import * as ui8 from 'uint8arrays' + +export default (db: Database): Partial> => ({ + async getLabels(req) { + const { subjects, issuers } = req + if (subjects.length === 0 || issuers.length === 0) { + return { records: [] } + } + const labels = await db.db + .selectFrom('label') + .where('src', 'in', issuers) + .where('uri', 'in', subjects) + .selectAll() + .execute() + + const records = labels.map((l) => { + const formatted = { + ...l, + cid: l.cid === '' ? undefined : l.cid, + } + return ui8.fromString(JSON.stringify(formatted), 'utf8') + }) + return { records } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/likes.ts b/packages/bsky/src/data-plane/server/routes/likes.ts new file mode 100644 index 00000000000..ef448369b6b --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/likes.ts @@ -0,0 +1,77 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getLikesBySubject(req) { + const { subjectUri, cursor, limit } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('like') + .where('like.subject', '=', subjectUri) + .selectAll('like') + + const keyset = new TimeCidKeyset(ref('like.sortAt'), ref('like.cid')) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const likes = await builder.execute() + + return { + uris: likes.map((l) => l.uri), + cursor: keyset.packFromResult(likes), + } + }, + + async getLikeByActorAndSubject(req) { + const { actorDid, subjectUri } = req + const res = await db.db + .selectFrom('like') + .where('creator', '=', actorDid) + .where('subject', '=', subjectUri) + .select('uri') + .executeTakeFirst() + return { uri: res?.uri } + }, + + async getActorLikes(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('like') + .where('like.creator', '=', actorDid) + .selectAll() + + const keyset = new TimeCidKeyset(ref('like.sortAt'), ref('like.cid')) + + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const likes = await builder.execute() + + return { + uris: likes.map((l) => l.uri), + cursor: keyset.packFromResult(likes), + } + }, + + async getLikesCount(req) { + const res = await db.db + .selectFrom('post_agg') + .where('uri', '=', req.subjectUri) + .select('likeCount') + .executeTakeFirst() + return { + count: res?.likeCount, + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/lists.ts b/packages/bsky/src/data-plane/server/routes/lists.ts new file mode 100644 index 00000000000..1cf3f6a2d72 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/lists.ts @@ -0,0 +1,76 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import * as ui8 from 'uint8arrays' +import { Database } from '../../../db' +import { countAll } from '../../../db/util' +import { keyBy } from '@atproto/common' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getListMembers(req) { + const { listUri, cursor, limit } = req + const { ref } = db.db.dynamic + let builder = db.db + .selectFrom('list_item') + .where('listUri', '=', listUri) + .selectAll() + + const keyset = new TimeCidKeyset( + ref('list_item.sortAt'), + ref('list_item.cid'), + ) + + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const listItems = await builder.execute() + return { + dids: listItems.map((item) => item.subjectDid), + cursor: keyset.packFromResult(listItems), + } + }, + + async getListMembership(req) { + const { actorDid, listUris } = req + if (listUris.length === 0) { + return { listitemUris: [] } + } + const res = await db.db + .selectFrom('list_item') + .where('subjectDid', '=', actorDid) + .where('listUri', 'in', listUris) + .selectAll() + .execute() + const byListUri = keyBy(res, 'listUri') + const listitemUris = listUris.map((uri) => byListUri[uri]?.uri ?? '') + return { + listitemUris, + } + }, + + async getList(req) { + const res = await db.db + .selectFrom('record') + .where('uri', '=', req.listUri) + .select('json') + .executeTakeFirst() + const record = res ? ui8.fromString(res.json, 'utf8') : undefined + return { + record, + } + }, + + async getListCount(req) { + const res = await db.db + .selectFrom('list_item') + .select(countAll.as('count')) + .where('list_item.listUri', '=', req.listUri) + .executeTakeFirst() + return { + count: res?.count, + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/moderation.ts b/packages/bsky/src/data-plane/server/routes/moderation.ts new file mode 100644 index 00000000000..2f86aa42462 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/moderation.ts @@ -0,0 +1,23 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { sql } from 'kysely' + +export default (db: Database): Partial> => ({ + async getBlobTakedown(req) { + const { cid } = req + const takedown = await db.db + .selectFrom('moderation_subject_status') + .select('id') + .where('blobCids', '@>', sql`CAST(${JSON.stringify([cid])} AS JSONB)`) + .where('takendown', 'is', true) + .executeTakeFirst() + return { + takenDown: !!takedown, + } + }, + + async updateTakedown(_req) { + throw new Error('unimplemented') + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/mutes.ts b/packages/bsky/src/data-plane/server/routes/mutes.ts new file mode 100644 index 00000000000..a25042556b6 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/mutes.ts @@ -0,0 +1,109 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { + CreatedAtDidKeyset, + TimeCidKeyset, + paginate, +} from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getActorMutesActor(req) { + const { actorDid, targetDid } = req + const res = await db.db + .selectFrom('mute') + .selectAll() + .where('mutedByDid', '=', actorDid) + .where('subjectDid', '=', targetDid) + .executeTakeFirst() + return { + muted: !!res, + } + }, + + async getMutes(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('mute') + .innerJoin('actor', 'actor.did', 'mute.subjectDid') + .where('mute.mutedByDid', '=', actorDid) + .selectAll('actor') + .select('mute.createdAt as createdAt') + + const keyset = new CreatedAtDidKeyset( + ref('mute.createdAt'), + ref('mute.subjectDid'), + ) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const mutes = await builder.execute() + + return { + dids: mutes.map((m) => m.did), + cursor: keyset.packFromResult(mutes), + } + }, + + async getActorMutesActorViaList(req) { + const { actorDid, targetDid } = req + const res = await db.db + .selectFrom('list_mute') + .innerJoin('list_item', 'list_item.listUri', 'list_mute.listUri') + .where('list_mute.mutedByDid', '=', actorDid) + .where('list_item.subjectDid', '=', targetDid) + .select('list_mute.listUri') + .limit(1) + .executeTakeFirst() + return { + listUri: res?.listUri, + } + }, + + async getMutelistSubscription(req) { + const { actorDid, listUri } = req + const res = await db.db + .selectFrom('list_mute') + .where('mutedByDid', '=', actorDid) + .where('listUri', '=', listUri) + .selectAll() + .limit(1) + .executeTakeFirst() + return { + subscribed: !!res, + } + }, + + async getMutelistSubscriptions(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + let builder = db.db + .selectFrom('list') + .whereExists( + db.db + .selectFrom('list_mute') + .where('list_mute.mutedByDid', '=', actorDid) + .whereRef('list_mute.listUri', '=', ref('list.uri')) + .selectAll(), + ) + .selectAll('list') + + const keyset = new TimeCidKeyset(ref('list.createdAt'), ref('list.cid')) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + const lists = await builder.execute() + + return { + listUris: lists.map((l) => l.uri), + cursor: keyset.packFromResult(lists), + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/notifs.ts b/packages/bsky/src/data-plane/server/routes/notifs.ts new file mode 100644 index 00000000000..aa77357982f --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/notifs.ts @@ -0,0 +1,117 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { countAll, excluded } from '../../../db/util' +import { sql } from 'kysely' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getNotifications(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + let builder = db.db + .selectFrom('notification as notif') + .where('notif.did', '=', actorDid) + .where((clause) => + clause + .where('reasonSubject', 'is', null) + .orWhereExists( + db.db + .selectFrom('record as subject') + .selectAll() + .whereRef('subject.uri', '=', ref('notif.reasonSubject')), + ), + ) + .select([ + 'notif.author as authorDid', + 'notif.recordUri as uri', + 'notif.recordCid as cid', + 'notif.reason as reason', + 'notif.reasonSubject as reasonSubject', + 'notif.sortAt as sortAt', + ]) + + const keyset = new TimeCidKeyset( + ref('notif.sortAt'), + ref('notif.recordCid'), + ) + builder = paginate(builder, { + cursor, + limit, + keyset, + tryIndex: true, + }) + + const notifsRes = await builder.execute() + const notifications = notifsRes.map((notif) => ({ + uri: notif.uri, + reason: notif.reason, + timestamp: { + nanos: new Date(notif.sortAt).getTime() * 1000, + }, + })) + return { + notifications, + cursor: keyset.packFromResult(notifsRes), + } + }, + + async getNotificationSeen(req) { + const res = await db.db + .selectFrom('actor_state') + .where('did', '=', req.actorDid) + .selectAll() + .executeTakeFirst() + if (!res) { + return {} + } + const nanos = new Date(res.lastSeenNotifs).getTime() * 1000 + return { + timestamp: { + nanos, + }, + } + }, + + async getUnreadNotificationCount(req) { + const { actorDid } = req + const { ref } = db.db.dynamic + const result = await db.db + .selectFrom('notification') + .select(countAll.as('count')) + .innerJoin('actor', 'actor.did', 'notification.did') + .leftJoin('actor_state', 'actor_state.did', 'actor.did') + .innerJoin('record', 'record.uri', 'notification.recordUri') + // Ensure to hit notification_did_sortat_idx, handling case where lastSeenNotifs is null. + .where('notification.did', '=', actorDid) + .where( + 'notification.sortAt', + '>', + sql`coalesce(${ref('actor_state.lastSeenNotifs')}, ${''})`, + ) + .executeTakeFirst() + + return { + count: result?.count, + } + }, + + async updateNotificationSeen(req) { + const { actorDid, timestamp } = req + if (!timestamp) { + return + } + const lastSeenNotifs = new Date( + Math.floor(timestamp.nanos / 1000), + ).toISOString() + await db.db + .insertInto('actor_state') + .values({ did: actorDid, lastSeenNotifs }) + .onConflict((oc) => + oc.column('did').doUpdateSet({ + lastSeenNotifs: excluded(db.db, 'lastSeenNotifs'), + }), + ) + .executeTakeFirst() + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/posts.ts b/packages/bsky/src/data-plane/server/routes/posts.ts new file mode 100644 index 00000000000..97c25fc7754 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/posts.ts @@ -0,0 +1,38 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { keyBy } from '@atproto/common' +import * as ui8 from 'uint8arrays' +import { Database } from '../../../db' + +export default (db: Database): Partial> => ({ + async getPosts(req) { + if (req.uris.length === 0) { + return { records: [] } + } + const res = await db.db + .selectFrom('record') + .selectAll() + .where('uri', 'in', req.uris) + .execute() + const byUri = keyBy(res, 'uri') + const records = req.uris.map((uri) => { + const row = byUri[uri] + const json = row ? row.json : JSON.stringify(null) + return ui8.fromString(json, 'utf8') + }) + return { records } + }, + async getPostReplyCount(req) { + if (req.uris.length === 0) { + return { counts: [] } + } + const res = await db.db + .selectFrom('post_agg') + .select(['uri', 'replyCount']) + .where('uri', 'in', req.uris) + .execute() + const byUri = keyBy(res, 'uri') + const counts = req.uris.map((uri) => byUri[uri]?.replyCount ?? 0) + return { counts } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/profile.ts b/packages/bsky/src/data-plane/server/routes/profile.ts new file mode 100644 index 00000000000..bb0ab452885 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/profile.ts @@ -0,0 +1,57 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { keyBy } from '@atproto/common' +import * as ui8 from 'uint8arrays' +import { Database } from '../../../db' + +export default (db: Database): Partial> => ({ + async getProfiles(req) { + const { dids } = req + if (dids.length === 0) { + return { records: [] } + } + const uris = dids.map((did) => `at://${did}/app.bsky.actor.profile/self`) + const res = await db.db + .selectFrom('record') + .selectAll() + .where('uri', 'in', uris) + .execute() + const byUri = keyBy(res, 'uri') + const records = uris.map((uri) => { + const row = byUri[uri] + const json = row ? row.json : JSON.stringify(null) + return ui8.fromString(json, 'utf8') + }) + return { records } + }, + + async getHandles(req) { + const { dids } = req + if (dids.length === 0) { + return { handles: [] } + } + const res = await db.db + .selectFrom('actor') + .where('did', 'in', dids) + .selectAll() + .execute() + const byDid = keyBy(res, 'did') + const handles = dids.map((did) => byDid[did]?.handle ?? '') + return { handles } + }, + + async getDidsByHandles(req) { + const { handles } = req + if (handles.length === 0) { + return { dids: [] } + } + const res = await db.db + .selectFrom('actor') + .where('handle', 'in', handles) + .selectAll() + .execute() + const byHandle = keyBy(res, 'handle') + const dids = handles.map((handle) => byHandle[handle]?.did ?? '') + return { dids } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/reposts.ts b/packages/bsky/src/data-plane/server/routes/reposts.ts new file mode 100644 index 00000000000..0e995aa0b61 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/reposts.ts @@ -0,0 +1,77 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' +import { TimeCidKeyset, paginate } from '../../../db/pagination' + +export default (db: Database): Partial> => ({ + async getRepostsBySubject(req) { + const { subjectUri, cursor, limit } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('repost') + .where('repost.subject', '=', subjectUri) + .selectAll('repost') + + const keyset = new TimeCidKeyset(ref('repost.sortAt'), ref('repost.cid')) + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const reposts = await builder.execute() + + return { + uris: reposts.map((l) => l.uri), + cursor: keyset.packFromResult(reposts), + } + }, + + async getRepostByActorAndSubject(req) { + const { actorDid, subjectUri } = req + const res = await db.db + .selectFrom('repost') + .where('creator', '=', actorDid) + .where('subject', '=', subjectUri) + .select('uri') + .executeTakeFirst() + return { uri: res?.uri } + }, + + async getActorReposts(req) { + const { actorDid, limit, cursor } = req + const { ref } = db.db.dynamic + + let builder = db.db + .selectFrom('repost') + .where('repost.creator', '=', actorDid) + .selectAll() + + const keyset = new TimeCidKeyset(ref('repost.sortAt'), ref('repost.cid')) + + builder = paginate(builder, { + limit, + cursor, + keyset, + }) + + const reposts = await builder.execute() + + return { + uris: reposts.map((l) => l.uri), + cursor: keyset.packFromResult(reposts), + } + }, + + async getRepostsCount(req) { + const res = await db.db + .selectFrom('post_agg') + .where('uri', '=', req.subjectUri) + .select('repostCount') + .executeTakeFirst() + return { + count: res?.repostCount, + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/search.ts b/packages/bsky/src/data-plane/server/routes/search.ts new file mode 100644 index 00000000000..7638d9399ca --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/search.ts @@ -0,0 +1,12 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' + +export default (db: Database): Partial> => ({ + async searchActors(req) { + throw new Error('unimplemented') + }, + async searchPosts(req) { + throw new Error('unimplemented') + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/suggestions.ts b/packages/bsky/src/data-plane/server/routes/suggestions.ts new file mode 100644 index 00000000000..40ce8f13a5e --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/suggestions.ts @@ -0,0 +1,60 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' + +export default (db: Database): Partial> => ({ + async getSuggestions(req) { + const alreadyIncluded = parseCursor(req.cursor) + const suggestions = await db.db + .selectFrom('suggested_follow') + .innerJoin('actor', 'actor.did', 'suggested_follow.did') + .if(alreadyIncluded.length > 0, (qb) => + qb.where('suggested_follow.order', 'not in', alreadyIncluded), + ) + .selectAll() + .orderBy('suggested_follow.order', 'asc') + .execute() + + // always include first two + const firstTwo = suggestions.filter( + (row) => row.order === 1 || row.order === 2, + ) + const rest = suggestions.filter((row) => row.order !== 1 && row.order !== 2) + const limited = firstTwo.concat(shuffle(rest)).slice(0, req.limit) + + // if the result set ends up getting larger, consider using a seed included in the cursor for for the randomized shuffle + const cursor = + limited.length > 0 + ? limited + .map((row) => row.order.toString()) + .concat(alreadyIncluded.map((id) => id.toString())) + .join(':') + : undefined + + return { + dids: suggestions.map((s) => s.did), + cursor, + } + }, +}) + +const parseCursor = (cursor?: string): number[] => { + if (!cursor) { + return [] + } + try { + return cursor + .split(':') + .map((id) => parseInt(id, 10)) + .filter((id) => !isNaN(id)) + } catch { + return [] + } +} + +const shuffle = (arr: T[]): T[] => { + return arr + .map((value) => ({ value, sort: Math.random() })) + .sort((a, b) => a.sort - b.sort) + .map(({ value }) => value) +} diff --git a/packages/bsky/src/data-plane/server/routes/sync.ts b/packages/bsky/src/data-plane/server/routes/sync.ts new file mode 100644 index 00000000000..11d8e87b746 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/sync.ts @@ -0,0 +1,16 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { Database } from '../../../db' + +export default (db: Database): Partial> => ({ + async getLatestRev(req) { + const res = await db.db + .selectFrom('actor_sync') + .where('did', '=', req.actorDid) + .select('repoRev') + .executeTakeFirst() + return { + rev: res?.repoRev ?? undefined, + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/threads.ts b/packages/bsky/src/data-plane/server/routes/threads.ts new file mode 100644 index 00000000000..34c15b53fce --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/threads.ts @@ -0,0 +1,54 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import * as ui8 from 'uint8arrays' +import { Database } from '../../../db' +import { keyBy } from '@atproto/common' +import { + getAncestorsAndSelfQb, + getDescendentsQb, +} from '../../../services/util/post' + +export default (db: Database): Partial> => ({ + async getThread(req) { + const { postUri, above, below } = req + const [ancestors, descendents] = await Promise.all([ + getAncestorsAndSelfQb(db.db, { + uri: postUri, + parentHeight: above, + }) + .selectFrom('ancestor') + .selectAll() + .execute(), + getDescendentsQb(db.db, { + uri: postUri, + depth: below, + }) + .selectFrom('descendent') + .selectAll() + .execute(), + ]) + const uris = [ + ...ancestors.map((p) => p.uri), + ...descendents.map((p) => p.uri), + ] + return { uris } + }, + + async getThreadgates(req) { + if (req.uris.length === 0) { + return { records: [] } + } + const res = await db.db + .selectFrom('record') + .selectAll() + .where('uri', 'in', req.uris) + .execute() + const byUri = keyBy(res, 'uri') + const records = req.uris.map((uri) => { + const row = byUri[uri] + const json = row ? row.json : JSON.stringify(null) + return ui8.fromString(json, 'utf8') + }) + return { records } + }, +}) diff --git a/packages/bsky/src/db/pagination.ts b/packages/bsky/src/db/pagination.ts index b38c69e5ada..d5887ae1fff 100644 --- a/packages/bsky/src/db/pagination.ts +++ b/packages/bsky/src/db/pagination.ts @@ -106,6 +106,15 @@ export class TimeCidKeyset< } } +export class CreatedAtDidKeyset extends TimeCidKeyset<{ + createdAt: string + did: string // dids are treated identically to cids in TimeCidKeyset +}> { + labelResult(result: { createdAt: string; did: string }) { + return { primary: result.createdAt, secondary: result.did } + } +} + export const paginate = < QB extends AnyQb, K extends GenericKeyset, diff --git a/packages/bsky/tests/data-plane.test.ts b/packages/bsky/tests/data-plane.test.ts deleted file mode 100644 index a4164d5464c..00000000000 --- a/packages/bsky/tests/data-plane.test.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { DataPlaneServer } from '../src/data-plane/server' -import { - createDataPlaneClient, - DataPlaneClient, -} from '../src/data-plane/client' - -describe('data plane', () => { - let server: DataPlaneServer - let client: DataPlaneClient - - beforeAll(async () => { - server = await DataPlaneServer.create(1337) - client = createDataPlaneClient('http://localhost:1337', '1.1') - }) - - afterAll(async () => { - await server.stop() - }) - - it('works', async () => { - const res = await client.getFollowers({ actorDid: 'did:example:test' }) - expect(res.uris).toEqual(['did:example:test']) - expect(res.cursor).toEqual('test-cursor') - }) -}) diff --git a/packages/common-web/src/arrays.ts b/packages/common-web/src/arrays.ts index 51598fc86f1..36c2f0dcb27 100644 --- a/packages/common-web/src/arrays.ts +++ b/packages/common-web/src/arrays.ts @@ -1,3 +1,10 @@ +export const keyBy = (arr: T[], key: string): Record => { + return arr.reduce((acc, cur) => { + acc[cur[key]] = cur + return acc + }, {} as Record) +} + export const mapDefined = ( arr: T[], fn: (obj: T) => S | undefined,