Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb feed items and cids to dataplane, avoid empty calls #2013

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/bsky/proto/bsky.proto
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ message GetListFeedRequest {
}

message GetListFeedResponse {
repeated string uris = 1;
repeated FeedItem items = 1;
string cursor = 2;
}

Expand Down
17 changes: 9 additions & 8 deletions packages/bsky/src/api/app/bsky/feed/getActorLikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Views } from '../../../../views'
import { DataPlaneClient } from '../../../../data-plane'
import { parseString } from '../../../../hydration/util'
import { creatorFromUri } from '../../../../views/util'
import { FeedItem } from '../../../../hydration/feed'

export default function (server: Server, ctx: AppContext) {
const getActorLikes = createPipeline(
Expand Down Expand Up @@ -56,10 +57,10 @@ const skeleton = async (inputs: {
cursor,
})

const postUris = likesRes.likes.map((l) => l.subject)
const items = likesRes.likes.map((l) => ({ post: { uri: l.subject } }))

return {
postUris,
items,
cursor: parseString(likesRes.cursor),
}
}
Expand All @@ -70,7 +71,7 @@ const hydration = async (inputs: {
skeleton: Skeleton
}) => {
const { ctx, params, skeleton } = inputs
return await ctx.hydrator.hydrateFeedPosts(skeleton.postUris, params.viewer)
return await ctx.hydrator.hydrateFeedItems(skeleton.items, params.viewer)
}

const noPostBlocks = (inputs: {
Expand All @@ -79,8 +80,8 @@ const noPostBlocks = (inputs: {
hydration: HydrationState
}) => {
const { ctx, skeleton, hydration } = inputs
skeleton.postUris = skeleton.postUris.filter((uri) => {
const creator = creatorFromUri(uri)
skeleton.items = skeleton.items.filter((item) => {
const creator = creatorFromUri(item.post.uri)
return !ctx.views.viewerBlockExists(creator, hydration)
})
return skeleton
Expand All @@ -92,8 +93,8 @@ const presentation = (inputs: {
hydration: HydrationState
}) => {
const { ctx, skeleton, hydration } = inputs
const feed = mapDefined(skeleton.postUris, (uri) =>
ctx.views.feedViewPost(uri, hydration),
const feed = mapDefined(skeleton.items, (item) =>
ctx.views.feedViewPost(item, hydration),
)
return {
feed,
Expand All @@ -110,6 +111,6 @@ type Context = {
type Params = QueryParams & { viewer: string | null }

type Skeleton = {
postUris: string[]
items: FeedItem[]
cursor?: string
}
20 changes: 13 additions & 7 deletions packages/bsky/src/api/app/bsky/feed/getAuthorFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Views } from '../../../../views'
import { DataPlaneClient } from '../../../../data-plane'
import { parseString } from '../../../../hydration/util'
import { Actor } from '../../../../hydration/actor'
import { FeedItem } from '../../../../hydration/feed'

export default function (server: Server, ctx: AppContext) {
const getAuthorFeed = createPipeline(
Expand Down Expand Up @@ -66,7 +67,12 @@ export const skeleton = async (inputs: {
})
return {
actor,
uris: res.items.map((item) => item.repost || item.uri),
items: res.items.map((item) => ({
post: { uri: item.uri, cid: item.cid || undefined },
repost: item.repost
? { uri: item.repost, cid: item.repostCid || undefined }
: undefined,
})),
cursor: parseString(res.cursor),
}
}
Expand All @@ -78,7 +84,7 @@ const hydration = async (inputs: {
}): Promise<HydrationState> => {
const { ctx, params, skeleton } = inputs
const [feedPostState, profileViewerState = {}] = await Promise.all([
ctx.hydrator.hydrateFeedPosts(skeleton.uris, params.viewer),
ctx.hydrator.hydrateFeedItems(skeleton.items, params.viewer),
params.viewer
? ctx.hydrator.actor.getProfileViewerStates(
[skeleton.actor.did],
Expand Down Expand Up @@ -108,8 +114,8 @@ const noBlocksOrMutedReposts = (inputs: {
'BlockedByActor',
)
}
skeleton.uris = skeleton.uris.filter((uri) => {
const bam = ctx.views.feedItemBlocksAndMutes(uri, hydration)
skeleton.items = skeleton.items.filter((item) => {
const bam = ctx.views.feedItemBlocksAndMutes(item, hydration)
return (
!bam.authorBlocked &&
!bam.originatorBlocked &&
Expand All @@ -125,8 +131,8 @@ const presentation = (inputs: {
hydration: HydrationState
}) => {
const { ctx, skeleton, hydration } = inputs
const feed = mapDefined(skeleton.uris, (uri) =>
ctx.views.feedViewPost(uri, hydration),
const feed = mapDefined(skeleton.items, (item) =>
ctx.views.feedViewPost(item, hydration),
)
return { feed, cursor: skeleton.cursor }
}
Expand All @@ -141,6 +147,6 @@ type Params = QueryParams & { viewer: string | null }

type Skeleton = {
actor: Actor
uris: string[]
items: FeedItem[]
cursor?: string
}
38 changes: 18 additions & 20 deletions packages/bsky/src/api/app/bsky/feed/getFeed.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mapDefined } from '@atproto/common'
import {
InvalidRequestError,
UpstreamFailureError,
Expand All @@ -15,15 +16,15 @@ import { QueryParams as GetFeedParams } from '../../../../lexicon/types/app/bsky
import { OutputSchema as SkeletonOutput } from '../../../../lexicon/types/app/bsky/feed/getFeedSkeleton'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { AlgoResponse, AlgoResponseItem } from '../../../feed-gen/types'
import { AlgoResponse, toFeedItem } from '../../../feed-gen/types'
import {
HydrationFnInput,
PresentationFnInput,
RulesFnInput,
SkeletonFnInput,
createPipeline,
} from '../../../../pipeline'
import { mapDefined } from '@atproto/common'
import { FeedItem } from '../../../../hydration/feed'

export default function (server: Server, ctx: AppContext) {
const getFeed = createPipeline(
Expand Down Expand Up @@ -59,13 +60,16 @@ const skeleton = async (
const { ctx, params } = inputs
const timerSkele = new ServerTimer('skele').start()
const localAlgo = ctx.algos[params.feed]
const { feedItems, cursor, ...passthrough } =
localAlgo !== undefined
? await localAlgo(ctx, params, params.viewer)
: await skeletonFromFeedGen(ctx, params)
const {
feedItems: algoItems,
cursor,
...passthrough
} = localAlgo !== undefined
? await localAlgo(ctx, params, params.viewer)
: await skeletonFromFeedGen(ctx, params)
return {
cursor,
feedItems,
items: algoItems.map(toFeedItem),
timerSkele: timerSkele.stop(),
timerHydr: new ServerTimer('hydr').start(),
passthrough,
Expand All @@ -77,9 +81,8 @@ const hydration = async (
) => {
const { ctx, params, skeleton } = inputs
const timerHydr = new ServerTimer('hydr').start()
const feedItemUris = skeleton.feedItems.map((item) => item.itemUri)
const hydration = await ctx.hydrator.hydrateFeedPosts(
feedItemUris,
const hydration = await ctx.hydrator.hydrateFeedItems(
skeleton.items,
params.viewer,
)
skeleton.timerHydr = timerHydr.stop()
Expand All @@ -88,8 +91,8 @@ const hydration = async (

const noBlocksOrMutes = (inputs: RulesFnInput<Context, Params, Skeleton>) => {
const { ctx, skeleton, hydration } = inputs
skeleton.feedItems = skeleton.feedItems.filter((item) => {
const bam = ctx.views.feedItemBlocksAndMutes(item.itemUri, hydration)
skeleton.items = skeleton.items.filter((item) => {
const bam = ctx.views.feedItemBlocksAndMutes(item, hydration)
return (
!bam.authorBlocked &&
!bam.authorMuted &&
Expand All @@ -104,13 +107,8 @@ const presentation = (
inputs: PresentationFnInput<Context, Params, Skeleton>,
) => {
const { ctx, params, skeleton, hydration } = inputs
const feed = mapDefined(skeleton.feedItems, (item) => {
const view = ctx.views.feedViewPost(item.itemUri, hydration)
if (view?.post.uri !== item.postUri) {
return undefined
} else {
return view
}
const feed = mapDefined(skeleton.items, (item) => {
return ctx.views.feedViewPost(item, hydration)
}).slice(0, params.limit)
return {
feed,
Expand All @@ -126,7 +124,7 @@ type Context = AppContext
type Params = GetFeedParams & { viewer: string | null; authorization?: string }

type Skeleton = {
feedItems: AlgoResponseItem[]
items: FeedItem[]
passthrough: Record<string, unknown> // pass through additional items in feedgen response
cursor?: string
timerSkele: ServerTimer
Expand Down
20 changes: 13 additions & 7 deletions packages/bsky/src/api/app/bsky/feed/getListFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Views } from '../../../../views'
import { DataPlaneClient } from '../../../../data-plane'
import { mapDefined } from '@atproto/common'
import { parseString } from '../../../../hydration/util'
import { FeedItem } from '../../../../hydration/feed'

export default function (server: Server, ctx: AppContext) {
const getListFeed = createPipeline(
Expand Down Expand Up @@ -47,7 +48,12 @@ export const skeleton = async (inputs: {
cursor: params.cursor,
})
return {
uris: res.uris,
items: res.items.map((item) => ({
post: { uri: item.uri, cid: item.cid || undefined },
repost: item.repost
? { uri: item.repost, cid: item.repostCid || undefined }
: undefined,
})),
cursor: parseString(res.cursor),
}
}
Expand All @@ -58,7 +64,7 @@ const hydration = async (inputs: {
skeleton: Skeleton
}): Promise<HydrationState> => {
const { ctx, params, skeleton } = inputs
return ctx.hydrator.hydrateFeedPosts(skeleton.uris, params.viewer)
return ctx.hydrator.hydrateFeedItems(skeleton.items, params.viewer)
}

const noBlocksOrMutes = (inputs: {
Expand All @@ -67,8 +73,8 @@ const noBlocksOrMutes = (inputs: {
hydration: HydrationState
}): Skeleton => {
const { ctx, skeleton, hydration } = inputs
skeleton.uris = skeleton.uris.filter((uri) => {
const bam = ctx.views.feedItemBlocksAndMutes(uri, hydration)
skeleton.items = skeleton.items.filter((item) => {
const bam = ctx.views.feedItemBlocksAndMutes(item, hydration)
return (
!bam.authorBlocked &&
!bam.authorMuted &&
Expand All @@ -85,8 +91,8 @@ const presentation = (inputs: {
hydration: HydrationState
}) => {
const { ctx, skeleton, hydration } = inputs
const feed = mapDefined(skeleton.uris, (uri) =>
ctx.views.feedViewPost(uri, hydration),
const feed = mapDefined(skeleton.items, (item) =>
ctx.views.feedViewPost(item, hydration),
)
return { feed, cursor: skeleton.cursor }
}
Expand All @@ -100,6 +106,6 @@ type Context = {
type Params = QueryParams & { viewer: string | null }

type Skeleton = {
uris: string[]
items: FeedItem[]
cursor?: string
}
5 changes: 4 additions & 1 deletion packages/bsky/src/api/app/bsky/feed/getPostThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ const hydration = async (
inputs: HydrationFnInput<Context, Params, Skeleton>,
) => {
const { ctx, params, skeleton } = inputs
return ctx.hydrator.hydrateThreadPosts(skeleton.uris, params.viewer)
return ctx.hydrator.hydrateThreadPosts(
skeleton.uris.map((uri) => ({ uri })),
params.viewer,
)
}

const presentation = (
Expand Down
5 changes: 4 additions & 1 deletion packages/bsky/src/api/app/bsky/feed/getPosts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ const hydration = async (inputs: {
skeleton: Skeleton
}) => {
const { ctx, params, skeleton } = inputs
return ctx.hydrator.hydratePosts(skeleton.posts, params.viewer)
return ctx.hydrator.hydratePosts(
skeleton.posts.map((uri) => ({ uri })),
params.viewer,
)
}

const noBlocks = (inputs: {
Expand Down
20 changes: 13 additions & 7 deletions packages/bsky/src/api/app/bsky/feed/getTimeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Views } from '../../../../views'
import { DataPlaneClient } from '../../../../data-plane'
import { parseString } from '../../../../hydration/util'
import { mapDefined } from '@atproto/common'
import { FeedItem } from '../../../../hydration/feed'

export default function (server: Server, ctx: AppContext) {
const getTimeline = createPipeline(
Expand Down Expand Up @@ -47,7 +48,12 @@ export const skeleton = async (inputs: {
cursor: params.cursor,
})
return {
uris: res.items.map((item) => item.repost || item.uri),
items: res.items.map((item) => ({
post: { uri: item.uri, cid: item.cid || undefined },
repost: item.repost
? { uri: item.repost, cid: item.repostCid || undefined }
: undefined,
})),
cursor: parseString(res.cursor),
}
}
Expand All @@ -58,7 +64,7 @@ const hydration = async (inputs: {
skeleton: Skeleton
}): Promise<HydrationState> => {
const { ctx, params, skeleton } = inputs
return ctx.hydrator.hydrateFeedPosts(skeleton.uris, params.viewer)
return ctx.hydrator.hydrateFeedItems(skeleton.items, params.viewer)
}

const noBlocksOrMutes = (inputs: {
Expand All @@ -67,8 +73,8 @@ const noBlocksOrMutes = (inputs: {
hydration: HydrationState
}): Skeleton => {
const { ctx, skeleton, hydration } = inputs
skeleton.uris = skeleton.uris.filter((uri) => {
const bam = ctx.views.feedItemBlocksAndMutes(uri, hydration)
skeleton.items = skeleton.items.filter((item) => {
const bam = ctx.views.feedItemBlocksAndMutes(item, hydration)
return (
!bam.authorBlocked &&
!bam.authorMuted &&
Expand All @@ -85,8 +91,8 @@ const presentation = (inputs: {
hydration: HydrationState
}) => {
const { ctx, skeleton, hydration } = inputs
const feed = mapDefined(skeleton.uris, (uri) =>
ctx.views.feedViewPost(uri, hydration),
const feed = mapDefined(skeleton.items, (item) =>
ctx.views.feedViewPost(item, hydration),
)
return { feed, cursor: skeleton.cursor }
}
Expand All @@ -100,6 +106,6 @@ type Context = {
type Params = QueryParams & { viewer: string }

type Skeleton = {
uris: string[]
items: FeedItem[]
cursor?: string
}
5 changes: 4 additions & 1 deletion packages/bsky/src/api/app/bsky/feed/searchPosts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ const hydration = async (
inputs: HydrationFnInput<Context, Params, Skeleton>,
) => {
const { ctx, params, skeleton } = inputs
return ctx.hydrator.hydratePosts(skeleton.posts, params.viewer)
return ctx.hydrator.hydratePosts(
skeleton.posts.map((uri) => ({ uri })),
params.viewer,
)
}

const noBlocks = (inputs: RulesFnInput<Context, Params, Skeleton>) => {
Expand Down
Loading