From 3cb9fd90703847b1b86e564b26887f8b3f9e8ec4 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Mon, 9 Dec 2024 19:12:52 -0500 Subject: [PATCH] working migration beginnings --- packages/shuttle/src/example-app/app.ts | 214 +++++++++++++----- packages/shuttle/src/example-app/env.ts | 1 + packages/shuttle/src/example-app/worker.ts | 9 +- packages/shuttle/src/shuttle/hubSubscriber.ts | 27 +++ .../src/shuttle/messageReconciliation.ts | 131 ++++++----- 5 files changed, 263 insertions(+), 119 deletions(-) diff --git a/packages/shuttle/src/example-app/app.ts b/packages/shuttle/src/example-app/app.ts index e830a0cded..9578d0a16e 100644 --- a/packages/shuttle/src/example-app/app.ts +++ b/packages/shuttle/src/example-app/app.ts @@ -12,6 +12,8 @@ import { HubEventStreamConsumer, HubSubscriber, MessageState, + BasicHubSubscriber, + allActiveDbMessagesOfTypeForFid, } from "../index"; // If you want to use this as a standalone app, replace this import with "@farcaster/shuttle" import { AppDb, migrateToLatest, Tables } from "./db"; import { @@ -31,6 +33,7 @@ import { LegacyMessage, Message, MessageData, + MessageType, OnChainEvent, OnChainEventRequest, OnChainEventType, @@ -50,6 +53,7 @@ import { SHARD_INDEX, TOTAL_SHARDS, SNAPCHAIN_HOST, + CLEAR_DB, } from "./env"; import * as process from "node:process"; import url from "node:url"; @@ -57,6 +61,7 @@ import { err, ok, Result } from "neverthrow"; import { getQueue, getWorker } from "./worker"; import { Queue } from "bullmq"; import { bytesToHex, farcasterTimeToDate, sleep } from "../utils"; +import { sql } from "kysely"; const hubId = "shuttle"; @@ -69,9 +74,11 @@ export class App implements MessageHandler { private readonly hubId; private snapchainClient: HubRpcClient; private snapchainAdminClient: AdminRpcClient; + private backendDb: DB; constructor( db: DB, + backendDb: DB, dbSchema: string, redis: RedisClient, hubSubscriber: HubSubscriber, @@ -80,6 +87,7 @@ export class App implements MessageHandler { snapchainAdminClient: AdminRpcClient, ) { this.db = db; + this.backendDb = backendDb; this.dbSchema = dbSchema; this.redis = redis; this.hubSubscriber = hubSubscriber; @@ -98,36 +106,56 @@ export class App implements MessageHandler { totalShards: number, shardIndex: number, hubSSL = false, + clearDb = false, ) { const db = getDbClient(dbUrl, dbSchema); + const backendDb = getDbClient( + "postgres://read_prod:bbgrq8k3jVLUBRUKOJ3HZVXp2mqOAynE@read.cluster-custom-cbfnhl1vqqgl.us-east-1.rds.amazonaws.com:5432/indexer_prod", + dbSchema, + ); const hub = getHubClient(hubUrl, { ssl: hubSSL }); const snapchainClient = getHubClient(snapchainUrl, { ssl: false }).client; const snapchainAdminClient = await getAdminRpcClient(snapchainUrl); const redis = RedisClient.create(redisUrl); + if (clearDb) { + await sql`TRUNCATE TABLE messages`.execute(db); + await sql`TRUNCATE TABLE onchain_events`.execute(db); + } const eventStreamForWrite = new EventStreamConnection(redis.client); const eventStreamForRead = new EventStreamConnection(redis.client); const shardKey = totalShards === 0 ? "all" : `${shardIndex}`; - const hubSubscriber = new EventStreamHubSubscriber( + const hubSubscriber = new BasicHubSubscriber( hubId, hub, eventStreamForWrite, - redis, - shardKey, log, - null, + shardKey, + undefined, totalShards, shardIndex, + 1000 * 60 * 10, // 10 mins ); const streamConsumer = new HubEventStreamConsumer(hub, eventStreamForRead, shardKey); - return new App(db, dbSchema, redis, hubSubscriber, streamConsumer, snapchainClient, snapchainAdminClient); + return new App( + db, + backendDb, + dbSchema, + redis, + hubSubscriber, + streamConsumer, + snapchainClient, + snapchainAdminClient, + ); } - async handleOnChainEvent(onChainEvent: OnChainEvent, txn: DB) { - const result = await this.snapchainAdminClient.submitOnChainEvent(onChainEvent); - if (result.isErr()) { - log.info(`Unable to submit onchain event to snapchain ${result.error.message} ${result.error.stack}`); - return; + async handleOnChainEvent(onChainEvent: OnChainEvent, txn: DB, submitToSnapchain = false) { + if (submitToSnapchain) { + const result = await this.snapchainAdminClient.submitOnChainEvent(onChainEvent); + if (result.isErr()) { + log.info(`Unable to submit onchain event to snapchain ${result.error.message} ${result.error.stack}`); + return; + } } let body = {}; @@ -194,20 +222,6 @@ export class App implements MessageHandler { return; } - if (!message.data) { - return; - } - - const newMessage = Message.create(message); - newMessage.dataBytes = MessageData.encode(message.data).finish(); - // TODO(aditi): Is this right? Need to leave data as is to avoid message missing data error. - // newMessage.data = undefined; - const result = await this.snapchainClient.submitMessage(newMessage); - if (result.isErr()) { - log.info(`Unable to submit message to snapchain ${result.error.message} ${result.error.stack}`); - return; - } - const appDB = txn as unknown as AppDb; // Need this to make typescript happy, not clean way to "inherit" table types // Example of how to materialize casts into a separate table. Insert casts into a separate table, and mark them as deleted when removed @@ -272,40 +286,71 @@ export class App implements MessageHandler { } for (const onChainEvent of result.value.events) { - await this.handleOnChainEvent(onChainEvent, this.db); + await this.handleOnChainEvent(onChainEvent, this.db, true); } return ok(undefined); } - async reconcileFids(fids: number[]) { - // biome-ignore lint/style/noNonNullAssertion: client is always initialized - const reconciler = new MessageReconciliation(this.hubSubscriber.hubClient!, this.db, log); + async reconcileFidsAgainstBackendDb(fids: number[]) { for (const fid of fids) { - const storageResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_STORAGE_RENT); - if (storageResult.isErr()) { - log.info(`Unable to get storage events ${storageResult.error.message} ${storageResult.error.stack}`); - continue; - } + const messageTypes = [ + MessageType.CAST_ADD, + MessageType.REACTION_ADD, + MessageType.LINK_ADD, + MessageType.VERIFICATION_ADD_ETH_ADDRESS, + MessageType.USER_DATA_ADD, + ]; + for (const type of messageTypes) { + let pageNumber = 0; + const pageSize = 100; + while (true) { + const messages = await allActiveDbMessagesOfTypeForFid( + this.backendDb, + fid, + type, + undefined, + undefined, + pageSize, + pageSize * pageNumber, + ); + if (messages.isErr()) { + throw messages.error; + } - const signerResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_SIGNER); - if (signerResult.isErr()) { - log.info(`Unable to get signer events ${signerResult.error.message} ${signerResult.error.stack}`); - continue; - } + if (messages.value.length === 0) { + break; + } - // TODO(aditi): Signer migrated for fid 0 + for (const dbMessage of messages.value) { + const message = Message.decode(dbMessage.raw); + + if (!message.data) { + return; + } + + const newMessage = Message.create(message); + newMessage.dataBytes = MessageData.encode(message.data).finish(); + // TODO(aditi): Is this right? Need to leave data as is to avoid message missing data error. + // newMessage.data = undefined; + const result = await this.snapchainClient.submitMessage(newMessage); + if (result.isErr()) { + log.info(`Unable to submit message to snapchain ${result.error.message} ${result.error.stack}`); + continue; + } + HubEventProcessor.handleMissingMessage(this.db, message, this, false); + } - const idRegisterResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_ID_REGISTER); - if (idRegisterResult.isErr()) { - log.info(`Unable to get signer events ${idRegisterResult.error.message} ${idRegisterResult.error.stack}`); - continue; + pageNumber += 1; + } } + } + } - // Wait for the onchain events to get committed - // TODO(aditi): We may want to do something better here. - await sleep(1 * 1000); - + async reconcileFids(fids: number[]) { + // biome-ignore lint/style/noNonNullAssertion: client is always initialized + const reconciler = new MessageReconciliation(this.hubSubscriber.hubClient!, this.db, log); + for (const fid of fids) { await reconciler.reconcileMessagesForFid( fid, async (message, missingInDb, prunedInDb, revokedInDb) => { @@ -325,7 +370,32 @@ export class App implements MessageHandler { } } - async backfillFids(fids: number[], backfillQueue: Queue) { + async reconcileAllOnchainEvents(fids: number[]) { + await this.ensureMigrations(); + + for (const fid of fids) { + const storageResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_STORAGE_RENT); + if (storageResult.isErr()) { + log.info(`Unable to get storage events ${storageResult.error.message} ${storageResult.error.stack}`); + } + + const signerResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_SIGNER); + if (signerResult.isErr()) { + log.info(`Unable to get signer events ${signerResult.error.message} ${signerResult.error.stack}`); + } + + // TODO(aditi): Signer migrated for fid 0 + + const idRegisterResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_ID_REGISTER); + if (idRegisterResult.isErr()) { + log.info(`Unable to get signer events ${idRegisterResult.error.message} ${idRegisterResult.error.stack}`); + } + } + } + + async backfillFids(fids: number[], backfillQueue: Queue, jobKind: "reconcile" | "reconcileOnchainEvents") { + await this.ensureMigrations(); + const startedAt = Date.now(); if (fids.length === 0) { log.info("No fids provided"); @@ -340,16 +410,11 @@ export class App implements MessageHandler { throw new Error("Max fid was undefined"); } log.info(`Queuing up fids up to: ${maxFid}`); - // create an array of arrays in batches of 100 upto maxFid - const batchSize = 10; - const fids = Array.from({ length: Math.ceil(maxFid / batchSize) }, (_, i) => i * batchSize).map((fid) => fid + 1); - for (const start of fids) { - const subset = Array.from({ length: batchSize }, (_, i) => start + i); - await backfillQueue.add("reconcile", { fids: subset }); - } + const fids = Array.from({ length: maxFid }, (_, i) => i).map((fid) => fid + 1); + await backfillQueue.add(jobKind, { fids }); } else { log.info(`Adding fids ${fids}`); - await backfillQueue.add("reconcile", { fids }); + await backfillQueue.add(jobKind, { fids }); } await backfillQueue.add("completionMarker", { startedAt }); log.info("Backfill jobs queued"); @@ -387,11 +452,40 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString() TOTAL_SHARDS, SHARD_INDEX, HUB_SSL, + CLEAR_DB, ); log.info("Starting shuttle"); await app.start(); } + async function backfillOnChainEvents() { + log.info(`Creating app connecting to: ${POSTGRES_URL}, ${REDIS_URL}, ${HUB_HOST}`); + const app = await App.create( + POSTGRES_URL, + POSTGRES_SCHEMA, + REDIS_URL, + HUB_HOST, + SNAPCHAIN_HOST, + TOTAL_SHARDS, + SHARD_INDEX, + HUB_SSL, + CLEAR_DB, + ); + const fids = BACKFILL_FIDS ? BACKFILL_FIDS.split(",").map((fid) => parseInt(fid)) : []; + log.info(`Backfilling fids: ${fids}`); + + // Don't want any carry over from past runs for now + const backfillQueue = getQueue(app.redis.client); + await backfillQueue.drain(true); + + await app.backfillFids(fids, backfillQueue, "reconcileOnchainEvents"); + + // Start the worker after initiating a backfill + const worker = getWorker(app, app.redis.client, log, CONCURRENCY); + await worker.run(); + return; + } + async function backfill() { log.info(`Creating app connecting to: ${POSTGRES_URL}, ${REDIS_URL}, ${HUB_HOST}`); const app = await App.create( @@ -403,6 +497,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString() TOTAL_SHARDS, SHARD_INDEX, HUB_SSL, + CLEAR_DB, ); const fids = BACKFILL_FIDS ? BACKFILL_FIDS.split(",").map((fid) => parseInt(fid)) : []; log.info(`Backfilling fids: ${fids}`); @@ -411,7 +506,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString() const backfillQueue = getQueue(app.redis.client); await backfillQueue.drain(true); - await app.backfillFids(fids, backfillQueue); + await app.backfillFids(fids, backfillQueue, "reconcile"); // Start the worker after initiating a backfill const worker = getWorker(app, app.redis.client, log, CONCURRENCY); @@ -430,6 +525,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString() TOTAL_SHARDS, SHARD_INDEX, HUB_SSL, + CLEAR_DB, ); const worker = getWorker(app, app.redis.client, log, CONCURRENCY); await worker.run(); @@ -455,6 +551,10 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString() program.command("start").description("Starts the shuttle").action(start); program.command("backfill").description("Queue up backfill for the worker").action(backfill); + program + .command("backfill-onchain-events") + .description("Queue up backfill for the worker") + .action(backfillOnChainEvents); program.command("worker").description("Starts the backfill worker").action(worker); program.parse(process.argv); diff --git a/packages/shuttle/src/example-app/env.ts b/packages/shuttle/src/example-app/env.ts index 4dae576455..94e0af9e36 100644 --- a/packages/shuttle/src/example-app/env.ts +++ b/packages/shuttle/src/example-app/env.ts @@ -23,3 +23,4 @@ export const STATSD_HOST = process.env["STATSD_HOST"]; export const STATSD_METRICS_PREFIX = process.env["STATSD_METRICS_PREFIX"] || "shuttle."; export const CONCURRENCY = parseInt(process.env["CONCURRENCY"] || "2"); +export const CLEAR_DB = process.env["CLEAR_DB"] === "true" ? true : false; diff --git a/packages/shuttle/src/example-app/worker.ts b/packages/shuttle/src/example-app/worker.ts index d3deff5c0c..df68cfc080 100644 --- a/packages/shuttle/src/example-app/worker.ts +++ b/packages/shuttle/src/example-app/worker.ts @@ -12,7 +12,14 @@ export function getWorker(app: App, redis: Redis | Cluster, log: pino.Logger, co if (job.name === "reconcile") { const start = Date.now(); const fids = job.data.fids as number[]; - await app.reconcileFids(fids); + await app.reconcileFidsAgainstBackendDb(fids); + const elapsed = (Date.now() - start) / 1000; + const lastFid = fids[fids.length - 1]; + log.info(`Reconciled ${fids.length} upto ${lastFid} in ${elapsed}s at ${new Date().toISOString()}`); + } else if (job.name === "reconcileOnchainEvents") { + const start = Date.now(); + const fids = job.data.fids as number[]; + await app.reconcileAllOnchainEvents(fids); const elapsed = (Date.now() - start) / 1000; const lastFid = fids[fids.length - 1]; log.info(`Reconciled ${fids.length} upto ${lastFid} in ${elapsed}s at ${new Date().toISOString()}`); diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index ea274decaf..90b8c8e1cd 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -314,3 +314,30 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { return true; } } + +export class BasicHubSubscriber extends BaseHubSubscriber { + private eventStream: EventStreamConnection; + public readonly streamKey: string; + + constructor( + label: string, + hubClient: HubClient, + eventStream: EventStreamConnection, + log: Logger, + shardKey: string, + eventTypes?: HubEventType[], + totalShards?: number, + shardIndex?: number, + connectionTimeout?: number, + ) { + super(label, hubClient.client, log, eventTypes, totalShards, shardIndex, connectionTimeout); + this.eventStream = eventStream; + this.streamKey = `hub:${hubClient.host}:evt:msg:${shardKey}`; + } + + public override async processHubEvent(event: HubEvent): Promise { + const eventBytes = Buffer.from(HubEvent.encode(event).finish()); + await this.eventStream.add(this.streamKey, eventBytes); + return true; + } +} diff --git a/packages/shuttle/src/shuttle/messageReconciliation.ts b/packages/shuttle/src/shuttle/messageReconciliation.ts index 972cde86fa..b992787ea1 100644 --- a/packages/shuttle/src/shuttle/messageReconciliation.ts +++ b/packages/shuttle/src/shuttle/messageReconciliation.ts @@ -134,7 +134,7 @@ export class MessageReconciliation { } // Next, reconcile messages that are in the database but not in the hub - const dbMessages = await this.allActiveDbMessagesOfTypeForFid(fid, type, startTimestamp, stopTimestamp); + const dbMessages = await allActiveDbMessagesOfTypeForFid(this.db, fid, type, startTimestamp, stopTimestamp); if (dbMessages.isErr()) { this.log.error({ startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation"); return; @@ -172,9 +172,11 @@ export class MessageReconciliation { default: throw `Unknown message type ${type}`; } + for await (const messages of fn.call(this, fid, MAX_PAGE_SIZE, startTimestamp, stopTimestamp)) { yield messages as Message[]; } + this.log.info({ type, fid }, "[reconciliation] Done all hub messages of type"); } private async doCallWithFailover( @@ -277,7 +279,9 @@ export class MessageReconciliation { yield messages; - if (!pageToken?.length) break; + if (!pageToken?.length) { + break; + } result = await this.getAllCastMessagesByFid({ pageSize, pageToken, fid, startTimestamp, stopTimestamp }); } } @@ -399,71 +403,76 @@ export class MessageReconciliation { }); } } +} - private async allActiveDbMessagesOfTypeForFid( - fid: number, - type: MessageType, - startTimestamp?: number, - stopTimestamp?: number, - ) { - let typeSet: MessageType[] = [type]; - // Add remove types for messages which support them - switch (type) { - case MessageType.CAST_ADD: - typeSet = [...typeSet, MessageType.CAST_REMOVE]; - break; - case MessageType.REACTION_ADD: - typeSet = [...typeSet, MessageType.REACTION_REMOVE]; - break; - case MessageType.LINK_ADD: - typeSet = [...typeSet, MessageType.LINK_REMOVE, MessageType.LINK_COMPACT_STATE]; - break; - case MessageType.VERIFICATION_ADD_ETH_ADDRESS: - typeSet = [...typeSet, MessageType.VERIFICATION_REMOVE]; - break; - } - - let startDate; - if (startTimestamp) { - const startUnixTimestampResult = fromFarcasterTime(startTimestamp); - if (startUnixTimestampResult.isErr()) { - return err(startUnixTimestampResult.error); - } +export async function allActiveDbMessagesOfTypeForFid( + db: DB, + fid: number, + type: MessageType, + startTimestamp?: number, + stopTimestamp?: number, + limit?: number, + offset?: number, +) { + let typeSet: MessageType[] = [type]; + // Add remove types for messages which support them + switch (type) { + case MessageType.CAST_ADD: + typeSet = [...typeSet, MessageType.CAST_REMOVE]; + break; + case MessageType.REACTION_ADD: + typeSet = [...typeSet, MessageType.REACTION_REMOVE]; + break; + case MessageType.LINK_ADD: + typeSet = [...typeSet, MessageType.LINK_REMOVE, MessageType.LINK_COMPACT_STATE]; + break; + case MessageType.VERIFICATION_ADD_ETH_ADDRESS: + typeSet = [...typeSet, MessageType.VERIFICATION_REMOVE]; + break; + } - startDate = new Date(startUnixTimestampResult.value); + let startDate; + if (startTimestamp) { + const startUnixTimestampResult = fromFarcasterTime(startTimestamp); + if (startUnixTimestampResult.isErr()) { + return err(startUnixTimestampResult.error); } - let stopDate; - if (stopTimestamp) { - const stopUnixTimestampResult = fromFarcasterTime(stopTimestamp); - if (stopUnixTimestampResult.isErr()) { - return err(stopUnixTimestampResult.error); - } + startDate = new Date(startUnixTimestampResult.value); + } - stopDate = new Date(stopUnixTimestampResult.value); + let stopDate; + if (stopTimestamp) { + const stopUnixTimestampResult = fromFarcasterTime(stopTimestamp); + if (stopUnixTimestampResult.isErr()) { + return err(stopUnixTimestampResult.error); } - const query = this.db - .selectFrom("messages") - .select([ - "messages.prunedAt", - "messages.revokedAt", - "messages.hash", - "messages.type", - "messages.fid", - "messages.raw", - "messages.signer", - ]) - .where("messages.fid", "=", fid) - .where("messages.type", "in", typeSet) - .where("messages.prunedAt", "is", null) - .where("messages.revokedAt", "is", null) - .where("messages.deletedAt", "is", null); - const queryWithStartTime = startDate ? query.where("messages.timestamp", ">=", startDate) : query; - const queryWithStopTime = stopDate - ? queryWithStartTime.where("messages.timestamp", "<=", stopDate) - : queryWithStartTime; - const result = await queryWithStopTime.execute(); - return ok(result); + stopDate = new Date(stopUnixTimestampResult.value); } + + const query = db + .selectFrom("messages") + .select([ + "messages.prunedAt", + "messages.revokedAt", + "messages.hash", + "messages.type", + "messages.fid", + "messages.raw", + "messages.signer", + ]) + .where("messages.fid", "=", fid) + .where("messages.type", "in", typeSet) + .where("messages.prunedAt", "is", null) + .where("messages.revokedAt", "is", null) + .where("messages.deletedAt", "is", null); + const queryWithStartTime = startDate ? query.where("messages.timestamp", ">=", startDate) : query; + const queryWithStopTime = stopDate + ? queryWithStartTime.where("messages.timestamp", "<=", stopDate) + : queryWithStartTime; + const queryWithLimit = limit ? queryWithStopTime.limit(limit) : queryWithStopTime; + const queryWithOffset = offset ? queryWithLimit.offset(offset) : queryWithLimit; + const result = await queryWithOffset.execute(); + return ok(result); }