Skip to content

Commit

Permalink
Merge pull request #8 from pinax-network/fix/logging
Browse files Browse the repository at this point in the history
improve logging
  • Loading branch information
DenisCarriere authored Mar 3, 2024
2 parents 852bf85 + 5abb06a commit 4b8152d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
24 changes: 16 additions & 8 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fs from "fs";
import path from "path";
import pkg from "./package.json" assert { type: "json" };
import { setup, fileCursor } from "substreams-sink";
import { CSVRunOptions } from "./bin/cli.js"
import { EntityChanges, getValuesInEntityChange } from "@substreams/sink-entity-changes/zod"
Expand All @@ -10,6 +11,8 @@ import { parseClock } from "./src/parseClock.js";
import { parseSchema } from "./src/parseSchema.js";

export async function action(options: CSVRunOptions ) {
console.log(`[substreams-sink-csv] v${pkg.version}`);

// handle file system manifest
// can be removed when issue resolved
// https://github.com/substreams-js/substreams-js/issues/62
Expand All @@ -29,14 +32,17 @@ export async function action(options: CSVRunOptions ) {

// Cursor
const moduleHash = await getModuleHash(options);
console.log(JSON.stringify({manifest: options.manifest, moduleName: options.moduleName, moduleHash}));
const { name, cursorFile, clockFile, sessionFile } = parseFilename(moduleHash, options);
const startCursor = fs.existsSync(cursorFile) ? fs.readFileSync(cursorFile, "utf8") : '';
console.log(JSON.stringify({name, cursorFile, clockFile, sessionFile}));

// CSV writer (append)
const clockWriter = fs.createWriteStream(clockFile, {flags: "a"});
const writers: Map<string, fs.WriteStream> = new Map();

for ( const [table, columns] of tables ) {
console.log(JSON.stringify({table, columns}));
const filename = `${name}-${table}.csv`;
const writer = fs.createWriteStream(filename, {flags: "a"});
if ( !fs.existsSync(filename) ) writer.write(columns.join(",") + "\n");
Expand All @@ -57,20 +63,23 @@ export async function action(options: CSVRunOptions ) {
let totalBytesRead = 0;
let totalBytesWritten = 0;
let traceId = "";
let start_block = 0;
let workers = 0;
let resolvedStartBlock = 0;
let maxParallelWorkers = 0;
let runningJobs = 0;

emitter.on("session", (session) => {
fs.writeFileSync(sessionFile, JSON.stringify(session, null, 2));
traceId = session.traceId;
start_block = Number(session.resolvedStartBlock);
workers = Number(session.maxParallelWorkers)
resolvedStartBlock = Number(session.resolvedStartBlock);
maxParallelWorkers = Number(session.maxParallelWorkers)
console.log(JSON.stringify({traceId, resolvedStartBlock, maxParallelWorkers}));
});

emitter.on("progress", (progress) => {
if ( progress.processedBytes ) {
totalBytesRead += Number(progress.processedBytes.totalBytesRead);
totalBytesWritten += Number(progress.processedBytes.totalBytesWritten);
runningJobs = progress.runningJobs.length;
}
log();
});
Expand All @@ -85,6 +94,8 @@ export async function action(options: CSVRunOptions ) {
// Stream Messages
emitter.on("anyMessage", async (data, cursor, clock) => {
const { block_num, block_id, timestamp, seconds } = parseClock(clock);
last_block_num = block_num;
last_timestamp = timestamp;

// block header
for ( const entityChange of EntityChanges.parse(data).entityChanges ) {
Expand Down Expand Up @@ -121,10 +132,7 @@ export async function action(options: CSVRunOptions ) {
});

function log() {
logUpdate(`[substreams-sink-csv]
trace_id=${traceId} start_block=${start_block} module_hash=${moduleHash} workers=${workers}
last_block_num=${last_block_num} last_timestamp=${last_timestamp} blocks=${blocks} rows=${rows} bytes_read=${totalBytesRead} bytes_written=${totalBytesWritten}
`);
logUpdate(JSON.stringify({last_block_num, last_timestamp, blocks, rows, totalBytesRead, totalBytesWritten, runningJobs}));
}

fileCursor.onCursor(emitter, cursorFile);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.2.6",
"version": "0.2.7",
"name": "substreams-sink-csv",
"description": "Substreams Sink CSV",
"type": "module",
Expand Down

0 comments on commit 4b8152d

Please sign in to comment.