diff --git a/packages/bsky/src/data-plane/server/routes/blocks.ts b/packages/bsky/src/data-plane/server/routes/blocks.ts index 66a0fdaaec5..333163945dc 100644 --- a/packages/bsky/src/data-plane/server/routes/blocks.ts +++ b/packages/bsky/src/data-plane/server/routes/blocks.ts @@ -87,7 +87,7 @@ export default (db: Database): Partial> => ({ .limit(1) .executeTakeFirst() return { - subscribed: !!res, + listblockUri: res?.uri, } }, diff --git a/packages/bsky/src/data-plane/server/routes/feed-gens.ts b/packages/bsky/src/data-plane/server/routes/feed-gens.ts index 92df9a1a082..2dc46a98386 100644 --- a/packages/bsky/src/data-plane/server/routes/feed-gens.ts +++ b/packages/bsky/src/data-plane/server/routes/feed-gens.ts @@ -1,29 +1,9 @@ import { ServiceImpl } from '@connectrpc/connect' import { Service } from '../../gen/bsky_connect' -import * as ui8 from 'uint8arrays' import { Database } from '../../../db' -import { keyBy } from '@atproto/common' import { TimeCidKeyset, paginate } from '../../../db/pagination' export default (db: Database): Partial> => ({ - async getFeedGenerators(req) { - if (req.uris.length === 0) { - return { records: [] } - } - const res = await db.db - .selectFrom('record') - .selectAll() - .where('uri', 'in', req.uris) - .execute() - const byUri = keyBy(res, 'uri') - const records = req.uris.map((uri) => { - const row = byUri[uri] - const json = row ? row.json : JSON.stringify(null) - return ui8.fromString(json, 'utf8') - }) - return { records } - }, - async getActorFeeds(req) { const { actorDid, limit, cursor } = req diff --git a/packages/bsky/src/data-plane/server/routes/feeds.ts b/packages/bsky/src/data-plane/server/routes/feeds.ts index f0620b8d631..27f3fb5f2b3 100644 --- a/packages/bsky/src/data-plane/server/routes/feeds.ts +++ b/packages/bsky/src/data-plane/server/routes/feeds.ts @@ -5,12 +5,13 @@ import { TimeCidKeyset, paginate } from '../../../db/pagination' export default (db: Database): Partial> => ({ async getAuthorFeed(req) { - const { actorDid, limit, cursor, repliesOnly, mediaOnly } = req + const { actorDid, limit, cursor, noReplies, mediaOnly } = req const { ref } = db.db.dynamic // defaults to posts, reposts, and replies let builder = db.db .selectFrom('feed_item') + .innerJoin('post', 'post.uri', 'feed_item.postUri') .selectAll('feed_item') .where('originatorDid', '=', actorDid) @@ -25,8 +26,10 @@ export default (db: Database): Partial> => ({ .select('post_embed_image.postUri') .whereRef('post_embed_image.postUri', '=', 'feed_item.postUri'), ) - } else if (repliesOnly) { - // @TODO + } else if (noReplies) { + builder = builder.where((qb) => + qb.where('post.replyParent', 'is', null).orWhere('type', '=', 'repost'), + ) } const keyset = new TimeCidKeyset( diff --git a/packages/bsky/src/data-plane/server/routes/follows.ts b/packages/bsky/src/data-plane/server/routes/follows.ts index 3d3fc94a1f9..863b325d205 100644 --- a/packages/bsky/src/data-plane/server/routes/follows.ts +++ b/packages/bsky/src/data-plane/server/routes/follows.ts @@ -78,24 +78,30 @@ export default (db: Database): Partial> => ({ cursor: keyset.packFromResult(follows), } }, - async getFollowersCount(req) { + async getFollowerCounts(req) { + if (req.dids.length === 0) { + return { counts: [] } + } const res = await db.db .selectFrom('profile_agg') - .select('followersCount') - .where('did', '=', req.actorDid) - .executeTakeFirst() - return { - count: res?.followersCount, - } + .selectAll() + .where('did', 'in', req.dids) + .execute() + const byDid = keyBy(res, 'did') + const counts = req.dids.map((did) => byDid[did]?.followersCount ?? 0) + return { counts } }, - async getFollowsCount(req) { + async getFollowCounts(req) { + if (req.dids.length === 0) { + return { counts: [] } + } const res = await db.db .selectFrom('profile_agg') - .select('followsCount') - .where('did', '=', req.actorDid) - .executeTakeFirst() - return { - count: res?.followsCount, - } + .selectAll() + .where('did', 'in', req.dids) + .execute() + const byDid = keyBy(res, 'did') + const counts = req.dids.map((did) => byDid[did]?.followsCount ?? 0) + return { counts } }, }) diff --git a/packages/bsky/src/data-plane/server/routes/labels.ts b/packages/bsky/src/data-plane/server/routes/labels.ts index d7c94f74b0d..6896ac4c327 100644 --- a/packages/bsky/src/data-plane/server/routes/labels.ts +++ b/packages/bsky/src/data-plane/server/routes/labels.ts @@ -9,20 +9,20 @@ export default (db: Database): Partial> => ({ if (subjects.length === 0 || issuers.length === 0) { return { records: [] } } - const labels = await db.db + const res = await db.db .selectFrom('label') .where('src', 'in', issuers) .where('uri', 'in', subjects) .selectAll() .execute() - const records = labels.map((l) => { + const labels = res.map((l) => { const formatted = { ...l, cid: l.cid === '' ? undefined : l.cid, } return ui8.fromString(JSON.stringify(formatted), 'utf8') }) - return { records } + return { labels } }, }) diff --git a/packages/bsky/src/data-plane/server/routes/likes.ts b/packages/bsky/src/data-plane/server/routes/likes.ts index ef448369b6b..058e22b0407 100644 --- a/packages/bsky/src/data-plane/server/routes/likes.ts +++ b/packages/bsky/src/data-plane/server/routes/likes.ts @@ -2,6 +2,7 @@ import { ServiceImpl } from '@connectrpc/connect' import { Service } from '../../gen/bsky_connect' import { Database } from '../../../db' import { TimeCidKeyset, paginate } from '../../../db/pagination' +import { keyBy } from '@atproto/common' export default (db: Database): Partial> => ({ async getLikesBySubject(req) { @@ -28,15 +29,20 @@ export default (db: Database): Partial> => ({ } }, - async getLikeByActorAndSubject(req) { - const { actorDid, subjectUri } = req + async getLikesByActorAndSubjects(req) { + const { actorDid, subjectUris } = req + if (subjectUris.length === 0) { + return { uris: [] } + } const res = await db.db .selectFrom('like') .where('creator', '=', actorDid) - .where('subject', '=', subjectUri) - .select('uri') - .executeTakeFirst() - return { uri: res?.uri } + .where('subject', 'in', subjectUris) + .selectAll() + .execute() + const bySubject = keyBy(res, 'subject') + const uris = req.subjectUris.map((uri) => bySubject[uri]?.uri) + return { uris } }, async getActorLikes(req) { @@ -64,14 +70,17 @@ export default (db: Database): Partial> => ({ } }, - async getLikesCount(req) { + async getLikeCounts(req) { + if (req.uris.length === 0) { + return { counts: [] } + } const res = await db.db .selectFrom('post_agg') - .where('uri', '=', req.subjectUri) - .select('likeCount') - .executeTakeFirst() - return { - count: res?.likeCount, - } + .where('uri', 'in', req.uris) + .selectAll() + .execute() + const byUri = keyBy(res, 'uri') + const counts = req.uris.map((uri) => byUri[uri]?.likeCount ?? 0) + return { counts } }, }) diff --git a/packages/bsky/src/data-plane/server/routes/lists.ts b/packages/bsky/src/data-plane/server/routes/lists.ts index 1cf3f6a2d72..d3ccd880cde 100644 --- a/packages/bsky/src/data-plane/server/routes/lists.ts +++ b/packages/bsky/src/data-plane/server/routes/lists.ts @@ -1,6 +1,5 @@ import { ServiceImpl } from '@connectrpc/connect' import { Service } from '../../gen/bsky_connect' -import * as ui8 from 'uint8arrays' import { Database } from '../../../db' import { countAll } from '../../../db/util' import { keyBy } from '@atproto/common' @@ -51,18 +50,6 @@ export default (db: Database): Partial> => ({ } }, - async getList(req) { - const res = await db.db - .selectFrom('record') - .where('uri', '=', req.listUri) - .select('json') - .executeTakeFirst() - const record = res ? ui8.fromString(res.json, 'utf8') : undefined - return { - record, - } - }, - async getListCount(req) { const res = await db.db .selectFrom('list_item') diff --git a/packages/bsky/src/data-plane/server/routes/posts.ts b/packages/bsky/src/data-plane/server/routes/posts.ts index 97c25fc7754..f9621cbce5e 100644 --- a/packages/bsky/src/data-plane/server/routes/posts.ts +++ b/packages/bsky/src/data-plane/server/routes/posts.ts @@ -1,38 +1,33 @@ import { ServiceImpl } from '@connectrpc/connect' import { Service } from '../../gen/bsky_connect' import { keyBy } from '@atproto/common' -import * as ui8 from 'uint8arrays' import { Database } from '../../../db' export default (db: Database): Partial> => ({ - async getPosts(req) { + async getPostReplyCounts(req) { if (req.uris.length === 0) { - return { records: [] } + return { counts: [] } } const res = await db.db - .selectFrom('record') - .selectAll() + .selectFrom('post_agg') + .select(['uri', 'replyCount']) .where('uri', 'in', req.uris) .execute() const byUri = keyBy(res, 'uri') - const records = req.uris.map((uri) => { - const row = byUri[uri] - const json = row ? row.json : JSON.stringify(null) - return ui8.fromString(json, 'utf8') - }) - return { records } + const counts = req.uris.map((uri) => byUri[uri]?.replyCount ?? 0) + return { counts } }, - async getPostReplyCount(req) { - if (req.uris.length === 0) { + async getPostCounts(req) { + if (req.dids.length === 0) { return { counts: [] } } const res = await db.db - .selectFrom('post_agg') - .select(['uri', 'replyCount']) - .where('uri', 'in', req.uris) + .selectFrom('profile_agg') + .selectAll() + .where('did', 'in', req.dids) .execute() - const byUri = keyBy(res, 'uri') - const counts = req.uris.map((uri) => byUri[uri]?.replyCount ?? 0) + const byDid = keyBy(res, 'did') + const counts = req.dids.map((did) => byDid[did]?.postsCount ?? 0) return { counts } }, }) diff --git a/packages/bsky/src/data-plane/server/routes/profile.ts b/packages/bsky/src/data-plane/server/routes/profile.ts index bb0ab452885..5d4af42a3cd 100644 --- a/packages/bsky/src/data-plane/server/routes/profile.ts +++ b/packages/bsky/src/data-plane/server/routes/profile.ts @@ -1,43 +1,30 @@ import { ServiceImpl } from '@connectrpc/connect' import { Service } from '../../gen/bsky_connect' import { keyBy } from '@atproto/common' -import * as ui8 from 'uint8arrays' import { Database } from '../../../db' +import { getRecords } from './records' export default (db: Database): Partial> => ({ - async getProfiles(req) { + async getActors(req) { const { dids } = req if (dids.length === 0) { - return { records: [] } + return { actors: [] } } - const uris = dids.map((did) => `at://${did}/app.bsky.actor.profile/self`) - const res = await db.db - .selectFrom('record') - .selectAll() - .where('uri', 'in', uris) - .execute() - const byUri = keyBy(res, 'uri') - const records = uris.map((uri) => { - const row = byUri[uri] - const json = row ? row.json : JSON.stringify(null) - return ui8.fromString(json, 'utf8') + const profileUris = dids.map( + (did) => `at://${did}/app.bsky.actor.profile/self`, + ) + const [handlesRes, profiles] = await Promise.all([ + db.db.selectFrom('actor').where('did', 'in', dids).selectAll().execute(), + getRecords(db)({ uris: profileUris }), + ]) + const byDid = keyBy(handlesRes, 'did') + const actors = dids.map((did, i) => { + return { + handle: byDid[did]?.handle ?? undefined, + profile: profiles[i], + } }) - return { records } - }, - - async getHandles(req) { - const { dids } = req - if (dids.length === 0) { - return { handles: [] } - } - const res = await db.db - .selectFrom('actor') - .where('did', 'in', dids) - .selectAll() - .execute() - const byDid = keyBy(res, 'did') - const handles = dids.map((did) => byDid[did]?.handle ?? '') - return { handles } + return { actors } }, async getDidsByHandles(req) { diff --git a/packages/bsky/src/data-plane/server/routes/records.ts b/packages/bsky/src/data-plane/server/routes/records.ts index face8d56fab..6fc872b54e9 100644 --- a/packages/bsky/src/data-plane/server/routes/records.ts +++ b/packages/bsky/src/data-plane/server/routes/records.ts @@ -19,7 +19,7 @@ export default (db: Database): Partial> => ({ getThreadGateRecords: getRecords(db), }) -const getRecords = +export const getRecords = (db: Database) => async (req: { uris: string[] }): Promise<{ records: Record[] }> => { if (req.uris.length === 0) { diff --git a/packages/bsky/src/data-plane/server/routes/relationships.ts b/packages/bsky/src/data-plane/server/routes/relationships.ts new file mode 100644 index 00000000000..90e3adbbe20 --- /dev/null +++ b/packages/bsky/src/data-plane/server/routes/relationships.ts @@ -0,0 +1,148 @@ +import { ServiceImpl } from '@connectrpc/connect' +import { Service } from '../../gen/bsky_connect' +import { keyBy } from '@atproto/common' +import { Database } from '../../../db' +import { sql } from 'kysely' +import { valuesList } from '../../../db/util' + +export default (db: Database): Partial> => ({ + async getRelationships(req) { + const { actorDid, targetDids } = req + if (targetDids.length === 0) { + return { relationships: [] } + } + const { ref } = db.db.dynamic + const res = await db.db + .selectFrom('actor') + .where('did', 'in', targetDids) + .select([ + 'actor.did', + db.db + .selectFrom('mute') + .where('mute.mutedByDid', '=', actorDid) + .whereRef('mute.subjectDid', '=', ref('actor.did')) + .select(sql`${true}`.as('val')) + .as('muted'), + db.db + .selectFrom('list_item') + .innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri') + .where('list_mute.mutedByDid', '=', actorDid) + .whereRef('list_item.subjectDid', '=', ref('actor.did')) + .select('list_item.listUri') + .as('mutedByList'), + db.db + .selectFrom('actor_block') + .where('actor_block.creator', '=', actorDid) + .whereRef('actor_block.subjectDid', '=', ref('actor.did')) + .select('uri') + .as('blocking'), + db.db + .selectFrom('actor_block') + .where('actor_block.subjectDid', '=', actorDid) + .whereRef('actor_block.creator', '=', ref('actor.did')) + .select('uri') + .as('blockedBy'), + db.db + .selectFrom('list_item') + .innerJoin('list_block', 'list_block.subjectUri', 'list_item.listUri') + .where('list_block.creator', '=', actorDid) + .whereRef('list_item.subjectDid', '=', ref('actor.did')) + .select('list_item.listUri') + .as('blockingByList'), + db.db + .selectFrom('list_item') + .innerJoin('list_block', 'list_block.subjectUri', 'list_item.listUri') + .where('list_item.subjectDid', '=', actorDid) + .whereRef('list_block.creator', '=', ref('actor.did')) + .select('list_item.listUri') + .as('blockedByList'), + db.db + .selectFrom('follow') + .where('follow.creator', '=', actorDid) + .whereRef('follow.subjectDid', '=', ref('actor.did')) + .select('uri') + .as('following'), + db.db + .selectFrom('follow') + .where('follow.subjectDid', '=', actorDid) + .whereRef('follow.creator', '=', ref('actor.did')) + .select('uri') + .as('followedBy'), + ]) + .execute() + const byDid = keyBy(res, 'did') + const relationships = targetDids.map((did) => { + const row = byDid[did] ?? {} + return { + muted: row.muted ?? false, + mutedByList: row.mutedByList ?? '', + blockedBy: row.blockedBy ?? '', + blocking: row.blocking ?? '', + blockedByList: row.blockedByList ?? '', + blockingByList: row.blockingByList ?? '', + following: row.following ?? '', + followedBy: row.followedBy ?? '', + } + }) + return { relationships } + }, + + async getBlockExistence(req) { + const { pairs } = req + if (pairs.length === 0) { + return { exists: [] } + } + const { ref } = db.db.dynamic + const sourceRef = ref('pair.source') + const targetRef = ref('pair.target') + const values = valuesList(pairs.map((p) => sql`${p.a}, ${p.b}`)) + const res = await db.db + .selectFrom(values.as(sql`pair (source, target)`)) + .select([ + sql`${sourceRef}`.as('source'), + sql`${targetRef}`.as('target'), + ]) + .whereExists((qb) => + qb + .selectFrom('actor_block') + .whereRef('actor_block.creator', '=', sourceRef) + .whereRef('actor_block.subjectDid', '=', targetRef) + .select('uri'), + ) + .orWhereExists((qb) => + qb + .selectFrom('actor_block') + .whereRef('actor_block.creator', '=', targetRef) + .whereRef('actor_block.subjectDid', '=', sourceRef) + .select('uri'), + ) + .orWhereExists((qb) => + qb + .selectFrom('list_item') + .innerJoin('list_block', 'list_block.subjectUri', 'list_item.listUri') + .whereRef('list_block.creator', '=', sourceRef) + .whereRef('list_item.subjectDid', '=', targetRef) + .select('list_item.listUri'), + ) + .orWhereExists((qb) => + qb + .selectFrom('list_item') + .innerJoin('list_block', 'list_block.subjectUri', 'list_item.listUri') + .whereRef('list_block.creator', '=', targetRef) + .whereRef('list_item.subjectDid', '=', sourceRef) + .select('list_item.listUri'), + ) + .execute() + const existMap = res.reduce((acc, cur) => { + const key = [cur.source, cur.target].sort().join(',') + return acc.set(key, true) + }, {} as Map) + const exists = pairs.map((pair) => { + const key = [pair.a, pair.b].sort().join(',') + return existMap.get(key) === true + }) + return { + exists, + } + }, +}) diff --git a/packages/bsky/src/data-plane/server/routes/reposts.ts b/packages/bsky/src/data-plane/server/routes/reposts.ts index 0e995aa0b61..51cf95752e3 100644 --- a/packages/bsky/src/data-plane/server/routes/reposts.ts +++ b/packages/bsky/src/data-plane/server/routes/reposts.ts @@ -2,6 +2,7 @@ import { ServiceImpl } from '@connectrpc/connect' import { Service } from '../../gen/bsky_connect' import { Database } from '../../../db' import { TimeCidKeyset, paginate } from '../../../db/pagination' +import { keyBy } from '@atproto/common' export default (db: Database): Partial> => ({ async getRepostsBySubject(req) { @@ -28,15 +29,20 @@ export default (db: Database): Partial> => ({ } }, - async getRepostByActorAndSubject(req) { - const { actorDid, subjectUri } = req + async getRepostsByActorAndSubjects(req) { + const { actorDid, subjectUris } = req + if (subjectUris.length === 0) { + return { uris: [] } + } const res = await db.db .selectFrom('repost') .where('creator', '=', actorDid) - .where('subject', '=', subjectUri) - .select('uri') - .executeTakeFirst() - return { uri: res?.uri } + .where('subject', 'in', subjectUris) + .selectAll() + .execute() + const bySubject = keyBy(res, 'subject') + const uris = req.subjectUris.map((uri) => bySubject[uri]?.uri) + return { uris } }, async getActorReposts(req) { @@ -64,14 +70,17 @@ export default (db: Database): Partial> => ({ } }, - async getRepostsCount(req) { + async getRepostCounts(req) { + if (req.uris.length === 0) { + return { counts: [] } + } const res = await db.db .selectFrom('post_agg') - .where('uri', '=', req.subjectUri) - .select('repostCount') - .executeTakeFirst() - return { - count: res?.repostCount, - } + .where('uri', 'in', req.uris) + .selectAll() + .execute() + const byUri = keyBy(res, 'uri') + const counts = req.uris.map((uri) => byUri[uri]?.repostCount ?? 0) + return { counts } }, }) diff --git a/packages/bsky/src/data-plane/server/routes/suggestions.ts b/packages/bsky/src/data-plane/server/routes/suggestions.ts index 40ce8f13a5e..20f29c61f4b 100644 --- a/packages/bsky/src/data-plane/server/routes/suggestions.ts +++ b/packages/bsky/src/data-plane/server/routes/suggestions.ts @@ -3,7 +3,7 @@ import { Service } from '../../gen/bsky_connect' import { Database } from '../../../db' export default (db: Database): Partial> => ({ - async getSuggestions(req) { + async getFollowSuggestions(req) { const alreadyIncluded = parseCursor(req.cursor) const suggestions = await db.db .selectFrom('suggested_follow')