Skip to content

Commit

Permalink
exp/services/ledgerexporter: Add metadata to exported files (#5324)
Browse files Browse the repository at this point in the history
* Add versioning in docker build
* Remove unused docker buildargs. Improve error handling when parsing core version

---------
Co-authored-by: tamirms <[email protected]>
  • Loading branch information
urvisavla authored Jun 3, 2024
1 parent 083b7bb commit 34b0e4c
Show file tree
Hide file tree
Showing 17 changed files with 659 additions and 96 deletions.
4 changes: 2 additions & 2 deletions exp/services/ledgerexporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo")

# https://github.com/opencontainers/image-spec/blob/master/annotations.md
BUILD_DATE := $(shell date -u +%FT%TZ)
VERSION ?= $(shell git rev-parse --short HEAD)
VERSION ?= 1.0.0-$(shell git rev-parse --short HEAD)
DOCKER_IMAGE := stellar/ledger-exporter

docker-build:
cd ../../../ && \
$(SUDO) docker build --platform linux/amd64 --pull --label org.opencontainers.image.created="$(BUILD_DATE)" \
--build-arg VERSION=$(VERSION) \
--build-arg GOFLAGS="-ldflags=-X=github.com/stellar/go/exp/services/ledgerexporter/internal.version=$(VERSION)" \
$(if $(STELLAR_CORE_VERSION), --build-arg STELLAR_CORE_VERSION=$(STELLAR_CORE_VERSION)) \
-f exp/services/ledgerexporter/docker/Dockerfile \
-t $(DOCKER_IMAGE):$(VERSION) \
Expand Down
1 change: 1 addition & 0 deletions exp/services/ledgerexporter/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ RUN go mod download

COPY . ./

ARG GOFLAGS
RUN go install github.com/stellar/go/exp/services/ledgerexporter

FROM ubuntu:22.04
Expand Down
11 changes: 7 additions & 4 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const (
)

var (
logger = log.New().WithField("service", "ledger-exporter")
logger = log.New().WithField("service", "ledger-exporter")
version = "develop"
)

func NewDataAlreadyExportedError(Start uint32, End uint32) *DataAlreadyExportedError {
Expand Down Expand Up @@ -95,13 +96,15 @@ func (a *App) init(ctx context.Context) error {
var err error
var archive historyarchive.ArchiveInterface

logger.Infof("Starting Ledger Exporter with version %s", version)

registry := prometheus.NewRegistry()
registry.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{Namespace: "ledger_exporter"}),
collectors.NewGoCollector(),
)

if a.config, err = NewConfig(ctx, a.flags); err != nil {
if a.config, err = NewConfig(a.flags); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, a.config.Network); err != nil {
Expand All @@ -126,7 +129,7 @@ func (a *App) init(ctx context.Context) error {
}

queue := NewUploadQueue(uploadQueueCapacity, registry)
if a.exportManager, err = NewExportManager(a.config.LedgerBatchConfig, a.ledgerBackend, queue, registry); err != nil {
if a.exportManager, err = NewExportManager(a.config, a.ledgerBackend, queue, registry); err != nil {
return err
}
a.uploader = NewUploader(a.dataStore, queue, registry)
Expand Down Expand Up @@ -256,7 +259,7 @@ func (a *App) Run() {
// newLedgerBackend Creates and initializes captive core ledger backend
// Currently, only supports captive-core as ledger backend
func newLedgerBackend(config *Config, prometheusRegistry *prometheus.Registry) (ledgerbackend.LedgerBackend, error) {
captiveConfig, err := config.GenerateCaptiveCoreConfig()
captiveConfig, err := config.generateCaptiveCoreConfig()
if err != nil {
return nil, err
}
Expand Down
41 changes: 36 additions & 5 deletions exp/services/ledgerexporter/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package ledgerexporter
import (
"context"
_ "embed"
"fmt"
"os/exec"
"strings"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
Expand Down Expand Up @@ -45,15 +47,16 @@ type Config struct {
StartLedger uint32
EndLedger uint32
Resume bool

CoreVersion string
}

// This will generate the config based on commandline flags and toml
//
// ctx - the caller context
// flags - command line flags
//
// return - *Config or an error if any range validation failed.
func NewConfig(ctx context.Context, flags Flags) (*Config, error) {
func NewConfig(flags Flags) (*Config, error) {
config := &Config{}

config.StartLedger = uint32(flags.StartLedger)
Expand Down Expand Up @@ -100,17 +103,21 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his
return nil
}

func (config *Config) GenerateCaptiveCoreConfig() (ledgerbackend.CaptiveCoreConfig, error) {
func (config *Config) generateCaptiveCoreConfig() (ledgerbackend.CaptiveCoreConfig, error) {
coreConfig := &config.StellarCoreConfig

// Look for stellar-core binary in $PATH, if not supplied
if coreConfig.StellarCoreBinaryPath == "" {
if config.StellarCoreConfig.StellarCoreBinaryPath == "" {
var err error
if coreConfig.StellarCoreBinaryPath, err = exec.LookPath("stellar-core"); err != nil {
if config.StellarCoreConfig.StellarCoreBinaryPath, err = exec.LookPath("stellar-core"); err != nil {
return ledgerbackend.CaptiveCoreConfig{}, errors.Wrap(err, "Failed to find stellar-core binary")
}
}

if err := config.setCoreVersionInfo(); err != nil {
return ledgerbackend.CaptiveCoreConfig{}, fmt.Errorf("failed to set stellar-core version info: %w", err)
}

var captiveCoreConfig []byte
// Default network config
switch config.Network {
Expand Down Expand Up @@ -151,6 +158,30 @@ func (config *Config) GenerateCaptiveCoreConfig() (ledgerbackend.CaptiveCoreConf
}, nil
}

// By default, it points to exec.Command, overridden for testing purpose
var execCommand = exec.Command

// Executes the "stellar-core version" command and parses its output to extract
// the core version
// The output of the "version" command is expected to be a multi-line string where the
// first line is the core version in format "vX.Y.Z-*".
func (c *Config) setCoreVersionInfo() (err error) {
versionCmd := execCommand(c.StellarCoreConfig.StellarCoreBinaryPath, "version")
versionOutput, err := versionCmd.Output()
if err != nil {
return fmt.Errorf("failed to execute stellar-core version command: %w", err)
}

// Split the output into lines
rows := strings.Split(string(versionOutput), "\n")
if len(rows) == 0 || len(rows[0]) == 0 {
return fmt.Errorf("stellar-core version not found")
}
c.CoreVersion = rows[0]
logger.Infof("stellar-core version: %s", c.CoreVersion)
return nil
}

func (config *Config) processToml(tomlPath string) error {
// Load config TOML file
cfg, err := toml.LoadFile(tomlPath)
Expand Down
89 changes: 75 additions & 14 deletions exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package ledgerexporter
import (
"context"
"fmt"
"os"
"os/exec"
"testing"

"github.com/stretchr/testify/require"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/support/errors"
)

func TestNewConfigResumeEnabled(t *testing.T) {
Expand All @@ -16,8 +19,7 @@ func TestNewConfigResumeEnabled(t *testing.T) {
mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 5}, nil).Once()

config, err := NewConfig(ctx,
Flags{StartLedger: 1, EndLedger: 2, ConfigFilePath: "test/test.toml", Resume: true})
config, err := NewConfig(Flags{StartLedger: 1, EndLedger: 2, ConfigFilePath: "test/test.toml", Resume: true})
config.ValidateAndSetLedgerRange(ctx, mockArchive)
require.NoError(t, err)
require.Equal(t, config.DataStoreConfig.Type, "ABC")
Expand All @@ -30,23 +32,19 @@ func TestNewConfigResumeEnabled(t *testing.T) {
}

func TestNewConfigResumeDisabled(t *testing.T) {
ctx := context.Background()

mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 5}, nil).Once()

// resume disabled by default
config, err := NewConfig(ctx,
Flags{StartLedger: 1, EndLedger: 2, ConfigFilePath: "test/test.toml"})
config, err := NewConfig(Flags{StartLedger: 1, EndLedger: 2, ConfigFilePath: "test/test.toml"})
require.NoError(t, err)
require.False(t, config.Resume)
}

func TestInvalidTomlConfig(t *testing.T) {
ctx := context.Background()

_, err := NewConfig(ctx,
Flags{StartLedger: 1, EndLedger: 2, ConfigFilePath: "test/no_network.toml", Resume: true})
_, err := NewConfig(Flags{StartLedger: 1, EndLedger: 2, ConfigFilePath: "test/no_network.toml", Resume: true})
require.ErrorContains(t, err, "Invalid TOML config")
}

Expand Down Expand Up @@ -111,8 +109,7 @@ func TestValidateStartAndEndLedger(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := NewConfig(ctx,
Flags{StartLedger: tt.startLedger, EndLedger: tt.endLedger, ConfigFilePath: "test/validate_start_end.toml"})
config, err := NewConfig(Flags{StartLedger: tt.startLedger, EndLedger: tt.endLedger, ConfigFilePath: "test/validate_start_end.toml"})
require.NoError(t, err)
err = config.ValidateAndSetLedgerRange(ctx, mockArchive)
if tt.errMsg != "" {
Expand Down Expand Up @@ -189,8 +186,7 @@ func TestAdjustedLedgerRangeBoundedMode(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := NewConfig(ctx,
Flags{StartLedger: tt.start, EndLedger: tt.end, ConfigFilePath: tt.configFile})
config, err := NewConfig(Flags{StartLedger: tt.start, EndLedger: tt.end, ConfigFilePath: tt.configFile})
require.NoError(t, err)
err = config.ValidateAndSetLedgerRange(ctx, mockArchive)
require.NoError(t, err)
Expand Down Expand Up @@ -258,8 +254,7 @@ func TestAdjustedLedgerRangeUnBoundedMode(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := NewConfig(ctx,
Flags{StartLedger: tt.start, EndLedger: tt.end, ConfigFilePath: tt.configFile})
config, err := NewConfig(Flags{StartLedger: tt.start, EndLedger: tt.end, ConfigFilePath: tt.configFile})
require.NoError(t, err)
err = config.ValidateAndSetLedgerRange(ctx, mockArchive)
require.NoError(t, err)
Expand All @@ -268,3 +263,69 @@ func TestAdjustedLedgerRangeUnBoundedMode(t *testing.T) {
})
}
}

var cmdOut = ""

func fakeExecCommand(command string, args ...string) *exec.Cmd {
cs := append([]string{"-test.run=TestExecCmdHelperProcess", "--", command}, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = append(os.Environ(), "GO_EXEC_CMD_HELPER_PROCESS=1", "CMD_OUT="+cmdOut)
return cmd
}

func TestExecCmdHelperProcess(t *testing.T) {
if os.Getenv("GO_EXEC_CMD_HELPER_PROCESS") != "1" {
return
}
fmt.Fprintf(os.Stdout, os.Getenv("CMD_OUT"))
os.Exit(0)
}

func TestSetCoreVersionInfo(t *testing.T) {
tests := []struct {
name string
commandOutput string
expectedError error
expectedCoreVer string
}{
{
name: "version found",
commandOutput: "v20.2.0-2-g6e73c0a88\n" +
"rust version: rustc 1.74.1 (a28077b28 2023-12-04)\n" +
"soroban-env-host: \n" +
" curr:\n" +
" package version: 20.2.0\n" +
" git version: 1bfc0f2a2ee134efc1e1b0d5270281d0cba61c2e\n" +
" ledger protocol version: 20\n" +
" pre-release version: 0\n" +
" rs-stellar-xdr:\n" +
" package version: 20.1.0\n" +
" git version: 8b9d623ef40423a8462442b86997155f2c04d3a1\n" +
" base XDR git version: b96148cd4acc372cc9af17b909ffe4b12c43ecb6\n",
expectedError: nil,
expectedCoreVer: "v20.2.0-2-g6e73c0a88",
},
{
name: "core version not found",
commandOutput: "",
expectedError: errors.New("stellar-core version not found"),
expectedCoreVer: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := Config{}

cmdOut = tt.commandOutput
execCommand = fakeExecCommand
err := config.setCoreVersionInfo()

if tt.expectedError != nil {
require.EqualError(t, err, tt.expectedError.Error())
} else {
require.NoError(t, err)
require.Equal(t, tt.expectedCoreVer, config.CoreVersion)
}
})
}
}
37 changes: 18 additions & 19 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/xdr"
)

type ExportManager struct {
config datastore.LedgerBatchConfig
config *Config
ledgerBackend ledgerbackend.LedgerBackend
currentMetaArchive *LedgerMetaArchive
currentMetaArchive *xdr.LedgerCloseMetaBatch
queue UploadQueue
latestLedgerMetric *prometheus.GaugeVec
}

// NewExportManager creates a new ExportManager with the provided configuration.
func NewExportManager(config datastore.LedgerBatchConfig, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) (*ExportManager, error) {
if config.LedgersPerFile < 1 {
return nil, errors.Errorf("Invalid ledgers per file (%d): must be at least 1", config.LedgersPerFile)
func NewExportManager(config *Config, backend ledgerbackend.LedgerBackend, queue UploadQueue, prometheusRegistry *prometheus.Registry) (*ExportManager, error) {
if config.LedgerBatchConfig.LedgersPerFile < 1 {
return nil, errors.Errorf("Invalid ledgers per file (%d): must be at least 1", config.LedgerBatchConfig.LedgersPerFile)
}

latestLedgerMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand All @@ -45,32 +44,32 @@ func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta
ledgerSeq := ledgerCloseMeta.LedgerSequence()

// Determine the object key for the given ledger sequence
objectKey := e.config.GetObjectKeyFromSequenceNumber(ledgerSeq)
objectKey := e.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(ledgerSeq)

if e.currentMetaArchive != nil && e.currentMetaArchive.ObjectKey != objectKey {
return errors.New("Current meta archive object key mismatch")
}
if e.currentMetaArchive == nil {
endSeq := ledgerSeq + e.config.LedgersPerFile - 1
if ledgerSeq < e.config.LedgersPerFile {
endSeq := ledgerSeq + e.config.LedgerBatchConfig.LedgersPerFile - 1
if ledgerSeq < e.config.LedgerBatchConfig.LedgersPerFile {
// Special case: Adjust the end ledger sequence for the first batch.
// Since the start ledger is 2 instead of 0, we want to ensure that the end ledger sequence
// does not exceed LedgersPerFile.
// For example, if LedgersPerFile is 64, the file name for the first batch should be 0-63, not 2-66.
endSeq = e.config.LedgersPerFile - 1
endSeq = e.config.LedgerBatchConfig.LedgersPerFile - 1
}

// Create a new LedgerMetaArchive and add it to the map.
e.currentMetaArchive = NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq)
// Create a new LedgerCloseMetaBatch
e.currentMetaArchive = &xdr.LedgerCloseMetaBatch{StartSequence: xdr.Uint32(ledgerSeq), EndSequence: xdr.Uint32(endSeq)}
}

if err := e.currentMetaArchive.Data.AddLedger(ledgerCloseMeta); err != nil {
if err := e.currentMetaArchive.AddLedger(ledgerCloseMeta); err != nil {
return errors.Wrapf(err, "failed to add ledger %d", ledgerSeq)
}

if ledgerSeq >= uint32(e.currentMetaArchive.Data.EndSequence) {
// Current archive is full, send it for upload
if err := e.queue.Enqueue(ctx, e.currentMetaArchive); err != nil {
if ledgerSeq >= uint32(e.currentMetaArchive.EndSequence) {
ledgerMetaArchive, err := NewLedgerMetaArchiveFromXDR(e.config, objectKey, *e.currentMetaArchive)
if err != nil {
return err
}
if err := e.queue.Enqueue(ctx, ledgerMetaArchive); err != nil {
return err
}
e.currentMetaArchive = nil
Expand Down
Loading

0 comments on commit 34b0e4c

Please sign in to comment.