Skip to content

Commit

Permalink
implement getBlocks, block application fix
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Dec 13, 2023
1 parent cd4b277 commit 265a2e3
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 39 deletions.
106 changes: 72 additions & 34 deletions packages/bsky/src/api/app/bsky/graph/getBlocks.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,85 @@
import { mapDefined } from '@atproto/common'
import { Server } from '../../../../lexicon'
import { paginate, TimeCidKeyset } from '../../../../db/pagination'
import { QueryParams } from '../../../../lexicon/types/app/bsky/graph/getBlocks'
import AppContext from '../../../../context'
import { notSoftDeletedClause } from '../../../../db/util'
import {
createPipelineNew,
HydrationFnInput,
noRulesNew,
PresentationFnInput,
SkeletonFnInput,
} from '../../../../pipeline'
import { Hydrator } from '../../../../hydration/hydrator'
import { Views } from '../../../../views'

export default function (server: Server, ctx: AppContext) {
const getBlocks = createPipelineNew(
skeleton,
hydration,
noRulesNew,
presentation,
)
server.app.bsky.graph.getBlocks({
auth: ctx.authVerifier,
handler: async ({ params, auth }) => {
const { limit, cursor } = params
const requester = auth.credentials.did
const db = ctx.db.getReplica()
const { ref } = db.db.dynamic

let blocksReq = db.db
.selectFrom('actor_block')
.where('actor_block.creator', '=', requester)
.innerJoin('actor as subject', 'subject.did', 'actor_block.subjectDid')
.where(notSoftDeletedClause(ref('subject')))
.selectAll('subject')
.select(['actor_block.cid as cid', 'actor_block.sortAt as sortAt'])

const keyset = new TimeCidKeyset(
ref('actor_block.sortAt'),
ref('actor_block.cid'),
)
blocksReq = paginate(blocksReq, {
limit,
cursor,
keyset,
})

const blocksRes = await blocksReq.execute()

const actorService = ctx.services.actor(db)
const blocks = await actorService.views.profilesList(blocksRes, requester)

const viewer = auth.credentials.did
const result = await getBlocks({ ...params, viewer }, ctx)
return {
encoding: 'application/json',
body: {
blocks,
cursor: keyset.packFromResult(blocksRes),
},
body: result,
}
},
})
}

const skeleton = async (input: SkeletonFnInput<Context, Params>) => {
const { params, ctx } = input
const { blockUris, cursor } = await ctx.hydrator.dataplane.getBlocks({
actorDid: params.viewer,
cursor: params.cursor,
limit: params.limit,
})
const blocks = await ctx.hydrator.graph.getBlocks(blockUris)
const blockedDids = mapDefined(
blockUris,
(uri) => blocks.get(uri)?.record.subject,
)
return {
blockedDids,
cursor: cursor || undefined,
}
}

const hydration = async (
input: HydrationFnInput<Context, Params, SkeletonState>,
) => {
const { ctx, params, skeleton } = input
const { viewer } = params
const { blockedDids } = skeleton
return ctx.hydrator.hydrateProfiles(blockedDids, viewer)
}

const presentation = (
input: PresentationFnInput<Context, Params, SkeletonState>,
) => {
const { ctx, hydration, skeleton } = input
const { blockedDids, cursor } = skeleton
const blocks = mapDefined(blockedDids, (did) => {
return ctx.views.profile(did, hydration)
})
return { blocks, cursor }
}

type Context = {
hydrator: Hydrator
views: Views
}

type Params = QueryParams & {
viewer: string
}

type SkeletonState = {
blockedDids: string[]
cursor?: string
}
2 changes: 1 addition & 1 deletion packages/bsky/src/api/app/bsky/graph/getMutes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mapDefined } from '@atproto/common'
import { Server } from '../../../../lexicon'
import { QueryParams } from '../../../../lexicon/types/app/bsky/graph/getMutes'
import AppContext from '../../../../context'
Expand All @@ -10,7 +11,6 @@ import {
createPipelineNew,
noRulesNew,
} from '../../../../pipeline'
import { mapDefined } from '@atproto/common'

export default function (server: Server, ctx: AppContext) {
const getMutes = createPipelineNew(
Expand Down
16 changes: 12 additions & 4 deletions packages/bsky/src/hydration/graph.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow'
import { Record as BlockRecord } from '../lexicon/types/app/bsky/graph/block'
import { Record as ListRecord } from '../lexicon/types/app/bsky/graph/list'
import { Record as ListItemRecord } from '../lexicon/types/app/bsky/graph/listitem'
import { DataPlaneClient } from '../data-plane/client'
Expand All @@ -21,6 +22,8 @@ export type ListViewerStates = HydrationMap<ListViewerState>
export type Follow = RecordInfo<FollowRecord>
export type Follows = HydrationMap<Follow>

export type Block = RecordInfo<BlockRecord>

export type RelationshipPair = [didA: string, didB: string]

const dedupePairs = (pairs: RelationshipPair[]): RelationshipPair[] => {
Expand Down Expand Up @@ -127,10 +130,7 @@ export class GraphHydrator {
}

async getBidirectionalBlocks(pairs: RelationshipPair[]): Promise<Blocks> {
const deduped = dedupePairs(pairs).map((pair) => ({
a: pair[0],
b: pair[0],
}))
const deduped = dedupePairs(pairs).map(([a, b]) => ({ a, b }))
const res = await this.dataplane.getBlockExistence({ pairs: deduped })
const blocks = new Blocks()
for (let i = 0; i < deduped.length; i++) {
Expand All @@ -148,6 +148,14 @@ export class GraphHydrator {
}, new HydrationMap<Follow>())
}

async getBlocks(uris: string[], includeTakedowns = false): Promise<Follows> {
const res = await this.dataplane.getBlockRecords({ uris })
return uris.reduce((acc, uri, i) => {
const record = parseRecord<BlockRecord>(res.records[i], includeTakedowns)
return acc.set(uri, record ?? null)
}, new HydrationMap<Block>())
}

async getActorFollows(input: {
did: string
cursor?: string
Expand Down
1 change: 1 addition & 0 deletions packages/bsky/src/hydration/hydrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ export const mergeStates = (
postBlocks: mergeMaps(stateA.postBlocks, stateB.postBlocks),
reposts: mergeMaps(stateA.reposts, stateB.reposts),
follows: mergeMaps(stateA.follows, stateB.follows),
followBlocks: mergeMaps(stateA.followBlocks, stateB.followBlocks),
lists: mergeMaps(stateA.lists, stateB.lists),
listViewers: mergeMaps(stateA.listViewers, stateB.listViewers),
listItems: mergeMaps(stateA.listItems, stateB.listItems),
Expand Down

0 comments on commit 265a2e3

Please sign in to comment.