-
Notifications
You must be signed in to change notification settings - Fork 109
/
Copy pathnode.go
577 lines (506 loc) · 19 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
package cmd
import (
"encoding/hex"
"flag"
"fmt"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/deso-protocol/go-deadlock"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/core/migrate"
"github.com/dgraph-io/badger/v3"
"github.com/go-pg/pg/v10"
"github.com/golang/glog"
migrations "github.com/robinjoseph08/go-pg-migrations/v3"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/profiler"
)
type Node struct {
Server *lib.Server
ChainDB *badger.DB
TXIndex *lib.TXIndex
Params *lib.DeSoParams
Config *Config
Postgres *lib.Postgres
Listeners []net.Listener
// IsRunning is false when a NewNode is created, set to true on Start(), set to false
// after Stop() is called. Mainly used in testing.
IsRunning bool
// runningMutex is held whenever we call Start() or Stop() on the node.
runningMutex sync.Mutex
// internalExitChan is used internally to signal that a node should close.
internalExitChan chan struct{}
// nodeMessageChan is passed to the core engine and used to trigger node actions such as a restart or database reset.
nodeMessageChan chan lib.NodeMessage
// stopWaitGroup allows us to wait for the node to fully close.
stopWaitGroup sync.WaitGroup
}
func NewNode(config *Config) *Node {
result := Node{}
result.Config = config
result.Params = config.Params
result.internalExitChan = make(chan struct{})
result.nodeMessageChan = make(chan lib.NodeMessage)
return &result
}
// Start is the main function used to kick off the node. The exitChannels are optionally passed by the caller to receive
// signals from the node. In particular, exitChannels will be closed by the node when the node is shutting down for good.
func (node *Node) Start(exitChannels ...*chan struct{}) {
// TODO: Replace glog with logrus so we can also get rid of flag library
flag.Set("log_dir", node.Config.LogDirectory)
flag.Set("v", fmt.Sprintf("%d", node.Config.GlogV))
flag.Set("vmodule", node.Config.GlogVmodule)
flag.Set("alsologtostderr", fmt.Sprintf("%t", !node.Config.NoLogToStdErr))
flag.Parse()
glog.CopyStandardLogTo("INFO")
node.runningMutex.Lock()
defer node.runningMutex.Unlock()
node.internalExitChan = make(chan struct{})
node.nodeMessageChan = make(chan lib.NodeMessage)
// listenToNodeMessages handles the messages received from the engine through the nodeMessageChan.
go node.listenToNodeMessages()
// Print config
node.Config.Print()
// Check for regtest mode
if node.Config.Regtest {
node.Params.EnableRegtest(node.Config.RegtestAccelerated)
}
// Validate params
validateParams(node.Params)
// This is a bit of a hack, and we should deprecate this. We rely on GlobalDeSoParams static variable in only one
// place in the core code, namely in encoder migrations. Encoder migrations allow us to update the core database
// schema without requiring a resync. GlobalDeSoParams is used so that encoders know if we're on mainnet or testnet.
lib.GlobalDeSoParams = *node.Params
// Setup Datadog span tracer and profiler
if node.Config.DatadogProfiler {
tracer.Start()
err := profiler.Start(profiler.WithProfileTypes(profiler.CPUProfile, profiler.BlockProfile, profiler.MutexProfile, profiler.GoroutineProfile, profiler.HeapProfile))
if err != nil {
glog.Fatal(err)
}
}
if node.Config.TimeEvents {
lib.Mode = lib.EnableTimer
}
// Setup statsd
statsdClient, err := statsd.New(fmt.Sprintf("%s:%d", os.Getenv("DD_AGENT_HOST"), 8125))
if err != nil {
glog.Fatal(err)
}
// Setup listeners and peers
desoAddrMgr := addrmgr.New(node.Config.DataDirectory, net.LookupIP)
desoAddrMgr.Start()
// This just gets localhost listening addresses on the protocol port.
// Such as [{127.0.0.1 18000 } {::1 18000 }], and associated listener structs.
_, node.Listeners = GetAddrsToListenOn(node.Config.ProtocolPort)
// If --connect-ips is not passed, we will connect the addresses from
// --add-ips, DNSSeeds, and DNSSeedGenerators.
if len(node.Config.ConnectIPs) == 0 {
glog.Infof("Looking for AddIPs: %v", len(node.Config.AddIPs))
for _, host := range node.Config.AddIPs {
addIPsForHost(desoAddrMgr, host, node.Params)
}
glog.Infof("Looking for DNSSeeds: %v", len(node.Params.DNSSeeds))
for _, host := range node.Params.DNSSeeds {
addIPsForHost(desoAddrMgr, host, node.Params)
}
// This is where we connect to addresses from DNSSeeds.
if !node.Config.PrivateMode {
go addSeedAddrsFromPrefixes(desoAddrMgr, node.Params)
}
}
// Setup chain database
dbDir := lib.GetBadgerDbPath(node.Config.DataDirectory)
opts := lib.PerformanceBadgerOptions(dbDir)
opts.ValueDir = dbDir
node.ChainDB, err = badger.Open(opts)
if err != nil {
panic(err)
}
// Setup snapshot logger
if node.Config.LogDBSummarySnapshots {
lib.StartDBSummarySnapshots(node.ChainDB)
}
// Validate that we weren't passed incompatible Hypersync flags
lib.ValidateHyperSyncFlags(node.Config.HyperSync, node.Config.SyncType)
// Setup postgres using a remote URI. Postgres is not currently supported when we're in hypersync mode.
if node.Config.HyperSync && node.Config.PostgresURI != "" {
glog.Fatal("--postgres-uri is not supported when --hypersync=true. We're " +
"working on Hypersync support for Postgres though!")
}
var db *pg.DB
if node.Config.PostgresURI != "" {
options, err := pg.ParseURL(node.Config.PostgresURI)
if err != nil {
panic(err)
}
db = pg.Connect(options)
node.Postgres = lib.NewPostgres(db)
// LoadMigrations registers all the migration files in the migrate package.
// See LoadMigrations for more info.
migrate.LoadMigrations()
// Migrate the database after loading all the migrations. This is equivalent
// to running "go run migrate.go migrate". See migrate.go for a migrations CLI tool
err = migrations.Run(db, "migrate", []string{"", "migrate"})
if err != nil {
panic(err)
}
}
// Setup eventManager
eventManager := lib.NewEventManager()
var blsKeystore *lib.BLSKeystore
if node.Config.PosValidatorSeed != "" {
blsKeystore, err = lib.NewBLSKeystore(node.Config.PosValidatorSeed)
if err != nil {
panic(err)
}
}
// Setup the server. ShouldRestart is used whenever we detect an issue and should restart the node after a recovery
// process, just in case. These issues usually arise when the node was shutdown unexpectedly mid-operation. The node
// performs regular health checks to detect whenever this occurs.
shouldRestart := false
node.Server, err, shouldRestart = lib.NewServer(
node.Params,
node.Config.Regtest,
node.Listeners,
desoAddrMgr,
node.Config.ConnectIPs,
node.ChainDB,
node.Postgres,
node.Config.TargetOutboundPeers,
node.Config.MaxInboundPeers,
node.Config.MinerPublicKeys,
node.Config.NumMiningThreads,
node.Config.OneInboundPerIp,
node.Config.PeerConnectionRefreshIntervalMillis,
node.Config.HyperSync,
node.Config.SyncType,
node.Config.MaxSyncBlockHeight,
node.Config.DisableEncoderMigrations,
node.Config.RateLimitFeerate,
node.Config.MinFeerate,
node.Config.StallTimeoutSeconds,
node.Config.MaxBlockTemplatesCache,
node.Config.MinBlockUpdateInterval,
node.Config.BlockCypherAPIKey,
true,
node.Config.SnapshotBlockHeightPeriod,
node.Config.DataDirectory,
node.Config.MempoolDumpDirectory,
node.Config.DisableNetworking,
node.Config.ReadOnlyMode,
node.Config.IgnoreInboundInvs,
statsdClient,
node.Config.BlockProducerSeed,
node.Config.TrustedBlockProducerPublicKeys,
node.Config.TrustedBlockProducerStartHeight,
eventManager,
node.nodeMessageChan,
node.Config.ForceChecksum,
node.Config.StateChangeDir,
node.Config.HypersyncMaxQueueSize,
blsKeystore,
node.Config.MempoolBackupIntervalMillis,
node.Config.MempoolMaxValidationViewConnects,
node.Config.TransactionValidationRefreshIntervalMillis,
node.Config.StateSyncerMempoolTxnSyncLimit,
node.Config.CheckpointSyncingProviders,
)
if err != nil {
// shouldRestart can be true if, on the previous run, we did not finish flushing all ancestral
// records to the DB. In this case, the snapshot is corrupted and needs to be computed. See the
// comment at the top of snapshot.go for more information on how this works.
if shouldRestart {
glog.Infof(lib.CLog(lib.Red, fmt.Sprintf("Start: Got an error while starting server and shouldRestart "+
"is true. Node will be erased and resynced. Error: (%v)", err)))
node.nodeMessageChan <- lib.NodeErase
return
}
panic(err)
}
if !shouldRestart {
node.Server.Start()
// Setup TXIndex - not compatible with postgres
if node.Config.TXIndex && node.Postgres == nil {
node.TXIndex, err = lib.NewTXIndex(node.Server.GetBlockchain(), node.Params, node.Config.DataDirectory)
if err != nil {
glog.Fatal(err)
}
node.Server.TxIndex = node.TXIndex
if !shouldRestart {
node.TXIndex.Start()
}
}
}
node.IsRunning = true
if shouldRestart {
if node.nodeMessageChan != nil {
node.nodeMessageChan <- lib.NodeRestart
}
}
// Detect whenever an interrupt (Ctrl-c) or termination signals are sent.
syscallChannel := make(chan os.Signal)
signal.Notify(syscallChannel, syscall.SIGINT, syscall.SIGTERM)
go func() {
// If an internalExitChan is triggered then we won't immediately signal a shutdown to the parent context through
// the exitChannels. When internal exit is called, we will just restart the node in the background.
select {
case _, open := <-node.internalExitChan:
if !open {
return
}
case <-syscallChannel:
}
node.Stop()
for _, channel := range exitChannels {
if *channel != nil {
close(*channel)
*channel = nil
}
}
glog.Info(lib.CLog(lib.Yellow, "Core node shutdown complete"))
}()
}
func (node *Node) Stop() {
node.runningMutex.Lock()
defer node.runningMutex.Unlock()
if !node.IsRunning {
return
}
node.IsRunning = false
glog.Infof(lib.CLog(lib.Yellow, "Node is shutting down. This might take a minute. Please don't "+
"close the node now or else you might corrupt the state."))
// Server
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Stopping server..."))
if node.Server != nil {
node.Server.Stop()
}
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Server successfully stopped."))
// Snapshot
if node.Server != nil && node.Server.GetBlockchain() != nil {
snap := node.Server.GetBlockchain().Snapshot()
if snap != nil {
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Stopping snapshot..."))
snap.Stop()
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Snapshot successfully stopped."))
}
}
// TXIndex
if node.TXIndex != nil {
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Stopping TXIndex..."))
node.TXIndex.Stop()
node.closeDb(node.TXIndex.TXIndexChain.DB(), "txindex")
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: TXIndex successfully stopped."))
}
// Databases
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Closing all databases..."))
if node.ChainDB != nil {
node.closeDb(node.ChainDB, "chain")
}
if node.Server != nil && node.Server.GetBlockchain() != nil {
blockchainDb := node.Server.GetBlockchain().DB()
node.closeDb(blockchainDb, "blockchain DB")
}
node.stopWaitGroup.Wait()
glog.Infof(lib.CLog(lib.Yellow, "Node.Stop: Databases successfully closed."))
if node.internalExitChan != nil {
close(node.internalExitChan)
node.internalExitChan = nil
}
}
// Close a database and handle the stopWaitGroup accordingly. We close databases in a go routine to speed up the process.
func (node *Node) closeDb(db *badger.DB, dbName string) {
node.stopWaitGroup.Add(1)
glog.Infof("Node.closeDb: Preparing to close %v db", dbName)
go func() {
defer node.stopWaitGroup.Done()
if err := db.Close(); err != nil {
glog.Fatalf(lib.CLog(lib.Red, fmt.Sprintf("Node.Stop: Problem closing %v db: err: (%v)", dbName, err)))
} else {
glog.Infof(lib.CLog(lib.Yellow, fmt.Sprintf("Node.closeDb: Closed %v Db", dbName)))
}
}()
}
// listenToNodeMessages listens to the communication from the engine through the nodeMessageChan. There are currently
// two main operations that the engine can request. These are a regular node restart, and a restart with a database
// erase. The latter may seem a little harsh, but it is only triggered when the node is really broken and there's
// no way we can recover.
func (node *Node) listenToNodeMessages(exitChannels ...*chan struct{}) {
select {
case <-node.internalExitChan:
break
case operation := <-node.nodeMessageChan:
if !node.IsRunning {
panic("Node.listenToNodeMessages: Node is currently not running, nodeMessageChan should've not been called!")
}
glog.Infof("Node.listenToNodeMessages: Stopping node")
node.Stop()
glog.Infof("Node.listenToNodeMessages: Finished stopping node")
switch operation {
case lib.NodeErase:
glog.Error("Not actually erasing node")
// TODO: Clean up this path. This NodeErase code was added when we upgraded to HyperSync,
// but it's not worth compromising the node if a false positive sends us here.
//if err := os.RemoveAll(node.Config.DataDirectory); err != nil {
// glog.Fatal(lib.CLog(lib.Red, fmt.Sprintf("IMPORTANT: Problem removing the directory (%v), you "+
// "should run `rm -rf %v` to delete it manually. Error: (%v)", node.Config.DataDirectory,
// node.Config.DataDirectory, err)))
// return
//}
}
glog.Infof("Node.listenToNodeMessages: Restarting node")
// Wait a few seconds so that all peer messages we've sent while closing the node get propagated in the network.
go node.Start(exitChannels...)
break
}
}
func validateParams(params *lib.DeSoParams) {
if params.BitcoinBurnAddress == "" {
glog.Fatalf("The DeSoParams being used are missing the BitcoinBurnAddress field.")
}
// Check that TimeBetweenDifficultyRetargets is evenly divisible
// by TimeBetweenBlocks.
if params.TimeBetweenBlocks == 0 {
glog.Fatalf("The DeSoParams being used have TimeBetweenBlocks=0")
}
numBlocks := params.TimeBetweenDifficultyRetargets / params.TimeBetweenBlocks
truncatedTime := params.TimeBetweenBlocks * numBlocks
if truncatedTime != params.TimeBetweenDifficultyRetargets {
glog.Fatalf("TimeBetweenDifficultyRetargets (%v) should be evenly divisible by "+
"TimeBetweenBlocks (%v)", params.TimeBetweenDifficultyRetargets,
params.TimeBetweenBlocks)
}
if params.GenesisBlock == nil || params.GenesisBlockHashHex == "" {
glog.Fatalf("The DeSoParams are missing genesis block info.")
}
// Compute the merkle root for the genesis block and make sure it matches.
merkle, _, err := lib.ComputeMerkleRoot(params.GenesisBlock.Txns)
if err != nil {
glog.Fatalf("Could not compute a merkle root for the genesis block: %v", err)
}
if *merkle != *params.GenesisBlock.Header.TransactionMerkleRoot {
glog.Fatalf("Genesis block merkle root (%s) not equal to computed merkle root (%s)",
hex.EncodeToString(params.GenesisBlock.Header.TransactionMerkleRoot[:]),
hex.EncodeToString(merkle[:]))
}
genesisHash, err := params.GenesisBlock.Header.Hash()
if err != nil {
glog.Fatalf("Problem hashing header for the GenesisBlock in "+
"the DeSoParams (%+v): %v", params.GenesisBlock.Header, err)
}
genesisHashHex := hex.EncodeToString(genesisHash[:])
if genesisHashHex != params.GenesisBlockHashHex {
glog.Fatalf("GenesisBlockHash in DeSoParams (%s) does not match the block "+
"hash computed (%s) %d %d", params.GenesisBlockHashHex, genesisHashHex, len(params.GenesisBlockHashHex), len(genesisHashHex))
}
if params.MinDifficultyTargetHex == "" {
glog.Fatalf("The DeSoParams MinDifficultyTargetHex (%s) should be non-empty",
params.MinDifficultyTargetHex)
}
// Check to ensure the genesis block hash meets the initial difficulty target.
hexBytes, err := hex.DecodeString(params.MinDifficultyTargetHex)
if err != nil || len(hexBytes) != 32 {
glog.Fatalf("The DeSoParams MinDifficultyTargetHex (%s) with length (%d) is "+
"invalid: %v", params.MinDifficultyTargetHex, len(params.MinDifficultyTargetHex), err)
}
if params.MaxDifficultyRetargetFactor == 0 {
glog.Fatalf("The DeSoParams MaxDifficultyRetargetFactor is unset")
}
}
func GetAddrsToListenOn(protocolPort uint16) ([]net.TCPAddr, []net.Listener) {
listeningAddrs := []net.TCPAddr{}
listeners := []net.Listener{}
ifaceAddrs, err := net.InterfaceAddrs()
if err != nil {
return nil, nil
}
for _, iAddr := range ifaceAddrs {
ifaceIP, _, err := net.ParseCIDR(iAddr.String())
if err != nil {
continue
}
if ifaceIP.IsLinkLocalUnicast() {
continue
}
netAddr := net.TCPAddr{
IP: ifaceIP,
Port: int(protocolPort),
}
listener, err := net.Listen(netAddr.Network(), netAddr.String())
if err != nil {
continue
}
listeners = append(listeners, listener)
listeningAddrs = append(listeningAddrs, netAddr)
}
return listeningAddrs, listeners
}
func addIPsForHost(desoAddrMgr *addrmgr.AddrManager, host string, params *lib.DeSoParams) {
ipAddrs, err := net.LookupIP(host)
if err != nil {
glog.V(2).Infof("_addSeedAddrs: DNS discovery failed on seed host (continuing on): %s %v\n", host, err)
return
}
if len(ipAddrs) == 0 {
glog.V(2).Infof("_addSeedAddrs: No IPs found for host: %s\n", host)
return
}
// Don't take more than 5 IPs per host.
ipsPerHost := 5
if len(ipAddrs) > ipsPerHost {
glog.V(1).Infof("_addSeedAddrs: Truncating IPs found from %d to %d\n", len(ipAddrs), ipsPerHost)
ipAddrs = ipAddrs[:ipsPerHost]
}
glog.V(1).Infof("_addSeedAddrs: Adding seed IPs from seed %s: %v\n", host, ipAddrs)
// Convert addresses to NetAddress'es.
netAddrs, err := lib.SafeMakeSliceWithLength[*wire.NetAddressV2](uint64(len(ipAddrs)))
if err != nil {
glog.V(2).Infof("_addSeedAddrs: Problem creating netAddrs slice with length %d", len(ipAddrs))
return
}
for ii, ip := range ipAddrs {
netAddrs[ii] = wire.NetAddressV2FromBytes(
// We initialize addresses with a
// randomly selected "last seen time" between 3
// and 7 days ago similar to what bitcoind does.
time.Now().Add(-1*time.Second*time.Duration(lib.SecondsIn3Days+
lib.RandInt32(lib.SecondsIn4Days))),
0,
ip,
params.DefaultSocketPort)
}
glog.V(1).Infof("_addSeedAddrs: Computed the following wire.NetAddress'es: %s", spew.Sdump(netAddrs))
// Normally the second argument is the source who told us about the
// addresses we're adding. In this case since the source is a DNS seed
// just use the first address in the fetch as the source.
desoAddrMgr.AddAddresses(netAddrs, netAddrs[0])
}
// Must be run in a goroutine. This function continuously adds IPs from a DNS seed
// prefix+suffix by iterating up through all of the possible numeric values, which are typically
// [0, 10]
func addSeedAddrsFromPrefixes(desoAddrMgr *addrmgr.AddrManager, params *lib.DeSoParams) {
MaxIterations := 20
go func() {
for dnsNumber := 0; dnsNumber < MaxIterations; dnsNumber++ {
var wg deadlock.WaitGroup
for _, dnsGeneratorOuter := range params.DNSSeedGenerators {
wg.Add(1)
go func(dnsGenerator []string) {
dnsString := fmt.Sprintf("%s%d%s", dnsGenerator[0], dnsNumber, dnsGenerator[1])
glog.V(2).Infof("_addSeedAddrsFromPrefixes: Querying DNS seed: %s", dnsString)
addIPsForHost(desoAddrMgr, dnsString, params)
wg.Done()
}(dnsGeneratorOuter)
}
wg.Wait()
}
}()
}