diff --git a/packages/aws/package.json b/packages/aws/package.json index 58bb98e0f80..8bf82637175 100644 --- a/packages/aws/package.json +++ b/packages/aws/package.json @@ -24,6 +24,7 @@ "update-main-to-dist": "node ../../update-main-to-dist.js packages/aws" }, "dependencies": { + "@atproto/common": "workspace:^", "@atproto/crypto": "workspace:^", "@atproto/repo": "workspace:^", "@aws-sdk/client-cloudfront": "^3.261.0", diff --git a/packages/aws/src/bunny.ts b/packages/aws/src/bunny.ts new file mode 100644 index 00000000000..62cfeedc891 --- /dev/null +++ b/packages/aws/src/bunny.ts @@ -0,0 +1,36 @@ +import { handleAllSettledErrors } from '@atproto/common' +import { ImageInvalidator } from './types' + +export type BunnyConfig = { + accessKey: string + urlPrefix: string +} + +const API_PURGE_URL = 'https://api.bunny.net/purge' + +export class BunnyInvalidator implements ImageInvalidator { + constructor(public cfg: BunnyConfig) {} + async invalidate(_subject: string, paths: string[]) { + const results = await Promise.allSettled( + paths.map(async (path) => + purgeUrl({ + url: this.cfg.urlPrefix + path, + accessKey: this.cfg.accessKey, + }), + ), + ) + handleAllSettledErrors(results) + } +} + +export default BunnyInvalidator + +async function purgeUrl(opts: { accessKey: string; url: string }) { + const search = new URLSearchParams() + search.set('async', 'true') + search.set('url', opts.url) + await fetch(API_PURGE_URL + '?' + search.toString(), { + method: 'post', + headers: { AccessKey: opts.accessKey }, + }) +} diff --git a/packages/aws/src/cloudfront.ts b/packages/aws/src/cloudfront.ts index 30708caee25..0577e4cc613 100644 --- a/packages/aws/src/cloudfront.ts +++ b/packages/aws/src/cloudfront.ts @@ -1,4 +1,5 @@ import * as aws from '@aws-sdk/client-cloudfront' +import { ImageInvalidator } from './types' export type CloudfrontConfig = { distributionId: string @@ -33,9 +34,3 @@ export class CloudfrontInvalidator implements ImageInvalidator { } export default CloudfrontInvalidator - -// @NOTE keep in sync with same interface in pds/src/image/invalidator.ts -// this is separate to avoid the dependency on @atproto/pds. -interface ImageInvalidator { - invalidate(subject: string, paths: string[]): Promise -} diff --git a/packages/aws/src/index.ts b/packages/aws/src/index.ts index a836847afc2..aa11f2c7ef0 100644 --- a/packages/aws/src/index.ts +++ b/packages/aws/src/index.ts @@ -1,3 +1,6 @@ export * from './kms' export * from './s3' export * from './cloudfront' +export * from './bunny' +export * from './util' +export * from './types' diff --git a/packages/aws/src/types.ts b/packages/aws/src/types.ts new file mode 100644 index 00000000000..9c5a3e2dd85 --- /dev/null +++ b/packages/aws/src/types.ts @@ -0,0 +1,5 @@ +// @NOTE keep in sync with same interface in bsky/src/image/invalidator.ts +// this is separate to avoid the dependency on @atproto/bsky. +export interface ImageInvalidator { + invalidate(subject: string, paths: string[]): Promise +} diff --git a/packages/aws/src/util.ts b/packages/aws/src/util.ts new file mode 100644 index 00000000000..8f603055bea --- /dev/null +++ b/packages/aws/src/util.ts @@ -0,0 +1,14 @@ +import { handleAllSettledErrors } from '@atproto/common' +import { ImageInvalidator } from './types' + +export class MultiImageInvalidator implements ImageInvalidator { + constructor(public invalidators: ImageInvalidator[]) {} + async invalidate(subject: string, paths: string[]) { + const results = await Promise.allSettled( + this.invalidators.map((invalidator) => + invalidator.invalidate(subject, paths), + ), + ) + handleAllSettledErrors(results) + } +} diff --git a/packages/bsky/src/indexer/subscription.ts b/packages/bsky/src/indexer/subscription.ts index 76c9b7297df..abc672db3b0 100644 --- a/packages/bsky/src/indexer/subscription.ts +++ b/packages/bsky/src/indexer/subscription.ts @@ -1,7 +1,7 @@ import assert from 'node:assert' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' -import { cborDecode, wait } from '@atproto/common' +import { cborDecode, wait, handleAllSettledErrors } from '@atproto/common' import { DisconnectError } from '@atproto/xrpc-server' import { WriteOpAction, @@ -343,23 +343,3 @@ type PreparedDelete = { } type PreparedWrite = PreparedCreate | PreparedUpdate | PreparedDelete - -function handleAllSettledErrors(results: PromiseSettledResult[]) { - const errors = results.filter(isRejected).map((res) => res.reason) - if (errors.length === 0) { - return - } - if (errors.length === 1) { - throw errors[0] - } - throw new AggregateError( - errors, - 'Multiple errors: ' + errors.map((err) => err?.message).join('\n'), - ) -} - -function isRejected( - result: PromiseSettledResult, -): result is PromiseRejectedResult { - return result.status === 'rejected' -} diff --git a/packages/common-web/src/async.ts b/packages/common-web/src/async.ts index 9e173d835fe..01e381ed337 100644 --- a/packages/common-web/src/async.ts +++ b/packages/common-web/src/async.ts @@ -124,3 +124,25 @@ export class AsyncBufferFullError extends Error { super(`ReachedMaxBufferSize: ${maxSize}`) } } + +export const handleAllSettledErrors = ( + results: PromiseSettledResult[], +) => { + const errors = results.filter(isRejected).map((res) => res.reason) + if (errors.length === 0) { + return + } + if (errors.length === 1) { + throw errors[0] + } + throw new AggregateError( + errors, + 'Multiple errors: ' + errors.map((err) => err?.message).join('\n'), + ) +} + +const isRejected = ( + result: PromiseSettledResult, +): result is PromiseRejectedResult => { + return result.status === 'rejected' +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0e22af0c4e1..6c899913626 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -123,6 +123,9 @@ importers: packages/aws: dependencies: + '@atproto/common': + specifier: workspace:^ + version: link:../common '@atproto/crypto': specifier: workspace:^ version: link:../crypto diff --git a/services/bsky/api.js b/services/bsky/api.js index ec38c55ae55..5363f7661f0 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -13,7 +13,11 @@ require('dd-trace') // Only works with commonjs // Tracer code above must come before anything else const path = require('path') const assert = require('assert') -const { CloudfrontInvalidator } = require('@atproto/aws') +const { + BunnyInvalidator, + CloudfrontInvalidator, + MultiImageInvalidator, +} = require('@atproto/aws') const { DatabaseCoordinator, PrimaryDatabase, @@ -59,17 +63,38 @@ const main = async () => { imgUriEndpoint: env.imgUriEndpoint, blobCacheLocation: env.blobCacheLocation, }) + + // configure zero, one, or both image invalidators + let imgInvalidator + const bunnyInvalidator = env.bunnyAccessKey + ? new BunnyInvalidator({ + accessKey: env.bunnyAccessKey, + urlPrefix: cfg.imgUriEndpoint, + }) + : undefined const cfInvalidator = env.cfDistributionId ? new CloudfrontInvalidator({ distributionId: env.cfDistributionId, pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, }) : undefined + + if (bunnyInvalidator && imgInvalidator) { + imgInvalidator = new MultiImageInvalidator([ + bunnyInvalidator, + imgInvalidator, + ]) + } else if (bunnyInvalidator) { + imgInvalidator = bunnyInvalidator + } else if (cfInvalidator) { + imgInvalidator = cfInvalidator + } + const algos = env.feedPublisherDid ? makeAlgos(env.feedPublisherDid) : {} const bsky = BskyAppView.create({ db, config: cfg, - imgInvalidator: cfInvalidator, + imgInvalidator, algos, }) // separate db needed for more permissions @@ -127,6 +152,7 @@ const getEnv = () => ({ imgUriKey: process.env.IMG_URI_KEY, imgUriEndpoint: process.env.IMG_URI_ENDPOINT, blobCacheLocation: process.env.BLOB_CACHE_LOC, + bunnyAccessKey: process.env.BUNNY_ACCESS_KEY, cfDistributionId: process.env.CF_DISTRIBUTION_ID, feedPublisherDid: process.env.FEED_PUBLISHER_DID, }) diff --git a/services/bsky/indexer.js b/services/bsky/indexer.js index 7ab287133df..beac2c114d7 100644 --- a/services/bsky/indexer.js +++ b/services/bsky/indexer.js @@ -3,7 +3,7 @@ require('dd-trace/init') // Only works with commonjs // Tracer code above must come before anything else -const { CloudfrontInvalidator } = require('@atproto/aws') +const { CloudfrontInvalidator, BunnyInvalidator } = require('@atproto/aws') const { IndexerConfig, BskyIndexer, @@ -25,12 +25,32 @@ const main = async () => { dbPostgresUrl: env.dbPostgresUrl, dbPostgresSchema: env.dbPostgresSchema, }) + + // configure zero, one, or both image invalidators + let imgInvalidator + const bunnyInvalidator = env.bunnyAccessKey + ? new BunnyInvalidator({ + accessKey: env.bunnyAccessKey, + urlPrefix: cfg.imgUriEndpoint, + }) + : undefined const cfInvalidator = env.cfDistributionId ? new CloudfrontInvalidator({ distributionId: env.cfDistributionId, pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, }) : undefined + if (bunnyInvalidator && imgInvalidator) { + imgInvalidator = new MultiImageInvalidator([ + bunnyInvalidator, + imgInvalidator, + ]) + } else if (bunnyInvalidator) { + imgInvalidator = bunnyInvalidator + } else if (cfInvalidator) { + imgInvalidator = cfInvalidator + } + const redis = new Redis( cfg.redisSentinelName ? { @@ -47,7 +67,7 @@ const main = async () => { db, redis, cfg, - imgInvalidator: cfInvalidator, + imgInvalidator, }) await indexer.start() process.on('SIGTERM', async () => { @@ -77,6 +97,7 @@ const getEnv = () => ({ dbPoolSize: maybeParseInt(process.env.DB_POOL_SIZE), dbPoolMaxUses: maybeParseInt(process.env.DB_POOL_MAX_USES), dbPoolIdleTimeoutMs: maybeParseInt(process.env.DB_POOL_IDLE_TIMEOUT_MS), + bunnyAccessKey: process.env.BUNNY_ACCESS_KEY, cfDistributionId: process.env.CF_DISTRIBUTION_ID, imgUriEndpoint: process.env.IMG_URI_ENDPOINT, })