Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/event replay perf #1572

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/docker-compose.dev.postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ services:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: stacks_blockchain_api
POSTGRES_PORT: 5432
command: >
-c work_mem=256MB
-c maintenance_work_mem=256MB
-c max_wal_size=1GB
98 changes: 14 additions & 84 deletions src/datastore/event-requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { connectPostgres, PgServer } from './connection';
import { connectPgPool, connectWithRetry } from './connection-legacy';
import * as pgCopyStreams from 'pg-copy-streams';
import * as PgCursor from 'pg-cursor';
import * as readline from 'readline';

export async function exportRawEventRequests(targetStream: Writable): Promise<void> {
const pool = await connectPgPool({
Expand All @@ -28,94 +29,23 @@ export async function exportRawEventRequests(targetStream: Writable): Promise<vo
}

export async function* getRawEventRequests(
readStream: Readable,
onStatusUpdate?: (msg: string) => void
): AsyncGenerator<DbRawEventRequest[], void, unknown> {
// 1. Pipe input stream into a temp table
// 2. Use `pg-cursor` to async read rows from temp table (order by `id` ASC)
// 3. Drop temp table
// 4. Close db connection
const pool = await connectPgPool({
usageName: 'get-raw-events',
pgServer: PgServer.primary,
readStream: Readable
): AsyncGenerator<DbRawEventRequest, void, unknown> {
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity,
});
try {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(`
CREATE TEMPORARY TABLE temp_event_observer_requests(
id bigint PRIMARY KEY,
receive_timestamp timestamptz NOT NULL,
event_path text NOT NULL,
payload jsonb NOT NULL
) ON COMMIT DROP
`);
// Use a `temp_raw_tsv` table first to store the raw TSV data as it might come with duplicate
// rows which would trigger the `PRIMARY KEY` constraint in `temp_event_observer_requests`.
// We will "upsert" from the former to the latter before event ingestion.
await client.query(`
CREATE TEMPORARY TABLE temp_raw_tsv
(LIKE temp_event_observer_requests)
ON COMMIT DROP
`);
onStatusUpdate?.('Importing raw event requests into temporary table...');
const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`));
await pipelineAsync(readStream, importStream);
onStatusUpdate?.('Removing any duplicate raw event requests...');
await client.query(`
INSERT INTO temp_event_observer_requests
SELECT *
FROM temp_raw_tsv
ON CONFLICT DO NOTHING;
`);
const totallengthQuery = await client.query<{ count: string }>(
`SELECT COUNT(id) count FROM temp_event_observer_requests`
);
const totallength = parseInt(totallengthQuery.rows[0].count);
let lastStatusUpdatePercent = 0;
onStatusUpdate?.('Streaming raw event requests from temporary table...');
const cursor = new PgCursor<{ id: string; event_path: string; payload: string }>(
`
SELECT id, event_path, payload::text
FROM temp_event_observer_requests
ORDER BY id ASC
`
);
const cursorQuery = client.query(cursor);
const rowBatchSize = 100;
let rowsReadCount = 0;
let rows: DbRawEventRequest[] = [];
do {
rows = await new Promise<DbRawEventRequest[]>((resolve, reject) => {
cursorQuery.read(rowBatchSize, (error, rows) => {
if (error) {
reject(error);
} else {
rowsReadCount += rows.length;
if ((rowsReadCount / totallength) * 100 > lastStatusUpdatePercent + 1) {
lastStatusUpdatePercent = Math.floor((rowsReadCount / totallength) * 100);
onStatusUpdate?.(
`Raw event requests processed: ${lastStatusUpdatePercent}% (${rowsReadCount} / ${totallength})`
);
}
resolve(rows);
}
});
});
if (rows.length > 0) {
yield rows;
}
} while (rows.length > 0);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
for await (const line of rl) {
const columns = line.split('\t');
const rawRequest: DbRawEventRequest = {
event_path: columns[2],
payload: columns[3],
};
yield rawRequest;
}
} finally {
await pool.end();
rl.close();
}
}

Expand Down
48 changes: 27 additions & 21 deletions src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,44 @@ export async function importEventsFromTsv(

// Import TSV chain data
const readStream = fs.createReadStream(resolvedFilePath);
const rawEventsIterator = getRawEventRequests(readStream, status => {
console.log(status);
});
const rawEventsIterator = getRawEventRequests(readStream);
// Set logger to only output for warnings/errors, otherwise the event replay will result
// in the equivalent of months/years of API log output.
logger.level = 'warn';
// The current import block height. Will be updated with every `/new_block` event.
let blockHeight = 0;
let lastStatusUpdatePercent = 0;
const responses = [];
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
if (eventImportMode === EventImportMode.pruned) {
if (blockHeight === prunedBlockHeight) {
console.log(`Resuming prunable event import...`);
}
for await (const rawEvent of rawEventsIterator) {
if (eventImportMode === EventImportMode.pruned) {
if (blockHeight === prunedBlockHeight) {
console.log(`Resuming prunable event import...`);
}
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (rawEvent.event_path === '/new_block') {
blockHeight = await getDbBlockHeight(db);
if (blockHeight && blockHeight % 1000 === 0) {
}
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (rawEvent.event_path === '/new_block') {
blockHeight = await getDbBlockHeight(db);
if (blockHeight) {
if (blockHeight % 1000 === 0) {
console.log(`Event file block height reached: ${blockHeight}`);
}
const percentProgress = (blockHeight / tsvBlockHeight) * 100;
if (percentProgress > lastStatusUpdatePercent + 1) {
lastStatusUpdatePercent = Math.floor(percentProgress);
console.log(
`Blocks processed: ${lastStatusUpdatePercent}% (${blockHeight} / ${tsvBlockHeight})`
);
}
}
responses.push(response);
}
responses.push(response);
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
Expand Down
68 changes: 32 additions & 36 deletions src/tests-event-replay/import-export-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,19 @@ describe('IBD', () => {
return [eventServer, eventServer.closeAsync] as const;
},
async (rawEventsIterator, eventServer) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
}
for await (const rawEvent of rawEventsIterator) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
}
}
}
Expand Down Expand Up @@ -214,27 +212,25 @@ describe('IBD', () => {
return [eventServer, eventServer.closeAsync] as const;
},
async (rawEventsIterator, eventServer) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
const chainTip = await db.getChainTip(client, false);
const ibdThreshold = Number.parseInt(process.env.IBD_MODE_UNTIL_BLOCK as string);
if (chainTip.blockHeight < ibdThreshold) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
} else {
expect(response.statusCode).toBe(200);
expect(response.response).not.toBe('IBD mode active.');
}
for await (const rawEvent of rawEventsIterator) {
ibdRoutesVisited.add(rawEvent.event_path);
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: true,
});
if (ibdRoutes.includes(rawEvent.event_path)) {
const chainTip = await db.getChainTip(client, false);
const ibdThreshold = Number.parseInt(process.env.IBD_MODE_UNTIL_BLOCK as string);
if (chainTip.blockHeight < ibdThreshold) {
expect(response.statusCode).toBe(200);
expect(response.response).toBe('IBD mode active.');
} else {
expect(response.statusCode).toBe(200);
expect(response.response).not.toBe('IBD mode active.');
}
}
}
Expand Down
44 changes: 21 additions & 23 deletions src/tests-event-replay/raw-event-request-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,29 @@ describe('Events table', () => {
return [eventServer, eventServer.closeAsync] as const;
},
async (rawEventsIterator, eventServer) => {
for await (const rawEvents of rawEventsIterator) {
for (const rawEvent of rawEvents) {
try {
if (rawEvent.event_path === '/new_block') {
const payloadJson = JSON.parse(rawEvent.payload);
payloadJson.transactions = undefined;
rawEvent.payload = JSON.stringify(payloadJson);
}
} catch (error) {}
const rawEventRequestCountResultBefore = await getRawEventCount();
const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0];
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: false,
});
for await (const rawEvent of rawEventsIterator) {
try {
if (rawEvent.event_path === '/new_block') {
expect(response.statusCode).toBe(500);
const rawEventRequestCountResultAfter = await getRawEventCount();
const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0];
expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter);
const payloadJson = JSON.parse(rawEvent.payload);
payloadJson.transactions = undefined;
rawEvent.payload = JSON.stringify(payloadJson);
}
} catch (error) {}
const rawEventRequestCountResultBefore = await getRawEventCount();
const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0];
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: false,
});
if (rawEvent.event_path === '/new_block') {
expect(response.statusCode).toBe(500);
const rawEventRequestCountResultAfter = await getRawEventCount();
const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0];
expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter);
}
}
}
Expand Down
Loading