Skip to content

Commit

Permalink
follow endpoints using data plane
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Dec 8, 2023
1 parent 39aa7b4 commit 5cd6fc7
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 215 deletions.
168 changes: 78 additions & 90 deletions packages/bsky/src/api/app/bsky/graph/getFollowers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,34 @@ import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import { QueryParams } from '../../../../lexicon/types/app/bsky/graph/getFollowers'
import AppContext from '../../../../context'
import { Database } from '../../../../db'
import { notSoftDeletedClause } from '../../../../db/util'
import { paginate, TimeCidKeyset } from '../../../../db/pagination'
import { Actor } from '../../../../db/tables/actor'
import { ActorInfoMap, ActorService } from '../../../../services/actor'
import { BlockAndMuteState, GraphService } from '../../../../services/graph'
import { createPipeline } from '../../../../pipeline'
import {
HydrationFnInput,
PresentationFnInput,
RulesFnInput,
SkeletonFnInput,
createPipelineNew,
} from '../../../../pipeline'
import { didFromUri } from '../../../../hydration/util'
import { Hydrator } from '../../../../hydration/hydrator'
import { Views } from '../../../../views'

export default function (server: Server, ctx: AppContext) {
const getFollowers = createPipeline(
const getFollowers = createPipelineNew(
skeleton,
hydration,
noBlocksInclInvalid,
noBlocks,
presentation,
)
server.app.bsky.graph.getFollowers({
auth: ctx.authOptionalAccessOrRoleVerifier,
handler: async ({ params, auth }) => {
const db = ctx.db.getReplica()
const actorService = ctx.services.actor(db)
const graphService = ctx.services.graph(db)
const viewer = 'did' in auth.credentials ? auth.credentials.did : null
const canViewTakendownProfile =
auth.credentials.type === 'role' && auth.credentials.triage

const result = await getFollowers(
{ ...params, viewer, canViewTakendownProfile },
{ db, actorService, graphService },
ctx,
)

return {
Expand All @@ -41,91 +41,85 @@ export default function (server: Server, ctx: AppContext) {
})
}

const skeleton = async (
params: Params,
ctx: Context,
): Promise<SkeletonState> => {
const { db, actorService } = ctx
const { limit, cursor, actor, canViewTakendownProfile } = params
const { ref } = db.db.dynamic

const subject = await actorService.getActor(actor, canViewTakendownProfile)
if (!subject) {
throw new InvalidRequestError(`Actor not found: ${actor}`)
const skeleton = async (input: SkeletonFnInput<Context, Params>) => {
const { params, ctx } = input
const [subjectDid] = await ctx.hydrator.actor.getDidsDefined([params.actor])
if (!subjectDid) {
throw new InvalidRequestError(`Actor not found: ${params.actor}`)
}

let followersReq = db.db
.selectFrom('follow')
.where('follow.subjectDid', '=', subject.did)
.innerJoin('actor as creator', 'creator.did', 'follow.creator')
.if(!canViewTakendownProfile, (qb) =>
qb.where(notSoftDeletedClause(ref('creator'))),
)
.selectAll('creator')
.select(['follow.cid as cid', 'follow.sortAt as sortAt'])

const keyset = new TimeCidKeyset(ref('follow.sortAt'), ref('follow.cid'))
followersReq = paginate(followersReq, {
limit,
cursor,
keyset,
const result = await ctx.hydrator.graph.getActorFollowers({
did: subjectDid,
cursor: params.cursor,
limit: params.limit,
})

const followers = await followersReq.execute()
return {
params,
followers,
subject,
cursor: keyset.packFromResult(followers),
subjectDid,
followUris: result.uris,
cursor: result.cursor,
}
}

const hydration = async (state: SkeletonState, ctx: Context) => {
const { graphService, actorService } = ctx
const { params, followers, subject } = state
const hydration = async (
input: HydrationFnInput<Context, Params, SkeletonState>,
) => {
const { ctx, params, skeleton } = input
const { viewer } = params
const [actors, bam] = await Promise.all([
actorService.views.profiles([subject, ...followers], viewer),
graphService.getBlockAndMuteState(
followers.flatMap((item) => {
if (viewer) {
return [
[viewer, item.did],
[subject.did, item.did],
]
}
return [[subject.did, item.did]]
}),
),
])
return { ...state, bam, actors }
const { followUris, subjectDid } = skeleton
// @TODO hydrate follows w/ block info
const follows = await ctx.hydrator.graph.getFollows(followUris, {
disallowBlock: true,
})
const dids = [subjectDid]
for (const [uri, follow] of follows) {
if (follow) {
dids.push(didFromUri(uri))
}
}
return ctx.hydrator.hydrateProfiles(dids, viewer)
}

const noBlocksInclInvalid = (state: HydrationState) => {
const { subject } = state
const { viewer } = state.params
state.followers = state.followers.filter(
(item) =>
!state.bam.block([subject.did, item.did]) &&
(!viewer || !state.bam.block([viewer, item.did])),
)
return state
const noBlocks = (input: RulesFnInput<Context, Params, SkeletonState>) => {
const { skeleton, params, hydration, ctx } = input
const { viewer } = params
if (viewer) {
skeleton.followUris = skeleton.followUris.filter((followUri) => {
const followerDid = didFromUri(followUri)
return !ctx.views.viewerBlockExists(followerDid, hydration)
})
}
return skeleton
}

const presentation = (state: HydrationState) => {
const { params, followers, subject, actors, cursor } = state
const subjectView = actors[subject.did]
const followersView = mapDefined(followers, (item) => actors[item.did])
if (!subjectView) {
const presentation = (
input: PresentationFnInput<Context, Params, SkeletonState>,
) => {
const { ctx, hydration, skeleton, params } = input
const { subjectDid, followUris, cursor } = skeleton
const isTakendown = (did: string) =>
ctx.views.actorIsTakendown(did, hydration)

const subject = ctx.views.profile(subjectDid, hydration)
if (
!subject ||
(!params.canViewTakendownProfile && isTakendown(subjectDid))
) {
throw new InvalidRequestError(`Actor not found: ${params.actor}`)
}
return { followers: followersView, subject: subjectView, cursor }

const followers = mapDefined(followUris, (followUri) => {
const followerDid = didFromUri(followUri)
if (!params.canViewTakendownProfile && isTakendown(followerDid)) {
return
}
return ctx.views.profile(didFromUri(followUri), hydration)
})

return { followers, subject, cursor }
}

type Context = {
db: Database
actorService: ActorService
graphService: GraphService
hydrator: Hydrator
views: Views
}

type Params = QueryParams & {
Expand All @@ -134,13 +128,7 @@ type Params = QueryParams & {
}

type SkeletonState = {
params: Params
followers: Actor[]
subject: Actor
subjectDid: string
followUris: string[]
cursor?: string
}

type HydrationState = SkeletonState & {
bam: BlockAndMuteState
actors: ActorInfoMap
}
Loading

0 comments on commit 5cd6fc7

Please sign in to comment.