From e566bef1cc83397df082d79a5c473e955f55bd41 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Tue, 5 Dec 2023 13:53:05 -0600 Subject: [PATCH] Cache labels in Redis (#1897) * cache did docs in redis * drop table * expire from redis * fix tests * add cache class * update api * refactor * filter negative labels * fix up dev-env * refactor did cache to use new redis cache class * tidy * ensure caching negatives * redis cache tests * remove timeout on did cache * fix ns in test * rename driver * add timeout & fail open * add test for timeout & fail open * small pr feedback * refactor caches * bugfixg * test for caching negative values * little more to cache * wire up cache cfg * switch from redis scratch to redis * fix build issues * use different redis clients for tests * fix test * fix flaky test * use separate db for redis cache --- packages/bsky/src/cache/read-through.ts | 143 +++++++++++ packages/bsky/src/config.ts | 65 ++++- packages/bsky/src/context.ts | 14 +- packages/bsky/src/daemon/index.ts | 3 - packages/bsky/src/daemon/services.ts | 10 +- packages/bsky/src/db/database-schema.ts | 2 - .../20231205T000257238Z-remove-did-cache.ts | 14 ++ packages/bsky/src/db/migrations/index.ts | 1 + packages/bsky/src/db/tables/did-cache.ts | 13 - packages/bsky/src/did-cache.ts | 83 ++----- packages/bsky/src/index.ts | 32 +-- packages/bsky/src/indexer/context.ts | 5 + packages/bsky/src/indexer/index.ts | 17 +- packages/bsky/src/label-cache.ts | 90 ------- packages/bsky/src/logger.ts | 2 + packages/bsky/src/redis.ts | 39 ++- packages/bsky/src/services/actor/index.ts | 23 +- packages/bsky/src/services/actor/views.ts | 20 +- packages/bsky/src/services/feed/index.ts | 40 ++- packages/bsky/src/services/feed/views.ts | 30 ++- packages/bsky/src/services/index.ts | 28 +-- packages/bsky/src/services/label/index.ts | 83 +++++-- packages/bsky/src/services/types.ts | 4 + packages/bsky/tests/admin/repo-search.test.ts | 1 + packages/bsky/tests/did-cache.test.ts | 25 +- packages/bsky/tests/redis-cache.test.ts | 231 ++++++++++++++++++ packages/dev-env/src/bsky.ts | 37 ++- services/bsky/api.js | 18 ++ services/bsky/indexer.js | 17 ++ 29 files changed, 802 insertions(+), 288 deletions(-) create mode 100644 packages/bsky/src/cache/read-through.ts create mode 100644 packages/bsky/src/db/migrations/20231205T000257238Z-remove-did-cache.ts delete mode 100644 packages/bsky/src/db/tables/did-cache.ts delete mode 100644 packages/bsky/src/label-cache.ts create mode 100644 packages/bsky/src/services/types.ts create mode 100644 packages/bsky/tests/redis-cache.test.ts diff --git a/packages/bsky/src/cache/read-through.ts b/packages/bsky/src/cache/read-through.ts new file mode 100644 index 00000000000..b414026e086 --- /dev/null +++ b/packages/bsky/src/cache/read-through.ts @@ -0,0 +1,143 @@ +import { cacheLogger as log } from '../logger' +import { Redis } from '../redis' + +export type CacheItem = { + val: T | null // null here is for negative caching + updatedAt: number +} + +export type CacheOptions = { + staleTTL: number + maxTTL: number + fetchMethod: (key: string) => Promise + fetchManyMethod?: (keys: string[]) => Promise> +} + +export class ReadThroughCache { + constructor(public redis: Redis, public opts: CacheOptions) {} + + private async _fetchMany(keys: string[]): Promise> { + if (this.opts.fetchManyMethod) { + return this.opts.fetchManyMethod(keys) + } + const got = await Promise.all(keys.map((k) => this.opts.fetchMethod(k))) + const result: Record = {} + for (let i = 0; i < keys.length; i++) { + result[keys[i]] = got[i] ?? null + } + return result + } + + private async fetchAndCache(key: string): Promise { + const fetched = await this.opts.fetchMethod(key) + this.set(key, fetched).catch((err) => + log.error({ err, key }, 'failed to set cache value'), + ) + return fetched + } + + private async fetchAndCacheMany(keys: string[]): Promise> { + const fetched = await this._fetchMany(keys) + this.setMany(fetched).catch((err) => + log.error({ err, keys }, 'failed to set cache values'), + ) + return removeNulls(fetched) + } + + async get(key: string, opts?: { revalidate?: boolean }): Promise { + if (opts?.revalidate) { + return this.fetchAndCache(key) + } + let cached: CacheItem | null + try { + const got = await this.redis.get(key) + cached = got ? JSON.parse(got) : null + } catch (err) { + cached = null + log.warn({ key, err }, 'failed to fetch value from cache') + } + if (!cached || this.isExpired(cached)) { + return this.fetchAndCache(key) + } + if (this.isStale(cached)) { + this.fetchAndCache(key).catch((err) => + log.warn({ key, err }, 'failed to refresh stale cache value'), + ) + } + return cached.val + } + + async getMany( + keys: string[], + opts?: { revalidate?: boolean }, + ): Promise> { + if (opts?.revalidate) { + return this.fetchAndCacheMany(keys) + } + let cached: Record + try { + cached = await this.redis.getMulti(keys) + } catch (err) { + cached = {} + log.warn({ keys, err }, 'failed to fetch values from cache') + } + + const stale: string[] = [] + const toFetch: string[] = [] + const results: Record = {} + for (const key of keys) { + const val = cached[key] ? (JSON.parse(cached[key]) as CacheItem) : null + if (!val || this.isExpired(val)) { + toFetch.push(key) + } else if (this.isStale(val)) { + stale.push(key) + } else if (val.val) { + results[key] = val.val + } + } + const fetched = await this.fetchAndCacheMany(toFetch) + this.fetchAndCacheMany(stale).catch((err) => + log.warn({ keys, err }, 'failed to refresh stale cache values'), + ) + return { + ...results, + ...fetched, + } + } + + async set(key: string, val: T | null) { + await this.setMany({ [key]: val }) + } + + async setMany(vals: Record) { + const items: Record = {} + for (const key of Object.keys(vals)) { + items[key] = JSON.stringify({ + val: vals[key], + updatedAt: Date.now(), + }) + } + await this.redis.setMulti(items, this.opts.maxTTL) + } + + async clearEntry(key: string) { + await this.redis.del(key) + } + + isExpired(result: CacheItem) { + return Date.now() > result.updatedAt + this.opts.maxTTL + } + + isStale(result: CacheItem) { + return Date.now() > result.updatedAt + this.opts.staleTTL + } +} + +const removeNulls = (obj: Record): Record => { + return Object.entries(obj).reduce((acc, [key, val]) => { + if (val !== null) { + acc[key] = val + } + return acc + }, {} as Record) +} diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index 679f99c59a4..c4dfe26393e 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -1,5 +1,11 @@ import assert from 'assert' -import { DAY, HOUR, parseIntWithFallback } from '@atproto/common' +import { + DAY, + HOUR, + MINUTE, + SECOND, + parseIntWithFallback, +} from '@atproto/common' export interface ServerConfigValues { version: string @@ -12,9 +18,15 @@ export interface ServerConfigValues { dbReplicaPostgresUrls?: string[] dbReplicaTags?: Record // E.g. { timeline: [0], thread: [1] } dbPostgresSchema?: string + redisHost?: string // either set redis host, or both sentinel name and hosts + redisSentinelName?: string + redisSentinelHosts?: string[] + redisPassword?: string didPlcUrl: string didCacheStaleTTL: number didCacheMaxTTL: number + labelCacheStaleTTL: number + labelCacheMaxTTL: number handleResolveNameservers?: string[] imgUriEndpoint?: string blobCacheLocation?: string @@ -38,6 +50,19 @@ export class ServerConfig { const feedGenDid = process.env.FEED_GEN_DID const envPort = parseInt(process.env.PORT || '', 10) const port = isNaN(envPort) ? 2584 : envPort + const redisHost = + overrides?.redisHost || process.env.REDIS_HOST || undefined + const redisSentinelName = + overrides?.redisSentinelName || + process.env.REDIS_SENTINEL_NAME || + undefined + const redisSentinelHosts = + overrides?.redisSentinelHosts || + (process.env.REDIS_SENTINEL_HOSTS + ? process.env.REDIS_SENTINEL_HOSTS.split(',') + : []) + const redisPassword = + overrides?.redisPassword || process.env.REDIS_PASSWORD || undefined const didPlcUrl = process.env.DID_PLC_URL || 'http://localhost:2582' const didCacheStaleTTL = parseIntWithFallback( process.env.DID_CACHE_STALE_TTL, @@ -47,6 +72,14 @@ export class ServerConfig { process.env.DID_CACHE_MAX_TTL, DAY, ) + const labelCacheStaleTTL = parseIntWithFallback( + process.env.LABEL_CACHE_STALE_TTL, + 30 * SECOND, + ) + const labelCacheMaxTTL = parseIntWithFallback( + process.env.LABEL_CACHE_MAX_TTL, + MINUTE, + ) const handleResolveNameservers = process.env.HANDLE_RESOLVE_NAMESERVERS ? process.env.HANDLE_RESOLVE_NAMESERVERS.split(',') : [] @@ -93,9 +126,15 @@ export class ServerConfig { dbReplicaPostgresUrls, dbReplicaTags, dbPostgresSchema, + redisHost, + redisSentinelName, + redisSentinelHosts, + redisPassword, didPlcUrl, didCacheStaleTTL, didCacheMaxTTL, + labelCacheStaleTTL, + labelCacheMaxTTL, handleResolveNameservers, imgUriEndpoint, blobCacheLocation, @@ -162,6 +201,22 @@ export class ServerConfig { return this.cfg.dbPostgresSchema } + get redisHost() { + return this.cfg.redisHost + } + + get redisSentinelName() { + return this.cfg.redisSentinelName + } + + get redisSentinelHosts() { + return this.cfg.redisSentinelHosts + } + + get redisPassword() { + return this.cfg.redisPassword + } + get didCacheStaleTTL() { return this.cfg.didCacheStaleTTL } @@ -170,6 +225,14 @@ export class ServerConfig { return this.cfg.didCacheMaxTTL } + get labelCacheStaleTTL() { + return this.cfg.labelCacheStaleTTL + } + + get labelCacheMaxTTL() { + return this.cfg.labelCacheMaxTTL + } + get handleResolveNameservers() { return this.cfg.handleResolveNameservers } diff --git a/packages/bsky/src/context.ts b/packages/bsky/src/context.ts index 3488c6a5c02..8c8db6b2a3c 100644 --- a/packages/bsky/src/context.ts +++ b/packages/bsky/src/context.ts @@ -8,11 +8,11 @@ import { ServerConfig } from './config' import { ImageUriBuilder } from './image/uri' import { Services } from './services' import * as auth from './auth' -import DidSqlCache from './did-cache' +import DidRedisCache from './did-cache' import { BackgroundQueue } from './background' import { MountedAlgos } from './feed-gen/types' -import { LabelCache } from './label-cache' import { NotificationServer } from './notifications' +import { Redis } from './redis' export class AppContext { public moderationPushAgent: AtpAgent | undefined @@ -24,8 +24,8 @@ export class AppContext { services: Services signingKey: Keypair idResolver: IdResolver - didCache: DidSqlCache - labelCache: LabelCache + didCache: DidRedisCache + redis: Redis backgroundQueue: BackgroundQueue searchAgent?: AtpAgent algos: MountedAlgos @@ -70,12 +70,12 @@ export class AppContext { return this.opts.idResolver } - get didCache(): DidSqlCache { + get didCache(): DidRedisCache { return this.opts.didCache } - get labelCache(): LabelCache { - return this.opts.labelCache + get redis(): Redis { + return this.opts.redis } get notifServer(): NotificationServer { diff --git a/packages/bsky/src/daemon/index.ts b/packages/bsky/src/daemon/index.ts index 9d6388dd381..80da01edc2f 100644 --- a/packages/bsky/src/daemon/index.ts +++ b/packages/bsky/src/daemon/index.ts @@ -4,7 +4,6 @@ import { DaemonConfig } from './config' import { DaemonContext } from './context' import { createServices } from './services' import { ImageUriBuilder } from '../image/uri' -import { LabelCache } from '../label-cache' import { NotificationsDaemon } from './notifications' import logger from './logger' @@ -28,10 +27,8 @@ export class BskyDaemon { static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon { const { db, cfg } = opts const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon - const labelCache = new LabelCache(db) const services = createServices({ imgUriBuilder, - labelCache, }) const ctx = new DaemonContext({ db, diff --git a/packages/bsky/src/daemon/services.ts b/packages/bsky/src/daemon/services.ts index a4e7935523c..93141d13a08 100644 --- a/packages/bsky/src/daemon/services.ts +++ b/packages/bsky/src/daemon/services.ts @@ -1,15 +1,17 @@ import { PrimaryDatabase } from '../db' import { ActorService } from '../services/actor' import { ImageUriBuilder } from '../image/uri' -import { LabelCache } from '../label-cache' +import { GraphService } from '../services/graph' +import { LabelService } from '../services/label' export function createServices(resources: { imgUriBuilder: ImageUriBuilder - labelCache: LabelCache }): Services { - const { imgUriBuilder, labelCache } = resources + const { imgUriBuilder } = resources + const graph = GraphService.creator(imgUriBuilder) + const label = LabelService.creator(null) return { - actor: ActorService.creator(imgUriBuilder, labelCache), + actor: ActorService.creator(imgUriBuilder, graph, label), } } diff --git a/packages/bsky/src/db/database-schema.ts b/packages/bsky/src/db/database-schema.ts index e43ade819e6..70ac6495c9b 100644 --- a/packages/bsky/src/db/database-schema.ts +++ b/packages/bsky/src/db/database-schema.ts @@ -24,7 +24,6 @@ import * as actorSync from './tables/actor-sync' import * as record from './tables/record' import * as notification from './tables/notification' import * as notificationPushToken from './tables/notification-push-token' -import * as didCache from './tables/did-cache' import * as moderation from './tables/moderation' import * as label from './tables/label' import * as algo from './tables/algo' @@ -57,7 +56,6 @@ export type DatabaseSchemaType = duplicateRecord.PartialDB & record.PartialDB & notification.PartialDB & notificationPushToken.PartialDB & - didCache.PartialDB & moderation.PartialDB & label.PartialDB & algo.PartialDB & diff --git a/packages/bsky/src/db/migrations/20231205T000257238Z-remove-did-cache.ts b/packages/bsky/src/db/migrations/20231205T000257238Z-remove-did-cache.ts new file mode 100644 index 00000000000..6b57a88bbb9 --- /dev/null +++ b/packages/bsky/src/db/migrations/20231205T000257238Z-remove-did-cache.ts @@ -0,0 +1,14 @@ +import { Kysely } from 'kysely' + +export async function up(db: Kysely): Promise { + await db.schema.dropTable('did_cache').execute() +} + +export async function down(db: Kysely): Promise { + await db.schema + .createTable('did_cache') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .addColumn('doc', 'jsonb', (col) => col.notNull()) + .addColumn('updatedAt', 'bigint', (col) => col.notNull()) + .execute() +} diff --git a/packages/bsky/src/db/migrations/index.ts b/packages/bsky/src/db/migrations/index.ts index da86bfdc669..f3ed5bc4dbd 100644 --- a/packages/bsky/src/db/migrations/index.ts +++ b/packages/bsky/src/db/migrations/index.ts @@ -31,3 +31,4 @@ export * as _20230906T222220386Z from './20230906T222220386Z-thread-gating' export * as _20230920T213858047Z from './20230920T213858047Z-add-tags-to-post' export * as _20230929T192920807Z from './20230929T192920807Z-record-cursor-indexes' export * as _20231003T202833377Z from './20231003T202833377Z-create-moderation-subject-status' +export * as _20231205T000257238Z from './20231205T000257238Z-remove-did-cache' diff --git a/packages/bsky/src/db/tables/did-cache.ts b/packages/bsky/src/db/tables/did-cache.ts deleted file mode 100644 index b3865548725..00000000000 --- a/packages/bsky/src/db/tables/did-cache.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { DidDocument } from '@atproto/identity' - -export interface DidCache { - did: string - doc: DidDocument - updatedAt: number -} - -export const tableName = 'did_cache' - -export type PartialDB = { - [tableName]: DidCache -} diff --git a/packages/bsky/src/did-cache.ts b/packages/bsky/src/did-cache.ts index e08b09ca7e7..2c4d6e43c1d 100644 --- a/packages/bsky/src/did-cache.ts +++ b/packages/bsky/src/did-cache.ts @@ -1,81 +1,55 @@ import PQueue from 'p-queue' import { CacheResult, DidCache, DidDocument } from '@atproto/identity' -import { PrimaryDatabase } from './db' -import { excluded } from './db/util' -import { dbLogger } from './logger' +import { cacheLogger as log } from './logger' +import { Redis } from './redis' -export class DidSqlCache implements DidCache { - public pQueue: PQueue | null //null during teardown +type CacheOptions = { + staleTTL: number + maxTTL: number +} + +export class DidRedisCache implements DidCache { + public pQueue: PQueue | null // null during teardown - constructor( - // @TODO perhaps could use both primary and non-primary. not high enough - // throughput to matter right now. also may just move this over to redis before long! - public db: PrimaryDatabase, - public staleTTL: number, - public maxTTL: number, - ) { + constructor(public redis: Redis, public opts: CacheOptions) { this.pQueue = new PQueue() } - async cacheDid( - did: string, - doc: DidDocument, - prevResult?: CacheResult, - ): Promise { - if (prevResult) { - await this.db.db - .updateTable('did_cache') - .set({ doc, updatedAt: Date.now() }) - .where('did', '=', did) - .where('updatedAt', '=', prevResult.updatedAt) - .execute() - } else { - await this.db.db - .insertInto('did_cache') - .values({ did, doc, updatedAt: Date.now() }) - .onConflict((oc) => - oc.column('did').doUpdateSet({ - doc: excluded(this.db.db, 'doc'), - updatedAt: excluded(this.db.db, 'updatedAt'), - }), - ) - .executeTakeFirst() - } + async cacheDid(did: string, doc: DidDocument): Promise { + const item = JSON.stringify({ + doc, + updatedAt: Date.now(), + }) + await this.redis.set(did, item, this.opts.maxTTL) } async refreshCache( did: string, getDoc: () => Promise, - prevResult?: CacheResult, ): Promise { this.pQueue?.add(async () => { try { const doc = await getDoc() if (doc) { - await this.cacheDid(did, doc, prevResult) + await this.cacheDid(did, doc) } else { await this.clearEntry(did) } } catch (err) { - dbLogger.error({ did, err }, 'refreshing did cache failed') + log.error({ did, err }, 'refreshing did cache failed') } }) } async checkCache(did: string): Promise { - const res = await this.db.db - .selectFrom('did_cache') - .where('did', '=', did) - .selectAll() - .executeTakeFirst() - if (!res) return null - + const got = await this.redis.get(did) + if (!got) return null + const { doc, updatedAt } = JSON.parse(got) as CacheResult const now = Date.now() - const updatedAt = new Date(res.updatedAt).getTime() - const expired = now > updatedAt + this.maxTTL - const stale = now > updatedAt + this.staleTTL + const expired = now > updatedAt + this.opts.maxTTL + const stale = now > updatedAt + this.opts.staleTTL return { - doc: res.doc, + doc, updatedAt, did, stale, @@ -84,14 +58,11 @@ export class DidSqlCache implements DidCache { } async clearEntry(did: string): Promise { - await this.db.db - .deleteFrom('did_cache') - .where('did', '=', did) - .executeTakeFirst() + await this.redis.del(did) } async clear(): Promise { - await this.db.db.deleteFrom('did_cache').execute() + throw new Error('Not implemented for redis cache') } async processAll() { @@ -107,4 +78,4 @@ export class DidSqlCache implements DidCache { } } -export default DidSqlCache +export default DidRedisCache diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 7ceba61f990..97acd1e7d09 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -16,17 +16,17 @@ import { ImageUriBuilder } from './image/uri' import { BlobDiskCache, ImageProcessingServer } from './image/server' import { createServices } from './services' import AppContext from './context' -import DidSqlCache from './did-cache' +import DidRedisCache from './did-cache' import { ImageInvalidator, ImageProcessingServerInvalidator, } from './image/invalidator' import { BackgroundQueue } from './background' import { MountedAlgos } from './feed-gen/types' -import { LabelCache } from './label-cache' import { NotificationServer } from './notifications' import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' +import { Redis } from './redis' export type { ServerConfigValues } from './config' export type { MountedAlgos } from './feed-gen/types' @@ -56,23 +56,24 @@ export class BskyAppView { static create(opts: { db: DatabaseCoordinator + redis: Redis config: ServerConfig signingKey: Keypair imgInvalidator?: ImageInvalidator algos?: MountedAlgos }): BskyAppView { - const { db, config, signingKey, algos = {} } = opts + const { db, redis, config, signingKey, algos = {} } = opts let maybeImgInvalidator = opts.imgInvalidator const app = express() app.use(cors()) app.use(loggerMiddleware) app.use(compression()) - const didCache = new DidSqlCache( - db.getPrimary(), - config.didCacheStaleTTL, - config.didCacheMaxTTL, - ) + const didCache = new DidRedisCache(redis.withNamespace('did-doc'), { + staleTTL: config.didCacheStaleTTL, + maxTTL: config.didCacheMaxTTL, + }) + const idResolver = new IdResolver({ plcUrl: config.didPlcUrl, didCache, @@ -103,7 +104,7 @@ export class BskyAppView { } const backgroundQueue = new BackgroundQueue(db.getPrimary()) - const labelCache = new LabelCache(db.getPrimary()) + const notifServer = new NotificationServer(db.getPrimary()) const searchAgent = config.searchEndpoint ? new AtpAgent({ service: config.searchEndpoint }) @@ -112,7 +113,11 @@ export class BskyAppView { const services = createServices({ imgUriBuilder, imgInvalidator, - labelCache, + labelCacheOpts: { + redis: redis.withNamespace('label'), + staleTTL: config.labelCacheStaleTTL, + maxTTL: config.labelCacheMaxTTL, + }, }) const ctx = new AppContext({ @@ -123,7 +128,7 @@ export class BskyAppView { signingKey, idResolver, didCache, - labelCache, + redis, backgroundQueue, searchAgent, algos, @@ -186,7 +191,6 @@ export class BskyAppView { 'background queue stats', ) }, 10000) - this.ctx.labelCache.start() const server = this.app.listen(this.ctx.cfg.port) this.server = server server.keepAliveTimeout = 90000 @@ -197,11 +201,11 @@ export class BskyAppView { return server } - async destroy(opts?: { skipDb: boolean }): Promise { - this.ctx.labelCache.stop() + async destroy(opts?: { skipDb: boolean; skipRedis: boolean }): Promise { await this.ctx.didCache.destroy() await this.terminator?.terminate() await this.ctx.backgroundQueue.destroy() + if (!opts?.skipRedis) await this.ctx.redis.destroy() if (!opts?.skipDb) await this.ctx.db.close() clearInterval(this.dbStatsInterval) } diff --git a/packages/bsky/src/indexer/context.ts b/packages/bsky/src/indexer/context.ts index e7fe24580fa..1ce2fbf1ea2 100644 --- a/packages/bsky/src/indexer/context.ts +++ b/packages/bsky/src/indexer/context.ts @@ -12,6 +12,7 @@ export class IndexerContext { private opts: { db: PrimaryDatabase redis: Redis + redisCache: Redis cfg: IndexerConfig services: Services idResolver: IdResolver @@ -29,6 +30,10 @@ export class IndexerContext { return this.opts.redis } + get redisCache(): Redis { + return this.opts.redisCache + } + get cfg(): IndexerConfig { return this.opts.cfg } diff --git a/packages/bsky/src/indexer/index.ts b/packages/bsky/src/indexer/index.ts index ed8188d353b..496cff67c73 100644 --- a/packages/bsky/src/indexer/index.ts +++ b/packages/bsky/src/indexer/index.ts @@ -2,7 +2,7 @@ import express from 'express' import { IdResolver } from '@atproto/identity' import { BackgroundQueue } from '../background' import { PrimaryDatabase } from '../db' -import DidSqlCache from '../did-cache' +import DidRedisCache from '../did-cache' import log from './logger' import { dbLogger } from '../logger' import { IndexerConfig } from './config' @@ -40,15 +40,15 @@ export class BskyIndexer { static create(opts: { db: PrimaryDatabase redis: Redis + redisCache: Redis cfg: IndexerConfig imgInvalidator?: ImageInvalidator }): BskyIndexer { - const { db, redis, cfg } = opts - const didCache = new DidSqlCache( - db, - cfg.didCacheStaleTTL, - cfg.didCacheMaxTTL, - ) + const { db, redis, redisCache, cfg } = opts + const didCache = new DidRedisCache(redisCache.withNamespace('did-doc'), { + staleTTL: cfg.didCacheStaleTTL, + maxTTL: cfg.didCacheMaxTTL, + }) const idResolver = new IdResolver({ plcUrl: cfg.didPlcUrl, didCache, @@ -81,6 +81,7 @@ export class BskyIndexer { const ctx = new IndexerContext({ db, redis, + redisCache, cfg, services, idResolver, @@ -139,7 +140,9 @@ export class BskyIndexer { if (this.closeServer) await this.closeServer() await this.sub.destroy() clearInterval(this.subStatsInterval) + await this.ctx.didCache.destroy() if (!opts?.skipRedis) await this.ctx.redis.destroy() + if (!opts?.skipRedis) await this.ctx.redisCache.destroy() if (!opts?.skipDb) await this.ctx.db.close() clearInterval(this.dbStatsInterval) } diff --git a/packages/bsky/src/label-cache.ts b/packages/bsky/src/label-cache.ts deleted file mode 100644 index b162a2d30bd..00000000000 --- a/packages/bsky/src/label-cache.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { wait } from '@atproto/common' -import { PrimaryDatabase } from './db' -import { Label } from './db/tables/label' -import { labelerLogger as log } from './logger' - -export class LabelCache { - bySubject: Record = {} - latestLabel = '' - refreshes = 0 - - destroyed = false - - constructor(public db: PrimaryDatabase) {} - - start() { - this.poll() - } - - async fullRefresh() { - const allLabels = await this.db.db.selectFrom('label').selectAll().execute() - this.wipeCache() - this.processLabels(allLabels) - } - - async partialRefresh() { - const labels = await this.db.db - .selectFrom('label') - .selectAll() - .where('cts', '>', this.latestLabel) - .execute() - this.processLabels(labels) - } - - async poll() { - try { - if (this.destroyed) return - if (this.refreshes >= 120) { - await this.fullRefresh() - this.refreshes = 0 - } else { - await this.partialRefresh() - this.refreshes++ - } - } catch (err) { - log.error( - { err, latestLabel: this.latestLabel, refreshes: this.refreshes }, - 'label cache failed to refresh', - ) - } - await wait(500) - this.poll() - } - - processLabels(labels: Label[]) { - for (const label of labels) { - if (label.cts > this.latestLabel) { - this.latestLabel = label.cts - } - this.bySubject[label.uri] ??= [] - this.bySubject[label.uri].push(label) - } - } - - wipeCache() { - this.bySubject = {} - } - - stop() { - this.destroyed = true - } - - forSubject(subject: string, includeNeg = false): Label[] { - const labels = this.bySubject[subject] ?? [] - return includeNeg ? labels : labels.filter((l) => l.neg === false) - } - - forSubjects(subjects: string[], includeNeg?: boolean): Label[] { - let labels: Label[] = [] - const alreadyAdded = new Set() - for (const subject of subjects) { - if (alreadyAdded.has(subject)) { - continue - } - const subLabels = this.forSubject(subject, includeNeg) - labels = [...labels, ...subLabels] - alreadyAdded.add(subject) - } - return labels - } -} diff --git a/packages/bsky/src/logger.ts b/packages/bsky/src/logger.ts index e975dff550a..d6fad590eef 100644 --- a/packages/bsky/src/logger.ts +++ b/packages/bsky/src/logger.ts @@ -3,6 +3,8 @@ import { subsystemLogger } from '@atproto/common' export const dbLogger: ReturnType = subsystemLogger('bsky:db') +export const cacheLogger: ReturnType = + subsystemLogger('bsky:cache') export const subLogger: ReturnType = subsystemLogger('bsky:sub') export const labelerLogger: ReturnType = diff --git a/packages/bsky/src/redis.ts b/packages/bsky/src/redis.ts index 72d895be24c..ce9d2cecc62 100644 --- a/packages/bsky/src/redis.ts +++ b/packages/bsky/src/redis.ts @@ -11,12 +11,16 @@ export class Redis { name: opts.sentinel, sentinels: opts.hosts.map((h) => addressParts(h, 26379)), password: opts.password, + db: opts.db, + commandTimeout: opts.commandTimeout, }) } else if ('host' in opts) { assert(opts.host) this.driver = new RedisDriver({ ...addressParts(opts.host), password: opts.password, + db: opts.db, + commandTimeout: opts.commandTimeout, }) } else { assert(opts.driver) @@ -25,6 +29,10 @@ export class Redis { this.namespace = opts.namespace } + withNamespace(namespace: string): Redis { + return new Redis({ driver: this.driver, namespace }) + } + async readStreams( streams: StreamRef[], opts: { count: number; blockMs?: number }, @@ -97,10 +105,35 @@ export class Redis { return await this.driver.get(this.ns(key)) } - async set(key: string, val: string | number) { + async set(key: string, val: string | number, ttlMs?: number) { + if (ttlMs !== undefined) { + return this.setMulti({ [key]: val }) + } await this.driver.set(this.ns(key), val) } + async getMulti(keys: string[]) { + const namespaced = keys.map((k) => this.ns(k)) + const got = await this.driver.mget(...namespaced) + const results = {} + for (let i = 0; i < keys.length; i++) { + const key = keys[i] + results[key] = got[i] + } + return results + } + + async setMulti(vals: Record, ttlMs?: number) { + let builder = this.driver.multi({ pipeline: true }) + for (const key of Object.keys(vals)) { + builder = builder.set(this.ns(key), vals[key]) + if (ttlMs !== undefined) { + builder = builder.pexpire(key, ttlMs) + } + } + await builder.exec() + } + async del(key: string) { return await this.driver.del(this.ns(key)) } @@ -152,9 +185,11 @@ export type RedisOptions = ( ) & { password?: string namespace?: string + db?: number + commandTimeout?: number } -function addressParts( +export function addressParts( addr: string, defaultPort = 6379, ): { host: string; port: number } { diff --git a/packages/bsky/src/services/actor/index.ts b/packages/bsky/src/services/actor/index.ts index a44e81e8f8f..7ef61529926 100644 --- a/packages/bsky/src/services/actor/index.ts +++ b/packages/bsky/src/services/actor/index.ts @@ -5,24 +5,33 @@ import { notSoftDeletedClause } from '../../db/util' import { ActorViews } from './views' import { ImageUriBuilder } from '../../image/uri' import { Actor } from '../../db/tables/actor' -import { LabelCache } from '../../label-cache' import { TimeCidKeyset, paginate } from '../../db/pagination' import { SearchKeyset, getUserSearchQuery } from '../util/search' +import { FromDb } from '../types' +import { GraphService } from '../graph' +import { LabelService } from '../label' export * from './types' export class ActorService { + views: ActorViews + constructor( public db: Database, public imgUriBuilder: ImageUriBuilder, - public labelCache: LabelCache, - ) {} - - static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) { - return (db: Database) => new ActorService(db, imgUriBuilder, labelCache) + private graph: FromDb, + private label: FromDb, + ) { + this.views = new ActorViews(this.db, this.imgUriBuilder, graph, label) } - views = new ActorViews(this.db, this.imgUriBuilder, this.labelCache) + static creator( + imgUriBuilder: ImageUriBuilder, + graph: FromDb, + label: FromDb, + ) { + return (db: Database) => new ActorService(db, imgUriBuilder, graph, label) + } async getActorDid(handleOrDid: string): Promise { if (handleOrDid.startsWith('did:')) { diff --git a/packages/bsky/src/services/actor/views.ts b/packages/bsky/src/services/actor/views.ts index 5c40eac308b..b60dcedcbaf 100644 --- a/packages/bsky/src/services/actor/views.ts +++ b/packages/bsky/src/services/actor/views.ts @@ -11,7 +11,6 @@ import { Actor } from '../../db/tables/actor' import { ImageUriBuilder } from '../../image/uri' import { LabelService, Labels, getSelfLabels } from '../label' import { BlockAndMuteState, GraphService } from '../graph' -import { LabelCache } from '../../label-cache' import { ActorInfoMap, ProfileDetailHydrationState, @@ -21,17 +20,24 @@ import { toMapByDid, } from './types' import { ListInfoMap } from '../graph/types' +import { FromDb } from '../types' export class ActorViews { + services: { + label: LabelService + graph: GraphService + } + constructor( private db: Database, private imgUriBuilder: ImageUriBuilder, - private labelCache: LabelCache, - ) {} - - services = { - label: LabelService.creator(this.labelCache)(this.db), - graph: GraphService.creator(this.imgUriBuilder)(this.db), + private graph: FromDb, + private label: FromDb, + ) { + this.services = { + label: label(db), + graph: graph(db), + } } async profiles( diff --git a/packages/bsky/src/services/feed/index.ts b/packages/bsky/src/services/feed/index.ts index 2323e6a74be..a8768518d70 100644 --- a/packages/bsky/src/services/feed/index.ts +++ b/packages/bsky/src/services/feed/index.ts @@ -1,6 +1,7 @@ import { sql } from 'kysely' import { AtUri } from '@atproto/syntax' import { jsonStringToLex } from '@atproto/lexicon' +import { mapDefined } from '@atproto/common' import { Database } from '../../db' import { countAll, noMatch, notSoftDeletedClause } from '../../db/util' import { ImageUriBuilder } from '../../image/uri' @@ -42,29 +43,42 @@ import { RelationshipPair, } from '../graph' import { FeedViews } from './views' -import { LabelCache } from '../../label-cache' import { threadgateToPostUri, postToThreadgateUri } from './util' -import { mapDefined } from '@atproto/common' +import { FromDb } from '../types' export * from './types' export class FeedService { + views: FeedViews + services: { + label: LabelService + actor: ActorService + graph: GraphService + } + constructor( public db: Database, public imgUriBuilder: ImageUriBuilder, - public labelCache: LabelCache, - ) {} - - views = new FeedViews(this.db, this.imgUriBuilder, this.labelCache) - - services = { - label: LabelService.creator(this.labelCache)(this.db), - actor: ActorService.creator(this.imgUriBuilder, this.labelCache)(this.db), - graph: GraphService.creator(this.imgUriBuilder)(this.db), + private actor: FromDb, + private label: FromDb, + private graph: FromDb, + ) { + this.views = new FeedViews(this.db, this.imgUriBuilder, actor, graph) + this.services = { + label: label(this.db), + actor: actor(this.db), + graph: graph(this.db), + } } - static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) { - return (db: Database) => new FeedService(db, imgUriBuilder, labelCache) + static creator( + imgUriBuilder: ImageUriBuilder, + actor: FromDb, + label: FromDb, + graph: FromDb, + ) { + return (db: Database) => + new FeedService(db, imgUriBuilder, actor, label, graph) } selectPostQb() { diff --git a/packages/bsky/src/services/feed/views.ts b/packages/bsky/src/services/feed/views.ts index a848c88caa0..f013570e2d7 100644 --- a/packages/bsky/src/services/feed/views.ts +++ b/packages/bsky/src/services/feed/views.ts @@ -1,4 +1,5 @@ import { mapDefined } from '@atproto/common' +import { AtUri } from '@atproto/syntax' import { Database } from '../../db' import { FeedViewPost, @@ -37,26 +38,35 @@ import { } from './types' import { Labels, getSelfLabels } from '../label' import { ImageUriBuilder } from '../../image/uri' -import { LabelCache } from '../../label-cache' import { ActorInfoMap, ActorService } from '../actor' import { ListInfoMap, GraphService } from '../graph' -import { AtUri } from '@atproto/syntax' +import { FromDb } from '../types' import { parseThreadGate } from './util' export class FeedViews { + services: { + actor: ActorService + graph: GraphService + } + constructor( public db: Database, public imgUriBuilder: ImageUriBuilder, - public labelCache: LabelCache, - ) {} - - static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) { - return (db: Database) => new FeedViews(db, imgUriBuilder, labelCache) + private actor: FromDb, + private graph: FromDb, + ) { + this.services = { + actor: actor(this.db), + graph: graph(this.db), + } } - services = { - actor: ActorService.creator(this.imgUriBuilder, this.labelCache)(this.db), - graph: GraphService.creator(this.imgUriBuilder)(this.db), + static creator( + imgUriBuilder: ImageUriBuilder, + actor: FromDb, + graph: FromDb, + ) { + return (db: Database) => new FeedViews(db, imgUriBuilder, actor, graph) } formatFeedGeneratorView( diff --git a/packages/bsky/src/services/index.ts b/packages/bsky/src/services/index.ts index c3fe47e6eff..2e5b4725681 100644 --- a/packages/bsky/src/services/index.ts +++ b/packages/bsky/src/services/index.ts @@ -1,25 +1,29 @@ -import { Database, PrimaryDatabase } from '../db' import { ImageUriBuilder } from '../image/uri' import { ActorService } from './actor' import { FeedService } from './feed' import { GraphService } from './graph' import { ModerationService } from './moderation' -import { LabelService } from './label' +import { LabelCacheOpts, LabelService } from './label' import { ImageInvalidator } from '../image/invalidator' -import { LabelCache } from '../label-cache' +import { FromDb, FromDbPrimary } from './types' export function createServices(resources: { imgUriBuilder: ImageUriBuilder imgInvalidator: ImageInvalidator - labelCache: LabelCache + labelCacheOpts: LabelCacheOpts }): Services { - const { imgUriBuilder, imgInvalidator, labelCache } = resources + const { imgUriBuilder, imgInvalidator, labelCacheOpts } = resources + const label = LabelService.creator(labelCacheOpts) + const graph = GraphService.creator(imgUriBuilder) + const actor = ActorService.creator(imgUriBuilder, graph, label) + const moderation = ModerationService.creator(imgUriBuilder, imgInvalidator) + const feed = FeedService.creator(imgUriBuilder, actor, label, graph) return { - actor: ActorService.creator(imgUriBuilder, labelCache), - feed: FeedService.creator(imgUriBuilder, labelCache), - graph: GraphService.creator(imgUriBuilder), - moderation: ModerationService.creator(imgUriBuilder, imgInvalidator), - label: LabelService.creator(labelCache), + actor, + feed, + moderation, + graph, + label, } } @@ -30,7 +34,3 @@ export type Services = { moderation: FromDbPrimary label: FromDb } - -type FromDb = (db: Database) => T - -type FromDbPrimary = (db: PrimaryDatabase) => T diff --git a/packages/bsky/src/services/label/index.ts b/packages/bsky/src/services/label/index.ts index f44b0439ddf..ed6691c09d0 100644 --- a/packages/bsky/src/services/label/index.ts +++ b/packages/bsky/src/services/label/index.ts @@ -3,15 +3,36 @@ import { AtUri, normalizeDatetimeAlways } from '@atproto/syntax' import { Database } from '../../db' import { Label, isSelfLabels } from '../../lexicon/types/com/atproto/label/defs' import { ids } from '../../lexicon/lexicons' -import { LabelCache } from '../../label-cache' +import { ReadThroughCache } from '../../cache/read-through' +import { Redis } from '../../redis' export type Labels = Record +export type LabelCacheOpts = { + redis: Redis + staleTTL: number + maxTTL: number +} + export class LabelService { - constructor(public db: Database, public cache: LabelCache | null) {} + public cache: ReadThroughCache | null - static creator(cache: LabelCache | null) { - return (db: Database) => new LabelService(db, cache) + constructor(public db: Database, cacheOpts: LabelCacheOpts | null) { + if (cacheOpts) { + this.cache = new ReadThroughCache(cacheOpts.redis, { + ...cacheOpts, + fetchMethod: async (subject: string) => { + const res = await fetchLabelsForSubjects(db, [subject]) + return res[subject] ?? null + }, + fetchManyMethod: (subjects: string[]) => + fetchLabelsForSubjects(db, subjects), + }) + } + } + + static creator(cacheOpts: LabelCacheOpts | null) { + return (db: Database) => new LabelService(db, cacheOpts) } async formatAndCreate( @@ -72,24 +93,19 @@ export class LabelService { }, ): Promise { if (subjects.length < 1) return {} - const res = - this.cache === null || opts?.skipCache - ? await this.db.db - .selectFrom('label') - .where('label.uri', 'in', subjects) - .if(!opts?.includeNeg, (qb) => qb.where('neg', '=', false)) - .selectAll() - .execute() - : this.cache.forSubjects(subjects, opts?.includeNeg) - return res.reduce((acc, cur) => { - acc[cur.uri] ??= [] - acc[cur.uri].push({ - ...cur, - cid: cur.cid === '' ? undefined : cur.cid, - neg: cur.neg, - }) - return acc - }, {} as Labels) + const res = this.cache + ? await this.cache.getMany(subjects, { revalidate: opts?.skipCache }) + : await fetchLabelsForSubjects(this.db, subjects) + + if (opts?.includeNeg) { + return res + } + + const noNegs: Labels = {} + for (const [key, val] of Object.entries(res)) { + noNegs[key] = val.filter((label) => !label.neg) + } + return noNegs } // gets labels for any record. when did is present, combine labels for both did & profile record. @@ -171,3 +187,26 @@ export function getSelfLabels(details: { return { src, uri, cid, val, cts, neg: false } }) } + +const fetchLabelsForSubjects = async ( + db: Database, + subjects: string[], +): Promise> => { + if (subjects.length < 0) { + return {} + } + const res = await db.db + .selectFrom('label') + .where('label.uri', 'in', subjects) + .selectAll() + .execute() + return res.reduce((acc, cur) => { + acc[cur.uri] ??= [] + acc[cur.uri].push({ + ...cur, + cid: cur.cid === '' ? undefined : cur.cid, + neg: cur.neg, + }) + return acc + }, {} as Record) +} diff --git a/packages/bsky/src/services/types.ts b/packages/bsky/src/services/types.ts new file mode 100644 index 00000000000..2039d6c07de --- /dev/null +++ b/packages/bsky/src/services/types.ts @@ -0,0 +1,4 @@ +import { Database, PrimaryDatabase } from '../db' + +export type FromDb = (db: Database) => T +export type FromDbPrimary = (db: PrimaryDatabase) => T diff --git a/packages/bsky/tests/admin/repo-search.test.ts b/packages/bsky/tests/admin/repo-search.test.ts index 837c4b2154a..9e643ba12e0 100644 --- a/packages/bsky/tests/admin/repo-search.test.ts +++ b/packages/bsky/tests/admin/repo-search.test.ts @@ -17,6 +17,7 @@ describe('admin repo search view', () => { sc = network.getSeedClient() await usersBulkSeed(sc) headers = network.pds.adminAuthHeaders() + await network.processAll() }) afterAll(async () => { diff --git a/packages/bsky/tests/did-cache.test.ts b/packages/bsky/tests/did-cache.test.ts index d0b94147bc6..8314981102e 100644 --- a/packages/bsky/tests/did-cache.test.ts +++ b/packages/bsky/tests/did-cache.test.ts @@ -1,14 +1,16 @@ import { TestNetwork, SeedClient } from '@atproto/dev-env' import userSeed from './seeds/users' import { IdResolver } from '@atproto/identity' -import DidSqlCache from '../src/did-cache' +import DidRedisCache from '../src/did-cache' import { wait } from '@atproto/common' +import { Redis } from '../src' describe('did cache', () => { let network: TestNetwork let sc: SeedClient let idResolver: IdResolver - let didCache: DidSqlCache + let redis: Redis + let didCache: DidRedisCache let alice: string let bob: string @@ -20,6 +22,7 @@ describe('did cache', () => { dbPostgresSchema: 'bsky_did_cache', }) idResolver = network.bsky.indexer.ctx.idResolver + redis = network.bsky.indexer.ctx.redis didCache = network.bsky.indexer.ctx.didCache sc = network.getSeedClient() await userSeed(sc) @@ -50,7 +53,12 @@ describe('did cache', () => { }) it('clears cache and repopulates', async () => { - await idResolver.did.cache?.clear() + await Promise.all([ + idResolver.did.cache?.clearEntry(alice), + idResolver.did.cache?.clearEntry(bob), + idResolver.did.cache?.clearEntry(carol), + idResolver.did.cache?.clearEntry(dan), + ]) const docsCleared = await Promise.all([ idResolver.did.cache?.checkCache(alice), idResolver.did.cache?.checkCache(bob), @@ -81,7 +89,10 @@ describe('did cache', () => { }) it('accurately reports expired dids & refreshes the cache', async () => { - const didCache = new DidSqlCache(network.bsky.ctx.db.getPrimary(), 1, 60000) + const didCache = new DidRedisCache(redis.withNamespace('did-doc'), { + staleTTL: 1, + maxTTL: 60000, + }) const shortCacheResolver = new IdResolver({ plcUrl: network.bsky.ctx.cfg.didPlcUrl, didCache, @@ -110,7 +121,10 @@ describe('did cache', () => { }) it('does not return expired dids & refreshes the cache', async () => { - const didCache = new DidSqlCache(network.bsky.ctx.db.getPrimary(), 0, 1) + const didCache = new DidRedisCache(redis.withNamespace('did-doc'), { + staleTTL: 0, + maxTTL: 1, + }) const shortExpireResolver = new IdResolver({ plcUrl: network.bsky.ctx.cfg.didPlcUrl, didCache, @@ -125,5 +139,6 @@ describe('did cache', () => { // see that the resolver does not return expired value & instead force refreshes const staleGet = await shortExpireResolver.did.resolve(alice) expect(staleGet?.id).toEqual(alice) + await didCache.destroy() }) }) diff --git a/packages/bsky/tests/redis-cache.test.ts b/packages/bsky/tests/redis-cache.test.ts new file mode 100644 index 00000000000..dc975a66161 --- /dev/null +++ b/packages/bsky/tests/redis-cache.test.ts @@ -0,0 +1,231 @@ +import { wait } from '@atproto/common' +import { Redis } from '../src/' +import { ReadThroughCache } from '../src/cache/read-through' + +describe('redis cache', () => { + let redis: Redis + + beforeAll(async () => { + redis = new Redis({ host: process.env.REDIS_HOST || '' }) + }) + + afterAll(async () => { + await redis.destroy() + }) + + it('caches according to namespace', async () => { + const ns1 = redis.withNamespace('ns1') + const ns2 = redis.withNamespace('ns2') + await Promise.all([ + ns1.set('key', 'a'), + ns2.set('key', 'b'), + redis.set('key', 'c'), + ]) + const got = await Promise.all([ + ns1.get('key'), + ns2.get('key'), + redis.get('key'), + ]) + expect(got[0]).toEqual('a') + expect(got[1]).toEqual('b') + expect(got[2]).toEqual('c') + + await Promise.all([ + ns1.setMulti({ key1: 'a', key2: 'b' }), + ns2.setMulti({ key1: 'c', key2: 'd' }), + redis.setMulti({ key1: 'e', key2: 'f' }), + ]) + const gotMany = await Promise.all([ + ns1.getMulti(['key1', 'key2']), + ns2.getMulti(['key1', 'key2']), + redis.getMulti(['key1', 'key2']), + ]) + expect(gotMany[0]['key1']).toEqual('a') + expect(gotMany[0]['key2']).toEqual('b') + expect(gotMany[1]['key1']).toEqual('c') + expect(gotMany[1]['key2']).toEqual('d') + expect(gotMany[2]['key1']).toEqual('e') + expect(gotMany[2]['key2']).toEqual('f') + }) + + it('caches values when empty', async () => { + const vals = { + '1': 'a', + '2': 'b', + '3': 'c', + } + let hits = 0 + const cache = new ReadThroughCache(redis.withNamespace('test1'), { + staleTTL: 60000, + maxTTL: 60000, + fetchMethod: async (key) => { + hits++ + return vals[key] + }, + }) + const got = await Promise.all([ + cache.get('1'), + cache.get('2'), + cache.get('3'), + ]) + expect(got[0]).toEqual('a') + expect(got[1]).toEqual('b') + expect(got[2]).toEqual('c') + expect(hits).toBe(3) + + const refetched = await Promise.all([ + cache.get('1'), + cache.get('2'), + cache.get('3'), + ]) + expect(refetched[0]).toEqual('a') + expect(refetched[1]).toEqual('b') + expect(refetched[2]).toEqual('c') + expect(hits).toBe(3) + }) + + it('skips and refreshes cache when requested', async () => { + let val = 'a' + let hits = 0 + const cache = new ReadThroughCache(redis.withNamespace('test2'), { + staleTTL: 60000, + maxTTL: 60000, + fetchMethod: async () => { + hits++ + return val + }, + }) + + const try1 = await cache.get('1') + expect(try1).toEqual('a') + expect(hits).toBe(1) + + val = 'b' + + const try2 = await cache.get('1') + expect(try2).toEqual('a') + expect(hits).toBe(1) + + const try3 = await cache.get('1', { revalidate: true }) + expect(try3).toEqual('b') + expect(hits).toBe(2) + + const try4 = await cache.get('1') + expect(try4).toEqual('b') + expect(hits).toBe(2) + }) + + it('accurately reports stale entries & refreshes the cache', async () => { + let val = 'a' + let hits = 0 + const cache = new ReadThroughCache(redis.withNamespace('test3'), { + staleTTL: 1, + maxTTL: 60000, + fetchMethod: async () => { + hits++ + return val + }, + }) + + const try1 = await cache.get('1') + expect(try1).toEqual('a') + + await wait(5) + + val = 'b' + + const try2 = await cache.get('1') + // cache gives us stale value while it revalidates + expect(try2).toEqual('a') + + await wait(5) + + const try3 = await cache.get('1') + expect(try3).toEqual('b') + expect(hits).toEqual(3) + }) + + it('does not return expired dids & refreshes the cache', async () => { + let val = 'a' + let hits = 0 + const cache = new ReadThroughCache(redis.withNamespace('test4'), { + staleTTL: 0, + maxTTL: 1, + fetchMethod: async () => { + hits++ + return val + }, + }) + + const try1 = await cache.get('1') + expect(try1).toEqual('a') + + await wait(5) + + val = 'b' + + const try2 = await cache.get('1') + expect(try2).toEqual('b') + expect(hits).toBe(2) + }) + + it('caches negative values', async () => { + let val: string | null = null + let hits = 0 + const cache = new ReadThroughCache(redis.withNamespace('test5'), { + staleTTL: 60000, + maxTTL: 60000, + fetchMethod: async () => { + hits++ + return val + }, + }) + + const try1 = await cache.get('1') + expect(try1).toEqual(null) + expect(hits).toBe(1) + + val = 'b' + + const try2 = await cache.get('1') + // returns cached negative value + expect(try2).toEqual(null) + expect(hits).toBe(1) + + const try3 = await cache.get('1', { revalidate: true }) + expect(try3).toEqual('b') + expect(hits).toEqual(2) + + const try4 = await cache.get('1') + expect(try4).toEqual('b') + expect(hits).toEqual(2) + }) + + it('times out and fails open', async () => { + let val = 'a' + let hits = 0 + const cache = new ReadThroughCache(redis.withNamespace('test6'), { + staleTTL: 60000, + maxTTL: 60000, + fetchMethod: async () => { + hits++ + return val + }, + }) + + const try1 = await cache.get('1') + expect(try1).toEqual('a') + + const orig = cache.redis.driver.get + cache.redis.driver.get = async (key) => { + await wait(600) + return orig(key) + } + + val = 'b' + + const try2 = await cache.get('1') + expect(try2).toEqual('b') + expect(hits).toBe(2) + }) +}) diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 8320130eb43..94d58eb790f 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -2,7 +2,7 @@ import assert from 'assert' import getPort from 'get-port' import * as ui8 from 'uint8arrays' import * as bsky from '@atproto/bsky' -import { DAY, HOUR, wait } from '@atproto/common-web' +import { DAY, HOUR, MINUTE, SECOND, wait } from '@atproto/common-web' import { AtpAgent } from '@atproto/api' import { Secp256k1Keypair, randomIntFromSeed } from '@atproto/crypto' import { Client as PlcClient } from '@did-plc/lib' @@ -42,6 +42,8 @@ export class TestBsky { serverDid, didCacheStaleTTL: HOUR, didCacheMaxTTL: DAY, + labelCacheStaleTTL: 30 * SECOND, + labelCacheMaxTTL: MINUTE, ...cfg, // Each test suite gets its own lock id for the repo subscription adminPassword: ADMIN_PASSWORD, @@ -73,18 +75,26 @@ export class TestBsky { } await migrationDb.close() + const ns = cfg.dbPostgresSchema + ? await randomIntFromSeed(cfg.dbPostgresSchema, 1000000) + : undefined + assert(config.redisHost) + const redisCache = new bsky.Redis({ + host: config.redisHost, + namespace: `ns${ns}`, + db: 1, + }) + // api server const server = bsky.BskyAppView.create({ db, + redis: redisCache, config, algos: cfg.algos, imgInvalidator: cfg.imgInvalidator, signingKey: serviceKeypair, }) // indexer - const ns = cfg.dbPostgresSchema - ? await randomIntFromSeed(cfg.dbPostgresSchema, 1000000) - : undefined const indexerCfg = new bsky.IndexerConfig({ version: '0.0.0', didCacheStaleTTL: HOUR, @@ -110,12 +120,14 @@ export class TestBsky { assert(indexerCfg.redisHost) const indexerRedis = new bsky.Redis({ host: indexerCfg.redisHost, - namespace: indexerCfg.indexerNamespace, + namespace: `ns${ns}`, }) + const indexer = bsky.BskyIndexer.create({ cfg: indexerCfg, db: db.getPrimary(), redis: indexerRedis, + redisCache, imgInvalidator: cfg.imgInvalidator, }) // ingester @@ -144,8 +156,6 @@ export class TestBsky { await indexer.start() await server.start() - // we refresh label cache by hand in `processAll` instead of on a timer - server.ctx.labelCache.stop() return new TestBsky(url, port, server, indexer, ingester) } @@ -184,14 +194,13 @@ export class TestBsky { await Promise.all([ this.ctx.backgroundQueue.processAll(), this.indexer.ctx.backgroundQueue.processAll(), - this.ctx.labelCache.fullRefresh(), ]) } async close() { - await this.server.destroy({ skipDb: true }) + await this.server.destroy({ skipDb: true, skipRedis: true }) await this.ingester.destroy({ skipDb: true }) - await this.indexer.destroy() // closes shared db + await this.indexer.destroy() // closes shared db & redis } } @@ -264,6 +273,12 @@ export async function getIndexers( host: baseCfg.redisHost, namespace: baseCfg.indexerNamespace, }) + const redisCache = new bsky.Redis({ + host: baseCfg.redisHost, + namespace: baseCfg.indexerNamespace, + db: 1, + }) + const indexers = await Promise.all( opts.partitionIdsByIndexer.map(async (indexerPartitionIds) => { const cfg = new bsky.IndexerConfig({ @@ -272,7 +287,7 @@ export async function getIndexers( indexerSubLockId: uniqueLockId(), indexerPort: await getPort(), }) - return bsky.BskyIndexer.create({ cfg, db, redis }) + return bsky.BskyIndexer.create({ cfg, db, redis, redisCache }) }), ) await db.migrateToLatestOrThrow() diff --git a/services/bsky/api.js b/services/bsky/api.js index cf4e59afb37..815a3df544d 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -77,6 +77,23 @@ const main = async () => { blobCacheLocation: env.blobCacheLocation, }) + const redis = new Redis( + config.redisSentinelName + ? { + sentinel: config.redisSentinelName, + hosts: config.redisSentinelHosts, + password: config.redisPassword, + db: 1, + commandTimeout: 500, + } + : { + host: config.redisHost, + password: config.redisPassword, + db: 1, + commandTimeout: 500, + }, + ) + const signingKey = await Secp256k1Keypair.import(env.serviceSigningKey) // configure zero, one, or more image invalidators @@ -108,6 +125,7 @@ const main = async () => { const algos = env.feedPublisherDid ? makeAlgos(env.feedPublisherDid) : {} const bsky = BskyAppView.create({ db, + redis, signingKey, config: cfg, imgInvalidator, diff --git a/services/bsky/indexer.js b/services/bsky/indexer.js index beac2c114d7..c7327339ff2 100644 --- a/services/bsky/indexer.js +++ b/services/bsky/indexer.js @@ -63,9 +63,26 @@ const main = async () => { password: cfg.redisPassword, }, ) + + const redisCache = new Redis( + cfg.redisSentinelName + ? { + sentinel: cfg.redisSentinelName, + hosts: cfg.redisSentinelHosts, + password: cfg.redisPassword, + db: 1, + } + : { + host: cfg.redisHost, + password: cfg.redisPassword, + db: 1, + }, + ) + const indexer = BskyIndexer.create({ db, redis, + redisCache, cfg, imgInvalidator, })