Skip to content

Commit

Permalink
update dataplane, sync with bsky proto updates
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Dec 19, 2023
1 parent 70cf5c2 commit 9574a86
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 42 deletions.
2 changes: 1 addition & 1 deletion packages/bsky/src/api/app/bsky/feed/getAuthorFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export const skeleton = async (inputs: {
})
return {
actor,
uris: res.uris,
uris: res.items.map((item) => item.repost || item.uri),
cursor: parseString(res.cursor),
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/src/api/app/bsky/feed/getLikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const skeleton = async (inputs: {
}): Promise<Skeleton> => {
const { ctx, params } = inputs
const likesRes = await ctx.hydrator.dataplane.getLikesBySubject({
subjectUri: params.uri,
subject: { uri: params.uri, cid: params.cid },
cursor: params.cursor,
limit: params.limit,
})
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/src/api/app/bsky/feed/getTimeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const skeleton = async (inputs: {
cursor: params.cursor,
})
return {
uris: res.uris,
uris: res.items.map((item) => item.repost || item.uri),
cursor: parseString(res.cursor),
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/bsky/src/api/app/bsky/graph/getFollowers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ const skeleton = async (input: SkeletonFnInput<Context, Params>) => {
if (!subjectDid) {
throw new InvalidRequestError(`Actor not found: ${params.actor}`)
}
const { uris, cursor } = await ctx.hydrator.graph.getActorFollowers({
const { followers, cursor } = await ctx.hydrator.graph.getActorFollowers({
did: subjectDid,
cursor: params.cursor,
limit: params.limit,
})
return {
subjectDid,
followUris: uris,
followUris: followers.map((f) => f.uri),
cursor: cursor || undefined,
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/bsky/src/api/app/bsky/graph/getFollows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ const skeleton = async (input: SkeletonFnInput<Context, Params>) => {
if (!subjectDid) {
throw new InvalidRequestError(`Actor not found: ${params.actor}`)
}
const { uris, cursor } = await ctx.hydrator.graph.getActorFollows({
const { follows, cursor } = await ctx.hydrator.graph.getActorFollows({
did: subjectDid,
cursor: params.cursor,
limit: params.limit,
})
return {
subjectDid,
followUris: uris,
followUris: follows.map((f) => f.uri),
cursor: cursor || undefined,
}
}
Expand Down
26 changes: 14 additions & 12 deletions packages/bsky/src/api/app/bsky/graph/getList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '../../../../pipeline'
import { Hydrator, mergeStates } from '../../../../hydration/hydrator'
import { Views } from '../../../../views'
import { ListItemInfo } from '../../../../data-plane/gen/bsky_pb'

export default function (server: Server, ctx: AppContext) {
const getList = createPipeline(skeleton, hydration, noRules, presentation)
Expand All @@ -32,14 +33,14 @@ const skeleton = async (
input: SkeletonFnInput<Context, Params>,
): Promise<SkeletonState> => {
const { ctx, params } = input
const { listitemUris, cursor } = await ctx.hydrator.dataplane.getListMembers({
const { listitems, cursor } = await ctx.hydrator.dataplane.getListMembers({
listUri: params.list,
limit: params.limit,
cursor: params.cursor,
})
return {
listUri: params.list,
listitemUris: listitemUris,
listitems,
cursor: cursor || undefined,
}
}
Expand All @@ -49,24 +50,25 @@ const hydration = async (
) => {
const { ctx, params, skeleton } = input
const { viewer } = params
const { listUri, listitemUris } = skeleton
const [listState, listitemState] = await Promise.all([
const { listUri, listitems } = skeleton
const [listState, profileState] = await Promise.all([
ctx.hydrator.hydrateLists([listUri], viewer),
ctx.hydrator.hydrateListItems(listitemUris, viewer),
ctx.hydrator.hydrateProfiles(
listitems.map(({ did }) => did),
viewer,
),
])
return mergeStates(listState, listitemState)
return mergeStates(listState, profileState)
}

const presentation = (
input: PresentationFnInput<Context, Params, SkeletonState>,
) => {
const { ctx, skeleton, hydration } = input
const { listUri, listitemUris, cursor } = skeleton
const { listUri, listitems, cursor } = skeleton
const list = ctx.views.list(listUri, hydration)
const items = mapDefined(listitemUris, (uri) => {
const listitem = hydration.listItems?.get(uri)
if (!listitem) return
const subject = ctx.views.profile(listitem.record.subject, hydration)
const items = mapDefined(listitems, ({ uri, did }) => {
const subject = ctx.views.profile(did, hydration)
if (!subject) return
return { uri, subject }
})
Expand All @@ -87,6 +89,6 @@ type Params = QueryParams & {

type SkeletonState = {
listUri: string
listitemUris: string[]
listitems: ListItemInfo[]
cursor?: string
}
13 changes: 10 additions & 3 deletions packages/bsky/src/data-plane/server/routes/feeds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
const feedItems = await builder.execute()

return {
uris: feedItems.map((row) => row.uri),
items: feedItems.map(feedItemFromRow),
cursor: keyset.packFromResult(feedItems),
}
},
Expand Down Expand Up @@ -99,7 +99,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
.slice(0, limit)

return {
uris: feedItems.map((item) => item.uri),
items: feedItems.map(feedItemFromRow),
cursor: keyset.packFromResult(feedItems),
}
},
Expand All @@ -124,8 +124,15 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
const feedItems = await builder.execute()

return {
uris: feedItems.map((item) => item.uri),
uris: feedItems.map((item) => item.uri), // @TODO consider switching to FeedItemInfo[]
cursor: keyset.packFromResult(feedItems),
}
},
})

const feedItemFromRow = (row: { postUri: string; uri: string }) => {
return {
uri: row.postUri,
repost: row.uri === row.postUri ? undefined : row.uri,
}
}
6 changes: 4 additions & 2 deletions packages/bsky/src/data-plane/server/routes/follows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
.select([
'follow.uri as uri',
'follow.cid as cid',
'follow.subjectDid as subjectDid',
'follow.sortAt as sortAt',
])

Expand All @@ -46,7 +47,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({

const followers = await followersReq.execute()
return {
uris: followers.map((f) => f.uri),
followers: followers.map((f) => ({ uri: f.uri, did: f.subjectDid })),
cursor: keyset.packFromResult(followers),
}
},
Expand All @@ -62,6 +63,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
.select([
'follow.uri as uri',
'follow.cid as cid',
'follow.subjectDid as subjectDid',
'follow.sortAt as sortAt',
])

Expand All @@ -76,7 +78,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
const follows = await followsReq.execute()

return {
uris: follows.map((f) => f.uri),
follows: follows.map((f) => ({ uri: f.uri, did: f.subjectDid })),
cursor: keyset.packFromResult(follows),
}
},
Expand Down
23 changes: 17 additions & 6 deletions packages/bsky/src/data-plane/server/routes/likes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import { keyBy } from '@atproto/common'

export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
async getLikesBySubject(req) {
const { subjectUri, cursor, limit } = req
const { subject, cursor, limit } = req
const { ref } = db.db.dynamic

if (!subject?.uri) {
return { uris: [] }
}

// @NOTE ignoring subject.cid
let builder = db.db
.selectFrom('like')
.where('like.subject', '=', subjectUri)
.where('like.subject', '=', subject?.uri)
.selectAll('like')

const keyset = new TimeCidKeyset(ref('like.sortAt'), ref('like.cid'))
Expand All @@ -30,18 +35,24 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
},

async getLikesByActorAndSubjects(req) {
const { actorDid, subjectUris } = req
if (subjectUris.length === 0) {
const { actorDid, refs } = req
if (refs.length === 0) {
return { uris: [] }
}
// @NOTE ignoring ref.cid
const res = await db.db
.selectFrom('like')
.where('creator', '=', actorDid)
.where('subject', 'in', subjectUris)
.where(
'subject',
'in',
refs.map(({ uri }) => uri),
)
.selectAll()
.execute()
const bySubject = keyBy(res, 'subject')
const uris = req.subjectUris.map((uri) => bySubject[uri]?.uri)
// @TODO handling undefineds properly, or do we need to turn them into empty strings?
const uris = refs.map(({ uri }) => bySubject[uri]?.uri)
return { uris }
},

Expand Down
5 changes: 4 additions & 1 deletion packages/bsky/src/data-plane/server/routes/lists.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({

const listItems = await builder.execute()
return {
listitemUris: listItems.map((item) => item.uri),
listitems: listItems.map((item) => ({
uri: item.uri,
did: item.subjectDid,
})),
cursor: keyset.packFromResult(listItems),
}
},
Expand Down
13 changes: 9 additions & 4 deletions packages/bsky/src/data-plane/server/routes/reposts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,23 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
},

async getRepostsByActorAndSubjects(req) {
const { actorDid, subjectUris } = req
if (subjectUris.length === 0) {
const { actorDid, refs } = req
if (refs.length === 0) {
return { uris: [] }
}
const res = await db.db
.selectFrom('repost')
.where('creator', '=', actorDid)
.where('subject', 'in', subjectUris)
.where(
'subject',
'in',
refs.map(({ uri }) => uri),
)
.selectAll()
.execute()
const bySubject = keyBy(res, 'subject')
const uris = req.subjectUris.map((uri) => bySubject[uri]?.uri)
// @TODO handling undefineds properly, or do we need to turn them into empty strings?
const uris = refs.map(({ uri }) => bySubject[uri]?.uri)
return { uris }
},

Expand Down
6 changes: 3 additions & 3 deletions packages/bsky/src/hydration/feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ export class FeedHydrator {
const [likes, reposts] = await Promise.all([
this.dataplane.getLikesByActorAndSubjects({
actorDid: viewer,
subjectUris: uris,
refs: uris.map((uri) => ({ uri })),
}),
this.dataplane.getRepostsByActorAndSubjects({
actorDid: viewer,
subjectUris: uris,
refs: uris.map((uri) => ({ uri })),
}),
])
return uris.reduce((acc, uri, i) => {
Expand Down Expand Up @@ -119,7 +119,7 @@ export class FeedHydrator {
): Promise<FeedGenViewerStates> {
const likes = await this.dataplane.getLikesByActorAndSubjects({
actorDid: viewer,
subjectUris: uris,
refs: uris.map((uri) => ({ uri })),
})
return uris.reduce((acc, uri, i) => {
return acc.set(uri, {
Expand Down
9 changes: 5 additions & 4 deletions packages/bsky/src/hydration/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Record as ListRecord } from '../lexicon/types/app/bsky/graph/list'
import { Record as ListItemRecord } from '../lexicon/types/app/bsky/graph/listitem'
import { DataPlaneClient } from '../data-plane/client'
import { HydrationMap, RecordInfo, parseRecord } from './util'
import { FollowInfo } from '../data-plane/gen/bsky_pb'

export type List = RecordInfo<ListRecord>
export type Lists = HydrationMap<List>
Expand Down Expand Up @@ -160,27 +161,27 @@ export class GraphHydrator {
did: string
cursor?: string
limit?: number
}): Promise<{ uris: string[]; cursor: string }> {
}): Promise<{ follows: FollowInfo[]; cursor: string }> {
const { did, cursor, limit } = input
const res = await this.dataplane.getFollows({
actorDid: did,
cursor,
limit,
})
return { uris: res.uris, cursor: res.cursor }
return { follows: res.follows, cursor: res.cursor }
}

async getActorFollowers(input: {
did: string
cursor?: string
limit?: number
}): Promise<{ uris: string[]; cursor: string }> {
}): Promise<{ followers: FollowInfo[]; cursor: string }> {
const { did, cursor, limit } = input
const res = await this.dataplane.getFollowers({
actorDid: did,
cursor,
limit,
})
return { uris: res.uris, cursor: res.cursor }
return { followers: res.followers, cursor: res.cursor }
}
}

0 comments on commit 9574a86

Please sign in to comment.