Skip to content

Commit

Permalink
add rpc poller for light blocks + --dev flag
Browse files Browse the repository at this point in the history
* added `tools poll-rpc-blocks` command to launch an RPC-based poller that acts as a firehose extractor node, printing base64-encoded protobuf blocks to stdout (used by the 'dev' node-type)
* added `--dev` flag to the `start` command to simplify running a local firehose+substreams stack from a development node (ex: Hardhat).
  * This flag overrides the `--reader-node-path`, instead pointing to the fireeth binary itself.
  * This flag overrides the `--reader-node-type`, setting it to `dev` instead of `geth`.
    This node type has the following default `reader-node-arguments`: `tools poll-rpc-blocks http://localhost:8545 0`
  * It also removes `node` from the list of default apps
  • Loading branch information
sduchesneau committed Oct 24, 2023
1 parent c496607 commit 800d283
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 11 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See [MAINTAINERS.md](./MAINTAINERS.md)
for instructions to keep up to date.

## Unreleased

## Added

* added `tools poll-rpc-blocks` command to launch an RPC-based poller that acts as a firehose extractor node, printing base64-encoded protobuf blocks to stdout (used by the 'dev' node-type). It creates "light" blocks, without traces and ordinals.
* added `--dev` flag to the `start` command to simplify running a local firehose+substreams stack from a development node (ex: Hardhat).
* This flag overrides the `--reader-node-path`, instead pointing to the fireeth binary itself.
* This flag overrides the `--reader-node-type`, setting it to `dev` instead of `geth`.
This node type has the following default `reader-node-arguments`: `tools poll-rpc-blocks http://localhost:8545 0`
* It also removes `node` from the list of default apps

## v1.4.19

