Skip to content

Commit

Permalink
working migration beginnings
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Dec 10, 2024
1 parent 7c1e6c5 commit 3cb9fd9
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 119 deletions.
214 changes: 157 additions & 57 deletions packages/shuttle/src/example-app/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
HubEventStreamConsumer,
HubSubscriber,
MessageState,
BasicHubSubscriber,
allActiveDbMessagesOfTypeForFid,
} from "../index"; // If you want to use this as a standalone app, replace this import with "@farcaster/shuttle"
import { AppDb, migrateToLatest, Tables } from "./db";
import {
Expand All @@ -31,6 +33,7 @@ import {
LegacyMessage,
Message,
MessageData,
MessageType,
OnChainEvent,
OnChainEventRequest,
OnChainEventType,
Expand All @@ -50,13 +53,15 @@ import {
SHARD_INDEX,
TOTAL_SHARDS,
SNAPCHAIN_HOST,
CLEAR_DB,
} from "./env";
import * as process from "node:process";
import url from "node:url";
import { err, ok, Result } from "neverthrow";
import { getQueue, getWorker } from "./worker";
import { Queue } from "bullmq";
import { bytesToHex, farcasterTimeToDate, sleep } from "../utils";
import { sql } from "kysely";

const hubId = "shuttle";

Expand All @@ -69,9 +74,11 @@ export class App implements MessageHandler {
private readonly hubId;
private snapchainClient: HubRpcClient;
private snapchainAdminClient: AdminRpcClient;
private backendDb: DB;

constructor(
db: DB,
backendDb: DB,
dbSchema: string,
redis: RedisClient,
hubSubscriber: HubSubscriber,
Expand All @@ -80,6 +87,7 @@ export class App implements MessageHandler {
snapchainAdminClient: AdminRpcClient,
) {
this.db = db;
this.backendDb = backendDb;
this.dbSchema = dbSchema;
this.redis = redis;
this.hubSubscriber = hubSubscriber;
Expand All @@ -98,36 +106,56 @@ export class App implements MessageHandler {
totalShards: number,
shardIndex: number,
hubSSL = false,
clearDb = false,
) {
const db = getDbClient(dbUrl, dbSchema);
const backendDb = getDbClient(
"postgres://read_prod:bbgrq8k3jVLUBRUKOJ3HZVXp2mqOAynE@read.cluster-custom-cbfnhl1vqqgl.us-east-1.rds.amazonaws.com:5432/indexer_prod",
dbSchema,
);
const hub = getHubClient(hubUrl, { ssl: hubSSL });
const snapchainClient = getHubClient(snapchainUrl, { ssl: false }).client;
const snapchainAdminClient = await getAdminRpcClient(snapchainUrl);
const redis = RedisClient.create(redisUrl);
if (clearDb) {
await sql`TRUNCATE TABLE messages`.execute(db);
await sql`TRUNCATE TABLE onchain_events`.execute(db);
}
const eventStreamForWrite = new EventStreamConnection(redis.client);
const eventStreamForRead = new EventStreamConnection(redis.client);
const shardKey = totalShards === 0 ? "all" : `${shardIndex}`;
const hubSubscriber = new EventStreamHubSubscriber(
const hubSubscriber = new BasicHubSubscriber(
hubId,
hub,
eventStreamForWrite,
redis,
shardKey,
log,
null,
shardKey,
undefined,
totalShards,
shardIndex,
1000 * 60 * 10, // 10 mins
);
const streamConsumer = new HubEventStreamConsumer(hub, eventStreamForRead, shardKey);

return new App(db, dbSchema, redis, hubSubscriber, streamConsumer, snapchainClient, snapchainAdminClient);
return new App(
db,
backendDb,
dbSchema,
redis,
hubSubscriber,
streamConsumer,
snapchainClient,
snapchainAdminClient,
);
}

async handleOnChainEvent(onChainEvent: OnChainEvent, txn: DB) {
const result = await this.snapchainAdminClient.submitOnChainEvent(onChainEvent);
if (result.isErr()) {
log.info(`Unable to submit onchain event to snapchain ${result.error.message} ${result.error.stack}`);
return;
async handleOnChainEvent(onChainEvent: OnChainEvent, txn: DB, submitToSnapchain = false) {
if (submitToSnapchain) {
const result = await this.snapchainAdminClient.submitOnChainEvent(onChainEvent);
if (result.isErr()) {
log.info(`Unable to submit onchain event to snapchain ${result.error.message} ${result.error.stack}`);
return;
}
}

let body = {};
Expand Down Expand Up @@ -194,20 +222,6 @@ export class App implements MessageHandler {
return;
}

if (!message.data) {
return;
}

const newMessage = Message.create(message);
newMessage.dataBytes = MessageData.encode(message.data).finish();
// TODO(aditi): Is this right? Need to leave data as is to avoid message missing data error.
// newMessage.data = undefined;
const result = await this.snapchainClient.submitMessage(newMessage);
if (result.isErr()) {
log.info(`Unable to submit message to snapchain ${result.error.message} ${result.error.stack}`);
return;
}

const appDB = txn as unknown as AppDb; // Need this to make typescript happy, not clean way to "inherit" table types

// Example of how to materialize casts into a separate table. Insert casts into a separate table, and mark them as deleted when removed
Expand Down Expand Up @@ -272,40 +286,71 @@ export class App implements MessageHandler {
}

for (const onChainEvent of result.value.events) {
await this.handleOnChainEvent(onChainEvent, this.db);
await this.handleOnChainEvent(onChainEvent, this.db, true);
}

return ok(undefined);
}

async reconcileFids(fids: number[]) {
// biome-ignore lint/style/noNonNullAssertion: client is always initialized
const reconciler = new MessageReconciliation(this.hubSubscriber.hubClient!, this.db, log);
async reconcileFidsAgainstBackendDb(fids: number[]) {
for (const fid of fids) {
const storageResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_STORAGE_RENT);
if (storageResult.isErr()) {
log.info(`Unable to get storage events ${storageResult.error.message} ${storageResult.error.stack}`);
continue;
}
const messageTypes = [
MessageType.CAST_ADD,
MessageType.REACTION_ADD,
MessageType.LINK_ADD,
MessageType.VERIFICATION_ADD_ETH_ADDRESS,
MessageType.USER_DATA_ADD,
];
for (const type of messageTypes) {
let pageNumber = 0;
const pageSize = 100;
while (true) {
const messages = await allActiveDbMessagesOfTypeForFid(
this.backendDb,
fid,
type,
undefined,
undefined,
pageSize,
pageSize * pageNumber,
);
if (messages.isErr()) {
throw messages.error;
}

const signerResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_SIGNER);
if (signerResult.isErr()) {
log.info(`Unable to get signer events ${signerResult.error.message} ${signerResult.error.stack}`);
continue;
}
if (messages.value.length === 0) {
break;
}

// TODO(aditi): Signer migrated for fid 0
for (const dbMessage of messages.value) {
const message = Message.decode(dbMessage.raw);

if (!message.data) {
return;
}

const newMessage = Message.create(message);
newMessage.dataBytes = MessageData.encode(message.data).finish();
// TODO(aditi): Is this right? Need to leave data as is to avoid message missing data error.
// newMessage.data = undefined;
const result = await this.snapchainClient.submitMessage(newMessage);
if (result.isErr()) {
log.info(`Unable to submit message to snapchain ${result.error.message} ${result.error.stack}`);
continue;
}
HubEventProcessor.handleMissingMessage(this.db, message, this, false);
}

const idRegisterResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_ID_REGISTER);
if (idRegisterResult.isErr()) {
log.info(`Unable to get signer events ${idRegisterResult.error.message} ${idRegisterResult.error.stack}`);
continue;
pageNumber += 1;
}
}
}
}

// Wait for the onchain events to get committed
// TODO(aditi): We may want to do something better here.
await sleep(1 * 1000);

async reconcileFids(fids: number[]) {
// biome-ignore lint/style/noNonNullAssertion: client is always initialized
const reconciler = new MessageReconciliation(this.hubSubscriber.hubClient!, this.db, log);
for (const fid of fids) {
await reconciler.reconcileMessagesForFid(
fid,
async (message, missingInDb, prunedInDb, revokedInDb) => {
Expand All @@ -325,7 +370,32 @@ export class App implements MessageHandler {
}
}

async backfillFids(fids: number[], backfillQueue: Queue) {
async reconcileAllOnchainEvents(fids: number[]) {
await this.ensureMigrations();

for (const fid of fids) {
const storageResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_STORAGE_RENT);
if (storageResult.isErr()) {
log.info(`Unable to get storage events ${storageResult.error.message} ${storageResult.error.stack}`);
}

const signerResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_SIGNER);
if (signerResult.isErr()) {
log.info(`Unable to get signer events ${signerResult.error.message} ${signerResult.error.stack}`);
}

// TODO(aditi): Signer migrated for fid 0

const idRegisterResult = await this.reconcileOnchainEvents(fid, OnChainEventType.EVENT_TYPE_ID_REGISTER);
if (idRegisterResult.isErr()) {
log.info(`Unable to get signer events ${idRegisterResult.error.message} ${idRegisterResult.error.stack}`);
}
}
}

async backfillFids(fids: number[], backfillQueue: Queue, jobKind: "reconcile" | "reconcileOnchainEvents") {
await this.ensureMigrations();

const startedAt = Date.now();
if (fids.length === 0) {
log.info("No fids provided");
Expand All @@ -340,16 +410,11 @@ export class App implements MessageHandler {
throw new Error("Max fid was undefined");
}
log.info(`Queuing up fids up to: ${maxFid}`);
// create an array of arrays in batches of 100 upto maxFid
const batchSize = 10;
const fids = Array.from({ length: Math.ceil(maxFid / batchSize) }, (_, i) => i * batchSize).map((fid) => fid + 1);
for (const start of fids) {
const subset = Array.from({ length: batchSize }, (_, i) => start + i);
await backfillQueue.add("reconcile", { fids: subset });
}
const fids = Array.from({ length: maxFid }, (_, i) => i).map((fid) => fid + 1);
await backfillQueue.add(jobKind, { fids });
} else {
log.info(`Adding fids ${fids}`);
await backfillQueue.add("reconcile", { fids });
await backfillQueue.add(jobKind, { fids });
}
await backfillQueue.add("completionMarker", { startedAt });
log.info("Backfill jobs queued");
Expand Down Expand Up @@ -387,11 +452,40 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString()
TOTAL_SHARDS,
SHARD_INDEX,
HUB_SSL,
CLEAR_DB,
);
log.info("Starting shuttle");
await app.start();
}

async function backfillOnChainEvents() {
log.info(`Creating app connecting to: ${POSTGRES_URL}, ${REDIS_URL}, ${HUB_HOST}`);
const app = await App.create(
POSTGRES_URL,
POSTGRES_SCHEMA,
REDIS_URL,
HUB_HOST,
SNAPCHAIN_HOST,
TOTAL_SHARDS,
SHARD_INDEX,
HUB_SSL,
CLEAR_DB,
);
const fids = BACKFILL_FIDS ? BACKFILL_FIDS.split(",").map((fid) => parseInt(fid)) : [];
log.info(`Backfilling fids: ${fids}`);

// Don't want any carry over from past runs for now
const backfillQueue = getQueue(app.redis.client);
await backfillQueue.drain(true);

await app.backfillFids(fids, backfillQueue, "reconcileOnchainEvents");

// Start the worker after initiating a backfill
const worker = getWorker(app, app.redis.client, log, CONCURRENCY);
await worker.run();
return;
}

async function backfill() {
log.info(`Creating app connecting to: ${POSTGRES_URL}, ${REDIS_URL}, ${HUB_HOST}`);
const app = await App.create(
Expand All @@ -403,6 +497,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString()
TOTAL_SHARDS,
SHARD_INDEX,
HUB_SSL,
CLEAR_DB,
);
const fids = BACKFILL_FIDS ? BACKFILL_FIDS.split(",").map((fid) => parseInt(fid)) : [];
log.info(`Backfilling fids: ${fids}`);
Expand All @@ -411,7 +506,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString()
const backfillQueue = getQueue(app.redis.client);
await backfillQueue.drain(true);

await app.backfillFids(fids, backfillQueue);
await app.backfillFids(fids, backfillQueue, "reconcile");

// Start the worker after initiating a backfill
const worker = getWorker(app, app.redis.client, log, CONCURRENCY);
Expand All @@ -430,6 +525,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString()
TOTAL_SHARDS,
SHARD_INDEX,
HUB_SSL,
CLEAR_DB,
);
const worker = getWorker(app, app.redis.client, log, CONCURRENCY);
await worker.run();
Expand All @@ -455,6 +551,10 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString()

program.command("start").description("Starts the shuttle").action(start);
program.command("backfill").description("Queue up backfill for the worker").action(backfill);
program
.command("backfill-onchain-events")
.description("Queue up backfill for the worker")
.action(backfillOnChainEvents);
program.command("worker").description("Starts the backfill worker").action(worker);

program.parse(process.argv);
Expand Down
1 change: 1 addition & 0 deletions packages/shuttle/src/example-app/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ export const STATSD_HOST = process.env["STATSD_HOST"];
export const STATSD_METRICS_PREFIX = process.env["STATSD_METRICS_PREFIX"] || "shuttle.";

export const CONCURRENCY = parseInt(process.env["CONCURRENCY"] || "2");
export const CLEAR_DB = process.env["CLEAR_DB"] === "true" ? true : false;
9 changes: 8 additions & 1 deletion packages/shuttle/src/example-app/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ export function getWorker(app: App, redis: Redis | Cluster, log: pino.Logger, co
if (job.name === "reconcile") {
const start = Date.now();
const fids = job.data.fids as number[];
await app.reconcileFids(fids);
await app.reconcileFidsAgainstBackendDb(fids);
const elapsed = (Date.now() - start) / 1000;
const lastFid = fids[fids.length - 1];
log.info(`Reconciled ${fids.length} upto ${lastFid} in ${elapsed}s at ${new Date().toISOString()}`);
} else if (job.name === "reconcileOnchainEvents") {
const start = Date.now();
const fids = job.data.fids as number[];
await app.reconcileAllOnchainEvents(fids);
const elapsed = (Date.now() - start) / 1000;
const lastFid = fids[fids.length - 1];
log.info(`Reconciled ${fids.length} upto ${lastFid} in ${elapsed}s at ${new Date().toISOString()}`);
Expand Down
Loading

0 comments on commit 3cb9fd9

Please sign in to comment.