forked from simpleledger/SLPDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.ts
288 lines (254 loc) · 11.6 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import * as dotenv from 'dotenv';
dotenv.config()
import { Bit } from './bit';
import { Db } from './db';
import { RpcClient } from './rpc';
import { Config } from './config';
import { SlpdbStatus } from './status';
import { Info, ChainSyncCheckpoint } from './info';
import { SlpGraphManager } from './slpgraphmanager';
import { TokenFilterRule, TokenFilter } from './filters';
import { BlockchainInfoResult } from 'bitcoin-com-rest';
const sp = require("synchronized-promise");
new RpcClient({ useGrpc: Boolean(Config.grpc.url) });
let getBlockchainInfoSync: () => BlockchainInfoResult = sp(RpcClient.getBlockchainInfo);
let setNetworkSync: (network: string) => void = sp(Info.setNetwork);
let chain = getBlockchainInfoSync().chain;
let network = chain === 'test' || chain === 'regtest' ? 'testnet' : 'mainnet';
setNetworkSync(network);
let db = new Db({
dbName: network === 'mainnet' ? Config.db.name : Config.db.name_testnet,
dbUrl: Config.db.url,
config: Config.db
});
let bit = new Bit(db);
new SlpdbStatus(db, process.argv);
let tokenManager: SlpGraphManager;
const daemon = {
run: async function({ startHeight, loadFromDb=true }: { startHeight?: number, loadFromDb?: boolean} ) {
// persist updated SLPDB status every 10 minutes
await SlpdbStatus.loadPreviousAttributes();
setInterval(async function() {
await SlpdbStatus.saveStatus();
}, 60000);
await bit.init();
// test RPC connection
console.log("[INFO] Testing RPC connection...");
await RpcClient.getBlockCount();
console.log("[INFO] RPC is initialized.");
// set start height override
if(startHeight)
await Info.updateBlockCheckpoint(startHeight, null);
await SlpdbStatus.saveStatus();
// try to load tokens filter yaml
let filter = TokenFilter.loadFromFile();
// check for confirmed collection schema update
let schema = await Info.getConfirmedCollectionSchema();
if(!schema || schema !== Config.db.confirmed_schema_version) {
await Info.setConfirmedCollectionSchema(Config.db.confirmed_schema_version);
await Info.checkpointReset();
console.log("[INFO] Schema version for the confirmed collection was updated. Reseting block checkpoint reset to", (await Info.getBlockCheckpoint()).height)
}
let lastSynchronized = <ChainSyncCheckpoint>await Info.getBlockCheckpoint((await Info.getNetwork()) === 'mainnet' ? Config.core.from : Config.core.from_testnet);
console.log("reprocessFrom: ", lastSynchronized.height);
if(lastSynchronized.height > await bit.requestheight()) {
lastSynchronized = await Bit.checkForBlockReorg(lastSynchronized);
//throw Error("Config.core.from or Config.core.from_testnet cannot be larger than the current blockchain height (check the config.ts file)");
}
console.time('[PERF] Indexing Keys');
let from = (await Info.getNetwork()) === 'mainnet' ? Config.core.from : Config.core.from_testnet;
if (lastSynchronized.height === from) {
console.log('[INFO] Indexing MongoDB With Configured Keys...', new Date());
await db.confirmedIndex();
}
console.timeEnd('[PERF] Indexing Keys');
console.log('[INFO] Synchronizing SLPDB with BCH blockchain data...', new Date());
console.time('[PERF] Initial Block Sync');
await SlpdbStatus.changeStateToStartupBlockSync({
network, getSyncdCheckpoint: async () => await Info.getBlockCheckpoint()
});
await bit.processBlocksForTNA();
await bit.processCurrentMempoolForTNA();
console.timeEnd('[PERF] Initial Block Sync');
console.log('[INFO] SLPDB Synchronization with BCH blockchain data complete.', new Date());
console.log('[INFO] Starting to processing SLP Data.', new Date());
let currentHeight = await RpcClient.getBlockCount();
tokenManager = new SlpGraphManager(db, currentHeight, network, bit, filter);
bit._slpGraphManager = tokenManager;
bit.listenToZmq();
await bit.checkForMissingMempoolTxns(undefined, true);
let onComplete = async () => {
await tokenManager._startupQueue.onIdle();
console.log("[INFO] Starting to process graph based on recent mempool and block activity");
tokenManager._updatesQueue.start();
await tokenManager._updatesQueue.onIdle();
console.log("[INFO] Updates from recent mempool and block activity complete");
await tokenManager.fixMissingTokenTimestamps();
await tokenManager._bit.handleConfirmedTxnsMissingSlpMetadata();
await SlpdbStatus.changeStateToRunning({
getSlpMempoolSize: () => tokenManager._bit.slpMempool.size
});
console.log("[INFO] initAllTokens complete");
}
await tokenManager.initAllTokens({ reprocessFrom: lastSynchronized.height, onComplete, loadFromDb: loadFromDb });
// // look for burned token transactions every hour after startup
// setInterval(async function() {
// if(tokenManager._startupQueue.size === 0 && tokenManager._startupQueue.pending === 0) {
// await tokenManager.searchForNonSlpBurnTransactions();
// }
// }, 3600000);
}
}
const util = {
reprocess_token: async function(tokenId: string) {
let network = (await RpcClient.getBlockchainInfo())!.chain === 'test' ? 'testnet' : 'mainnet'
await Info.setNetwork(network);
await bit.init();
console.log('[INFO] Synchronizing SLPDB with BCH blockchain data...', new Date());
console.time('[PERF] Initial Block Sync');
await bit.processBlocksForTNA();
await bit.processCurrentMempoolForTNA();
console.timeEnd('[PERF] Initial Block Sync');
console.log('[INFO] SLPDB Synchronization with BCH blockchain data complete.', new Date());
let filter = new TokenFilter();
filter.addRule(new TokenFilterRule({ name: "unknown", info: tokenId, type: 'include-single'}));
let currentHeight = await RpcClient.getBlockCount();
let tokenManager = new SlpGraphManager(db, currentHeight, network, bit, filter);
bit._slpGraphManager = tokenManager;
bit.listenToZmq();
await tokenManager.initAllTokens({ reprocessFrom: 0, loadFromDb: false });
await tokenManager._startupQueue.onIdle();
console.log("[INFO] Reprocess done.");
process.exit(1);
},
reset_to_block: async function(block_height: number) { //592340
let includeTokenIds = [
"8aab2185354926d72c6a8f6bf7e403daaf1469c02e00a5ad5981b84ea776d980",
]
let filter = new TokenFilter();
includeTokenIds.forEach(i => {
filter.addRule(new TokenFilterRule({ name: "unknown", info: i, type: 'include-single'}));
});
let network = (await RpcClient.getBlockchainInfo())!.chain === 'test' ? 'testnet' : 'mainnet';
await Info.setNetwork(network);
let currentHeight = await RpcClient.getBlockCount();
let tokenManager = new SlpGraphManager(db, currentHeight, network, bit, filter);
await tokenManager.initAllTokens({ reprocessFrom: 0, reprocessTo: block_height });
await tokenManager._startupQueue.onIdle();
let blockhash = await RpcClient.getBlockHash(block_height+1);
await tokenManager.onBlockHash(blockhash);
await Info.updateBlockCheckpoint(block_height, null);
console.log("[INFO] Reset block done.")
process.exit(1);
}
}
const start = async function() {
let args = process.argv;
if (args.length > 2) {
if(args[2] === "run") {
let options: any = {};
if(args.includes("--reprocess")) {
console.log("[INFO] Reprocessing all tokens.");
options.loadFromDb = false;
}
if(args.includes("--startHeight")) {
let index = args.indexOf("--startHeight");
console.log("[INFO] Resync from startHeight:", index);
options.startHeight = parseInt(args[index+1]);
}
await daemon.run(options);
}
else if(args[2] === "reprocess")
await util.reprocess_token(process.argv[3]);
else if(args[2] === "goToBlock")
await util.reset_to_block(parseInt(process.argv[3]));
} else {
throw Error("No command provided after 'node ./index.js'.");
}
}
// @ts-ignore
process.on('uncaughtException', async (err: any, origin: any) => {
console.log("[ERROR] uncaughtException", err);
var message;
if(err.stack)
message = `[${(new Date()).toUTCString()}] ${err.stack}`;
else if(err.message)
message = `[${(new Date()).toUTCString()}] ${err.message}`;
else if(typeof message === 'string')
message = `[${(new Date()).toUTCString()}] ${err}`;
else if(typeof message === 'object')
message = `[${(new Date()).toUTCString()}] ${JSON.stringify(err)}`;
else
message = `[${(new Date()).toUTCString()}] SLPDB exited for an unknown reason.`
try {
await SlpdbStatus.logExitReason(message);
console.log(err);
console.log('[INFO] Shutting down SLPDB...', new Date().toString());
await db.exit();
} catch(error) {
console.log("[ERROR] Could not log to DB:", error);
} finally {
process.exit(0);
}
});
process.on('unhandledRejection', async (err: any, promise: any) => {
console.log("[ERROR] unhandledRejection", err);
var message;
if(err.stack)
message = `[${(new Date()).toUTCString()}] ${err.stack}`;
else if(err.message)
message = `[${(new Date()).toUTCString()}] ${err.message}`;
else if(typeof message === 'string')
message = `[${(new Date()).toUTCString()}] ${err}`;
else if(typeof message === 'object')
message = `[${(new Date()).toUTCString()}] ${JSON.stringify(err)}`;
else
message = `[${(new Date()).toUTCString()}] SLPDB exited for an unknown reason.`
try {
await SlpdbStatus.logExitReason(message);
console.log(err);
console.log('[INFO] Shutting down SLPDB...', new Date().toString());
await db.exit();
} catch(error) {
console.log("[ERROR] Could not log to DB:", error);
} finally {
process.exit(0);
}
});
process.on('SIGINT', async () => {
await shutdown('SIGINT');
});
process.on('SIGTERM', async () => {
await shutdown('SIGTERM');
});
async function shutdown(signal: string) {
console.log(`[INFO] Got ${signal}. Graceful shutdown start ${new Date().toISOString()}`);
try {
console.log('[INFO] Block sync processing stopped.');
} catch(_) {}
try {
bit._zmqItemQueue.pause();
console.log('[INFO] ZMQ processing stopped.');
} catch (_) {}
try {
await tokenManager.stop();
let tokens = Array.from(tokenManager._tokens);
for (let i = 0; i < tokens.length; i++) {
await tokens[i][1].stop();
}
console.log('[INFO] Token graph processing stopped.');
} catch (_) {}
try {
await SlpdbStatus.logExitReason(signal);
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
await sleep(2000);
console.log('[INFO] Final telemetry update complete.');
} catch(_) {}
try {
await db.exit();
console.log('[INFO] Closed mongo DB connection.');
} catch (_) {}
console.log(`[INFO] Graceful shutdown completed ${new Date().toISOString()}`);
process.exit();
}
start();