* Bumped substreams to `v1.1.18` with a regression fix for when a substreams has a start block in the reversible segment
Expand Down
1 change: 1 addition & 0 deletions cmd/fireeth/cli/common-flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func RegisterCommonFlags(_ *zap.Logger, cmd *cobra.Command) error {
cmd.Flags().String("common-forked-blocks-store-url", ForkedBlocksStoreURL, "[COMMON] Store URL (with prefix) to read/write forked block files that we want to keep")
cmd.Flags().String("common-index-store-url", IndexStoreURL, "[COMMON] Store URL (with prefix) to read/write index files.")
cmd.Flags().String("common-live-blocks-addr", RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.")
cmd.Flags().Bool("dev", false, "[COMMON] Runs in local dev mode (remove 'node' app from defaults, overrides '--reader-node-path' to this program itself, overrides '--reader-node-arguments' to 'tools poll-rpc-blocks http://localhost:8545 0'")

cmd.Flags().IntSlice("common-block-index-sizes", []int{100000, 100000, 10000, 1000}, "index bundle sizes that that are considered valid when looking for block indexes")
cmd.Flags().Bool("common-blocks-cache-enabled", false, FlagDescription(`
Expand Down
26 changes: 22 additions & 4 deletions cmd/fireeth/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/firehose-ethereum/codec"
nodemanager "github.com/streamingfast/firehose-ethereum/node-manager"
"github.com/streamingfast/firehose-ethereum/node-manager/dev"
"github.com/streamingfast/firehose-ethereum/node-manager/geth"
"github.com/streamingfast/firehose-ethereum/node-manager/openeth"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -253,7 +254,7 @@ func isGenesisBootstrapper(bootstrapDataURL string) bool {

func getSupervisedProcessLogger(isReader bool, nodeType string) *zap.Logger {
switch nodeType {
case "geth":
case "geth", "dev":
if isReader {
return readerGethLogger
} else {
Expand All @@ -266,7 +267,7 @@ func getSupervisedProcessLogger(isReader bool, nodeType string) *zap.Logger {
return nodeOpenEthereumLogger
}
default:
panic(fmt.Errorf("unknown node type %q, only knows about %q and %q", nodeType, "geth", "openethereum"))
panic(fmt.Errorf("unknown node type %q, only knows about %q, %q and %q", nodeType, "geth", "openethereum", "dev"))
}
}

Expand All @@ -284,6 +285,9 @@ var nodeArgsByTypeAndRole = map[string]nodeArgsByRole{
//"dev-miner": ...
"reader": "--network-id={network-id} --ipc-path={node-ipc-path} --base-path={node-data-dir} --port=" + ReaderNodeP2PPort + " --jsonrpc-port=" + ReaderNodeRPCPort + " --jsonrpc-apis=web3,net,eth,parity,parity,parity_pubsub,parity_accounts,parity_set --firehose-enabled --no-warp",
},
"dev": {
"reader": "tools poll-rpc-blocks http://localhost:8545 0",
},
}

func registerEthereumNodeFlags(cmd *cobra.Command) error {
Expand All @@ -301,8 +305,8 @@ func registerCommonNodeFlags(cmd *cobra.Command, isReader bool) {
managerAPIAddr = ReaderNodeManagerAPIAddr
}

cmd.Flags().String(prefix+"path", "geth", "command that will be launched by the node manager")
cmd.Flags().String(prefix+"type", "geth", "one of: ['geth','openethereum']")
cmd.Flags().String(prefix+"path", "geth", "command that will be launched by the node manager (ignored on type 'dev')")
cmd.Flags().String(prefix+"type", "dev", "one of: ['dev', 'geth','openethereum']")
cmd.Flags().String(prefix+"arguments", "", "If not empty, overrides the list of default node arguments (computed from node type and role). Start with '+' to append to default args instead of replacing. You can use the {public-ip} token, that will be matched against space-separated hostname:IP pairs in PUBLIC_IPS env var, taking hostname from HOSTNAME env var.")
cmd.Flags().String(prefix+"data-dir", "{sf-data-dir}/{node-role}/data", "Directory for node data ({node-role} is either reader, peering or dev-miner)")
cmd.Flags().String(prefix+"ipc-path", "{sf-data-dir}/{node-role}/ipc", "IPC path cannot be more than 64chars on geth")
Expand Down Expand Up @@ -450,6 +454,20 @@ func buildSuperviser(
) (nodeManager.ChainSuperviser, error) {

switch nodeType {
case "dev":
superviser, err := dev.NewSuperviser(
nodePath,
nodeArguments,
metricsAndReadinessManager.UpdateHeadBlock,
appLogger,
supervisedProcessLogger,
)
if err != nil {
return nil, fmt.Errorf("unable to create chain superviser: %w", err)
}

return superviser, nil

case "geth":
superviser, err := geth.NewGethSuperviser(
nodePath,
Expand Down
11 changes: 11 additions & 0 deletions cmd/fireeth/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cli
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -70,6 +71,14 @@ func sfStartE(cmd *cobra.Command, args []string) (err error) {
}

func Start(ctx context.Context, dataDir string, args []string) (err error) {

devMode := viper.GetBool("dev")
if devMode {
viper.Set("reader-node-type", "dev")
viper.Set("reader-node-path", os.Args[0])
// viper.Set("reader-node-arguments", "tools poll-rpc-blocks http://localhost:8545 0") // set in node-type template 'dev'
}

dataDirAbs, err := filepath.Abs(dataDir)
if err != nil {
return fmt.Errorf("unable to setup directory structure: %w", err)
Expand Down Expand Up @@ -122,6 +131,8 @@ func Start(ctx context.Context, dataDir string, args []string) (err error) {
zlog.Debug("launcher created")
runByDefault := func(app string) bool {
switch app {
case "node":
return !devMode
case "reader-node-stdin":
return false
}
Expand Down
4 changes: 4 additions & 0 deletions codec/console_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,10 @@ func (ctx *parseCtx) readCodeChange(line string) error {
// Formats
// FIRE BLOCK <NUMBER (u64 string)> <HASH (hex string)> <LIB NUMBER (u64 string)> <LIB ID (hex string)> <proto (base64 string)>
func (ctx *parseCtx) readBlock(line string) (*bstream.Block, error) {
if ctx.blockVersion == 0 {
return nil, fmt.Errorf("cannot start reading block: INIT not done")
}

start := time.Now()

chunks, err := SplitInBoundedChunks(line, 6)
Expand Down
227 changes: 227 additions & 0 deletions node-manager/dev/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2021 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dev

import (
"archive/tar"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"time"

"github.com/streamingfast/dstore"
"go.uber.org/zap"
)

// GenesisBootstrapper needs to write genesis file, static node file, then run a command like 'geth init'
type GenesisBootstrapper struct {
dataDir string
genesisFileURL string
cmdArgs []string
nodePath string
// staticNodesFilepath string
logger *zap.Logger
}

func NewGenesisBootstrapper(dataDir string, genesisFileURL string, nodePath string, cmdArgs []string, logger *zap.Logger) *GenesisBootstrapper {
return &GenesisBootstrapper{
dataDir: dataDir,
genesisFileURL: genesisFileURL,
nodePath: nodePath,
cmdArgs: cmdArgs,
logger: logger,
}
}

func downloadDstoreObject(url string, destpath string) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

reader, _, _, err := dstore.OpenObject(ctx, url)
if err != nil {
return fmt.Errorf("cannot get file from store: %w", err)
}
defer reader.Close()
data, err := ioutil.ReadAll(reader)
if err != nil {
return err
}

return ioutil.WriteFile(destpath, data, 0644)
}

func (b *GenesisBootstrapper) Bootstrap() error {
if b.genesisFileURL == "" || isBootstrapped(b.dataDir, b.logger) {
return nil
}

genesisFilePath := filepath.Join(b.dataDir, "genesis.json")

b.logger.Info("running bootstrap sequence", zap.String("data_dir", b.dataDir), zap.String("genesis_file_path", genesisFilePath))
if err := os.MkdirAll(b.dataDir, 0755); err != nil {
return fmt.Errorf("cannot create folder %s to bootstrap node: %w", b.dataDir, err)
}

if !fileExists(genesisFilePath) {
b.logger.Info("fetching genesis file", zap.String("source_url", b.genesisFileURL))
if err := downloadDstoreObject(b.genesisFileURL, genesisFilePath); err != nil {
return err
}
}

cmd := exec.Command(b.nodePath, b.cmdArgs...)
b.logger.Info("running node init command (creating genesis block from genesis.json)", zap.Stringer("cmd", cmd))
if output, err := runCmd(cmd); err != nil {
return fmt.Errorf("failed to init node (output %s): %w", output, err)
}

return nil
}

func NewTarballBootstrapper(
url string,
dataDir string,
logger *zap.Logger,
) *TarballBootstrapper {
return &TarballBootstrapper{
url: url,
dataDir: dataDir,
logger: logger,
}
}

type TarballBootstrapper struct {
url string
dataDir string
logger *zap.Logger
}

func isBootstrapped(dataDir string, logger *zap.Logger) bool {
var foundCURRENT bool
err := filepath.Walk(dataDir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Base(path) == "CURRENT" {
foundCURRENT = true
return io.EOF
}
return nil
})
if err != nil && !os.IsNotExist(err) && err != io.EOF {
logger.Warn("error while checking for bootstrapped status", zap.Error(err))
}

return foundCURRENT
}

func (b *TarballBootstrapper) isBootstrapped() bool {
return isBootstrapped(b.dataDir, b.logger)
}

func (b *TarballBootstrapper) Bootstrap() error {
if b.isBootstrapped() {
return nil
}

b.logger.Info("bootstrapping geth chain data from pre-built data", zap.String("bootstrap_data_url", b.url))

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()

reader, _, _, err := dstore.OpenObject(ctx, b.url, dstore.Compression("zstd"))
if err != nil {
return fmt.Errorf("cannot get snapshot from gstore: %w", err)
}
defer reader.Close()

b.createChainData(reader)
return nil
}

func (b *TarballBootstrapper) createChainData(reader io.Reader) error {
err := os.MkdirAll(b.dataDir, os.ModePerm)
if err != nil {
return fmt.Errorf("unable to create blocks log file: %w", err)
}

b.logger.Info("extracting bootstrapping data into node data directory", zap.String("data_dir", b.dataDir))
tr := tar.NewReader(reader)
for {
header, err := tr.Next()
if err != nil {
if err == io.EOF {
return nil
}

return err
}

path := filepath.Join(b.dataDir, header.Name)
b.logger.Debug("about to write content of entry", zap.String("name", header.Name), zap.String("path", path), zap.Bool("is_dir", header.FileInfo().IsDir()))
if header.FileInfo().IsDir() {
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return fmt.Errorf("unable to create directory: %w", err)
}

continue
}

file, err := os.Create(path)
if err != nil {
return fmt.Errorf("unable to create file: %w", err)
}

if _, err := io.Copy(file, tr); err != nil {
file.Close()
return err
}
file.Close()
}
}

func dirExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return info.IsDir()
}

func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
func runCmd(cmd *exec.Cmd) (string, error) {
// This runs (and wait) the command, combines both stdout and stderr in a single stream and return everything
out, err := cmd.CombinedOutput()
if err == nil {
return "", nil
}

return string(out), err
}
Loading

0 comments on commit 800d283

Please sign in to comment.