diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 9d1f7f7fc0..c8f28974f5 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -349,11 +349,11 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error ) } - c.stellarCoreRunner = c.stellarCoreRunnerFactory() - err = c.stellarCoreRunner.catchup(from, to) - if err != nil { + stellarCoreRunner := c.stellarCoreRunnerFactory() + if err = stellarCoreRunner.catchup(from, to); err != nil { return errors.Wrap(err, "error running stellar-core") } + c.stellarCoreRunner = stellarCoreRunner // The next ledger should be the first ledger of the checkpoint containing // the requested ledger @@ -375,11 +375,11 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro return errors.Wrap(err, "error calculating ledger and hash for stellar-core run") } - c.stellarCoreRunner = c.stellarCoreRunnerFactory() - err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash) - if err != nil { + stellarCoreRunner := c.stellarCoreRunnerFactory() + if err = stellarCoreRunner.runFrom(runFrom, ledgerHash); err != nil { return errors.Wrap(err, "error running stellar-core") } + c.stellarCoreRunner = stellarCoreRunner // In the online mode we update nextLedger after streaming the first ledger. // This is to support versions before and after/including v17.1.0 that @@ -556,7 +556,7 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool { return false } - if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited { + if _, exited := c.stellarCoreRunner.getProcessExitError(); exited { return false } @@ -627,9 +627,6 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd if c.stellarCoreRunner == nil { return xdr.LedgerCloseMeta{}, errors.New("stellar-core cannot be nil, call PrepareRange first") } - if c.closed { - return xdr.LedgerCloseMeta{}, errors.New("stellar-core has an error, call PrepareRange first") - } if sequence < c.nextExpectedSequence() { return xdr.LedgerCloseMeta{}, errors.Errorf( @@ -647,12 +644,17 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd ) } + ch, ok := c.stellarCoreRunner.getMetaPipe() + if !ok { + return xdr.LedgerCloseMeta{}, errors.New("stellar-core is not running, call PrepareRange first") + } + // Now loop along the range until we find the ledger we want. for { select { case <-ctx.Done(): return xdr.LedgerCloseMeta{}, ctx.Err() - case result, ok := <-c.stellarCoreRunner.getMetaPipe(): + case result, ok := <-ch: found, ledger, err := c.handleMetaPipeResult(sequence, result, ok) if found || err != nil { return ledger, err @@ -732,7 +734,7 @@ func (c *CaptiveStellarCore) checkMetaPipeResult(result metaResult, ok bool) err return err } if !ok || result.err != nil { - exited, err := c.stellarCoreRunner.getProcessExitError() + err, exited := c.stellarCoreRunner.getProcessExitError() if exited && err != nil { // Case 2 - The stellar core process exited unexpectedly with an error message return errors.Wrap(err, "stellar core exited unexpectedly") @@ -775,12 +777,12 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3 if c.stellarCoreRunner == nil { return 0, errors.New("stellar-core cannot be nil, call PrepareRange first") } - if c.closed { - return 0, errors.New("stellar-core is closed, call PrepareRange first") - + ch, ok := c.stellarCoreRunner.getMetaPipe() + if !ok { + return 0, errors.New("stellar-core is not running, call PrepareRange first") } if c.lastLedger == nil { - return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil + return c.nextExpectedSequence() - 1 + uint32(len(ch)), nil } return *c.lastLedger, nil } diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 76319c2f77..f8161aec25 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -42,14 +42,14 @@ func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error { return a.Error(0) } -func (m *stellarCoreRunnerMock) getMetaPipe() <-chan metaResult { +func (m *stellarCoreRunnerMock) getMetaPipe() (<-chan metaResult, bool) { a := m.Called() - return a.Get(0).(<-chan metaResult) + return a.Get(0).(<-chan metaResult), a.Bool(1) } -func (m *stellarCoreRunnerMock) getProcessExitError() (bool, error) { +func (m *stellarCoreRunnerMock) getProcessExitError() (error, bool) { a := m.Called() - return a.Bool(0), a.Error(1) + return a.Error(0), a.Bool(1) } func (m *stellarCoreRunnerMock) close() error { @@ -213,7 +213,7 @@ func TestCaptivePrepareRange(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -251,8 +251,8 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getProcessExitError").Return(true, errors.New("exit code -1")) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getProcessExitError").Return(errors.New("exit code -1"), true) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("close").Return(nil).Once() mockRunner.On("context").Return(ctx) @@ -292,7 +292,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -328,7 +328,7 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -364,7 +364,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("close").Return(fmt.Errorf("transient error")) - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) mockRunner.On("context").Return(ctx) captiveBackend := CaptiveStellarCore{ @@ -440,7 +440,7 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) { } mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100))) @@ -481,7 +481,7 @@ func TestCaptivePrepareRangeWithDB_FromIsAheadOfRootHAS(t *testing.T) { LedgerCloseMeta: &meta, } mockRunner.On("runFrom", uint32(99), "").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100))) @@ -517,7 +517,6 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) { func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(192)).Return(errors.New("transient error")).Once() - mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -552,7 +551,6 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() - mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -604,9 +602,9 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -653,7 +651,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -699,7 +697,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil) @@ -766,9 +764,9 @@ func TestCaptiveGetLedger(t *testing.T) { ctx, cancel := context.WithCancel(ctx) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -857,7 +855,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { defer cancel() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -919,7 +917,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -965,7 +963,7 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(64), mock.Anything).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -1067,13 +1065,13 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) ctx, cancel := context.WithCancel(ctx) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Run(func(args mock.Arguments) { cancel() }).Once() - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) // even if the request to fetch the latest checkpoint succeeds, we should fail at creating the subprocess mockArchive := &historyarchive.MockArchive{} @@ -1125,7 +1123,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(fmt.Errorf("transient error")).Once() @@ -1167,7 +1165,7 @@ func TestCaptiveAfterClose(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} ctx, cancel := context.WithCancel(context.Background()) mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() @@ -1222,7 +1220,7 @@ func TestGetLedgerBoundsCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(128), uint32(130)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -1346,9 +1344,9 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { ctx := testCase.ctx mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(64), uint32(100)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("getProcessExitError").Return(testCase.processExited, testCase.processExitedError) + mockRunner.On("getProcessExitError").Return(testCase.processExitedError, testCase.processExited) mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} @@ -1514,7 +1512,7 @@ func TestCaptiveRunFromParams(t *testing.T) { func TestCaptiveIsPrepared(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("context").Return(context.Background()).Maybe() - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) // c.prepared == nil captiveBackend := CaptiveStellarCore{ @@ -1578,7 +1576,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} ctx, cancel := context.WithCancel(context.Background()) mockRunner.On("context").Return(ctx).Maybe() - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) rang := UnboundedRange(100) captiveBackend := CaptiveStellarCore{ @@ -1630,7 +1628,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() diff --git a/ingest/ledgerbackend/catchup.go b/ingest/ledgerbackend/catchup.go new file mode 100644 index 0000000000..2cd12df0f3 --- /dev/null +++ b/ingest/ledgerbackend/catchup.go @@ -0,0 +1,76 @@ +package ledgerbackend + +import ( + "context" + "fmt" + + "github.com/stellar/go/support/log" +) + +type catchupStream struct { + dir workingDir + from uint32 + to uint32 + coreCmdFactory coreCmdFactory + log *log.Entry + useDB bool +} + +func newCatchupStream(r *stellarCoreRunner, from, to uint32) catchupStream { + // We want to use ephemeral directories in running the catchup command + // (used for the reingestion use case) because it's possible to run parallel + // reingestion where multiple captive cores are active on the same machine. + // Having ephemeral directories will ensure that each ingestion worker will + // have a separate working directory + dir := newWorkingDir(r, true) + return catchupStream{ + dir: dir, + from: from, + to: to, + coreCmdFactory: newCoreCmdFactory(r, dir), + log: r.log, + useDB: r.useDB, + } +} + +func (s catchupStream) getWorkingDir() workingDir { + return s.dir +} + +func (s catchupStream) start(ctx context.Context) (cmdI, pipe, error) { + var err error + var cmd cmdI + var captiveCorePipe pipe + + rangeArg := fmt.Sprintf("%d/%d", s.to, s.to-s.from+1) + params := []string{"catchup", rangeArg, "--metadata-output-stream", s.coreCmdFactory.getPipeName()} + + // horizon operator has specified to use external storage for captive core ledger state + // instruct captive core invocation to not use memory, and in that case + // cc will look at DATABASE property in cfg toml for the external storage source to use. + // when using external storage of ledgers, use new-db to first set the state of + // remote db storage to genesis to purge any prior state and reset. + if s.useDB { + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOffline, true, "new-db") + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err) + } + } else { + params = append(params, "--in-memory") + } + + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOffline, true, params...) + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + captiveCorePipe, err = s.coreCmdFactory.startCaptiveCore(cmd) + if err != nil { + return nil, pipe{}, fmt.Errorf("error starting `stellar-core run` subprocess: %w", err) + } + + return cmd, captiveCorePipe, nil +} diff --git a/ingest/ledgerbackend/cmd.go b/ingest/ledgerbackend/cmd.go new file mode 100644 index 0000000000..8af729f0a6 --- /dev/null +++ b/ingest/ledgerbackend/cmd.go @@ -0,0 +1,157 @@ +package ledgerbackend + +import ( + "context" + "fmt" + "io/fs" + "io/ioutil" + "math/rand" + "os" + "os/exec" + "time" + + "github.com/stellar/go/support/log" +) + +type isDir interface { + IsDir() bool +} + +type systemCaller interface { + removeAll(path string) error + writeFile(filename string, data []byte, perm fs.FileMode) error + mkdirAll(path string, perm os.FileMode) error + stat(name string) (isDir, error) + command(ctx context.Context, name string, arg ...string) cmdI +} + +type realSystemCaller struct{} + +func (realSystemCaller) removeAll(path string) error { + return os.RemoveAll(path) +} + +func (realSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error { + return ioutil.WriteFile(filename, data, perm) +} + +func (realSystemCaller) mkdirAll(path string, perm os.FileMode) error { + return os.MkdirAll(path, perm) +} + +func (realSystemCaller) stat(name string) (isDir, error) { + return os.Stat(name) +} + +func (realSystemCaller) command(ctx context.Context, name string, arg ...string) cmdI { + cmd := exec.CommandContext(ctx, name, arg...) + cmd.Cancel = func() error { + return cmd.Process.Signal(os.Interrupt) + } + cmd.WaitDelay = time.Second * 10 + return &realCmd{Cmd: cmd} +} + +type cmdI interface { + Output() ([]byte, error) + Wait() error + Start() error + Run() error + setDir(dir string) + setLogLineWriter(logWriter *logLineWriter) + setExtraFiles([]*os.File) +} + +type realCmd struct { + *exec.Cmd + logWriter *logLineWriter +} + +func (r *realCmd) setDir(dir string) { + r.Cmd.Dir = dir +} + +func (r *realCmd) setLogLineWriter(logWriter *logLineWriter) { + r.logWriter = logWriter +} + +func (r *realCmd) setExtraFiles(extraFiles []*os.File) { + r.ExtraFiles = extraFiles +} + +func (r *realCmd) Start() error { + if r.logWriter != nil { + r.Cmd.Stdout = r.logWriter + r.Cmd.Stderr = r.logWriter + r.logWriter.Start() + } + err := r.Cmd.Start() + if err != nil && r.logWriter != nil { + r.logWriter.Close() + } + return err +} + +func (r *realCmd) Run() error { + if r.logWriter != nil { + r.Cmd.Stdout = r.logWriter + r.Cmd.Stderr = r.logWriter + r.logWriter.Start() + } + err := r.Cmd.Run() + if r.logWriter != nil { + r.logWriter.Close() + } + return err +} + +func (r *realCmd) Wait() error { + err := r.Cmd.Wait() + if r.logWriter != nil { + r.logWriter.Close() + } + return err +} + +type coreCmdFactory struct { + log *log.Entry + systemCaller systemCaller + executablePath string + dir workingDir + nonce string +} + +func newCoreCmdFactory(r *stellarCoreRunner, dir workingDir) coreCmdFactory { + return coreCmdFactory{ + log: r.log, + systemCaller: r.systemCaller, + executablePath: r.executablePath, + dir: dir, + nonce: fmt.Sprintf( + "captive-stellar-core-%x", + rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), + ), + } +} + +func (c coreCmdFactory) newCmd(ctx context.Context, mode stellarCoreRunnerMode, redirectOutputToLogs bool, params ...string) (cmdI, error) { + if err := c.dir.createIfNotExists(); err != nil { + return nil, err + } + + if err := c.dir.writeConf(mode); err != nil { + return nil, fmt.Errorf("error writing configuration: %w", err) + } + + allParams := []string{"--conf", c.dir.getConfFileName()} + if redirectOutputToLogs { + allParams = append(allParams, "--console") + } + allParams = append(allParams, params...) + cmd := c.systemCaller.command(ctx, c.executablePath, allParams...) + cmd.setDir(c.dir.path) + if redirectOutputToLogs { + cmd.setLogLineWriter(newLogLineWriter(c.log)) + } + return cmd, nil +} diff --git a/ingest/ledgerbackend/stellar_core_runner_posix.go b/ingest/ledgerbackend/cmd_posix.go similarity index 75% rename from ingest/ledgerbackend/stellar_core_runner_posix.go rename to ingest/ledgerbackend/cmd_posix.go index 6f34a49a75..c36dc208ee 100644 --- a/ingest/ledgerbackend/stellar_core_runner_posix.go +++ b/ingest/ledgerbackend/cmd_posix.go @@ -4,26 +4,25 @@ package ledgerbackend import ( + "fmt" "os" - - "github.com/pkg/errors" ) // Posix-specific methods for the StellarCoreRunner type. -func (c *stellarCoreRunner) getPipeName() string { +func (c coreCmdFactory) getPipeName() string { // The exec.Cmd.ExtraFiles field carries *io.File values that are assigned // to child process fds counting from 3, and we'll be passing exactly one // fd: the write end of the anonymous pipe below. return "fd:3" } -func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { +func (c coreCmdFactory) startCaptiveCore(cmd cmdI) (pipe, error) { // First make an anonymous pipe. // Note io.File objects close-on-finalization. readFile, writeFile, err := os.Pipe() if err != nil { - return pipe{}, errors.Wrap(err, "error making a pipe") + return pipe{}, fmt.Errorf("error making a pipe: %w", err) } p := pipe{Reader: readFile, File: writeFile} @@ -34,7 +33,7 @@ func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { if err != nil { writeFile.Close() readFile.Close() - return pipe{}, errors.Wrap(err, "error starting stellar-core") + return pipe{}, fmt.Errorf("error starting stellar-core: %w", err) } return p, nil diff --git a/ingest/ledgerbackend/stellar_core_runner_windows.go b/ingest/ledgerbackend/cmd_windows.go similarity index 85% rename from ingest/ledgerbackend/stellar_core_runner_windows.go rename to ingest/ledgerbackend/cmd_windows.go index 47368a55b6..6eb6e4d0d2 100644 --- a/ingest/ledgerbackend/stellar_core_runner_windows.go +++ b/ingest/ledgerbackend/cmd_windows.go @@ -11,11 +11,11 @@ import ( // Windows-specific methods for the stellarCoreRunner type. -func (c *stellarCoreRunner) getPipeName() string { +func (c coreCmdFactory) getPipeName() string { return fmt.Sprintf(`\\.\pipe\%s`, c.nonce) } -func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { +func (c coreCmdFactory) startCaptiveCore(cmd cmdI) (pipe, error) { // First set up the server pipe. listener, err := winio.ListenPipe(c.getPipeName(), nil) if err != nil { diff --git a/ingest/ledgerbackend/core_log.go b/ingest/ledgerbackend/core_log.go new file mode 100644 index 0000000000..438bc136b0 --- /dev/null +++ b/ingest/ledgerbackend/core_log.go @@ -0,0 +1,82 @@ +package ledgerbackend + +import ( + "bufio" + "io" + "regexp" + "strings" + "sync" + + "github.com/stellar/go/support/log" +) + +type logLineWriter struct { + pipeReader *io.PipeReader + pipeWriter *io.PipeWriter + wg sync.WaitGroup + log *log.Entry +} + +func newLogLineWriter(log *log.Entry) *logLineWriter { + rd, wr := io.Pipe() + return &logLineWriter{ + pipeReader: rd, + pipeWriter: wr, + log: log, + } +} + +func (l *logLineWriter) Write(p []byte) (n int, err error) { + return l.pipeWriter.Write(p) +} + +func (l *logLineWriter) Close() error { + err := l.pipeWriter.Close() + l.wg.Wait() + return err +} + +func (l *logLineWriter) Start() { + br := bufio.NewReader(l.pipeReader) + l.wg.Add(1) + go func() { + defer l.wg.Done() + dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) + levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`) + for { + line, err := br.ReadString('\n') + if err != nil { + break + } + line = dateRx.ReplaceAllString(line, "") + line = strings.TrimSpace(line) + + if line == "" { + continue + } + + matches := levelRx.FindStringSubmatch(line) + if len(matches) >= 4 { + // Extract the substrings from the log entry and trim it + category, level := matches[1], matches[2] + line = matches[3] + + levelMapping := map[string]func(string, ...interface{}){ + "FATAL": l.log.Errorf, + "ERROR": l.log.Errorf, + "WARNING": l.log.Warnf, + "INFO": l.log.Infof, + "DEBUG": l.log.Debugf, + } + + writer := l.log.Infof + if f, ok := levelMapping[strings.ToUpper(level)]; ok { + writer = f + } + writer("%s: %s", category, line) + } else { + l.log.Info(line) + } + } + }() +} diff --git a/ingest/ledgerbackend/dir.go b/ingest/ledgerbackend/dir.go new file mode 100644 index 0000000000..d26835936c --- /dev/null +++ b/ingest/ledgerbackend/dir.go @@ -0,0 +1,109 @@ +package ledgerbackend + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/stellar/go/support/log" +) + +type workingDir struct { + ephemeral bool + path string + log *log.Entry + toml *CaptiveCoreToml + systemCaller systemCaller +} + +func newWorkingDir(r *stellarCoreRunner, ephemeral bool) workingDir { + var path string + if ephemeral { + path = filepath.Join(r.storagePath, "captive-core-"+createRandomHexString(8)) + } else { + path = filepath.Join(r.storagePath, "captive-core") + } + return workingDir{ + ephemeral: ephemeral, + path: path, + log: r.log, + toml: r.toml, + systemCaller: r.systemCaller, + } +} + +func (w workingDir) createIfNotExists() error { + info, err := w.systemCaller.stat(w.path) + if os.IsNotExist(err) { + innerErr := w.systemCaller.mkdirAll(w.path, os.FileMode(int(0755))) // rwx|rx|rx + if innerErr != nil { + return fmt.Errorf("failed to create storage directory (%s): %w", w.path, innerErr) + } + } else if !info.IsDir() { + return fmt.Errorf("%s is not a directory", w.path) + } else if err != nil { + return fmt.Errorf("error accessing storage directory (%s): %w", w.path, err) + } + + return nil +} + +func (w workingDir) writeConf(mode stellarCoreRunnerMode) error { + text, err := generateConfig(w.toml, mode) + if err != nil { + return err + } + + w.log.Debugf("captive core config file contents:\n%s", string(text)) + return w.systemCaller.writeFile(w.getConfFileName(), text, 0644) +} + +func (w workingDir) cleanup(coreExitError error) error { + if w.ephemeral || (coreExitError != nil && !errors.Is(coreExitError, context.Canceled)) { + return w.remove() + } + return nil +} + +func (w workingDir) remove() error { + return w.systemCaller.removeAll(w.path) +} + +func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) { + if mode == stellarCoreRunnerModeOffline { + var err error + captiveCoreToml, err = captiveCoreToml.CatchupToml() + if err != nil { + return nil, fmt.Errorf("could not generate catch up config: %w", err) + } + } + + if !captiveCoreToml.QuorumSetIsConfigured() { + return nil, fmt.Errorf("captive-core config file does not define any quorum set") + } + + text, err := captiveCoreToml.Marshal() + if err != nil { + return nil, fmt.Errorf("could not marshal captive core config: %w", err) + } + return text, nil +} + +func (w workingDir) getConfFileName() string { + joinedPath := filepath.Join(w.path, "stellar-core.conf") + + // Given that `storagePath` can be anything, we need the full, absolute path + // here so that everything Core needs is created under the storagePath + // subdirectory. + // + // If the path *can't* be absolutely resolved (bizarre), we can still try + // recovering by using the path the user specified directly. + path, err := filepath.Abs(joinedPath) + if err != nil { + w.log.Warnf("Failed to resolve %s as an absolute path: %s", joinedPath, err) + return joinedPath + } + return path +} diff --git a/ingest/ledgerbackend/main.go b/ingest/ledgerbackend/main.go deleted file mode 100644 index 6029b8a1f6..0000000000 --- a/ingest/ledgerbackend/main.go +++ /dev/null @@ -1,92 +0,0 @@ -package ledgerbackend - -import ( - "io/fs" - "io/ioutil" - "os" - "os/exec" -) - -type isDir interface { - IsDir() bool -} - -type systemCaller interface { - removeAll(path string) error - writeFile(filename string, data []byte, perm fs.FileMode) error - mkdirAll(path string, perm os.FileMode) error - stat(name string) (isDir, error) - command(name string, arg ...string) cmdI -} - -type realSystemCaller struct{} - -func (realSystemCaller) removeAll(path string) error { - return os.RemoveAll(path) -} - -func (realSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error { - return ioutil.WriteFile(filename, data, perm) -} - -func (realSystemCaller) mkdirAll(path string, perm os.FileMode) error { - return os.MkdirAll(path, perm) -} - -func (realSystemCaller) stat(name string) (isDir, error) { - return os.Stat(name) -} - -func (realSystemCaller) command(name string, arg ...string) cmdI { - cmd := exec.Command(name, arg...) - return &realCmd{Cmd: cmd} -} - -type cmdI interface { - Output() ([]byte, error) - Wait() error - Start() error - Run() error - setDir(dir string) - setStdout(stdout *logLineWriter) - getStdout() *logLineWriter - setStderr(stderr *logLineWriter) - getStderr() *logLineWriter - getProcess() *os.Process - setExtraFiles([]*os.File) -} - -type realCmd struct { - *exec.Cmd - stdout, stderr *logLineWriter -} - -func (r *realCmd) setDir(dir string) { - r.Cmd.Dir = dir -} - -func (r *realCmd) setStdout(stdout *logLineWriter) { - r.stdout = stdout - r.Cmd.Stdout = stdout -} - -func (r *realCmd) getStdout() *logLineWriter { - return r.stdout -} - -func (r *realCmd) setStderr(stderr *logLineWriter) { - r.stderr = stderr - r.Cmd.Stderr = stderr -} - -func (r *realCmd) getStderr() *logLineWriter { - return r.stderr -} - -func (r *realCmd) getProcess() *os.Process { - return r.Cmd.Process -} - -func (r *realCmd) setExtraFiles(extraFiles []*os.File) { - r.ExtraFiles = extraFiles -} diff --git a/ingest/ledgerbackend/mock_cmd_test.go b/ingest/ledgerbackend/mock_cmd_test.go index bf06a9ae86..8be533e8b1 100644 --- a/ingest/ledgerbackend/mock_cmd_test.go +++ b/ingest/ledgerbackend/mock_cmd_test.go @@ -1,7 +1,6 @@ package ledgerbackend import ( - "io" "os" "github.com/stretchr/testify/mock" @@ -35,27 +34,8 @@ func (m *mockCmd) setDir(dir string) { m.Called(dir) } -func (m *mockCmd) setStdout(stdout *logLineWriter) { - m.Called(stdout) -} - -func (m *mockCmd) getStdout() *logLineWriter { - args := m.Called() - return args.Get(0).(*logLineWriter) -} - -func (m *mockCmd) setStderr(stderr *logLineWriter) { - m.Called(stderr) -} - -func (m *mockCmd) getStderr() *logLineWriter { - args := m.Called() - return args.Get(0).(*logLineWriter) -} - -func (m *mockCmd) getProcess() *os.Process { - args := m.Called() - return args.Get(0).(*os.Process) +func (m *mockCmd) setLogLineWriter(logWriter *logLineWriter) { + m.Called(logWriter) } func (m *mockCmd) setExtraFiles(files []*os.File) { @@ -63,15 +43,9 @@ func (m *mockCmd) setExtraFiles(files []*os.File) { } func simpleCommandMock() *mockCmd { - _, writer := io.Pipe() - llw := logLineWriter{pipeWriter: writer} cmdMock := &mockCmd{} cmdMock.On("setDir", mock.Anything) - cmdMock.On("setStdout", mock.Anything) - cmdMock.On("getStdout").Return(&llw) - cmdMock.On("setStderr", mock.Anything) - cmdMock.On("getStderr").Return(&llw) - cmdMock.On("getProcess").Return(&os.Process{}).Maybe() + cmdMock.On("setLogLineWriter", mock.Anything) cmdMock.On("setExtraFiles", mock.Anything) cmdMock.On("Start").Return(nil) return cmdMock diff --git a/ingest/ledgerbackend/mock_system_caller_test.go b/ingest/ledgerbackend/mock_system_caller_test.go index 99e1faede9..7878e39f34 100644 --- a/ingest/ledgerbackend/mock_system_caller_test.go +++ b/ingest/ledgerbackend/mock_system_caller_test.go @@ -1,6 +1,7 @@ package ledgerbackend import ( + "context" "io/fs" "os" @@ -37,8 +38,8 @@ func (m *mockSystemCaller) stat(name string) (isDir, error) { return args.Get(0).(isDir), args.Error(1) } -func (m *mockSystemCaller) command(name string, arg ...string) cmdI { - a := []interface{}{name} +func (m *mockSystemCaller) command(ctx context.Context, name string, arg ...string) cmdI { + a := []interface{}{ctx, name} for _, ar := range arg { a = append(a, ar) } diff --git a/ingest/ledgerbackend/run_from.go b/ingest/ledgerbackend/run_from.go new file mode 100644 index 0000000000..2d02322519 --- /dev/null +++ b/ingest/ledgerbackend/run_from.go @@ -0,0 +1,140 @@ +package ledgerbackend + +import ( + "context" + "encoding/json" + "fmt" + "runtime" + + "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/support/log" +) + +type runFromStream struct { + dir workingDir + from uint32 + hash string + coreCmdFactory coreCmdFactory + log *log.Entry + useDB bool +} + +func newRunFromStream(r *stellarCoreRunner, from uint32, hash string) runFromStream { + // We only use ephemeral directories on windows because there is + // no way to terminate captive core gracefully on windows. + // Having an ephemeral directory ensures that it is wiped out + // whenever we terminate captive core + dir := newWorkingDir(r, runtime.GOOS == "windows") + return runFromStream{ + dir: dir, + from: from, + hash: hash, + coreCmdFactory: newCoreCmdFactory(r, dir), + log: r.log, + useDB: r.useDB, + } +} + +func (s runFromStream) getWorkingDir() workingDir { + return s.dir +} + +func (s runFromStream) offlineInfo(ctx context.Context) (stellarcore.InfoResponse, error) { + cmd, err := s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, false, "offline-info") + if err != nil { + return stellarcore.InfoResponse{}, fmt.Errorf("error creating offline-info cmd: %w", err) + } + output, err := cmd.Output() + if err != nil { + return stellarcore.InfoResponse{}, fmt.Errorf("error executing offline-info cmd: %w", err) + } + var info stellarcore.InfoResponse + err = json.Unmarshal(output, &info) + if err != nil { + return stellarcore.InfoResponse{}, fmt.Errorf("invalid output of offline-info cmd: %w", err) + } + return info, nil +} + +func (s runFromStream) start(ctx context.Context) (cmd cmdI, captiveCorePipe pipe, returnErr error) { + var err error + var createNewDB bool + defer func() { + if returnErr != nil && createNewDB { + // if we could not start captive core remove the new db we created + s.dir.remove() + } + }() + if s.useDB { + // Check if on-disk core DB exists and what's the LCL there. If not what + // we need remove storage dir and start from scratch. + var info stellarcore.InfoResponse + info, err = s.offlineInfo(ctx) + if err != nil { + s.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err) + createNewDB = true + } else if info.Info.Ledger.Num <= 1 || uint32(info.Info.Ledger.Num) > s.from { + s.log.Infof("Unexpected LCL in Stellar-Core DB: %d (want: %d), removing existing storage-dir contents", info.Info.Ledger.Num, s.from) + createNewDB = true + } + + if createNewDB { + if err = s.dir.remove(); err != nil { + return nil, pipe{}, fmt.Errorf("error removing existing storage-dir contents: %w", err) + } + + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "new-db") + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err) + } + + // Do a quick catch-up to set the LCL in core to be our expected starting + // point. + if s.from > 2 { + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "catchup", fmt.Sprintf("%d/0", s.from-1)) + } else { + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "catchup", "2/0") + } + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error runing stellar-core catchup: %w", err) + } + } + + cmd, err = s.coreCmdFactory.newCmd( + ctx, + stellarCoreRunnerModeOnline, + true, + "run", + "--metadata-output-stream", s.coreCmdFactory.getPipeName(), + ) + } else { + cmd, err = s.coreCmdFactory.newCmd( + ctx, + stellarCoreRunnerModeOnline, + true, + "run", + "--in-memory", + "--start-at-ledger", fmt.Sprintf("%d", s.from), + "--start-at-hash", s.hash, + "--metadata-output-stream", s.coreCmdFactory.getPipeName(), + ) + } + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + captiveCorePipe, err = s.coreCmdFactory.startCaptiveCore(cmd) + if err != nil { + return nil, pipe{}, fmt.Errorf("error starting `stellar-core run` subprocess: %w", err) + } + + return cmd, captiveCorePipe, nil +} diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 57e8c1c0f9..5245051dce 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -1,33 +1,21 @@ package ledgerbackend import ( - "bufio" "context" - "encoding/json" "fmt" "io" "math/rand" - "os" - "path" - "path/filepath" - "regexp" - "runtime" - "strings" "sync" - "time" - "github.com/pkg/errors" - - "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/log" ) type stellarCoreRunnerInterface interface { catchup(from, to uint32) error runFrom(from uint32, hash string) error - getMetaPipe() <-chan metaResult + getMetaPipe() (<-chan metaResult, bool) context() context.Context - getProcessExitError() (bool, error) + getProcessExitError() (error, bool) close() error } @@ -51,29 +39,33 @@ type pipe struct { File io.Closer } +type executionState struct { + cmd cmdI + workingDir workingDir + ledgerBuffer *bufferedLedgerMetaReader + pipe pipe + wg sync.WaitGroup + processExitedLock sync.RWMutex + processExited bool + processExitError error + log *log.Entry +} + type stellarCoreRunner struct { executablePath string - - started bool - cmd cmdI - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - ledgerBuffer *bufferedLedgerMetaReader - pipe pipe - mode stellarCoreRunnerMode + ctx context.Context + cancel context.CancelFunc systemCaller systemCaller - lock sync.Mutex - closeOnce sync.Once - processExited bool - processExitError error + stateLock sync.Mutex + state *executionState + + closeOnce sync.Once storagePath string toml *CaptiveCoreToml useDB bool - nonce string log *log.Entry } @@ -96,12 +88,8 @@ func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { cancel: cancel, storagePath: config.StoragePath, useDB: config.UseDB, - nonce: fmt.Sprintf( - "captive-stellar-core-%x", - rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), - ), - log: config.Log, - toml: config.Toml, + log: config.Log, + toml: config.Toml, systemCaller: realSystemCaller{}, } @@ -109,356 +97,54 @@ func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { return runner } -func (r *stellarCoreRunner) getFullStoragePath() string { - if runtime.GOOS == "windows" || r.mode == stellarCoreRunnerModeOffline { - // On Windows, first we ALWAYS append something to the base storage path, - // because we will delete the directory entirely when Horizon stops. We also - // add a random suffix in order to ensure that there aren't naming - // conflicts. - // This is done because it's impossible to send SIGINT on Windows so - // buckets can become corrupted. - // We also want to use random directories in offline mode (reingestion) - // because it's possible it's running multiple Stellar-Cores on a single - // machine. - return path.Join(r.storagePath, "captive-core-"+createRandomHexString(8)) - } else { - // Use the specified directory to store Captive Core's data: - // https://github.com/stellar/go/issues/3437 - // but be sure to re-use rather than replace it: - // https://github.com/stellar/go/issues/3631 - return path.Join(r.storagePath, "captive-core") - } -} - -func (r *stellarCoreRunner) establishStorageDirectory() error { - info, err := r.systemCaller.stat(r.storagePath) - if os.IsNotExist(err) { - innerErr := r.systemCaller.mkdirAll(r.storagePath, os.FileMode(int(0755))) // rwx|rx|rx - if innerErr != nil { - return errors.Wrap(innerErr, fmt.Sprintf( - "failed to create storage directory (%s)", r.storagePath)) - } - } else if !info.IsDir() { - return errors.New(fmt.Sprintf("%s is not a directory", r.storagePath)) - } else if err != nil { - return errors.Wrap(err, fmt.Sprintf( - "error accessing storage directory (%s)", r.storagePath)) - } - - return nil -} - -func (r *stellarCoreRunner) writeConf() (string, error) { - text, err := generateConfig(r.toml, r.mode) - if err != nil { - return "", err - } - - return string(text), r.systemCaller.writeFile(r.getConfFileName(), text, 0644) -} - -func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) { - if mode == stellarCoreRunnerModeOffline { - var err error - captiveCoreToml, err = captiveCoreToml.CatchupToml() - if err != nil { - return nil, errors.Wrap(err, "could not generate catch up config") - } - } - - if !captiveCoreToml.QuorumSetIsConfigured() { - return nil, errors.New("captive-core config file does not define any quorum set") - } - - text, err := captiveCoreToml.Marshal() - if err != nil { - return nil, errors.Wrap(err, "could not marshal captive core config") - } - return text, nil -} - -func (r *stellarCoreRunner) getConfFileName() string { - joinedPath := filepath.Join(r.storagePath, "stellar-core.conf") - - // Given that `storagePath` can be anything, we need the full, absolute path - // here so that everything Core needs is created under the storagePath - // subdirectory. - // - // If the path *can't* be absolutely resolved (bizarre), we can still try - // recovering by using the path the user specified directly. - path, err := filepath.Abs(joinedPath) - if err != nil { - r.log.Warnf("Failed to resolve %s as an absolute path: %s", joinedPath, err) - return joinedPath - } - return path -} - -type logLineWriter struct { - pipeWriter *io.PipeWriter - wg sync.WaitGroup -} - -func (l *logLineWriter) Write(p []byte) (n int, err error) { - return l.pipeWriter.Write(p) -} - -func (l *logLineWriter) Close() error { - err := l.pipeWriter.Close() - l.wg.Wait() - return err -} - -func (r *stellarCoreRunner) getLogLineWriter() *logLineWriter { - rd, wr := io.Pipe() - br := bufio.NewReader(rd) - result := &logLineWriter{ - pipeWriter: wr, - } - // Strip timestamps from log lines from captive stellar-core. We emit our own. - dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) - result.wg.Add(1) - go func() { - defer result.wg.Done() - levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`) - for { - line, err := br.ReadString('\n') - if err != nil { - break - } - line = dateRx.ReplaceAllString(line, "") - line = strings.TrimSpace(line) - - if line == "" { - continue - } - - matches := levelRx.FindStringSubmatch(line) - if len(matches) >= 4 { - // Extract the substrings from the log entry and trim it - category, level := matches[1], matches[2] - line = matches[3] - - levelMapping := map[string]func(string, ...interface{}){ - "FATAL": r.log.Errorf, - "ERROR": r.log.Errorf, - "WARNING": r.log.Warnf, - "INFO": r.log.Infof, - "DEBUG": r.log.Debugf, - } - - writer := r.log.Infof - if f, ok := levelMapping[strings.ToUpper(level)]; ok { - writer = f - } - writer("%s: %s", category, line) - } else { - r.log.Info(line) - } - } - }() - return result -} - -func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) { - allParams := []string{"--conf", r.getConfFileName(), "offline-info"} - cmd := r.systemCaller.command(r.executablePath, allParams...) - cmd.setDir(r.storagePath) - output, err := cmd.Output() - if err != nil { - return stellarcore.InfoResponse{}, errors.Wrap(err, "error executing offline-info cmd") - } - var info stellarcore.InfoResponse - err = json.Unmarshal(output, &info) - if err != nil { - return stellarcore.InfoResponse{}, errors.Wrap(err, "invalid output of offline-info cmd") - } - return info, nil -} - -func (r *stellarCoreRunner) createCmd(params ...string) (cmdI, error) { - err := r.establishStorageDirectory() - if err != nil { - return nil, err - } - - if conf, err := r.writeConf(); err != nil { - return nil, errors.Wrap(err, "error writing configuration") - } else { - r.log.Debugf("captive core config file contents:\n%s", conf) - } - - allParams := append([]string{"--conf", r.getConfFileName(), "--console"}, params...) - cmd := r.systemCaller.command(r.executablePath, allParams...) - cmd.setDir(r.storagePath) - cmd.setStdout(r.getLogLineWriter()) - cmd.setStderr(r.getLogLineWriter()) - return cmd, nil -} - // context returns the context.Context instance associated with the running captive core instance func (r *stellarCoreRunner) context() context.Context { return r.ctx } +// runFrom executes the run command with a starting ledger on the captive core subprocess +func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { + return r.startMetaStream(newRunFromStream(r, from, hash)) +} + // catchup executes the catchup command on the captive core subprocess func (r *stellarCoreRunner) catchup(from, to uint32) error { - r.lock.Lock() - defer r.lock.Unlock() - - // check if we have already been closed - if r.ctx.Err() != nil { - return r.ctx.Err() - } - - if r.started { - return errors.New("runner already started") - } - - r.mode = stellarCoreRunnerModeOffline - r.storagePath = r.getFullStoragePath() - - rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) - params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()} - - // horizon operator has specified to use external storage for captive core ledger state - // instruct captive core invocation to not use memory, and in that case - // cc will look at DATABASE property in cfg toml for the external storage source to use. - // when using external storage of ledgers, use new-db to first set the state of - // remote db storage to genesis to purge any prior state and reset. - if r.useDB { - cmd, err := r.createCmd("new-db") - if err != nil { - return errors.Wrap(err, "error creating command") - } - if err := cmd.Run(); err != nil { - return errors.Wrap(err, "error initializing core db") - } - } else { - params = append(params, "--in-memory") - } - - var err error - r.cmd, err = r.createCmd(params...) - if err != nil { - return errors.Wrap(err, "error creating command") - } - - r.pipe, err = r.start(r.cmd) - if err != nil { - r.closeLogLineWriters(r.cmd) - return errors.Wrap(err, "error starting `stellar-core catchup` subprocess") - } - - r.started = true - r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) - go r.ledgerBuffer.start() - - if binaryWatcher, err := newFileWatcher(r); err != nil { - r.log.Warnf("could not create captive core binary watcher: %v", err) - } else { - go binaryWatcher.loop() - } - - r.wg.Add(1) - go r.handleExit() + return r.startMetaStream(newCatchupStream(r, from, to)) +} - return nil +type metaStream interface { + getWorkingDir() workingDir + start(ctx context.Context) (cmdI, pipe, error) } -// runFrom executes the run command with a starting ledger on the captive core subprocess -func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { - r.lock.Lock() - defer r.lock.Unlock() +func (r *stellarCoreRunner) startMetaStream(stream metaStream) error { + r.stateLock.Lock() + defer r.stateLock.Unlock() // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() } - if r.started { - return errors.New("runner already started") + if r.state != nil { + return fmt.Errorf("runner already started") } - r.mode = stellarCoreRunnerModeOnline - r.storagePath = r.getFullStoragePath() - - var err error - - if r.useDB { - // Check if on-disk core DB exists and what's the LCL there. If not what - // we need remove storage dir and start from scratch. - removeStorageDir := false - var info stellarcore.InfoResponse - info, err = r.offlineInfo() - if err != nil { - r.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err) - removeStorageDir = true - } else if uint32(info.Info.Ledger.Num) > from { - r.log.Infof("Unexpected LCL in Stellar-Core DB: %d (want: %d), removing existing storage-dir contents", info.Info.Ledger.Num, from) - removeStorageDir = true - } - - if removeStorageDir { - if err = r.systemCaller.removeAll(r.storagePath); err != nil { - return errors.Wrap(err, "error removing existing storage-dir contents") - } - - var cmd cmdI - cmd, err = r.createCmd("new-db") - if err != nil { - return errors.Wrap(err, "error creating command") - } - - if err = cmd.Run(); err != nil { - return errors.Wrap(err, "error initializing core db") - } - - // Do a quick catch-up to set the LCL in core to be our expected starting - // point. - if from > 2 { - cmd, err = r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)) - } else { - cmd, err = r.createCmd("catchup", "2/0") - } - - if err != nil { - return errors.Wrap(err, "error creating command") - } - - if err = cmd.Run(); err != nil { - return errors.Wrap(err, "error runing stellar-core catchup") - } - } - - r.cmd, err = r.createCmd( - "run", - "--metadata-output-stream", - r.getPipeName(), - ) - } else { - r.cmd, err = r.createCmd( - "run", - "--in-memory", - "--start-at-ledger", fmt.Sprintf("%d", from), - "--start-at-hash", hash, - "--metadata-output-stream", r.getPipeName(), - ) + state := &executionState{ + workingDir: stream.getWorkingDir(), + log: r.log, } + cmd, p, err := stream.start(r.ctx) if err != nil { - return errors.Wrap(err, "error creating command") + state.workingDir.cleanup(nil) + return err } - r.pipe, err = r.start(r.cmd) - if err != nil { - r.closeLogLineWriters(r.cmd) - return errors.Wrap(err, "error starting `stellar-core run` subprocess") - } - - r.started = true - r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) - go r.ledgerBuffer.start() + state.cmd = cmd + state.pipe = p + state.ledgerBuffer = newBufferedLedgerMetaReader(state.pipe.Reader) + go state.ledgerBuffer.start() if binaryWatcher, err := newFileWatcher(r); err != nil { r.log.Warnf("could not create captive core binary watcher: %v", err) @@ -466,101 +152,78 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { go binaryWatcher.loop() } - r.wg.Add(1) - go r.handleExit() + state.wg.Add(1) + go state.handleExit() + r.state = state return nil } -func (r *stellarCoreRunner) handleExit() { - defer r.wg.Done() - - // Pattern recommended in: - // https://github.com/golang/go/blob/cacac8bdc5c93e7bc71df71981fdf32dded017bf/src/cmd/go/script_test.go#L1091-L1098 - interrupt := os.Interrupt - if runtime.GOOS == "windows" { - // Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on - // Windows; using it with os.Process.Signal will return an error.” - // Fall back to Kill instead. - interrupt = os.Kill - } +func (r *stellarCoreRunner) getExecutionState() *executionState { + r.stateLock.Lock() + defer r.stateLock.Unlock() + return r.state +} - errc := make(chan error) - go func() { - select { - case errc <- nil: - return - case <-r.ctx.Done(): - } +func (state *executionState) handleExit() { + defer state.wg.Done() - err := r.cmd.getProcess().Signal(interrupt) - if err == nil { - err = r.ctx.Err() // Report ctx.Err() as the reason we interrupted. - } else if err.Error() == "os: process already finished" { - errc <- nil - return - } + waitErr := state.cmd.Wait() - timer := time.NewTimer(10 * time.Second) - select { - // Report ctx.Err() as the reason we interrupted the process... - case errc <- r.ctx.Err(): - timer.Stop() - return - // ...but after killDelay has elapsed, fall back to a stronger signal. - case <-timer.C: - } + // By closing the pipe file we will send an EOF to the pipe reader used by ledgerBuffer. + if err := state.pipe.File.Close(); err != nil { + state.log.WithError(err).Warn("could not close captive core write pipe") + } - // Wait still hasn't returned. - // Kill the process harder to make sure that it exits. - // - // Ignore any error: if cmd.Process has already terminated, we still - // want to send ctx.Err() (or the error from the Interrupt call) - // to properly attribute the signal that may have terminated it. - _ = r.cmd.getProcess().Kill() + state.processExitedLock.Lock() + defer state.processExitedLock.Unlock() + state.processExited = true + state.processExitError = waitErr +} - errc <- err - }() +func (state *executionState) getProcessExitError() (error, bool) { + state.processExitedLock.RLock() + defer state.processExitedLock.RUnlock() + return state.processExitError, state.processExited +} - waitErr := r.cmd.Wait() - r.closeLogLineWriters(r.cmd) +func (state *executionState) cleanup() error { + // wait for the stellar core process to terminate + state.wg.Wait() - r.lock.Lock() - defer r.lock.Unlock() + // drain meta pipe channel to make sure the ledger buffer goroutine exits + for range state.ledgerBuffer.getChannel() { - // By closing the pipe file we will send an EOF to the pipe reader used by ledgerBuffer. - // We need to do this operation with the lock to ensure that the processExitError is available - // when the ledgerBuffer channel is closed - if closeErr := r.pipe.File.Close(); closeErr != nil { - r.log.WithError(closeErr).Warn("could not close captive core write pipe") } - r.processExited = true - if interruptErr := <-errc; interruptErr != nil { - r.processExitError = interruptErr - } else { - r.processExitError = waitErr + // now it's safe to close the pipe reader + // because the ledger buffer is no longer reading from it + if err := state.pipe.Reader.Close(); err != nil { + state.log.WithError(err).Warn("could not close captive core read pipe") } -} -// closeLogLineWriters closes the go routines created by getLogLineWriter() -func (r *stellarCoreRunner) closeLogLineWriters(cmd cmdI) { - cmd.getStdout().Close() - cmd.getStderr().Close() + processExitError, _ := state.getProcessExitError() + return state.workingDir.cleanup(processExitError) } // getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess -func (r *stellarCoreRunner) getMetaPipe() <-chan metaResult { - return r.ledgerBuffer.getChannel() +func (r *stellarCoreRunner) getMetaPipe() (<-chan metaResult, bool) { + state := r.getExecutionState() + if state == nil { + return nil, false + } + return state.ledgerBuffer.getChannel(), true } // getProcessExitError returns an exit error (can be nil) of the process and a bool indicating // if the process has exited yet // getProcessExitError is thread safe -func (r *stellarCoreRunner) getProcessExitError() (bool, error) { - r.lock.Lock() - defer r.lock.Unlock() - return r.processExited, r.processExitError +func (r *stellarCoreRunner) getProcessExitError() (error, bool) { + state := r.getExecutionState() + if state == nil { + return nil, false + } + return state.getProcessExitError() } // close kills the captive core process if it is still running and performs @@ -569,43 +232,11 @@ func (r *stellarCoreRunner) getProcessExitError() (bool, error) { func (r *stellarCoreRunner) close() error { var closeError error r.closeOnce.Do(func() { - r.lock.Lock() - // we cancel the context while holding the lock in order to guarantee that - // this captive core instance cannot start once the lock is released. - // catchup() and runFrom() can only execute while holding the lock and if - // the context is canceled both catchup() and runFrom() will abort early - // without performing any side effects (e.g. state mutations). r.cancel() - r.lock.Unlock() - - // only reap captive core sub process and related go routines if we've started - // otherwise, just cleanup the temp dir - if r.started { - // wait for the stellar core process to terminate - r.wg.Wait() - - // drain meta pipe channel to make sure the ledger buffer goroutine exits - for range r.getMetaPipe() { - - } - - // now it's safe to close the pipe reader - // because the ledger buffer is no longer reading from it - r.pipe.Reader.Close() - } - - if r.mode != 0 && (runtime.GOOS == "windows" || - (r.processExitError != nil && r.processExitError != context.Canceled) || - r.mode == stellarCoreRunnerModeOffline) { - // It's impossible to send SIGINT on Windows so buckets can become - // corrupted. If we can't reuse it, then remove it. - // We also remove the storage path if there was an error terminating the - // process (files can be corrupted). - // We remove all files when reingesting to save disk space. - closeError = r.systemCaller.removeAll(r.storagePath) - return + state := r.getExecutionState() + if state != nil { + closeError = state.cleanup() } }) - return closeError } diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 00cb29137b..f53cd88328 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -37,6 +37,7 @@ func TestCloseOffline(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -78,6 +79,7 @@ func TestCloseOnline(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -121,6 +123,7 @@ func TestCloseOnlineWithError(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -141,7 +144,7 @@ func TestCloseOnlineWithError(t *testing.T) { // Wait with calling close until r.processExitError is set to Wait() error for { - _, err := runner.getProcessExitError() + err, _ := runner.getProcessExitError() if err != nil { break } @@ -175,6 +178,7 @@ func TestCloseConcurrency(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -196,7 +200,7 @@ func TestCloseConcurrency(t *testing.T) { go func() { defer wg.Done() assert.NoError(t, runner.close()) - exited, err := runner.getProcessExitError() + err, exited := runner.getProcessExitError() assert.True(t, exited) assert.Error(t, err) }() @@ -238,12 +242,14 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, "offline-info", ).Return(offlineInfoCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -299,12 +305,14 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, "offline-info", ).Return(offlineInfoCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -360,12 +368,14 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, "offline-info", ).Return(offlineInfoCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -373,6 +383,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { "new-db", ).Return(newDBCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -381,6 +392,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { "99/0", ).Return(catchupCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything,