Skip to content

Commit

Permalink
Misc fixes (#5352)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Jun 17, 2024
1 parent 3a31ed7 commit 100dc4f
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 59 deletions.
2 changes: 1 addition & 1 deletion exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (a *App) init(ctx context.Context, runtimeSettings RuntimeSettings) error {
if a.config, err = NewConfig(runtimeSettings); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = a.config.GenerateHistoryArchive(ctx); err != nil {
if archive, err = a.config.GenerateHistoryArchive(ctx, logger); err != nil {
return err
}
if err = a.config.ValidateAndSetLedgerRange(ctx, archive); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion exp/services/ledgerexporter/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"

"github.com/pelletier/go-toml"

Expand Down Expand Up @@ -142,12 +143,13 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his
return nil
}

func (config *Config) GenerateHistoryArchive(ctx context.Context) (historyarchive.ArchiveInterface, error) {
func (config *Config) GenerateHistoryArchive(ctx context.Context, entry *log.Entry) (historyarchive.ArchiveInterface, error) {
return historyarchive.NewArchivePool(config.StellarCoreConfig.HistoryArchiveUrls, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: config.UserAgent,
Context: ctx,
},
Logger: logger,
})
}

Expand Down
4 changes: 2 additions & 2 deletions exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGenerateHistoryArchiveFromPreconfiguredNetwork(t *testing.T) {
RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/valid_captive_core_preconfigured.toml", Mode: Append})
require.NoError(t, err)

_, err = config.GenerateHistoryArchive(ctx)
_, err = config.GenerateHistoryArchive(ctx, nil)
require.NoError(t, err)
}

Expand All @@ -56,7 +56,7 @@ func TestGenerateHistoryArchiveFromManulConfiguredNetwork(t *testing.T) {
RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/valid_captive_core_manual.toml", Mode: Append})
require.NoError(t, err)

_, err = config.GenerateHistoryArchive(ctx)
_, err = config.GenerateHistoryArchive(ctx, nil)
require.NoError(t, err)
}

Expand Down
2 changes: 2 additions & 0 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)
Expand All @@ -43,6 +44,7 @@ type CommandOptions struct {
type ArchiveOptions struct {
storage.ConnectOptions

Logger *supportlog.Entry
// NetworkPassphrase defines the expected network of history archive. It is
// checked when getting HAS. If network passphrase does not match, error is
// returned.
Expand Down
7 changes: 5 additions & 2 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/pkg/errors"

log "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

Expand All @@ -19,6 +20,7 @@ import (
// An ArchivePool is just a collection of `ArchiveInterface`s so that we can
// distribute requests fairly throughout the pool.
type ArchivePool struct {
logger *log.Entry
backoff backoff.BackOff
pool []ArchiveInterface
curr int
Expand Down Expand Up @@ -46,6 +48,7 @@ func NewArchivePoolWithBackoff(archiveURLs []string, opts ArchiveOptions, strate
ap := ArchivePool{
pool: make([]ArchiveInterface, 0, len(archiveURLs)),
backoff: strategy,
logger: opts.Logger,
}
var lastErr error

Expand Down Expand Up @@ -107,8 +110,8 @@ func (pa *ArchivePool) runRoundRobin(runner func(ai ArchiveInterface) error) err
}

// Intentionally avoid logging context errors
if stats := ai.GetStats(); len(stats) > 0 {
log.WithField("error", err).Warnf(
if stats := ai.GetStats(); len(stats) > 0 && pa.logger != nil {
pa.logger.WithField("error", err).Warnf(
"Encountered an error with archive '%s'",
stats[0].GetBackendName())
}
Expand Down
7 changes: 4 additions & 3 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
archivePool, err := historyarchive.NewArchivePool(
config.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
Logger: config.Log,
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Expand Down Expand Up @@ -780,14 +781,14 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
// Close is thread-safe and can be called from another go routine.
func (c *CaptiveStellarCore) Close() error {
// after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail
c.cancel()

c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

c.closed = true

// after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail
c.cancel()

// TODO: Sucks to ignore the error here, but no worse than it was before,
// so...
if c.ledgerHashStore != nil {
Expand Down
26 changes: 14 additions & 12 deletions ingest/ledgerbackend/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ledgerbackend

import (
"io"
"io/fs"
"io/ioutil"
"os"
Expand Down Expand Up @@ -40,7 +39,7 @@ func (realSystemCaller) stat(name string) (isDir, error) {

func (realSystemCaller) command(name string, arg ...string) cmdI {
cmd := exec.Command(name, arg...)
return &realCmd{cmd}
return &realCmd{Cmd: cmd}
}

type cmdI interface {
Expand All @@ -49,36 +48,39 @@ type cmdI interface {
Start() error
Run() error
setDir(dir string)
setStdout(stdout io.Writer)
getStdout() io.Writer
setStderr(stderr io.Writer)
getStderr() io.Writer
setStdout(stdout *logLineWriter)
getStdout() *logLineWriter
setStderr(stderr *logLineWriter)
getStderr() *logLineWriter
getProcess() *os.Process
setExtraFiles([]*os.File)
}

type realCmd struct {
*exec.Cmd
stdout, stderr *logLineWriter
}

func (r *realCmd) setDir(dir string) {
r.Cmd.Dir = dir
}

func (r *realCmd) setStdout(stdout io.Writer) {
func (r *realCmd) setStdout(stdout *logLineWriter) {
r.stdout = stdout
r.Cmd.Stdout = stdout
}

func (r *realCmd) getStdout() io.Writer {
return r.Cmd.Stdout
func (r *realCmd) getStdout() *logLineWriter {
return r.stdout
}

func (r *realCmd) setStderr(stderr io.Writer) {
func (r *realCmd) setStderr(stderr *logLineWriter) {
r.stderr = stderr
r.Cmd.Stderr = stderr
}

func (r *realCmd) getStderr() io.Writer {
return r.Cmd.Stderr
func (r *realCmd) getStderr() *logLineWriter {
return r.stderr
}

func (r *realCmd) getProcess() *os.Process {
Expand Down
17 changes: 9 additions & 8 deletions ingest/ledgerbackend/mock_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ func (m *mockCmd) setDir(dir string) {
m.Called(dir)
}

func (m *mockCmd) setStdout(stdout io.Writer) {
func (m *mockCmd) setStdout(stdout *logLineWriter) {
m.Called(stdout)
}

func (m *mockCmd) getStdout() io.Writer {
func (m *mockCmd) getStdout() *logLineWriter {
args := m.Called()
return args.Get(0).(io.Writer)
return args.Get(0).(*logLineWriter)
}

func (m *mockCmd) setStderr(stderr io.Writer) {
func (m *mockCmd) setStderr(stderr *logLineWriter) {
m.Called(stderr)
}

func (m *mockCmd) getStderr() io.Writer {
func (m *mockCmd) getStderr() *logLineWriter {
args := m.Called()
return args.Get(0).(io.Writer)
return args.Get(0).(*logLineWriter)
}

func (m *mockCmd) getProcess() *os.Process {
Expand All @@ -64,12 +64,13 @@ func (m *mockCmd) setExtraFiles(files []*os.File) {

func simpleCommandMock() *mockCmd {
_, writer := io.Pipe()
llw := logLineWriter{pipeWriter: writer}
cmdMock := &mockCmd{}
cmdMock.On("setDir", mock.Anything)
cmdMock.On("setStdout", mock.Anything)
cmdMock.On("getStdout").Return(writer)
cmdMock.On("getStdout").Return(&llw)
cmdMock.On("setStderr", mock.Anything)
cmdMock.On("getStderr").Return(writer)
cmdMock.On("getStderr").Return(&llw)
cmdMock.On("getProcess").Return(&os.Process{}).Maybe()
cmdMock.On("setExtraFiles", mock.Anything)
cmdMock.On("Start").Return(nil)
Expand Down
29 changes: 24 additions & 5 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,32 @@ func (r *stellarCoreRunner) getConfFileName() string {
return path
}

func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
type logLineWriter struct {
pipeWriter *io.PipeWriter
wg sync.WaitGroup
}

func (l *logLineWriter) Write(p []byte) (n int, err error) {
return l.pipeWriter.Write(p)
}

func (l *logLineWriter) Close() error {
err := l.pipeWriter.Close()
l.wg.Wait()
return err
}

func (r *stellarCoreRunner) getLogLineWriter() *logLineWriter {
rd, wr := io.Pipe()
br := bufio.NewReader(rd)

result := &logLineWriter{
pipeWriter: wr,
}
// Strip timestamps from log lines from captive stellar-core. We emit our own.
dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `)
result.wg.Add(1)
go func() {
defer result.wg.Done()
levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`)
for {
line, err := br.ReadString('\n')
Expand Down Expand Up @@ -238,7 +257,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
}
}
}()
return wr
return result
}

func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) {
Expand Down Expand Up @@ -526,8 +545,8 @@ func (r *stellarCoreRunner) handleExit() {

// closeLogLineWriters closes the go routines created by getLogLineWriter()
func (r *stellarCoreRunner) closeLogLineWriters(cmd cmdI) {
cmd.getStdout().(*io.PipeWriter).Close()
cmd.getStderr().(*io.PipeWriter).Close()
cmd.getStdout().Close()
cmd.getStderr().Close()
}

// getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func NewSystem(config Config) (System, error) {
archive, err := historyarchive.NewArchivePool(
config.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
Logger: log.WithField("subservice", "archive"),
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Expand Down
7 changes: 5 additions & 2 deletions support/datastore/history_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package datastore
import (
"context"

log "github.com/sirupsen/logrus"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
)

Expand All @@ -15,7 +17,7 @@ const (
Testnet = "testnet"
)

func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string) (historyarchive.ArchiveInterface, error) {
func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string, logger *supportlog.Entry) (historyarchive.ArchiveInterface, error) {
var historyArchiveUrls []string
switch networkName {
case Pubnet:
Expand All @@ -27,6 +29,7 @@ func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string
}

return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{
Logger: logger,
ConnectOptions: storage.ConnectOptions{
UserAgent: userAgent,
Context: ctx,
Expand Down
24 changes: 1 addition & 23 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,7 @@ func (s *Session) context(requestCtx context.Context) (context.Context, context.

// Begin binds this session to a new transaction.
func (s *Session) Begin(ctx context.Context) error {
if s.tx != nil {
return errors.New("already in transaction")
}
ctx, cancel, err := s.context(ctx)
if err != nil {
return err
}

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
cancel()
return knownErr
}

cancel()
return errors.Wrap(err, "beginx failed")
}
log.Debug("sql: begin")
s.tx = tx
s.txOptions = nil
s.txCancel = cancel
return nil
return s.BeginTx(ctx, nil)
}

// BeginTx binds this session to a new transaction which is configured with the
Expand Down

0 comments on commit 100dc4f

Please sign in to comment.