From 100dc4fa6043cf65dd17f53f352cbf6752823847 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 17 Jun 2024 20:35:18 +0200 Subject: [PATCH] Misc fixes (#5352) --- exp/services/ledgerexporter/internal/app.go | 2 +- .../ledgerexporter/internal/config.go | 4 ++- .../ledgerexporter/internal/config_test.go | 4 +-- historyarchive/archive.go | 2 ++ historyarchive/archive_pool.go | 7 +++-- ingest/ledgerbackend/captive_core_backend.go | 7 +++-- ingest/ledgerbackend/main.go | 26 +++++++++-------- ingest/ledgerbackend/mock_cmd_test.go | 17 ++++++----- ingest/ledgerbackend/stellar_core_runner.go | 29 +++++++++++++++---- services/horizon/internal/ingest/main.go | 1 + support/datastore/history_archive.go | 7 +++-- support/db/session.go | 24 +-------------- 12 files changed, 71 insertions(+), 59 deletions(-) diff --git a/exp/services/ledgerexporter/internal/app.go b/exp/services/ledgerexporter/internal/app.go index eb0f544ec3..54c7a72096 100644 --- a/exp/services/ledgerexporter/internal/app.go +++ b/exp/services/ledgerexporter/internal/app.go @@ -107,7 +107,7 @@ func (a *App) init(ctx context.Context, runtimeSettings RuntimeSettings) error { if a.config, err = NewConfig(runtimeSettings); err != nil { return errors.Wrap(err, "Could not load configuration") } - if archive, err = a.config.GenerateHistoryArchive(ctx); err != nil { + if archive, err = a.config.GenerateHistoryArchive(ctx, logger); err != nil { return err } if err = a.config.ValidateAndSetLedgerRange(ctx, archive); err != nil { diff --git a/exp/services/ledgerexporter/internal/config.go b/exp/services/ledgerexporter/internal/config.go index ff0ffbe2bf..b040e2dcc9 100644 --- a/exp/services/ledgerexporter/internal/config.go +++ b/exp/services/ledgerexporter/internal/config.go @@ -11,6 +11,7 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/network" + "github.com/stellar/go/support/log" "github.com/pelletier/go-toml" @@ -142,12 +143,13 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his return nil } -func (config *Config) GenerateHistoryArchive(ctx context.Context) (historyarchive.ArchiveInterface, error) { +func (config *Config) GenerateHistoryArchive(ctx context.Context, entry *log.Entry) (historyarchive.ArchiveInterface, error) { return historyarchive.NewArchivePool(config.StellarCoreConfig.HistoryArchiveUrls, historyarchive.ArchiveOptions{ ConnectOptions: storage.ConnectOptions{ UserAgent: config.UserAgent, Context: ctx, }, + Logger: logger, }) } diff --git a/exp/services/ledgerexporter/internal/config_test.go b/exp/services/ledgerexporter/internal/config_test.go index 37dc574012..6523df7605 100644 --- a/exp/services/ledgerexporter/internal/config_test.go +++ b/exp/services/ledgerexporter/internal/config_test.go @@ -46,7 +46,7 @@ func TestGenerateHistoryArchiveFromPreconfiguredNetwork(t *testing.T) { RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/valid_captive_core_preconfigured.toml", Mode: Append}) require.NoError(t, err) - _, err = config.GenerateHistoryArchive(ctx) + _, err = config.GenerateHistoryArchive(ctx, nil) require.NoError(t, err) } @@ -56,7 +56,7 @@ func TestGenerateHistoryArchiveFromManulConfiguredNetwork(t *testing.T) { RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/valid_captive_core_manual.toml", Mode: Append}) require.NoError(t, err) - _, err = config.GenerateHistoryArchive(ctx) + _, err = config.GenerateHistoryArchive(ctx, nil) require.NoError(t, err) } diff --git a/historyarchive/archive.go b/historyarchive/archive.go index d52c41ec43..4f9e14380f 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/stellar/go/support/errors" + supportlog "github.com/stellar/go/support/log" "github.com/stellar/go/support/storage" "github.com/stellar/go/xdr" ) @@ -43,6 +44,7 @@ type CommandOptions struct { type ArchiveOptions struct { storage.ConnectOptions + Logger *supportlog.Entry // NetworkPassphrase defines the expected network of history archive. It is // checked when getting HAS. If network passphrase does not match, error is // returned. diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go index 259f8ff48a..48178ade26 100644 --- a/historyarchive/archive_pool.go +++ b/historyarchive/archive_pool.go @@ -10,6 +10,7 @@ import ( "time" "github.com/pkg/errors" + log "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" @@ -19,6 +20,7 @@ import ( // An ArchivePool is just a collection of `ArchiveInterface`s so that we can // distribute requests fairly throughout the pool. type ArchivePool struct { + logger *log.Entry backoff backoff.BackOff pool []ArchiveInterface curr int @@ -46,6 +48,7 @@ func NewArchivePoolWithBackoff(archiveURLs []string, opts ArchiveOptions, strate ap := ArchivePool{ pool: make([]ArchiveInterface, 0, len(archiveURLs)), backoff: strategy, + logger: opts.Logger, } var lastErr error @@ -107,8 +110,8 @@ func (pa *ArchivePool) runRoundRobin(runner func(ai ArchiveInterface) error) err } // Intentionally avoid logging context errors - if stats := ai.GetStats(); len(stats) > 0 { - log.WithField("error", err).Warnf( + if stats := ai.GetStats(); len(stats) > 0 && pa.logger != nil { + pa.logger.WithField("error", err).Warnf( "Encountered an error with archive '%s'", stats[0].GetBackendName()) } diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 9dfc82ac02..879049f873 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -179,6 +179,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { archivePool, err := historyarchive.NewArchivePool( config.HistoryArchiveURLs, historyarchive.ArchiveOptions{ + Logger: config.Log, NetworkPassphrase: config.NetworkPassphrase, CheckpointFrequency: config.CheckpointFrequency, ConnectOptions: storage.ConnectOptions{ @@ -780,14 +781,14 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3 // all subsequent calls to PrepareRange(), GetLedger(), etc will fail. // Close is thread-safe and can be called from another go routine. func (c *CaptiveStellarCore) Close() error { + // after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail + c.cancel() + c.stellarCoreLock.RLock() defer c.stellarCoreLock.RUnlock() c.closed = true - // after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail - c.cancel() - // TODO: Sucks to ignore the error here, but no worse than it was before, // so... if c.ledgerHashStore != nil { diff --git a/ingest/ledgerbackend/main.go b/ingest/ledgerbackend/main.go index 4a5d119de2..6029b8a1f6 100644 --- a/ingest/ledgerbackend/main.go +++ b/ingest/ledgerbackend/main.go @@ -1,7 +1,6 @@ package ledgerbackend import ( - "io" "io/fs" "io/ioutil" "os" @@ -40,7 +39,7 @@ func (realSystemCaller) stat(name string) (isDir, error) { func (realSystemCaller) command(name string, arg ...string) cmdI { cmd := exec.Command(name, arg...) - return &realCmd{cmd} + return &realCmd{Cmd: cmd} } type cmdI interface { @@ -49,36 +48,39 @@ type cmdI interface { Start() error Run() error setDir(dir string) - setStdout(stdout io.Writer) - getStdout() io.Writer - setStderr(stderr io.Writer) - getStderr() io.Writer + setStdout(stdout *logLineWriter) + getStdout() *logLineWriter + setStderr(stderr *logLineWriter) + getStderr() *logLineWriter getProcess() *os.Process setExtraFiles([]*os.File) } type realCmd struct { *exec.Cmd + stdout, stderr *logLineWriter } func (r *realCmd) setDir(dir string) { r.Cmd.Dir = dir } -func (r *realCmd) setStdout(stdout io.Writer) { +func (r *realCmd) setStdout(stdout *logLineWriter) { + r.stdout = stdout r.Cmd.Stdout = stdout } -func (r *realCmd) getStdout() io.Writer { - return r.Cmd.Stdout +func (r *realCmd) getStdout() *logLineWriter { + return r.stdout } -func (r *realCmd) setStderr(stderr io.Writer) { +func (r *realCmd) setStderr(stderr *logLineWriter) { + r.stderr = stderr r.Cmd.Stderr = stderr } -func (r *realCmd) getStderr() io.Writer { - return r.Cmd.Stderr +func (r *realCmd) getStderr() *logLineWriter { + return r.stderr } func (r *realCmd) getProcess() *os.Process { diff --git a/ingest/ledgerbackend/mock_cmd_test.go b/ingest/ledgerbackend/mock_cmd_test.go index a28b6c8d01..bf06a9ae86 100644 --- a/ingest/ledgerbackend/mock_cmd_test.go +++ b/ingest/ledgerbackend/mock_cmd_test.go @@ -35,22 +35,22 @@ func (m *mockCmd) setDir(dir string) { m.Called(dir) } -func (m *mockCmd) setStdout(stdout io.Writer) { +func (m *mockCmd) setStdout(stdout *logLineWriter) { m.Called(stdout) } -func (m *mockCmd) getStdout() io.Writer { +func (m *mockCmd) getStdout() *logLineWriter { args := m.Called() - return args.Get(0).(io.Writer) + return args.Get(0).(*logLineWriter) } -func (m *mockCmd) setStderr(stderr io.Writer) { +func (m *mockCmd) setStderr(stderr *logLineWriter) { m.Called(stderr) } -func (m *mockCmd) getStderr() io.Writer { +func (m *mockCmd) getStderr() *logLineWriter { args := m.Called() - return args.Get(0).(io.Writer) + return args.Get(0).(*logLineWriter) } func (m *mockCmd) getProcess() *os.Process { @@ -64,12 +64,13 @@ func (m *mockCmd) setExtraFiles(files []*os.File) { func simpleCommandMock() *mockCmd { _, writer := io.Pipe() + llw := logLineWriter{pipeWriter: writer} cmdMock := &mockCmd{} cmdMock.On("setDir", mock.Anything) cmdMock.On("setStdout", mock.Anything) - cmdMock.On("getStdout").Return(writer) + cmdMock.On("getStdout").Return(&llw) cmdMock.On("setStderr", mock.Anything) - cmdMock.On("getStderr").Return(writer) + cmdMock.On("getStderr").Return(&llw) cmdMock.On("getProcess").Return(&os.Process{}).Maybe() cmdMock.On("setExtraFiles", mock.Anything) cmdMock.On("Start").Return(nil) diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 7f883b69c5..57e8c1c0f9 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -194,13 +194,32 @@ func (r *stellarCoreRunner) getConfFileName() string { return path } -func (r *stellarCoreRunner) getLogLineWriter() io.Writer { +type logLineWriter struct { + pipeWriter *io.PipeWriter + wg sync.WaitGroup +} + +func (l *logLineWriter) Write(p []byte) (n int, err error) { + return l.pipeWriter.Write(p) +} + +func (l *logLineWriter) Close() error { + err := l.pipeWriter.Close() + l.wg.Wait() + return err +} + +func (r *stellarCoreRunner) getLogLineWriter() *logLineWriter { rd, wr := io.Pipe() br := bufio.NewReader(rd) - + result := &logLineWriter{ + pipeWriter: wr, + } // Strip timestamps from log lines from captive stellar-core. We emit our own. dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) + result.wg.Add(1) go func() { + defer result.wg.Done() levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`) for { line, err := br.ReadString('\n') @@ -238,7 +257,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer { } } }() - return wr + return result } func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) { @@ -526,8 +545,8 @@ func (r *stellarCoreRunner) handleExit() { // closeLogLineWriters closes the go routines created by getLogLineWriter() func (r *stellarCoreRunner) closeLogLineWriters(cmd cmdI) { - cmd.getStdout().(*io.PipeWriter).Close() - cmd.getStderr().(*io.PipeWriter).Close() + cmd.getStdout().Close() + cmd.getStderr().Close() } // getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 7e109c391a..0769faee4f 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -240,6 +240,7 @@ func NewSystem(config Config) (System, error) { archive, err := historyarchive.NewArchivePool( config.HistoryArchiveURLs, historyarchive.ArchiveOptions{ + Logger: log.WithField("subservice", "archive"), NetworkPassphrase: config.NetworkPassphrase, CheckpointFrequency: config.CheckpointFrequency, ConnectOptions: storage.ConnectOptions{ diff --git a/support/datastore/history_archive.go b/support/datastore/history_archive.go index 123240dc6f..9fd291bac7 100644 --- a/support/datastore/history_archive.go +++ b/support/datastore/history_archive.go @@ -3,10 +3,12 @@ package datastore import ( "context" + log "github.com/sirupsen/logrus" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/network" "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" + supportlog "github.com/stellar/go/support/log" "github.com/stellar/go/support/storage" ) @@ -15,7 +17,7 @@ const ( Testnet = "testnet" ) -func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string) (historyarchive.ArchiveInterface, error) { +func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string, logger *supportlog.Entry) (historyarchive.ArchiveInterface, error) { var historyArchiveUrls []string switch networkName { case Pubnet: @@ -27,6 +29,7 @@ func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string } return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{ + Logger: logger, ConnectOptions: storage.ConnectOptions{ UserAgent: userAgent, Context: ctx, diff --git a/support/db/session.go b/support/db/session.go index 6b5c2b18c0..dc3c98cafb 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -55,29 +55,7 @@ func (s *Session) context(requestCtx context.Context) (context.Context, context. // Begin binds this session to a new transaction. func (s *Session) Begin(ctx context.Context) error { - if s.tx != nil { - return errors.New("already in transaction") - } - ctx, cancel, err := s.context(ctx) - if err != nil { - return err - } - - tx, err := s.DB.BeginTxx(ctx, nil) - if err != nil { - if knownErr := s.handleError(err, ctx); knownErr != nil { - cancel() - return knownErr - } - - cancel() - return errors.Wrap(err, "beginx failed") - } - log.Debug("sql: begin") - s.tx = tx - s.txOptions = nil - s.txCancel = cancel - return nil + return s.BeginTx(ctx, nil) } // BeginTx binds this session to a new transaction which is configured with the